diff --git a/src/margo-core.c b/src/margo-core.c index ecc9644..02c6fef 100644 --- a/src/margo-core.c +++ b/src/margo-core.c @@ -1979,6 +1979,10 @@ void __margo_hg_progress_fn(void* foo) ret = margo_internal_trigger(mid, 0, 1, &actual_count); } while ((ret == HG_SUCCESS) && actual_count && !mid->hg_progress_shutdown_flag); + /* once we have processed callbacks, give the ES an opportunity to + * run other ULTs if it needs to. + */ + ABT_thread_yield(); /* Check to see if there are any runnable ULTs in the pool now. If * so, then we yield here to allow them a chance to execute. @@ -2017,27 +2021,7 @@ void __margo_hg_progress_fn(void* foo) ABT_mutex_unlock(mid->pending_operations_mtx); if (pending || size > 1) { - /* TODO: a custom ABT scheduler could optimize this further by - * delaying Mercury progress until all other runnable ULTs have - * been given a chance to execute. This will often happen - * anyway, but not guaranteed. - */ - - ret = margo_internal_progress(mid, 0); - if (ret == HG_SUCCESS) { - /* Mercury completed something; loop around to trigger - * callbacks - */ - } else if (ret == HG_TIMEOUT) { - /* No completion; yield here to allow other ULTs to run */ - ABT_thread_yield(); - } else { - /* TODO: error handling */ - MARGO_CRITICAL( - mid, "unexpected return code (%d: %s) from HG_Progress()", - ret, HG_Error_to_string(ret)); - assert(0); - } + hg_progress_timeout = 0; } else { hg_progress_timeout = mid->hg_progress_timeout_ub; ret = __margo_timer_get_next_expiration(mid, &next_timer_exp); @@ -2053,14 +2037,15 @@ void __margo_hg_progress_fn(void* foo) hg_progress_timeout = 0; } } - ret = margo_internal_progress(mid, hg_progress_timeout); - if (ret != HG_SUCCESS && ret != HG_TIMEOUT) { - /* TODO: error handling */ - MARGO_CRITICAL( - mid, "unexpected return code (%d: %s) from HG_Progress()", - ret, HG_Error_to_string(ret)); - assert(0); - } + } + + ret = margo_internal_progress(mid, hg_progress_timeout); + if (ret != HG_SUCCESS && ret != HG_TIMEOUT) { + /* TODO: error handling */ + MARGO_CRITICAL(mid, + "unexpected return code (%d: %s) from HG_Progress()", + ret, HG_Error_to_string(ret)); + assert(0); } /* check for any expired timers */ diff --git a/tests/unit-tests/margo-init.c b/tests/unit-tests/margo-init.c index 13bdc4c..dee606f 100644 --- a/tests/unit-tests/margo-init.c +++ b/tests/unit-tests/margo-init.c @@ -1,5 +1,7 @@ #include +#include +#include #include "helper-server.h" #include "munit/munit.h" @@ -184,6 +186,70 @@ static MunitResult ref_incr_and_release(const MunitParameter params[], void* dat return MUNIT_OK; } +static void kill_test(void* args) { + volatile int* x = (int*)args; + double t = ABT_get_wtime(); + while(ABT_get_wtime() - t < 1.0) { + ABT_thread_yield(); + } + if(*x == 1) return; + else { + x = NULL; + *x = 1; // segfault + } +} + +void my_func(void*) {} + +static MunitResult multiple_pools_progress_loop(const MunitParameter params[], void* data) +{ + const char* config = "{\n" + "\"argobots\": {" + "\"pools\": [" + "{\"name\":\"__primary__\",\"access\":\"mpmc\",\"kind\":\"fifo_wait\"}," + "{\"name\":\"p1\",\"access\":\"mpmc\",\"kind\":\"fifo_wait\"}," + "{\"name\":\"p2\",\"access\":\"mpmc\",\"kind\":\"fifo_wait\"}" + "]," + "\"xstreams\": [" + "{\"name\":\"__primary__\"," + "\"scheduler\":{" + "\"pools\":[\"__primary__\",\"p1\"]," + "\"type\":\"basic_wait\"" + "}" + "}," + "{\"name\":\"es1\"," + "\"scheduler\":{" + "\"pools\":[\"p2\"]," + "\"type\":\"basic_wait\"" + "}" + "}" + "]" + "}," + "\"progress_pool\":\"p1\"," + "\"rpc_pool\":\"p1\"" + "}"; + + struct margo_init_info info = MARGO_INIT_INFO_INITIALIZER; + info.json_config = config; + margo_instance_id mid = margo_init_ext("na+sm", MARGO_SERVER_MODE, &info); + + struct margo_pool_info p2 = {0}; + margo_find_pool_by_name(mid, "p2", &p2); + + ABT_thread ult, killer; + volatile int x = 0; + ABT_thread_create(p2.pool, kill_test, (void*)&x, ABT_THREAD_ATTR_NULL, &killer); + ABT_thread_create(p2.pool, my_func, NULL, ABT_THREAD_ATTR_NULL, &ult); + ABT_thread_join(ult); + x = 1; + ABT_thread_free(&ult); + ABT_thread_join(killer); + ABT_thread_free(&killer); + + margo_finalize(mid); + return MUNIT_OK; +} + static char* protocol_params[] = { "na+sm", NULL }; @@ -208,6 +274,7 @@ static MunitTest tests[] = { { "/init-cycle-server", init_cycle_server, test_context_setup, test_context_tear_down, MUNIT_TEST_OPTION_NONE, test_params}, { "/finalize-and-wait", finalize_and_wait, test_context_setup, test_context_tear_down, MUNIT_TEST_OPTION_NONE, test_params}, { "/ref-incr-and-release", ref_incr_and_release, test_context_setup, test_context_tear_down, MUNIT_TEST_OPTION_NONE, test_params}, + { "/multiple-pools-progress-loop", multiple_pools_progress_loop, test_context_setup, test_context_tear_down, MUNIT_TEST_OPTION_NONE, NULL}, { NULL, NULL, NULL, NULL, MUNIT_TEST_OPTION_NONE, NULL } };