Skip to content

Commit

Permalink
broker: add support for PMIx bootstrap
Browse files Browse the repository at this point in the history
Problem: on some platforms PMIx is the preferred mechanism
to use for bootstrapping Flux.

Add support to the broker's pmiutil.c to use PMIx if the PMIx
server environment variables are set.

For now, keep the PMIx integration as simple as possible, and use
the PMIx_*() functions directly.  We can consider other options
such as indirection through dlopen() later, if we run into problems.

This implementation was guided by the PMI-1 compatibility code here:
  https://github.com/openpmix/pmi-shim

Since Flux does not require all of PMI-1, our code is much simpler.
In addition, some PMIx differences from PMI-1 with respect to key scope
could be dealt with directly, compared to the shim:

- add a 'from_rank' to broker_pmi_kvs_get() so that PMIx_Get() can set
  proc.rank to this instead of PMIX_RANK_UNDEF.  This avoids a hang
  with the dstore gds component, as described in openpmix/pmi-shim#3

- if 'from_rank' is set to -1, then set proc.rank to PMIX_RANK_UNDEF,
  and set the PMIX_OPTIONAL attribute to 1 so PMIx_Get() fails immediately
  if the key is not set.  This is used when the broker tries to fetch the
  'flux.instance-level' key, which the flux shell places in the KVS,
  and is not expected to exist when Flux is launched by a foreign resource
  manager.  Note to future implementor of flux shell PMIx plugin (flux-framework#3536):
  this assumes that 'flux.instance-level' would be set using
  PMIx_server_register_nspace() or equivalent, which would push the key
  to the client at initialization.

Add some PMIX well known environment variables to the blocklist in runat.c,
so they do not propagate to the initial program when Flux is launched by
a PMIx process manager.

Co-authored-by: Jim Garlick <[email protected]>
  • Loading branch information
ggouaillardet and garlick committed Mar 9, 2021
1 parent e04e5dc commit 920c192
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 7 deletions.
7 changes: 4 additions & 3 deletions src/broker/boot_pmi.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ static int set_instance_level_attr (struct pmi_handle *pmi,
kvsname,
"flux.instance-level",
val,
sizeof (val));
sizeof (val),
-1);
if (result == PMI_SUCCESS)
level = val;
if (attr_add (attrs, "instance-level", level, FLUX_ATTRFLAG_IMMUTABLE) < 0)
Expand Down Expand Up @@ -261,7 +262,7 @@ int boot_pmi (struct overlay *overlay, attr_t *attrs, int tbon_k)
goto error;
}
result = broker_pmi_kvs_get (pmi, pmi_params.kvsname,
key, val, sizeof (val));
key, val, sizeof (val), rank);
if (result != PMI_SUCCESS) {
log_msg ("broker_pmi_kvs_get %s: %s", key, pmi_strerror (result));
goto error;
Expand Down Expand Up @@ -295,7 +296,7 @@ int boot_pmi (struct overlay *overlay, attr_t *attrs, int tbon_k)
goto error;
}
result = broker_pmi_kvs_get (pmi, pmi_params.kvsname,
key, val, sizeof (val));
key, val, sizeof (val), rank);

if (result != PMI_SUCCESS) {
log_msg ("broker_pmi_kvs_get %s: %s", key, pmi_strerror (result));
Expand Down
231 changes: 229 additions & 2 deletions src/broker/pmiutil.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
#include <dlfcn.h>
#include <assert.h>
#include <czmq.h>
#ifdef HAVE_LIBPMIX
#include <pmix.h>
#endif

#include "src/common/libutil/log.h"
#include "src/common/libutil/iterators.h"
Expand All @@ -26,10 +29,14 @@
#include "pmiutil.h"
#include "liblist.h"


typedef enum {
PMI_MODE_SINGLETON,
PMI_MODE_DLOPEN,
PMI_MODE_WIRE1,
#ifdef HAVE_LIBPMIX
PMI_MODE_PMIX,
#endif
} pmi_mode_t;

struct pmi_dso {
Expand All @@ -51,6 +58,9 @@ struct pmi_handle {
int debug;
pmi_mode_t mode;
int rank;
#ifdef HAVE_LIBPMIX
pmix_proc_t myproc;
#endif
};

static void vdebugf (struct pmi_handle *pmi, const char *fmt, va_list ap)
Expand All @@ -62,7 +72,11 @@ static void vdebugf (struct pmi_handle *pmi, const char *fmt, va_list ap)
fprintf (stderr, "pmi-debug-%s[%d]: %s\n",
pmi->mode == PMI_MODE_SINGLETON ? "singleton" :
pmi->mode == PMI_MODE_WIRE1 ? "wire.1" :
pmi->mode == PMI_MODE_DLOPEN ? "dlopen" : "unknown",
pmi->mode == PMI_MODE_DLOPEN ? "dlopen" :
#ifdef HAVE_LIBPMIX
pmi->mode == PMI_MODE_PMIX ? "pmix" :
#endif
"unknown",
pmi->rank,
buf);
}
Expand All @@ -76,6 +90,93 @@ static void debugf (struct pmi_handle *pmi, const char *fmt, ...)
va_end (ap);
}

