Skip to content

Commit

Permalink
server: publish remote address through direct keyval calls
Browse files Browse the repository at this point in the history
  • Loading branch information
adammoody committed Jul 20, 2021
1 parent fd7bb20 commit 444f4aa
Showing 1 changed file with 63 additions and 54 deletions.
117 changes: 63 additions & 54 deletions server/src/margo_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -179,17 +179,42 @@ static void register_server_server_rpcs(margo_instance_id mid)
unlink_bcast_rpc);
}

/* Given a margo instance ID (mid), return its corresponding
* address as a newly allocated string to be freed by caller.
* Returns NULL on error. */
static char* get_margo_addr(margo_instance_id mid)
{
/* get margo address for given instance */
hg_addr_t addr_self;
hg_return_t hret = margo_addr_self(mid, &addr_self);
if (hret != HG_SUCCESS) {
LOGERR("margo_addr_self() failed");
return NULL;
}

/* convert margo address to a string */
char self_string[128];
hg_size_t self_string_sz = sizeof(self_string);
hret = margo_addr_to_string(mid,
self_string, &self_string_sz, addr_self);
if (hret != HG_SUCCESS) {
LOGERR("margo_addr_to_string() failed");
margo_addr_free(mid, addr_self);
return NULL;
}
margo_addr_free(mid, addr_self);

/* return address in newly allocated string */
char* addr = strdup(self_string);
return addr;
}

/* setup_local_target - Initializes the client-server margo target */
static margo_instance_id setup_local_target(void)
{
/* initialize margo */
const char* margo_protocol = PROTOCOL_MARGO_SHM;
hg_return_t hret;
hg_addr_t addr_self;
char self_string[128];
hg_size_t self_string_sz = sizeof(self_string);
margo_instance_id mid;
mid = margo_init(margo_protocol, MARGO_SERVER_MODE,
margo_instance_id mid = margo_init(margo_protocol, MARGO_SERVER_MODE,
margo_use_progress_thread, margo_client_server_pool_sz);
if (mid == MARGO_INSTANCE_NULL) {
LOGERR("margo_init(%s, SERVER_MODE, %d, %d) failed", margo_protocol,
Expand All @@ -198,23 +223,13 @@ static margo_instance_id setup_local_target(void)
}

/* figure out what address this server is listening on */
hret = margo_addr_self(mid, &addr_self);
if (hret != HG_SUCCESS) {
char* self_string = get_margo_addr(mid);
if (NULL == self_string) {
LOGERR("margo_addr_self() failed");
margo_finalize(mid);
return MARGO_INSTANCE_NULL;
}
hret = margo_addr_to_string(mid,
self_string, &self_string_sz,
addr_self);
if (hret != HG_SUCCESS) {
LOGERR("margo_addr_to_string() failed");
margo_addr_free(mid, addr_self);
margo_finalize(mid);
return MARGO_INSTANCE_NULL;
}
LOGINFO("shared-memory margo RPC server: %s", self_string);
margo_addr_free(mid, addr_self);

/* publish rpc address of server for local clients */
rpc_publish_local_server_addr(self_string);
Expand Down Expand Up @@ -430,48 +445,41 @@ int margo_connect_servers(void)

int ret = (int)UNIFYFS_SUCCESS;

/* publish the pmi rank of this server */
/* publish our pmi rank */
char rank_str[16] = {0};
snprintf(rank_str, sizeof(rank_str), "%d", glb_pmi_rank);
rc = unifyfs_keyval_publish_remote(key_unifyfsd_pmi_rank, rank_str);
if (rc != (int)UNIFYFS_SUCCESS) {
exit(1);
}

/* get margo address of our server context */
hg_addr_t addr_self;
margo_instance_id mid = unifyfsd_rpc_context->svr_mid;
hret = margo_addr_self(mid, &addr_self);
if (hret != HG_SUCCESS) {
LOGERR("margo_addr_self() failed");
exit(1);
if ((int)UNIFYFS_SUCCESS != rc) {
LOGERR("failed to publish PMI rank of server");
return (int)UNIFYFS_ERROR_KEYVAL;
}

/* convert our margo address to string format */
char self_string[128];
hg_size_t self_string_sz = sizeof(self_string);
hret = margo_addr_to_string(mid,
self_string, &self_string_sz, addr_self);
if (hret != HG_SUCCESS) {
LOGERR("margo_addr_to_string() failed");
margo_addr_free(mid, addr_self);
exit(1);
/* get our address for server-server rpcs */
char* self_string = get_margo_addr(unifyfsd_rpc_context->svr_mid);
if (NULL == self_string) {
LOGERR("invalid value to publish server-server margo rpc address");
return (int)UNIFYFS_ERROR_KEYVAL;
}
LOGINFO("margo RPC server: %s", self_string);
margo_addr_free(mid, addr_self);

/* publish rpc address of server for remote servers */
rpc_publish_remote_server_addr(self_string);
/* publish our address */
LOGDBG("publishing server-server rpc address '%s'", self_string);
rc = unifyfs_keyval_publish_remote(key_unifyfsd_margo_svr, self_string);
if ((int)UNIFYFS_SUCCESS != rc) {
LOGERR("failed to publish server-server margo rpc address");
free(self_string);
return (int)UNIFYFS_ERROR_KEYVAL;
}
free(self_string);

/* block until a margo_svr key pair published by all servers */
/* block until all servers have published their address */
rc = unifyfs_keyval_fence_remote();
if ((int)UNIFYFS_SUCCESS != rc) {
LOGERR("keyval fence on margo_svr key failed");
ret = UNIFYFS_ERROR_KEYVAL;
return ret;
return (int)UNIFYFS_ERROR_KEYVAL;
}

/* allocate array of structs to record margo address for each server */
/* allocate array of structs to record address for each server */
glb_servers = (server_info_t*) calloc(glb_num_servers,
sizeof(server_info_t));
if (NULL == glb_servers) {
Expand All @@ -489,24 +497,25 @@ int margo_connect_servers(void)
&pmi_rank_str);
if ((int)UNIFYFS_SUCCESS != rc) {
LOGERR("server index=%zu - pmi rank lookup failed", i);
ret = (int)UNIFYFS_FAILURE;
return ret;
return (int)UNIFYFS_FAILURE;
}

/* convert pmi rank string to int and record it */
/* convert pmi rank string to int */
int remote_pmi_rank = -1;
if (NULL != pmi_rank_str) {
remote_pmi_rank = atoi(pmi_rank_str);
free(pmi_rank_str);
}

/* lookup margo address string for this server */
char* margo_addr_str = rpc_lookup_remote_server_addr(i);
if (NULL == margo_addr_str) {
/* lookup rpc address for this server */
char* margo_addr_str = NULL;
rc = unifyfs_keyval_lookup_remote(i, key_unifyfsd_margo_svr,
&margo_addr_str);
if ((int)UNIFYFS_SUCCESS != rc) {
LOGERR("server index=%zu - margo server lookup failed", i);
ret = (int)UNIFYFS_FAILURE;
return ret;
return (int)UNIFYFS_FAILURE;
}
LOGDBG("found server %d rpc address '%s'", i, margo_addr_str);

/* record values on struct for this server */
glb_servers[i].pmi_rank = remote_pmi_rank;
Expand Down

0 comments on commit 444f4aa

Please sign in to comment.