Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix yielding in progress loop #278

Merged
merged 3 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 14 additions & 29 deletions src/margo-core.c
Original file line number Diff line number Diff line change
Expand Up @@ -1979,6 +1979,10 @@
ret = margo_internal_trigger(mid, 0, 1, &actual_count);
} while ((ret == HG_SUCCESS) && actual_count
&& !mid->hg_progress_shutdown_flag);
/* once we have processed callbacks, give the ES an opportunity to
* run other ULTs if it needs to.
*/
ABT_thread_yield();

/* Check to see if there are any runnable ULTs in the pool now. If
* so, then we yield here to allow them a chance to execute.
Expand Down Expand Up @@ -2017,27 +2021,7 @@
ABT_mutex_unlock(mid->pending_operations_mtx);

if (pending || size > 1) {
/* TODO: a custom ABT scheduler could optimize this further by
* delaying Mercury progress until all other runnable ULTs have
* been given a chance to execute. This will often happen
* anyway, but not guaranteed.
*/

ret = margo_internal_progress(mid, 0);
if (ret == HG_SUCCESS) {
/* Mercury completed something; loop around to trigger
* callbacks
*/
} else if (ret == HG_TIMEOUT) {
/* No completion; yield here to allow other ULTs to run */
ABT_thread_yield();
} else {
/* TODO: error handling */
MARGO_CRITICAL(
mid, "unexpected return code (%d: %s) from HG_Progress()",
ret, HG_Error_to_string(ret));
assert(0);
}
hg_progress_timeout = 0;
} else {
hg_progress_timeout = mid->hg_progress_timeout_ub;
ret = __margo_timer_get_next_expiration(mid, &next_timer_exp);
Expand All @@ -2053,14 +2037,15 @@
hg_progress_timeout = 0;
}
}
ret = margo_internal_progress(mid, hg_progress_timeout);
if (ret != HG_SUCCESS && ret != HG_TIMEOUT) {
/* TODO: error handling */
MARGO_CRITICAL(
mid, "unexpected return code (%d: %s) from HG_Progress()",
ret, HG_Error_to_string(ret));
assert(0);
}
}

ret = margo_internal_progress(mid, hg_progress_timeout);
if (ret != HG_SUCCESS && ret != HG_TIMEOUT) {
/* TODO: error handling */
MARGO_CRITICAL(mid,

Check warning on line 2045 in src/margo-core.c

View check run for this annotation

Codecov / codecov/patch

src/margo-core.c#L2045

Added line #L2045 was not covered by tests
"unexpected return code (%d: %s) from HG_Progress()",
ret, HG_Error_to_string(ret));
assert(0);

Check warning on line 2048 in src/margo-core.c

View check run for this annotation

Codecov / codecov/patch

src/margo-core.c#L2048

Added line #L2048 was not covered by tests
}

/* check for any expired timers */
Expand Down
67 changes: 67 additions & 0 deletions tests/unit-tests/margo-init.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@

#include <margo.h>
#include <stdlib.h>
#include <unistd.h>
#include "helper-server.h"
#include "munit/munit.h"

Expand Down Expand Up @@ -184,6 +186,70 @@
return MUNIT_OK;
}

static void kill_test(void* args) {
volatile int* x = (int*)args;
double t = ABT_get_wtime();
while(ABT_get_wtime() - t < 1.0) {
ABT_thread_yield();
}
if(*x == 1) return;
else {
x = NULL;
*x = 1; // segfault

Check warning on line 198 in tests/unit-tests/margo-init.c

View check run for this annotation

Codecov / codecov/patch

tests/unit-tests/margo-init.c#L197-L198

Added lines #L197 - L198 were not covered by tests
}
}

void my_func(void*) {}

static MunitResult multiple_pools_progress_loop(const MunitParameter params[], void* data)
{
const char* config = "{\n"
"\"argobots\": {"
"\"pools\": ["
"{\"name\":\"__primary__\",\"access\":\"mpmc\",\"kind\":\"fifo_wait\"},"
"{\"name\":\"p1\",\"access\":\"mpmc\",\"kind\":\"fifo_wait\"},"
"{\"name\":\"p2\",\"access\":\"mpmc\",\"kind\":\"fifo_wait\"}"
"],"
"\"xstreams\": ["
"{\"name\":\"__primary__\","
"\"scheduler\":{"
"\"pools\":[\"__primary__\",\"p1\"],"
"\"type\":\"basic_wait\""
"}"
"},"
"{\"name\":\"es1\","
"\"scheduler\":{"
"\"pools\":[\"p2\"],"
"\"type\":\"basic_wait\""
"}"
"}"
"]"
"},"
"\"progress_pool\":\"p1\","
"\"rpc_pool\":\"p1\""
"}";

struct margo_init_info info = MARGO_INIT_INFO_INITIALIZER;
info.json_config = config;
margo_instance_id mid = margo_init_ext("na+sm", MARGO_SERVER_MODE, &info);

struct margo_pool_info p2 = {0};
margo_find_pool_by_name(mid, "p2", &p2);

ABT_thread ult, killer;
volatile int x = 0;
ABT_thread_create(p2.pool, kill_test, (void*)&x, ABT_THREAD_ATTR_NULL, &killer);
ABT_thread_create(p2.pool, my_func, NULL, ABT_THREAD_ATTR_NULL, &ult);
ABT_thread_join(ult);
x = 1;
ABT_thread_free(&ult);
ABT_thread_join(killer);
ABT_thread_free(&killer);

margo_finalize(mid);
return MUNIT_OK;
}

static char* protocol_params[] = {
"na+sm", NULL
};
Expand All @@ -208,6 +274,7 @@
{ "/init-cycle-server", init_cycle_server, test_context_setup, test_context_tear_down, MUNIT_TEST_OPTION_NONE, test_params},
{ "/finalize-and-wait", finalize_and_wait, test_context_setup, test_context_tear_down, MUNIT_TEST_OPTION_NONE, test_params},
{ "/ref-incr-and-release", ref_incr_and_release, test_context_setup, test_context_tear_down, MUNIT_TEST_OPTION_NONE, test_params},
{ "/multiple-pools-progress-loop", multiple_pools_progress_loop, test_context_setup, test_context_tear_down, MUNIT_TEST_OPTION_NONE, NULL},
{ NULL, NULL, NULL, NULL, MUNIT_TEST_OPTION_NONE, NULL }
};

Expand Down
Loading