#ifdef HAVE_LIBPMIX
static int convert_err (pmix_status_t rc)
{
switch (rc) {
case PMIX_ERR_INVALID_SIZE:
return PMI_ERR_INVALID_SIZE;
case PMIX_ERR_INVALID_KEYVALP:
return PMI_ERR_INVALID_KEYVALP;
case PMIX_ERR_INVALID_NUM_PARSED:
return PMI_ERR_INVALID_NUM_PARSED;
case PMIX_ERR_INVALID_ARGS:
return PMI_ERR_INVALID_ARGS;
case PMIX_ERR_INVALID_NUM_ARGS:
return PMI_ERR_INVALID_NUM_ARGS;
case PMIX_ERR_INVALID_LENGTH:
return PMI_ERR_INVALID_LENGTH;
case PMIX_ERR_INVALID_VAL_LENGTH:
return PMI_ERR_INVALID_VAL_LENGTH;
case PMIX_ERR_INVALID_VAL:
return PMI_ERR_INVALID_VAL;
case PMIX_ERR_INVALID_KEY_LENGTH:
return PMI_ERR_INVALID_KEY_LENGTH;
case PMIX_ERR_INVALID_KEY:
return PMI_ERR_INVALID_KEY;
case PMIX_ERR_INVALID_ARG:
return PMI_ERR_INVALID_ARG;
case PMIX_ERR_NOMEM:
return PMI_ERR_NOMEM;
case PMIX_ERR_INIT:
return PMI_ERR_INIT;
case PMIX_SUCCESS:
return PMI_SUCCESS;
default:
return PMI_FAIL;
}
}

static pmix_status_t convert_int (int *value, pmix_value_t *kv)
{
switch (kv->type) {
case PMIX_INT:
*value = kv->data.integer;
break;
case PMIX_INT8:
*value = kv->data.int8;
break;
case PMIX_INT16:
*value = kv->data.int16;
break;
case PMIX_INT32:
*value = kv->data.int32;
break;
case PMIX_INT64:
*value = kv->data.int64;
break;
case PMIX_UINT:
*value = kv->data.uint;
break;
case PMIX_UINT8:
*value = kv->data.uint8;
break;
case PMIX_UINT16:
*value = kv->data.uint16;
break;
case PMIX_UINT32:
*value = kv->data.uint32;
break;
case PMIX_UINT64:
*value = kv->data.uint64;
break;
case PMIX_BYTE:
*value = kv->data.byte;
break;
case PMIX_SIZE:
*value = kv->data.size;
break;
case PMIX_BOOL:
*value = kv->data.flag;
break;
default:
/* not an integer type */
return PMIX_ERR_BAD_PARAM;
}
return PMIX_SUCCESS;
}
#endif

