Skip to content

Commit

Permalink
server: move glb_servers to local variable in margo_server
Browse files Browse the repository at this point in the history
  • Loading branch information
adammoody committed Jul 20, 2021
1 parent 444f4aa commit 50a3089
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 68 deletions.
32 changes: 0 additions & 32 deletions common/src/unifyfs_rpc_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,6 @@ void rpc_publish_local_server_addr(const char* addr)
}
}

/* publishes server-server RPC address */
void rpc_publish_remote_server_addr(const char* addr)
{
LOGDBG("publishing server-server rpc address '%s'", addr);

// publish client-server margo address
unifyfs_keyval_publish_remote(key_unifyfsd_margo_svr, addr);
}

/* lookup address of server, returns NULL if server address is not found,
* otherwise returns server address in newly allocated string that caller
* must free */
Expand Down Expand Up @@ -87,29 +78,6 @@ char* rpc_lookup_local_server_addr(void)
return addr;
}

/* lookup address of server, returns NULL if server address is not found,
* otherwise returns server address in newly allocated string that caller
* must free */
char* rpc_lookup_remote_server_addr(int srv_rank)
{
/* returns NULL if we can't find server address */
char* addr = NULL;
char* valstr = NULL;

// lookup server-server margo address
if (0 == unifyfs_keyval_lookup_remote(srv_rank, key_unifyfsd_margo_svr,
&valstr)) {
addr = strdup(valstr);
free(valstr);
}

/* print sserver address (debugging) */
if (NULL != addr) {
LOGDBG("found server %d rpc address '%s'", srv_rank, addr);
}
return addr;
}

