From 7884d0a61c2469bbf90423c4bdab4d6fd3486187 Mon Sep 17 00:00:00 2001 From: Adam Moody Date: Mon, 3 May 2021 17:27:57 -0700 Subject: [PATCH] enable servers to use pmi to get rank and count --- common/src/unifyfs_keyval.c | 120 +++++++++++++++++------- common/src/unifyfs_keyval.h | 7 +- server/src/Makefile.am | 10 ++ server/src/margo_server.c | 165 ++++++++++++++++++--------------- server/src/unifyfs_global.h | 1 - server/src/unifyfs_server.c | 176 +++++++++++++++++++----------------- 6 files changed, 287 insertions(+), 192 deletions(-) diff --git a/common/src/unifyfs_keyval.c b/common/src/unifyfs_keyval.c index 4975a4794..151661d72 100644 --- a/common/src/unifyfs_keyval.c +++ b/common/src/unifyfs_keyval.c @@ -37,7 +37,6 @@ const char* const key_unifyfsd_socket = "unifyfsd.socket"; const char* const key_unifyfsd_margo_shm = "unifyfsd.margo-shm"; const char* const key_unifyfsd_margo_svr = "unifyfsd.margo-svr"; -const char* const key_unifyfsd_pmi_rank = "unifyfsd.pmi-rank"; // key-value store state static int kv_initialized; // = 0 @@ -58,6 +57,9 @@ static size_t kv_max_vallen; // = 0 # define UNIFYFS_MAX_KV_VALLEN 4096 #endif +/* PMI information */ +int glb_pmi_rank = -1; +int glb_pmi_size; /* = 0 */ //--------------------- PMI2 K-V Store --------------------- #if defined(USE_PMI2) @@ -127,12 +129,17 @@ static void unifyfs_pmi2_errstr(int rc) } // initialize PMI2 -static int unifyfs_pmi2_init(void) +int unifyfs_pmi2_init(void) { int nprocs, rank, rc, val, len, found; int pmi_world_rank = -1; int pmi_world_nprocs = -1; + /* return success if we're already initialized */ + if (pmi2_initialized) { + return (int)UNIFYFS_SUCCESS; + } + kv_max_keylen = PMI2_MAX_KEYLEN; kv_max_vallen = PMI2_MAX_VALLEN; @@ -191,6 +198,9 @@ static int unifyfs_pmi2_init(void) kv_myrank = pmi_world_rank; kv_nranks = pmi_world_nprocs; + glb_pmi_rank = kv_myrank; + glb_pmi_size = kv_nranks; + LOGDBG("PMI2 Job Id: %s, Rank: %d of %d, hasNameServer=%d", pmi_jobid, kv_myrank, kv_nranks, pmi2_has_nameserv); @@ -239,6 +249,26 @@ static int unifyfs_pmi2_lookup(const char* key, LOGERR("PMI2_KVS_Get(%s) failed: %s", key, pmi2_errstr); return (int)UNIFYFS_ERROR_PMI; } + + // HACK: replace '!' with ';' for SLURM PMI2 + // This assumes the value does not actually use "!" + // + // At least one version of SLURM PMI2 seems to use ";" + // characters to separate key/value pairs, so the following: + // + // PMI2_KVS_Put("unifyfs.margo-svr", "ofi+tcp;ofi_rxm://ip:port") + // + // leads to an error like: + // + // slurmstepd: error: mpi/pmi2: no value for key ;ofi_rxm://ip:port; in req + char* p = pmi2_val; + while (*p != '\0') { + if (*p == '!') { + *p = ';'; + } + p++; + } + *oval = strdup(pmi2_val); return (int)UNIFYFS_SUCCESS; } @@ -257,6 +287,26 @@ static int unifyfs_pmi2_publish(const char* key, strncpy(pmi2_key, key, sizeof(pmi2_key)); strncpy(pmi2_val, val, sizeof(pmi2_val)); + + // HACK: replace ';' with '!' for SLURM PMI2 + // This assumes the value does not actually use "!" + // + // At least one version of SLURM PMI2 seems to use ";" + // characters to separate key/value pairs, so the following: + // + // PMI2_KVS_Put("unifyfs.margo-svr", "ofi+tcp;ofi_rxm://ip:port") + // + // leads to an error like: + // + // slurmstepd: error: mpi/pmi2: no value for key ;ofi_rxm://ip:port; in req + char* p = pmi2_val; + while (*p != '\0') { + if (*p == ';') { + *p = '!'; + } + p++; + } + rc = PMI2_KVS_Put(pmi2_key, pmi2_val); if (rc != PMI2_SUCCESS) { unifyfs_pmi2_errstr(rc); @@ -294,7 +344,7 @@ static pmix_proc_t pmix_myproc; #endif // initialize PMIx -static int unifyfs_pmix_init(void) +int unifyfs_pmix_init(void) { int rc; size_t pmix_univ_nprocs; @@ -302,6 +352,11 @@ static int unifyfs_pmix_init(void) pmix_value_t* valp = &value; pmix_proc_t proc; + /* return success if we're already initialized */ + if (pmix_initialized) { + return (int)UNIFYFS_SUCCESS; + } + /* init PMIx */ PMIX_PROC_CONSTRUCT(&pmix_myproc); rc = PMIx_Init(&pmix_myproc, NULL, 0); @@ -328,6 +383,9 @@ static int unifyfs_pmix_init(void) kv_myrank = pmix_myproc.rank; kv_nranks = (int)pmix_univ_nprocs; + glb_pmi_rank = kv_myrank; + glb_pmi_size = kv_nranks; + LOGDBG("PMIX Job Id: %s, Rank: %d of %d", pmix_myproc.nspace, kv_myrank, kv_nranks); @@ -490,7 +548,6 @@ static int unifyfs_fskv_init(unifyfs_cfg_t* cfg) int rc, err; struct stat s; - if (NULL == cfg) { LOGERR("NULL config"); return EINVAL; @@ -715,6 +772,27 @@ static int unifyfs_fskv_lookup_local(const char* key, return (int)UNIFYFS_SUCCESS; } +// publish a key-value pair +static int unifyfs_fskv_publish_local(const char* key, + const char* val) +{ + FILE* kvf; + char kvfile[UNIFYFS_MAX_FILENAME]; + + scnprintf(kvfile, sizeof(kvfile), "%s/%s", + localfs_kvdir, key); + kvf = fopen(kvfile, "w"); + if (NULL == kvf) { + LOGERR("failed to create kvstore entry %s", kvfile); + return (int)UNIFYFS_ERROR_KEYVAL; + } + fprintf(kvf, "%s\n", val); + fclose(kvf); + + return (int)UNIFYFS_SUCCESS; +} + +#if (!defined(USE_PMI2)) && (!defined(USE_PMIX)) static int unifyfs_fskv_lookup_remote(int rank, const char* key, char** oval) @@ -748,26 +826,6 @@ static int unifyfs_fskv_lookup_remote(int rank, return (int)UNIFYFS_SUCCESS; } -// publish a key-value pair -static int unifyfs_fskv_publish_local(const char* key, - const char* val) -{ - FILE* kvf; - char kvfile[UNIFYFS_MAX_FILENAME]; - - scnprintf(kvfile, sizeof(kvfile), "%s/%s", - localfs_kvdir, key); - kvf = fopen(kvfile, "w"); - if (NULL == kvf) { - LOGERR("failed to create kvstore entry %s", kvfile); - return (int)UNIFYFS_ERROR_KEYVAL; - } - fprintf(kvf, "%s\n", val); - fclose(kvf); - - return (int)UNIFYFS_SUCCESS; -} - static int unifyfs_fskv_publish_remote(const char* key, const char* val) { @@ -791,7 +849,6 @@ static int unifyfs_fskv_publish_remote(const char* key, return (int)UNIFYFS_SUCCESS; } -#if (!defined(USE_PMI2)) && (!defined(USE_PMIX)) static int unifyfs_fskv_fence(void) { if (!have_sharedfs_kvstore) { @@ -840,6 +897,7 @@ int unifyfs_keyval_init(unifyfs_cfg_t* cfg, kv_nranks = *nranks; } #endif + // NOTE: do this after getting rank/n_ranks info rc = unifyfs_fskv_init(cfg); if (rc != (int)UNIFYFS_SUCCESS) { @@ -855,6 +913,7 @@ int unifyfs_keyval_init(unifyfs_cfg_t* cfg, if (NULL != nranks) { *nranks = kv_nranks; } + return (int)UNIFYFS_SUCCESS; } @@ -959,11 +1018,8 @@ int unifyfs_keyval_lookup_remote(int rank, #elif defined(USE_PMI2) rc = unifyfs_pmi2_lookup(rank_key, oval); #else - rc = (int)UNIFYFS_FAILURE; + rc = unifyfs_fskv_lookup_remote(rank, key, oval); #endif - if (rc != (int)UNIFYFS_SUCCESS) { - rc = unifyfs_fskv_lookup_remote(rank, key, oval); - } if (rc != (int)UNIFYFS_SUCCESS) { LOGERR("remote keyval lookup for '%s' failed", key); } @@ -1043,12 +1099,8 @@ int unifyfs_keyval_publish_remote(const char* key, #elif defined(USE_PMI2) rc = unifyfs_pmi2_publish(rank_key, val); #else - rc = (int)UNIFYFS_FAILURE; + rc = unifyfs_fskv_publish_remote(key, val); #endif - if (rc != (int)UNIFYFS_SUCCESS) { - rc = unifyfs_fskv_publish_remote(key, val); - } - if (rc != (int)UNIFYFS_SUCCESS) { LOGERR("remote keyval publish for '%s' failed", key); } else { diff --git a/common/src/unifyfs_keyval.h b/common/src/unifyfs_keyval.h index d56e884cd..27d4b96e9 100644 --- a/common/src/unifyfs_keyval.h +++ b/common/src/unifyfs_keyval.h @@ -21,11 +21,16 @@ extern "C" { #endif +extern int glb_pmi_rank; +extern int glb_pmi_size; + +int unifyfs_pmix_init(void); +int unifyfs_pmi2_init(void); + // keys we use extern const char* const key_unifyfsd_socket; // server domain socket path extern const char* const key_unifyfsd_margo_shm; // client-server margo address extern const char* const key_unifyfsd_margo_svr; // server-server margo address -extern const char* const key_unifyfsd_pmi_rank; // server-server pmi rank // initialize key-value store int unifyfs_keyval_init(unifyfs_cfg_t* cfg, diff --git a/server/src/Makefile.am b/server/src/Makefile.am index ca5c62191..634503de2 100644 --- a/server/src/Makefile.am +++ b/server/src/Makefile.am @@ -72,6 +72,16 @@ else # ! USE_MDHIM endif # USE_MDHIM +if USE_PMIX + OPT_C_FLAGS += -DUSE_PMIX + OPT_LIBS += -lpmix +endif + +if USE_PMI2 + OPT_C_FLAGS += -DUSE_PMI2 + OPT_LIBS += -lpmi2 +endif + unifyfsd_CFLAGS = $(AM_CFLAGS) $(UNIFYFS_COMMON_FLAGS) $(OPT_C_FLAGS) unifyfsd_LDFLAGS = $(OPT_LD_FLAGS) unifyfsd_LDADD = $(UNIFYFS_COMMON_LIBS) $(OPT_LIBS) diff --git a/server/src/margo_server.c b/server/src/margo_server.c index 25a037c84..0065fbd2b 100644 --- a/server/src/margo_server.c +++ b/server/src/margo_server.c @@ -32,6 +32,10 @@ int margo_client_server_pool_sz = 4; int margo_server_server_pool_sz = 4; int margo_use_progress_thread = 1; +// records pmi rank, server address string, and server address +// for each server for use in server-to-server rpcs +static server_info_t* server_infos; // array of server_info_t + #if defined(NA_HAS_SM) static const char* PROTOCOL_MARGO_SHM = "na+sm"; #else @@ -58,30 +62,51 @@ static const char* PROTOCOL_MARGO_OFI_TCP; static const char* PROTOCOL_MARGO_OFI_RMA; #endif -/* setup_remote_target - Initializes the server-server margo target */ -static margo_instance_id setup_remote_target(void) +/* Given a margo instance ID (mid), return its corresponding + * address as a newly allocated string to be freed by caller. + * Returns NULL on error. */ +static char* get_margo_addr_str(margo_instance_id mid) { - /* initialize margo */ - hg_return_t hret; + /* get margo address for given instance */ hg_addr_t addr_self; + hg_return_t hret = margo_addr_self(mid, &addr_self); + if (hret != HG_SUCCESS) { + LOGERR("margo_addr_self() failed"); + return NULL; + } + + /* convert margo address to a string */ char self_string[128]; hg_size_t self_string_sz = sizeof(self_string); - margo_instance_id mid; - const char* margo_protocol; + hret = margo_addr_to_string(mid, + self_string, &self_string_sz, addr_self); + if (hret != HG_SUCCESS) { + LOGERR("margo_addr_to_string() failed"); + margo_addr_free(mid, addr_self); + return NULL; + } + margo_addr_free(mid, addr_self); + + /* return address in newly allocated string */ + char* addr = strdup(self_string); + return addr; +} +/* setup_remote_target - Initializes the server-server margo target */ +static margo_instance_id setup_remote_target(void) +{ /* by default we try to use ofi */ - margo_protocol = margo_use_tcp ? + const char* margo_protocol = margo_use_tcp ? PROTOCOL_MARGO_OFI_TCP : PROTOCOL_MARGO_OFI_RMA; - - /* when ofi is not available, fallback to using bmi */ if (!margo_protocol) { + /* when ofi is not available, fallback to using bmi */ LOGWARN("OFI is not available, using BMI for margo rpc"); margo_protocol = PROTOCOL_MARGO_BMI_TCP; } - mid = margo_init(margo_protocol, MARGO_SERVER_MODE, - margo_use_progress_thread, - margo_server_server_pool_sz); + /* initialize margo */ + margo_instance_id mid = margo_init(margo_protocol, MARGO_SERVER_MODE, + margo_use_progress_thread, margo_server_server_pool_sz); if (mid == MARGO_INSTANCE_NULL) { LOGERR("margo_init(%s, SERVER_MODE, %d, %d) failed", margo_protocol, margo_use_progress_thread, @@ -101,28 +126,20 @@ static margo_instance_id setup_remote_target(void) } } - /* figure out what address this server is listening on */ - hret = margo_addr_self(mid, &addr_self); - if (hret != HG_SUCCESS) { - LOGERR("margo_addr_self() failed"); - margo_finalize(mid); - return MARGO_INSTANCE_NULL; - } - hret = margo_addr_to_string(mid, - self_string, &self_string_sz, - addr_self); - if (hret != HG_SUCCESS) { - LOGERR("margo_addr_to_string() failed"); - margo_addr_free(mid, addr_self); + /* get our address for server-server rpcs */ + char* self_string = get_margo_addr_str(mid); + if (NULL == self_string) { + LOGERR("invalid value to publish server-server margo rpc address"); margo_finalize(mid); return MARGO_INSTANCE_NULL; } LOGINFO("margo RPC server: %s", self_string); - margo_addr_free(mid, addr_self); /* publish rpc address of server for remote servers */ rpc_publish_remote_server_addr(self_string); + free(self_string); + return mid; } @@ -225,12 +242,7 @@ static margo_instance_id setup_local_target(void) { /* initialize margo */ const char* margo_protocol = PROTOCOL_MARGO_SHM; - hg_return_t hret; - hg_addr_t addr_self; - char self_string[128]; - hg_size_t self_string_sz = sizeof(self_string); - margo_instance_id mid; - mid = margo_init(margo_protocol, MARGO_SERVER_MODE, + margo_instance_id mid = margo_init(margo_protocol, MARGO_SERVER_MODE, margo_use_progress_thread, margo_client_server_pool_sz); if (mid == MARGO_INSTANCE_NULL) { LOGERR("margo_init(%s, SERVER_MODE, %d, %d) failed", margo_protocol, @@ -239,27 +251,19 @@ static margo_instance_id setup_local_target(void) } /* figure out what address this server is listening on */ - hret = margo_addr_self(mid, &addr_self); - if (hret != HG_SUCCESS) { + char* self_string = get_margo_addr_str(mid); + if (NULL == self_string) { LOGERR("margo_addr_self() failed"); margo_finalize(mid); return MARGO_INSTANCE_NULL; } - hret = margo_addr_to_string(mid, - self_string, &self_string_sz, - addr_self); - if (hret != HG_SUCCESS) { - LOGERR("margo_addr_to_string() failed"); - margo_addr_free(mid, addr_self); - margo_finalize(mid); - return MARGO_INSTANCE_NULL; - } LOGINFO("shared-memory margo RPC server: %s", self_string); - margo_addr_free(mid, addr_self); /* publish rpc address of server for local clients */ rpc_publish_local_server_addr(self_string); + free(self_string); + return mid; } @@ -425,19 +429,21 @@ int margo_server_rpc_finalize(void) /* free global server addresses */ for (int i = 0; i < glb_num_servers; i++) { - if (glb_servers[i].margo_svr_addr != HG_ADDR_NULL) { - margo_addr_free(ctx->svr_mid, glb_servers[i].margo_svr_addr); - glb_servers[i].margo_svr_addr = HG_ADDR_NULL; + server_info_t* server = &server_infos[i]; + if (server->margo_svr_addr != HG_ADDR_NULL) { + margo_addr_free(ctx->svr_mid, server->margo_svr_addr); + server->margo_svr_addr = HG_ADDR_NULL; } - if (NULL != glb_servers[i].margo_svr_addr_str) { - free(glb_servers[i].margo_svr_addr_str); - glb_servers[i].margo_svr_addr_str = NULL; + if (NULL != server->margo_svr_addr_str) { + free(server->margo_svr_addr_str); + server->margo_svr_addr_str = NULL; } } /* shut down margo */ LOGDBG("finalizing server-server margo"); margo_finalize(ctx->svr_mid); + /* NOTE: 2nd call to margo_finalize() sometimes crashes - Margo bug? */ LOGDBG("finalizing client-server margo"); margo_finalize(ctx->shm_mid); @@ -454,18 +460,21 @@ int margo_connect_server(int rank) assert(rank < glb_num_servers); int ret = UNIFYFS_SUCCESS; + + server_info_t* server = &server_infos[rank]; + + /* lookup rpc address for this server */ char* margo_addr_str = rpc_lookup_remote_server_addr(rank); if (NULL == margo_addr_str) { - LOGERR("server index=%d - margo server lookup failed", rank); - ret = UNIFYFS_ERROR_KEYVAL; - return ret; + LOGERR("server index=%zu - margo server lookup failed", rank); + return (int)UNIFYFS_ERROR_KEYVAL; } - glb_servers[rank].margo_svr_addr_str = margo_addr_str; LOGDBG("server rank=%d, margo_addr=%s", rank, margo_addr_str); + server->margo_svr_addr_str = margo_addr_str; hg_return_t hret = margo_addr_lookup(unifyfsd_rpc_context->svr_mid, - glb_servers[rank].margo_svr_addr_str, - &(glb_servers[rank].margo_svr_addr)); + server->margo_svr_addr_str, + &(server->margo_svr_addr)); if (hret != HG_SUCCESS) { LOGERR("server index=%zu - margo_addr_lookup(%s) failed", rank, margo_addr_str); @@ -477,30 +486,43 @@ int margo_connect_server(int rank) /* margo_connect_servers * - * Using address strings found in glb_servers, resolve - * each peer server's margo address. + * Gather pmi rank and margo address string for all servers, + * and optionally connect to each one. */ int margo_connect_servers(void) { int rc; - int ret = UNIFYFS_SUCCESS; - int i; - // block until a margo_svr key pair published by all servers + int ret = (int)UNIFYFS_SUCCESS; + + /* block until all servers have published their address */ rc = unifyfs_keyval_fence_remote(); if ((int)UNIFYFS_SUCCESS != rc) { LOGERR("keyval fence on margo_svr key failed"); - ret = UNIFYFS_ERROR_KEYVAL; - return ret; + return (int)UNIFYFS_ERROR_KEYVAL; } - for (i = 0; i < (int)glb_num_servers; i++) { - glb_servers[i].pmi_rank = i; - glb_servers[i].margo_svr_addr = HG_ADDR_NULL; - glb_servers[i].margo_svr_addr_str = NULL; + /* allocate array of structs to record address for each server */ + server_infos = (server_info_t*) calloc(glb_num_servers, + sizeof(server_info_t)); + if (NULL == server_infos) { + LOGERR("failed to allocate server_info array"); + return ENOMEM; + } + + /* lookup address string for each server, and optionally connect */ + size_t i; + for (i = 0; i < glb_num_servers; i++) { + /* record values on struct for this server */ + server_info_t* server = &server_infos[i]; + server->pmi_rank = i; + server->margo_svr_addr = HG_ADDR_NULL; + server->margo_svr_addr_str = NULL; + + /* connect to each server now if not using lazy connect */ if (!margo_lazy_connect) { rc = margo_connect_server(i); - if (rc != UNIFYFS_SUCCESS) { + if (UNIFYFS_SUCCESS != rc) { ret = rc; } } @@ -512,11 +534,12 @@ int margo_connect_servers(void) hg_addr_t get_margo_server_address(int rank) { assert(rank < glb_num_servers); - hg_addr_t addr = glb_servers[rank].margo_svr_addr; + server_info_t* server = &server_infos[rank]; + hg_addr_t addr = server->margo_svr_addr; if ((HG_ADDR_NULL == addr) && margo_lazy_connect) { int rc = margo_connect_server(rank); - if (rc == UNIFYFS_SUCCESS) { - addr = glb_servers[rank].margo_svr_addr; + if (UNIFYFS_SUCCESS == rc) { + addr = server->margo_svr_addr; } } return addr; diff --git a/server/src/unifyfs_global.h b/server/src/unifyfs_global.h index 068c1d3b5..e3abb081d 100644 --- a/server/src/unifyfs_global.h +++ b/server/src/unifyfs_global.h @@ -74,7 +74,6 @@ typedef struct { int pmi_rank; } server_info_t; -extern server_info_t* glb_servers; /* array of server info structs */ extern size_t glb_num_servers; /* number of entries in glb_servers array */ extern struct unifyfs_inode_tree* global_inode_tree; /* global inode tree */ diff --git a/server/src/unifyfs_server.c b/server/src/unifyfs_server.c index 9519c5a8f..edbc63380 100644 --- a/server/src/unifyfs_server.c +++ b/server/src/unifyfs_server.c @@ -45,16 +45,11 @@ // margo rpcs #include "margo_server.h" -/* PMI information */ -int glb_pmi_rank; /* = 0 */ -int glb_pmi_size = 1; // for standalone server tests int server_pid; char glb_host[UNIFYFS_MAX_HOSTNAME]; -size_t glb_host_ndx; // index of localhost in glb_servers size_t glb_num_servers; // size of glb_servers array -server_info_t* glb_servers; // array of server_info_t unifyfs_cfg_t server_cfg; @@ -154,74 +149,32 @@ void exit_request(int sig) } } -#if defined(UNIFYFSD_USE_MPI) -static void init_MPI(int* argc, char*** argv) -{ - int rc, provided; - rc = MPI_Init_thread(argc, argv, MPI_THREAD_MULTIPLE, &provided); - if (rc != MPI_SUCCESS) { - exit(1); - } - - rc = MPI_Comm_rank(MPI_COMM_WORLD, &glb_pmi_rank); - if (rc != MPI_SUCCESS) { - exit(1); - } - - rc = MPI_Comm_size(MPI_COMM_WORLD, &glb_pmi_size); - if (rc != MPI_SUCCESS) { - exit(1); - } -} - -static void fini_MPI(void) -{ - MPI_Finalize(); -} -#endif // UNIFYFSD_USE_MPI - -static int allocate_servers(size_t n_servers) -{ - glb_num_servers = n_servers; - glb_servers = (server_info_t*) calloc(n_servers, sizeof(server_info_t)); - if (NULL == glb_servers) { - LOGERR("failed to allocate server_info array"); - return ENOMEM; - } - return (int)UNIFYFS_SUCCESS; -} - static int process_servers_hostfile(const char* hostfile) { - int rc; - size_t i, cnt; - FILE* fp = NULL; - char hostbuf[UNIFYFS_MAX_HOSTNAME+1]; - if (NULL == hostfile) { return EINVAL; } - fp = fopen(hostfile, "r"); + + FILE* fp = fopen(hostfile, "r"); if (!fp) { LOGERR("failed to open hostfile %s", hostfile); return (int)UNIFYFS_FAILURE; } // scan first line: number of hosts - rc = fscanf(fp, "%zu\n", &cnt); + size_t cnt = 0; + int rc = fscanf(fp, "%zu\n", &cnt); if (1 != rc) { LOGERR("failed to scan hostfile host count"); fclose(fp); return (int)UNIFYFS_FAILURE; } - rc = allocate_servers(cnt); - if ((int)UNIFYFS_SUCCESS != rc) { - fclose(fp); - return (int)UNIFYFS_FAILURE; - } - // scan host lines + // scan host lines to find index of host of this process + size_t i; + size_t ndx = 0; for (i = 0; i < cnt; i++) { + char hostbuf[UNIFYFS_MAX_HOSTNAME + 1]; memset(hostbuf, 0, sizeof(hostbuf)); rc = fscanf(fp, "%s\n", hostbuf); if (1 != rc) { @@ -230,21 +183,60 @@ static int process_servers_hostfile(const char* hostfile) return (int)UNIFYFS_FAILURE; } + // check whether this line matches our hostname // NOTE: following assumes one server per host if (0 == strcmp(glb_host, hostbuf)) { - glb_host_ndx = (int)i; - LOGDBG("found myself at hostfile index=%zu, pmi_rank=%d", - glb_host_ndx, glb_pmi_rank); + ndx = (int)i; + LOGDBG("found myself at hostfile index=%zu", ndx); } } fclose(fp); - if (glb_pmi_size < cnt) { - glb_pmi_rank = (int)glb_host_ndx; - glb_pmi_size = (int)cnt; - LOGDBG("set pmi rank to host index %d", glb_pmi_rank); + glb_pmi_rank = (int)ndx; + glb_pmi_size = (int)cnt; + + LOGDBG("set pmi rank to host index %d", glb_pmi_rank); + + return (int)UNIFYFS_SUCCESS; +} + +/* Ensure that glb_pmi_rank, glb_pmi_size, and glb_num_server values are set. */ +static int get_server_rank_and_size(const unifyfs_cfg_t* cfg) +{ + int rc; + +#if defined(UNIFYFSD_USE_MPI) + /* use rank and size of MPI communicator */ + rc = MPI_Comm_rank(MPI_COMM_WORLD, &glb_pmi_rank); + if (rc != MPI_SUCCESS) { + exit(1); + } + + rc = MPI_Comm_size(MPI_COMM_WORLD, &glb_pmi_size); + if (rc != MPI_SUCCESS) { + exit(1); + } +#elif !defined(USE_PMIX) && !defined(USE_PMI2) + /* if not using PMIX or PMI2, + * initialize rank/size to assume a singleton job */ + glb_pmi_rank = 0; + glb_pmi_size = 1; +#endif + + /* If the user has specified a hostfile, + * extract glb_pmi_rank and glb_pmi_size from there + * overriding any settings from MPI/PMI. */ + if (NULL != cfg->server_hostfile) { + rc = process_servers_hostfile(cfg->server_hostfile); + if (rc != (int)UNIFYFS_SUCCESS) { + LOGERR("failed to gather server information"); + exit(1); + } } + /* TODO: can we just use glb_pmi_size everywhere instead? */ + glb_num_servers = glb_pmi_size; + return (int)UNIFYFS_SUCCESS; } @@ -287,7 +279,6 @@ int main(int argc, char* argv[]) int kv_rank, kv_nranks; bool daemon = true; struct sigaction sa; - char rank_str[16] = {0}; char dbg_fname[UNIFYFS_MAX_FILENAME] = {0}; rc = unifyfs_config_init(&server_cfg, argc, argv, 0, NULL); @@ -343,12 +334,10 @@ int main(int argc, char* argv[]) // initialize empty app_configs[] memset(app_configs, 0, sizeof(app_configs)); -#if defined(UNIFYFSD_USE_MPI) - init_MPI(&argc, &argv); -#endif + // record hostname of this server in global variable + gethostname(glb_host, sizeof(glb_host)); // start logging - gethostname(glb_host, sizeof(glb_host)); snprintf(dbg_fname, sizeof(dbg_fname), "%s/%s.%s", server_cfg.log_dir, server_cfg.log_file, glb_host); rc = unifyfs_log_open(dbg_fname); @@ -359,12 +348,40 @@ int main(int argc, char* argv[]) // print config unifyfs_config_print(&server_cfg, unifyfs_log_stream); - if (NULL != server_cfg.server_hostfile) { - rc = process_servers_hostfile(server_cfg.server_hostfile); - if (rc != (int)UNIFYFS_SUCCESS) { - LOGERR("failed to gather server information"); - exit(1); - } + // initialize MPI and PMI if we're using them +#if defined(UNIFYFSD_USE_MPI) + int provided; + rc = MPI_Init_thread(NULL, NULL, MPI_THREAD_MULTIPLE, &provided); + if (rc != MPI_SUCCESS) { + LOGERR("failed to initialize MPI"); + exit(1); + } +#elif defined(USE_PMIX) + rc = unifyfs_pmix_init(); + if (rc != (int)UNIFYFS_SUCCESS) { + LOGERR("failed to initialize PMIX"); + exit(1); + } +#elif defined(USE_PMI2) + rc = unifyfs_pmi2_init(); + if (rc != (int)UNIFYFS_SUCCESS) { + LOGERR("failed to initialize PMI2"); + exit(1); + } +#endif + + /* get rank of this server process and number of servers, + * set glb_pmi_rank and glb_pmi_size */ + rc = get_server_rank_and_size(&server_cfg); + if (rc != (int)UNIFYFS_SUCCESS) { + LOGERR("failed to get server rank and size"); + exit(1); + } + + /* bail out if we don't have our server rank and group size defined */ + if (glb_pmi_size <= 0) { + LOGERR("failed to read rank and size of server group"); + exit(1); } kv_rank = glb_pmi_rank; @@ -384,17 +401,6 @@ int main(int argc, char* argv[]) glb_pmi_size = kv_nranks; } - snprintf(rank_str, sizeof(rank_str), "%d", glb_pmi_rank); - rc = unifyfs_keyval_publish_remote(key_unifyfsd_pmi_rank, rank_str); - if (rc != (int)UNIFYFS_SUCCESS) { - exit(1); - } - - if (NULL == server_cfg.server_hostfile) { - //glb_svr_rank = kv_rank; - rc = allocate_servers((size_t)kv_nranks); - } - LOGDBG("initializing rpc service"); rc = configurator_bool_val(server_cfg.margo_lazy_connect, &margo_lazy_connect); @@ -674,7 +680,7 @@ static int unifyfs_exit(void) #if defined(UNIFYFSD_USE_MPI) LOGDBG("finalizing MPI"); - fini_MPI(); + MPI_Finalize(); #endif LOGDBG("all done!");