static void broker_pmi_dlclose (struct pmi_dso *dso)
{
if (dso) {
Expand Down Expand Up @@ -156,6 +257,9 @@ static struct pmi_dso *broker_pmi_dlopen (const char *pmi_library, int debug)
int broker_pmi_kvs_commit (struct pmi_handle *pmi, const char *kvsname)
{
int ret = PMI_SUCCESS;
#ifdef HAVE_LIBPMIX
pmix_status_t rc;
#endif

switch (pmi->mode) {
case PMI_MODE_SINGLETON:
Expand All @@ -165,6 +269,12 @@ int broker_pmi_kvs_commit (struct pmi_handle *pmi, const char *kvsname)
case PMI_MODE_DLOPEN:
ret = pmi->dso->kvs_commit (kvsname);
break;
#ifdef HAVE_LIBPMIX
case PMI_MODE_PMIX:
rc = PMIx_Commit ();
ret = convert_err (rc);
break;
#endif
}
debugf (pmi,
"kvs_commit (kvsname=%s) = %s",
Expand All @@ -179,6 +289,10 @@ int broker_pmi_kvs_put (struct pmi_handle *pmi,
const char *value)
{
int ret = PMI_SUCCESS;
#ifdef HAVE_LIBPMIX
pmix_status_t rc;
pmix_value_t val;
#endif

switch (pmi->mode) {
case PMI_MODE_SINGLETON:
Expand All @@ -189,6 +303,14 @@ int broker_pmi_kvs_put (struct pmi_handle *pmi,
case PMI_MODE_DLOPEN:
ret = pmi->dso->kvs_put (kvsname, key, value);
break;
#ifdef HAVE_LIBPMIX
case PMI_MODE_PMIX:
val.type = PMIX_STRING;
val.data.string = (char*)value;
rc = PMIx_Put (PMIX_GLOBAL, key, &val);
ret = convert_err (rc);
break;
#endif
}
debugf (pmi,
"kvs_put (kvsname=%s key=%s value=%s) = %s",
Expand All @@ -203,9 +325,17 @@ int broker_pmi_kvs_get (struct pmi_handle *pmi,
const char *kvsname,
const char *key,
char *value,
int len)
int len,
int from_rank)
{
int ret = PMI_FAIL;
#ifdef HAVE_LIBPMIX
pmix_value_t *val;
pmix_proc_t proc;
pmix_status_t rc;
pmix_info_t info[1];
bool val_optional = 1;
#endif

switch (pmi->mode) {
case PMI_MODE_SINGLETON:
Expand All @@ -216,6 +346,38 @@ int broker_pmi_kvs_get (struct pmi_handle *pmi,
case PMI_MODE_DLOPEN:
ret = pmi->dso->kvs_get (kvsname, key, value, len);
break;
#ifdef HAVE_LIBPMIX
case PMI_MODE_PMIX:
strncpy (proc.nspace, kvsname, PMIX_MAX_NSLEN);
/* If from_rank < 0, assume that value was stored by
* enclosing instance using PMIx_server_register_nspace()
* or equivalent, so that it's either in the client cache,
* or fails immedaitely.
*/
if (from_rank < 0) {
proc.rank = PMIX_RANK_UNDEF;
PMIX_INFO_CONSTRUCT (&info[0]);
PMIX_INFO_LOAD (&info[0], PMIX_OPTIONAL, &val_optional,
PMIX_BOOL);
rc = PMIx_Get (&proc, key, info, 1, &val);
}
else {
proc.rank = from_rank;
rc = PMIx_Get (&proc, key, NULL, 0, &val);
}
if (rc == PMIX_SUCCESS && val != NULL) {
if (val->type != PMIX_STRING) {
rc = PMIX_ERROR;
}
else if (val->data.string != NULL) {
strncpy (value, val->data.string, len-1);
}
PMIX_VALUE_RELEASE (val);
}

ret = convert_err (rc);
break;
#endif
}
debugf (pmi,
"kvs_get (kvsname=%s key=%s value=%s) = %s",
Expand All @@ -229,6 +391,13 @@ int broker_pmi_kvs_get (struct pmi_handle *pmi,
int broker_pmi_barrier (struct pmi_handle *pmi)
{
int ret = PMI_SUCCESS;
#ifdef HAVE_LIBPMIX
pmix_status_t rc;
pmix_info_t buf;
int ninfo;
pmix_info_t *info;
bool val = 1;
#endif

switch (pmi->mode) {
case PMI_MODE_SINGLETON:
Expand All @@ -239,6 +408,17 @@ int broker_pmi_barrier (struct pmi_handle *pmi)
case PMI_MODE_DLOPEN:
ret = pmi->dso->barrier();
break;
#ifdef HAVE_LIBPMIX
case PMI_MODE_PMIX:
info = &buf;
PMIX_INFO_CONSTRUCT (info);
PMIX_INFO_LOAD (info, PMIX_COLLECT_DATA, &val, PMIX_BOOL);
ninfo = 1;
rc = PMIx_Fence (NULL, 0, info, ninfo);
PMIX_INFO_DESTRUCT (info);
ret = convert_err (rc);
break;
#endif
}
debugf (pmi, "barrier = %s", pmi_strerror (ret));
return ret;
Expand All @@ -248,6 +428,14 @@ int broker_pmi_get_params (struct pmi_handle *pmi,
struct pmi_params *params)
{
int ret = PMI_SUCCESS;
#ifdef HAVE_LIBPMIX
pmix_status_t rc;
pmix_value_t *val;
pmix_info_t info[1];
bool val_optional = 1;
pmix_proc_t proc = pmi->myproc;
proc.rank = PMIX_RANK_WILDCARD;
#endif

switch (pmi->mode) {
case PMI_MODE_SINGLETON:
Expand All @@ -270,6 +458,26 @@ int broker_pmi_get_params (struct pmi_handle *pmi,
ret = pmi->dso->kvs_get_my_name (params->kvsname,
sizeof (params->kvsname));
break;
#ifdef HAVE_LIBPMIX
case PMI_MODE_PMIX:
params->rank = pmi->myproc.rank;

strncpy (params->kvsname, pmi->myproc.nspace,
sizeof(params->kvsname)-1);

PMIX_INFO_CONSTRUCT (&info[0]);
PMIX_INFO_LOAD (&info[0], PMIX_OPTIONAL, &val_optional, PMIX_BOOL);

rc = PMIx_Get (&proc, PMIX_JOB_SIZE, info, 1, &val);
if (rc == PMIX_SUCCESS) {
rc = convert_int (&params->size, val);
PMIX_VALUE_RELEASE (val);
}

PMIX_INFO_DESTRUCT (&info[0]);
ret = convert_err (rc);
break;
#endif
}
if (ret == PMI_SUCCESS)
pmi->rank = params->rank;
Expand All @@ -296,6 +504,12 @@ int broker_pmi_init (struct pmi_handle *pmi)
case PMI_MODE_DLOPEN:
ret = pmi->dso->init(&spawned);
break;
#ifdef HAVE_LIBPMIX
case PMI_MODE_PMIX:
if (PMIx_Init (&pmi->myproc, NULL, 0) != PMIX_SUCCESS)
ret = PMI_ERR_INIT;
break;
#endif
}
debugf (pmi, "init = %s", pmi_strerror (ret));
return ret;
Expand All @@ -314,6 +528,11 @@ int broker_pmi_finalize (struct pmi_handle *pmi)
case PMI_MODE_DLOPEN:
ret = pmi->dso->finalize ();
break;
#ifdef HAVE_LIBPMIX
case PMI_MODE_PMIX:
PMIx_Finalize (NULL, 0);
break;
#endif
}
debugf (pmi, "finalize = %s", pmi_strerror (ret));
return PMI_SUCCESS;
Expand All @@ -324,6 +543,9 @@ void broker_pmi_destroy (struct pmi_handle *pmi)
if (pmi) {
int saved_errno = errno;
switch (pmi->mode) {
#ifdef HAVE_LIBPMIX
case PMI_MODE_PMIX:
#endif
case PMI_MODE_SINGLETON:
break;
case PMI_MODE_WIRE1:
Expand Down Expand Up @@ -358,6 +580,11 @@ struct pmi_handle *broker_pmi_create (void)
NULL))) {
pmi->mode = PMI_MODE_WIRE1;
}
#ifdef HAVE_LIBPMIX
else if (getenv ("PMIX_SERVER_URI") || getenv ("PMIX_SERVER_URI2")) {
pmi->mode = PMI_MODE_PMIX;
}
#endif
/* N.B. SLURM boldly installs its libpmi.so into the system libdir,
* so it will be found here, even if not running in a SLURM job.
* Fortunately it emulates singleton in that case, in lieu of failing.
Expand Down
Loading

0 comments on commit 920c192

Please sign in to comment.