/* remove local server RPC address file */
void rpc_clean_local_server_addr(void)
{
Expand Down
2 changes: 0 additions & 2 deletions common/src/unifyfs_rpc_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@ extern "C" {

/* publish the address of the server */
void rpc_publish_local_server_addr(const char* addr);
void rpc_publish_remote_server_addr(const char* addr);

/* lookup address of server */
char* rpc_lookup_local_server_addr(void);
char* rpc_lookup_remote_server_addr(int srv_rank);

/* remove server rpc address file */
void rpc_clean_local_server_addr(void);
Expand Down
66 changes: 34 additions & 32 deletions server/src/margo_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ int margo_client_server_pool_sz = 4;
int margo_server_server_pool_sz = 4;
int margo_use_progress_thread = 1;

// records pmi rank, server address string, and server address
// for each server for use in server-to-server rpcs
static server_info_t* server_infos; // array of server_info_t

#if defined(NA_HAS_SM)
static const char* PROTOCOL_MARGO_SHM = "na+sm";
#else
Expand Down Expand Up @@ -383,19 +387,21 @@ int margo_server_rpc_finalize(void)

/* free global server addresses */
for (int i = 0; i < glb_num_servers; i++) {
if (glb_servers[i].margo_svr_addr != HG_ADDR_NULL) {
margo_addr_free(ctx->svr_mid, glb_servers[i].margo_svr_addr);
glb_servers[i].margo_svr_addr = HG_ADDR_NULL;
server_info_t* server = &server_infos[i];
if (server->margo_svr_addr != HG_ADDR_NULL) {
margo_addr_free(ctx->svr_mid, server->margo_svr_addr);
server->margo_svr_addr = HG_ADDR_NULL;
}
if (NULL != glb_servers[i].margo_svr_addr_str) {
free(glb_servers[i].margo_svr_addr_str);
glb_servers[i].margo_svr_addr_str = NULL;
if (NULL != server->margo_svr_addr_str) {
free(server->margo_svr_addr_str);
server->margo_svr_addr_str = NULL;
}
}

/* shut down margo */
LOGDBG("finalizing server-server margo");
margo_finalize(ctx->svr_mid);

/* NOTE: 2nd call to margo_finalize() sometimes crashes - Margo bug? */
LOGDBG("finalizing client-server margo");
margo_finalize(ctx->shm_mid);
Expand All @@ -412,18 +418,19 @@ int margo_connect_server(int rank)
assert(rank < glb_num_servers);

int ret = UNIFYFS_SUCCESS;
char* margo_addr_str = rpc_lookup_remote_server_addr(rank);

server_info_t* server = &server_infos[rank];

char* margo_addr_str = server->margo_svr_addr_str;
if (NULL == margo_addr_str) {
LOGERR("server index=%d - margo server lookup failed", rank);
ret = UNIFYFS_ERROR_KEYVAL;
return ret;
return (int)UNIFYFS_ERROR_KEYVAL;
}
glb_servers[rank].margo_svr_addr_str = margo_addr_str;
LOGDBG("server rank=%d, margo_addr=%s", rank, margo_addr_str);

hg_return_t hret = margo_addr_lookup(unifyfsd_rpc_context->svr_mid,
glb_servers[rank].margo_svr_addr_str,
&(glb_servers[rank].margo_svr_addr));
server->margo_svr_addr_str,
&(server->margo_svr_addr));
if (hret != HG_SUCCESS) {
LOGERR("server index=%zu - margo_addr_lookup(%s) failed",
rank, margo_addr_str);
Expand All @@ -435,20 +442,17 @@ int margo_connect_server(int rank)

/* margo_connect_servers
*
* Using address strings found in glb_servers, resolve
* each peer server's margo address.
* Gather pmi rank and margo address string for all servers,
* and optionally connect to each one.
*/
int margo_connect_servers(void)
{
int rc;
hg_return_t hret;

int ret = (int)UNIFYFS_SUCCESS;

/* publish our pmi rank */
char rank_str[16] = {0};
snprintf(rank_str, sizeof(rank_str), "%d", glb_pmi_rank);
rc = unifyfs_keyval_publish_remote(key_unifyfsd_pmi_rank, rank_str);
int rc = unifyfs_keyval_publish_remote(key_unifyfsd_pmi_rank, rank_str);
if ((int)UNIFYFS_SUCCESS != rc) {
LOGERR("failed to publish PMI rank of server");
return (int)UNIFYFS_ERROR_KEYVAL;
Expand Down Expand Up @@ -480,9 +484,9 @@ int margo_connect_servers(void)
}

/* allocate array of structs to record address for each server */
glb_servers = (server_info_t*) calloc(glb_num_servers,
server_infos = (server_info_t*) calloc(glb_num_servers,
sizeof(server_info_t));
if (NULL == glb_servers) {
if (NULL == server_infos) {
LOGERR("failed to allocate server_info array");
return ENOMEM;
}
Expand Down Expand Up @@ -518,20 +522,17 @@ int margo_connect_servers(void)
LOGDBG("found server %d rpc address '%s'", i, margo_addr_str);

/* record values on struct for this server */
glb_servers[i].pmi_rank = remote_pmi_rank;
glb_servers[i].margo_svr_addr = HG_ADDR_NULL;
glb_servers[i].margo_svr_addr_str = margo_addr_str;
server_info_t* server = &server_infos[i];
server->pmi_rank = remote_pmi_rank;
server->margo_svr_addr = HG_ADDR_NULL;
server->margo_svr_addr_str = margo_addr_str;
LOGDBG("server index=%zu, pmi_rank=%d, margo_addr=%s",
i, remote_pmi_rank, margo_addr_str);

/* connect to each server now if not using lazy connect */
if (!margo_lazy_connect) {
hret = margo_addr_lookup(unifyfsd_rpc_context->svr_mid,
glb_servers[i].margo_svr_addr_str,
&(glb_servers[i].margo_svr_addr));
if (hret != HG_SUCCESS) {
LOGERR("server index=%zu - margo_addr_lookup(%s) failed",
i, margo_addr_str);
rc = margo_connect_server(i);
if (UNIFYFS_SUCCESS != rc) {
ret = (int)UNIFYFS_FAILURE;
}
}
Expand All @@ -543,11 +544,12 @@ int margo_connect_servers(void)
hg_addr_t get_margo_server_address(int rank)
{
assert(rank < glb_num_servers);
hg_addr_t addr = glb_servers[rank].margo_svr_addr;
server_info_t* server = &server_infos[rank];
hg_addr_t addr = server->margo_svr_addr;
if ((HG_ADDR_NULL == addr) && margo_lazy_connect) {
int rc = margo_connect_server(rank);
if (rc == UNIFYFS_SUCCESS) {
addr = glb_servers[rank].margo_svr_addr;
if (UNIFYFS_SUCCESS == rc) {
addr = server->margo_svr_addr;
}
}
return addr;
Expand Down
1 change: 0 additions & 1 deletion server/src/unifyfs_global.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ typedef struct {
int pmi_rank;
} server_info_t;

extern server_info_t* glb_servers; /* array of server info structs */
extern size_t glb_num_servers; /* number of entries in glb_servers array */

extern struct unifyfs_inode_tree* global_inode_tree; /* global inode tree */
Expand Down
1 change: 0 additions & 1 deletion server/src/unifyfs_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ int server_pid;
char glb_host[UNIFYFS_MAX_HOSTNAME];

size_t glb_num_servers; // size of glb_servers array
server_info_t* glb_servers; // array of server_info_t

unifyfs_cfg_t server_cfg;

Expand Down

0 comments on commit 50a3089

Please sign in to comment.