Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove local completion for group construct #2112

Merged
merged 1 commit into from
Dec 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions src/mca/grpcomm/direct/grpcomm_direct_group.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ int prte_grpcomm_direct_group(pmix_group_operation_t op, char *grpid,
prte_pmix_grp_caddy_t *cd;

PMIX_OUTPUT_VERBOSE((1, prte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct:group for \"%s\" with %lu procs",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), grpid, nprocs));
"%s grpcomm:direct:group %s for \"%s\" with %lu procs",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME),
PMIx_Group_operation_string(op), grpid, nprocs));

cd = PMIX_NEW(prte_pmix_grp_caddy_t);
cd->op = op;
Expand Down Expand Up @@ -442,8 +443,8 @@ void prte_grpcomm_direct_grp_recv(int status, pmix_proc_t *sender,

if (PRTE_PROC_IS_MASTER) {
PMIX_OUTPUT_VERBOSE((1, prte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct group HNP reports complete",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)));
"%s grpcomm:direct group HNP reports complete for %s",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), coll->sig->groupID));

/* the allgather is complete - send the xcast */
if (PMIX_GROUP_CONSTRUCT == sig->op) {
Expand Down Expand Up @@ -702,7 +703,7 @@ void prte_grpcomm_direct_grp_release(int status, pmix_proc_t *sender,
PMIX_ACQUIRE_OBJECT(cd);

pmix_output_verbose(2, prte_pmix_server_globals.output,
"%s group request complete",
"%s group release recvd",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME));

// unpack the signature
Expand Down
174 changes: 0 additions & 174 deletions src/prted/pmix/pmix_server_group.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,158 +72,12 @@ pmix_status_t pmix_server_group_fn(pmix_group_operation_t op, char *grpid,

#else

static void relcb(void *cbdata)
{
prte_pmix_grp_caddy_t *cd = (prte_pmix_grp_caddy_t*)cbdata;
PMIX_RELEASE(cd);
}

static void opcbfunc(int status, void *cbdata)
{
prte_pmix_grp_caddy_t *cd = (prte_pmix_grp_caddy_t*)cbdata;
PRTE_HIDE_UNUSED_PARAMS(status);

PMIX_RELEASE(cd);
}

static void local_complete(int sd, short args, void *cbdata)
{
prte_pmix_grp_caddy_t *cd = (prte_pmix_grp_caddy_t*)cbdata;
prte_pmix_grp_caddy_t *cd2;
pmix_server_pset_t *pset;
pmix_data_array_t members = PMIX_DATA_ARRAY_STATIC_INIT;
pmix_proc_t *addmembers = NULL;
size_t nmembers = 0, naddmembers = 0;
pmix_proc_t *p;
void *ilist;
pmix_status_t rc;
size_t n;
pmix_data_array_t darray;
pmix_data_buffer_t dbuf;
pmix_byte_object_t bo;
PRTE_HIDE_UNUSED_PARAMS(sd, args);

if (PMIX_GROUP_CONSTRUCT == cd->op) {

PMIX_INFO_LIST_START(ilist);

for (n=0; n < cd->ndirs; n++) {
// check if they gave us any grp or endpt info
if (PMIX_CHECK_KEY(&cd->directives[n], PMIX_PROC_DATA) ||
PMIX_CHECK_KEY(&cd->directives[n], PMIX_GROUP_INFO)) {
rc = PMIx_Info_list_add_value(ilist, cd->directives[n].key, &cd->directives[n].value);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
}
// check for add members - server lib would have aggregated them
} else if (PMIX_CHECK_KEY(&cd->directives[n], PMIX_GROUP_ADD_MEMBERS)) {
naddmembers = cd->directives[n].value.data.darray->size;
addmembers = (pmix_proc_t*)cd->directives[n].value.data.darray->array;
}
}

// construct the final group membership
nmembers = cd->nprocs + naddmembers;
PMIX_DATA_ARRAY_CONSTRUCT(&members, nmembers, PMIX_PROC);
p = (pmix_proc_t*)members.array;
memcpy(p, cd->procs, cd->nprocs * sizeof(pmix_proc_t));
if (0 < naddmembers) {
memcpy(&p[cd->nprocs], addmembers, naddmembers * sizeof(pmix_proc_t));
}
PMIX_INFO_LIST_ADD(rc, ilist, PMIX_GROUP_MEMBERSHIP, &members, PMIX_DATA_ARRAY);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
}

PMIX_INFO_LIST_ADD(rc, ilist, PMIX_GROUP_ID, cd->grpid, PMIX_STRING);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
}

/* add it to our list of known groups */
pset = PMIX_NEW(pmix_server_pset_t);
pset->name = strdup(cd->grpid);
pset->num_members = nmembers;
PMIX_PROC_CREATE(pset->members, nmembers);
memcpy(pset->members, p, nmembers * sizeof(pmix_proc_t));
pmix_list_append(&prte_pmix_server_globals.groups, &pset->super);

// convert the info list
PMIX_INFO_LIST_CONVERT(rc, ilist, &darray);
cd->info = (pmix_info_t*)darray.array;
cd->ninfo = darray.size;
PMIX_INFO_LIST_RELEASE(ilist);

// generate events for any add members as they are waiting for notification
if (NULL != addmembers) {

cd2 = PMIX_NEW(prte_pmix_grp_caddy_t);
cd2->ninfo = cd->ninfo + 3;
PMIX_INFO_CREATE(cd2->info, cd2->ninfo);
// carry over the info we created
for (n=0; n < cd->ninfo; n++) {
rc = PMIx_Info_xfer(&cd2->info[n], &cd->info[n]);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
}
}

// set the range to be only procs that were added
darray.type = PMIX_PROC;
darray.array = addmembers;
darray.size = naddmembers;
// load the array - note: this copies the array!
PMIX_INFO_LOAD(&cd2->info[n], PMIX_EVENT_CUSTOM_RANGE, &darray, PMIX_DATA_ARRAY);
++n;

// mark that this event stays local and does not go up to the host
PMIX_INFO_LOAD(&cd2->info[n], PMIX_EVENT_STAYS_LOCAL, NULL, PMIX_BOOL);
++n;

// add the job-level info
PMIX_DATA_BUFFER_CONSTRUCT(&dbuf);
rc = PMIx_server_collect_job_info(p, nmembers, &dbuf);
if (PMIX_SUCCESS == rc) {
PMIx_Data_buffer_unload(&dbuf, &bo.bytes, &bo.size);
PMIX_INFO_LOAD(&cd2->info[n], PMIX_GROUP_JOB_INFO, &bo, PMIX_BYTE_OBJECT);
PMIX_BYTE_OBJECT_DESTRUCT(&bo);
}
PMIX_DATA_BUFFER_DESTRUCT(&dbuf);

// notify local procs
PMIx_Notify_event(PMIX_GROUP_INVITED, &prte_process_info.myproc,
PMIX_RANGE_CUSTOM,
cd2->info, cd2->ninfo, opcbfunc, cd2);
}

// return this to the PMIx server
cd->cbfunc(PMIX_SUCCESS, cd->info, cd->ninfo, cd->cbdata, relcb, cd);

} else {
/* find this group ID on our list of groups and remove it */
PMIX_LIST_FOREACH(pset, &prte_pmix_server_globals.groups, pmix_server_pset_t)
{
if (0 == strcmp(pset->name, cd->grpid)) {
pmix_list_remove_item(&prte_pmix_server_globals.groups, &pset->super);
PMIX_RELEASE(pset);
break;
}
}

// return their callback
cd->cbfunc(PMIX_SUCCESS, NULL, 0, cd->cbdata, relcb, cd);
}
}

