Skip to content

Commit

Permalink
add heartbeat rpc to detect failed clients
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelBrim authored and adammoody committed Aug 3, 2021
1 parent fc7cb5b commit ea2f5f5
Show file tree
Hide file tree
Showing 10 changed files with 256 additions and 18 deletions.
81 changes: 68 additions & 13 deletions client/src/margo_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ static void register_client_rpcs(client_rpc_context_t* ctx)

hg_id_t hgid;

/* client-to-server RPCs */

#define CLIENT_REGISTER_RPC(name) \
do { \
hgid = MARGO_REGISTER(mid, "unifyfs_" #name "_rpc", \
Expand All @@ -42,15 +44,6 @@ static void register_client_rpcs(client_rpc_context_t* ctx)
ctx->rpcs.name##_id = hgid; \
} while (0)

#define CLIENT_REGISTER_RPC_HANDLER(name) \
do { \
hgid = MARGO_REGISTER(mid, "unifyfs_" #name "_rpc", \
unifyfs_##name##_in_t, \
unifyfs_##name##_out_t, \
unifyfs_##name##_rpc); \
ctx->rpcs.name##_id = hgid; \
} while (0)

CLIENT_REGISTER_RPC(attach);
CLIENT_REGISTER_RPC(mount);
CLIENT_REGISTER_RPC(unmount);
Expand All @@ -63,11 +56,25 @@ static void register_client_rpcs(client_rpc_context_t* ctx)
CLIENT_REGISTER_RPC(laminate);
CLIENT_REGISTER_RPC(fsync);
CLIENT_REGISTER_RPC(mread);

#undef CLIENT_REGISTER_RPC

/* server-to-client RPCs */

#define CLIENT_REGISTER_RPC_HANDLER(name) \
do { \
hgid = MARGO_REGISTER(mid, "unifyfs_" #name "_rpc", \
unifyfs_##name##_in_t, \
unifyfs_##name##_out_t, \
unifyfs_##name##_rpc); \
ctx->rpcs.name##_id = hgid; \
} while (0)

CLIENT_REGISTER_RPC_HANDLER(heartbeat);
CLIENT_REGISTER_RPC_HANDLER(mread_req_data);
CLIENT_REGISTER_RPC_HANDLER(mread_req_complete);
CLIENT_REGISTER_RPC_HANDLER(transfer_complete);

#undef CLIENT_REGISTER_RPC
#undef CLIENT_REGISTER_RPC_HANDLER
}

Expand Down Expand Up @@ -191,6 +198,8 @@ int unifyfs_client_rpc_finalize(void)
return UNIFYFS_SUCCESS;
}

/*--- Invocation methods for client-to-server RPCs ---*/

/* create and return a margo handle for given rpc id */
static hg_handle_t create_handle(hg_id_t id)
{
Expand Down Expand Up @@ -847,6 +856,53 @@ int invoke_client_mread_rpc(unifyfs_client* client,
return ret;
}

/*--- Handler methods for server-to-client RPCs ---*/

/* simple heartbeat ping rpc */
static void unifyfs_heartbeat_rpc(hg_handle_t handle)
{
int ret;

/* get input params */
unifyfs_heartbeat_in_t in;
hg_return_t hret = margo_get_input(handle, &in);
if (hret != HG_SUCCESS) {
LOGERR("margo_get_input() failed");
ret = UNIFYFS_ERROR_MARGO;
} else {
/* lookup client */
unifyfs_client* client;
int client_app = (int) in.app_id;
int client_id = (int) in.client_id;
client = unifyfs_find_client(client_app, client_id, NULL);
if (NULL == client) {
/* unknown client */
ret = EINVAL;
} else if (client->state.is_mounted) {
/* client is still active */
ret = UNIFYFS_SUCCESS;
} else {
ret = UNIFYFS_FAILURE;
}
margo_free_input(handle, &in);
}

/* set rpc result status */
unifyfs_heartbeat_out_t out;
out.ret = ret;

/* return to caller */
LOGDBG("responding");
hret = margo_respond(handle, &out);
if (hret != HG_SUCCESS) {
LOGERR("margo_respond() failed");
}

/* free margo resources */
margo_destroy(handle);
}
DEFINE_MARGO_RPC_HANDLER(unifyfs_heartbeat_rpc)

/* for client read request identified by mread_id and request index, copy bulk
* data to request's user buffer at given byte offset from start of request */
static void unifyfs_mread_req_data_rpc(hg_handle_t handle)
Expand Down Expand Up @@ -954,9 +1010,8 @@ static void unifyfs_mread_req_data_rpc(hg_handle_t handle)
unifyfs_mread_req_data_out_t out;
out.ret = ret;

LOGDBG("responding");

/* return to caller */
LOGDBG("responding");
hret = margo_respond(handle, &out);
if (hret != HG_SUCCESS) {
LOGERR("margo_respond() failed");
Expand Down Expand Up @@ -1034,7 +1089,7 @@ static void unifyfs_transfer_complete_rpc(hg_handle_t handle)
LOGERR("margo_get_input() failed");
ret = UNIFYFS_ERROR_MARGO;
} else {
/* lookup client mread request */
/* lookup client transfer request */
unifyfs_client* client;
int client_app = (int) in.app_id;
int client_id = (int) in.client_id;
Expand Down
4 changes: 4 additions & 0 deletions client/src/margo_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <margo.h>

typedef struct ClientRpcIds {
/* client-to-server */
hg_id_t attach_id;
hg_id_t mount_id;
hg_id_t unmount_id;
Expand All @@ -36,6 +37,9 @@ typedef struct ClientRpcIds {
hg_id_t laminate_id;
hg_id_t fsync_id;
hg_id_t mread_id;

/* server-to-client */
hg_id_t heartbeat_id;
hg_id_t mread_req_data_id;
hg_id_t mread_req_complete_id;
hg_id_t transfer_complete_id;
Expand Down
11 changes: 7 additions & 4 deletions common/src/arraylist.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,15 @@ int arraylist_free(arraylist_t* arr)
return -1;
}

int i;
for (i = 0; i < arr->cap; i++) {
if (arr->elems[i] != NULL) {
free(arr->elems[i]);
if (NULL != arr->elems) {
for (int i = 0; i < arr->cap; i++) {
if (arr->elems[i] != NULL) {
free(arr->elems[i]);
}
}
free(arr->elems);
}

free(arr);

return 0;
Expand Down
9 changes: 9 additions & 0 deletions common/src/unifyfs_client_rpcs.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,15 @@ MERCURY_GEN_PROC(unifyfs_mread_req_complete_in_t,
MERCURY_GEN_PROC(unifyfs_mread_req_complete_out_t, ((int32_t)(ret)))
DECLARE_MARGO_RPC_HANDLER(unifyfs_mread_req_complete_rpc)

/* unifyfs_heartbeat_rpc (server => client)
*
* Used to detect when client unexpectedly goes away */
MERCURY_GEN_PROC(unifyfs_heartbeat_in_t,
((int32_t)(app_id))
((int32_t)(client_id)))
MERCURY_GEN_PROC(unifyfs_heartbeat_out_t, ((int32_t)(ret)))
DECLARE_MARGO_RPC_HANDLER(unifyfs_heartbeat_rpc)

#ifdef __cplusplus
} // extern "C"
#endif
Expand Down
55 changes: 55 additions & 0 deletions server/src/margo_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,12 @@ static void register_client_server_rpcs(margo_instance_id mid)
unifyfs_mread_rpc);

/* register the RPCs we call (and capture assigned hg_id_t) */
unifyfsd_rpc_context->rpcs.client_heartbeat_id =
MARGO_REGISTER(mid, "unifyfs_heartbeat_rpc",
unifyfs_heartbeat_in_t,
unifyfs_heartbeat_out_t,
NULL);

unifyfsd_rpc_context->rpcs.client_mread_data_id =
MARGO_REGISTER(mid, "unifyfs_mread_req_data_rpc",
unifyfs_mread_req_data_in_t,
Expand Down Expand Up @@ -618,6 +624,55 @@ static hg_handle_t create_client_handle(hg_id_t id,
return handle;
}

/* invokes the heartbeat rpc function */
int invoke_client_heartbeat_rpc(int app_id,
int client_id)
{
hg_return_t hret;

/* check that we have initialized margo */
if (NULL == unifyfsd_rpc_context) {
return UNIFYFS_FAILURE;
}

/* fill input struct */
unifyfs_heartbeat_in_t in;
in.app_id = (int32_t) app_id;
in.client_id = (int32_t) client_id;

/* get handle to rpc function */
hg_id_t rpc_id = unifyfsd_rpc_context->rpcs.client_heartbeat_id;
hg_handle_t handle = create_client_handle(rpc_id, app_id, client_id);

/* call rpc function */
LOGDBG("invoking the heartbeat rpc function in client");
double timeout_msec = 500; /* half a second */
hret = margo_forward_timed(handle, &in, timeout_msec);
if (hret != HG_SUCCESS) {
LOGERR("margo_forward_timed() failed");
margo_destroy(handle);
return UNIFYFS_ERROR_MARGO;
}

/* decode response */
int ret;
unifyfs_heartbeat_out_t out;
hret = margo_get_output(handle, &out);
if (hret == HG_SUCCESS) {
LOGDBG("Got response ret=%" PRIi32, out.ret);
ret = (int) out.ret;
margo_free_output(handle, &out);
} else {
LOGERR("margo_get_output() failed");
ret = UNIFYFS_ERROR_MARGO;
}

/* free resources */
margo_destroy(handle);

return ret;
}

/* invokes the client mread request data response rpc function */
int invoke_client_mread_req_data_rpc(int app_id,
int client_id,
Expand Down
4 changes: 4 additions & 0 deletions server/src/margo_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ typedef struct ServerRpcIds {
hg_id_t unlink_bcast_id;

/* client-server rpcs */
hg_id_t client_heartbeat_id;
hg_id_t client_mread_data_id;
hg_id_t client_mread_complete_id;
hg_id_t client_transfer_complete_id;
Expand Down Expand Up @@ -82,6 +83,9 @@ void* pull_margo_bulk_buffer(hg_handle_t rpc_hdl,
hg_size_t bulk_sz,
hg_bulk_t* local_bulk);

/* invokes the client heartbeat rpc function */
int invoke_client_heartbeat_rpc(int app_id, int client_id);

/* invokes the client mread request data response rpc function */
int invoke_client_mread_req_data_rpc(int app_id,
int client_id,
Expand Down
6 changes: 6 additions & 0 deletions server/src/unifyfs_global.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ typedef struct {
struct reqmgr_thrd;



/**
* Structure to maintain application client state, including
* logio and shared memory contexts, margo rpc address, etc.
Expand Down Expand Up @@ -187,6 +188,11 @@ unifyfs_rc disconnect_app_client(app_client* clnt);
unifyfs_rc cleanup_app_client(app_config* app, app_client* clnt);


/* arraylist to track failed clients */
arraylist_t* failed_clients; // = NULL
unifyfs_rc add_failed_client(int app_id, int client_id);


/* publish the pids of all servers to a shared file */
int unifyfs_publish_server_pids(void);

Expand Down
2 changes: 1 addition & 1 deletion server/src/unifyfs_group_rpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ static int collective_finish(coll_request* coll_req)
if (NULL != coll_req->child_reqs) {
margo_request* creq;
hg_handle_t* chdl;
/* MJB TODO - use margo_wait_any() instead of our own loop */
/* TODO: use margo_wait_any() instead of our own loop */
for (i = 0; i < child_count; i++) {
chdl = coll_req->child_hdls + i;
creq = coll_req->child_reqs + i;
Expand Down
38 changes: 38 additions & 0 deletions server/src/unifyfs_request_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -1312,6 +1312,38 @@ static int rm_process_client_requests(reqmgr_thrd_t* reqmgr)
return ret;
}

static int rm_heartbeat(reqmgr_thrd_t* reqmgr)
{
static time_t last_check; // = 0
static int check_interval = 30; /* seconds */

int ret = UNIFYFS_SUCCESS;

/* send a heartbeat rpc to associated client every 30 seconds */
time_t now = time(NULL);
if (0 == last_check) {
last_check = now;
}

time_t elapsed = now - last_check;
if (elapsed >= check_interval) {
last_check = now;

/* invoke heartbeat rpc */
LOGDBG("sending heartbeat rpc");
int app = reqmgr->app_id;
int clid = reqmgr->client_id;
int rc = invoke_client_heartbeat_rpc(app, clid);
if (rc != UNIFYFS_SUCCESS) {
ret = rc;
LOGDBG("heartbeat rpc for client[%d:%d] failed", app, clid);
add_failed_client(app, clid);
}
}

return ret;
}

/* Entry point for request manager thread. One thread is created
* for each client process to retrieve remote data and notify the
* client when data is ready.
Expand Down Expand Up @@ -1381,6 +1413,12 @@ void* request_manager_thread(void* arg)
thrd_ctrl->waiting_for_work = 0;
RM_UNLOCK(thrd_ctrl);

rc = rm_heartbeat(thrd_ctrl);
if (rc != UNIFYFS_SUCCESS) {
/* detected failure of our client, time to exit */
break;
}

/* bail out if we've been told to exit */
if (thrd_ctrl->exit_flag == 1) {
break;
Expand Down
Loading

0 comments on commit ea2f5f5

Please sign in to comment.