From e7b033220a9c1fe1499d14225265bc6f258f2441 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 28 Dec 2020 15:24:51 -0700 Subject: [PATCH] [ADDED] natsOptions_SetFailRequestsOnDisconnect() option If enabled, any pending request (using new style) will fail with NATS_CONNECTION_DISCONNECTED status. Also fixed the failing of pending flush requests which was done in the doReconnect thread while it should be done prior to creating the pending buffer and starting the reconnect thread. Resolves #391 Signed-off-by: Ivan Kozlovic --- src/conn.c | 16 +++++++++---- src/nats.h | 17 ++++++++++++++ src/natsp.h | 5 ++++ src/opts.c | 10 ++++++++ src/pub.c | 2 +- test/list.txt | 1 + test/test.c | 65 +++++++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 110 insertions(+), 6 deletions(-) diff --git a/src/conn.c b/src/conn.c index f6a5f38f3..b78bed208 100644 --- a/src/conn.c +++ b/src/conn.c @@ -1203,6 +1203,7 @@ natsConn_disposeRespInfo(natsConnection *nc, respInfo *resp, bool needsLock) natsConn_Lock(nc); resp->closed = false; + resp->closedSts = NATS_OK; resp->removed = false; resp->msg = NULL; @@ -1360,7 +1361,7 @@ natsConn_initResp(natsConnection *nc, natsMsgHandler cb) // This will clear any pending Request calls. // Lock is assumed to be held by the caller. static void -_clearPendingRequestCalls(natsConnection *nc) +_clearPendingRequestCalls(natsConnection *nc, natsStatus reason) { natsStrHashIter iter; void *p = NULL; @@ -1374,6 +1375,7 @@ _clearPendingRequestCalls(natsConnection *nc) respInfo *val = (respInfo*) p; natsMutex_Lock(val->mu); val->closed = true; + val->closedSts = reason; val->removed = true; natsCondition_Signal(val->cond); natsMutex_Unlock(val->mu); @@ -1410,9 +1412,6 @@ _doReconnect(void *arg) natsConn_Lock(nc); - // Kick out all calls to natsConnection_Flush[Timeout](). - _clearPendingFlushRequests(nc); - // Clear any error. nc->err = NATS_OK; nc->errStr[0] = '\0'; @@ -2031,6 +2030,13 @@ _processOpError(natsConnection *nc, natsStatus s, bool initialConnect) nc->sockCtx.fd = NATS_SOCK_INVALID; } + // Fail pending flush requests. + if (ls == NATS_OK) + _clearPendingFlushRequests(nc); + // If option set, also fail pending requests. + if ((ls == NATS_OK) && nc->opts->failRequestsOnDisconnect) + _clearPendingRequestCalls(nc, NATS_CONNECTION_DISCONNECTED); + // Create the pending buffer to hold all write requests while we try // to reconnect. if (ls == NATS_OK) @@ -2382,7 +2388,7 @@ _close(natsConnection *nc, natsConnStatus status, bool fromPublicClose, bool doC _clearPendingFlushRequests(nc); // Kick out any queued and blocking requests. - _clearPendingRequestCalls(nc); + _clearPendingRequestCalls(nc, NATS_CONNECTION_CLOSED); if (nc->ptmr != NULL) natsTimer_Stop(nc->ptmr); diff --git a/src/nats.h b/src/nats.h index 7888cf16c..2d5028714 100644 --- a/src/nats.h +++ b/src/nats.h @@ -1534,6 +1534,23 @@ natsOptions_SetSendAsap(natsOptions *opts, bool sendAsap); NATS_EXTERN natsStatus natsOptions_UseOldRequestStyle(natsOptions *opts, bool useOldStyle); +/** \brief Fails pending requests on disconnect event. + * + * If this option is enabled, all pending #natsConnection_Request() family + * calls will fail with the #NATS_CONNECTION_DISCONNECTED status. + * + * \note This does not apply to requests from connections that use the + * old style requests. + * + * @see natsOptions_UseOldRequestStyle + * + * @param opts the pointer to the #natsOptions object. + * @param failRequests a boolean indicating if pending requests should fail + * when a disconnect event occurs. + */ +NATS_EXTERN natsStatus +natsOptions_SetFailRequestsOnDisconnect(natsOptions *opts, bool failRequests); + /** \brief Sets if connection receives its own messages. * * This configures whether the server will echo back messages diff --git a/src/natsp.h b/src/natsp.h index cce014f40..117fee738 100644 --- a/src/natsp.h +++ b/src/natsp.h @@ -261,6 +261,10 @@ struct __natsOptions // not rely on the flusher. bool sendAsap; + // If set to true, pending requests will fail with NATS_CONNECTION_DISCONNECTED + // when the library detects a disconnection. + bool failRequestsOnDisconnect; + // NoEcho configures whether the server will echo back messages // that are sent on this connection if we also have matching subscriptions. // Note this is supported on servers >= version 1.2. Proto 1 or greater. @@ -451,6 +455,7 @@ typedef struct __respInfo natsCondition *cond; natsMsg *msg; bool closed; + natsStatus closedSts; bool removed; bool pooled; diff --git a/src/opts.c b/src/opts.c index a8a92902e..81748cc7a 100644 --- a/src/opts.c +++ b/src/opts.c @@ -1077,6 +1077,16 @@ natsOptions_UseOldRequestStyle(natsOptions *opts, bool useOldStype) return NATS_OK; } +natsStatus +natsOptions_SetFailRequestsOnDisconnect(natsOptions *opts, bool failRequests) +{ + LOCK_AND_CHECK_OPTIONS(opts, 0); + opts->failRequestsOnDisconnect = failRequests; + UNLOCK_OPTS(opts); + + return NATS_OK; +} + static void _freeUserCreds(userCreds *uc) { diff --git a/src/pub.c b/src/pub.c index 63d3affbd..038f211fc 100644 --- a/src/pub.c +++ b/src/pub.c @@ -492,7 +492,7 @@ natsConnection_RequestMsg(natsMsg **replyMsg, natsConnection *nc, { // Set the correct error status that we return to the user if (resp->closed) - s = NATS_CONNECTION_CLOSED; + s = resp->closedSts; else s = NATS_TIMEOUT; } diff --git a/test/list.txt b/test/list.txt index 687991702..db0e84519 100644 --- a/test/list.txt +++ b/test/list.txt @@ -78,6 +78,7 @@ IsReconnectingAndStatus ReconnectBufSize RetryOnFailedConnect NoPartialOnReconnect +ReconnectFailsPendingRequests ErrOnConnectAndDeadlock ErrOnMaxPayloadLimit Auth diff --git a/test/test.c b/test/test.c index e0299373b..9ed2977de 100644 --- a/test/test.c +++ b/test/test.c @@ -17121,6 +17121,70 @@ test_NoPartialOnReconnect(void) _stopServer(pid); } +static void +_stopServerInThread(void *closure) +{ + natsPid pid = *((natsPid*) closure); + + nats_Sleep(150); + _stopServer(pid); +} + +static void +test_ReconnectFailsPendingRequest(void) +{ + natsStatus s; + natsOptions *opts = NULL; + natsConnection *nc = NULL; + natsSubscription *sub = NULL; + natsMsg *msg = NULL; + natsThread *t = NULL; + natsPid pid = NATS_INVALID_PID; + bool failr = false; + int iter; + + for (iter=1; iter<=2; iter++) + { + failr = (iter == 2 ? true : false); + + test("Create options: "); + s = natsOptions_Create(&opts); + IFOK(s, natsOptions_SetFailRequestsOnDisconnect(opts, failr)); + testCond(s == NATS_OK); + + test("Start server: "); + pid = _startServer("nats://127.0.0.1:4222", "-p 4222", true); + CHECK_SERVER_STARTED(pid); + testCond(true); + + test("Connect: "); + s = natsConnection_Connect(&nc, opts); + testCond(s == NATS_OK); + + test("Create service provider: "); + s = natsConnection_SubscribeSync(&sub, nc, "requests"); + testCond(s == NATS_OK); + + test("Start thread that will stop server: "); + s = natsThread_Create(&t, _stopServerInThread, (void*) &pid); + testCond(s == NATS_OK); + + test((failr ? "Fails due to disconnect: " : "Fails due to timeout: ")); + s = natsConnection_RequestString(&msg, nc, "requests", "help", 300); + testCond(s == (failr ? NATS_CONNECTION_DISCONNECTED : NATS_TIMEOUT)); + + natsThread_Join(t); + natsThread_Destroy(t); + t = NULL; + natsSubscription_Destroy(sub); + sub = NULL; + natsConnection_Destroy(nc); + nc = NULL; + natsOptions_Destroy(opts); + opts = NULL; + } +} + static void test_HeadersNotSupported(void) { @@ -20531,6 +20595,7 @@ static testInfo allTests[] = {"ReconnectBufSize", test_ReconnectBufSize}, {"RetryOnFailedConnect", test_RetryOnFailedConnect}, {"NoPartialOnReconnect", test_NoPartialOnReconnect}, + {"ReconnectFailsPendingRequests", test_ReconnectFailsPendingRequest}, {"ErrOnConnectAndDeadlock", test_ErrOnConnectAndDeadlock}, {"ErrOnMaxPayloadLimit", test_ErrOnMaxPayloadLimit},