pmix_status_t pmix_server_group_fn(pmix_group_operation_t op, char *grpid,
const pmix_proc_t procs[], size_t nprocs,
const pmix_info_t directives[], size_t ndirs,
pmix_info_cbfunc_t cbfunc, void *cbdata)
{
prte_pmix_grp_caddy_t *cd;
int rc;
size_t i;
bool force_local = false;

pmix_output_verbose(2, prte_pmix_server_globals.output,
"%s Group request %s recvd with %lu directives",
Expand All @@ -235,34 +89,6 @@ pmix_status_t pmix_server_group_fn(pmix_group_operation_t op, char *grpid,
return PMIX_ERR_BAD_PARAM;
}

/* check the directives */
for (i = 0; i < ndirs; i++) {
/* see if this is local only */
if (PMIX_CHECK_KEY(&directives[i], PMIX_GROUP_LOCAL_ONLY)) {
force_local = PMIX_INFO_TRUE(&directives[i]);
break;
}
}

/* if they insist on forcing local completion of the operation, then
* we are done */
if (force_local) {
pmix_output_verbose(2, prte_pmix_server_globals.output,
"%s group request - purely local",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME));
cd = PMIX_NEW(prte_pmix_grp_caddy_t);
cd->op = op;
cd->grpid = strdup(grpid);
cd->procs = procs;
cd->nprocs = nprocs;
cd->directives = directives;
cd->ndirs = ndirs;
cd->cbfunc = cbfunc;
cd->cbdata = cbdata;
PRTE_PMIX_THREADSHIFT(cd, prte_event_base, local_complete);
return PMIX_SUCCESS;
}

rc = prte_grpcomm.group(op, grpid, procs, nprocs,
directives, ndirs, cbfunc, cbdata);
if (PRTE_SUCCESS != rc) {
Expand Down
Loading