diff --git a/src/conn.c b/src/conn.c index f6a5f38f3..b78bed208 100644 --- a/src/conn.c +++ b/src/conn.c @@ -1203,6 +1203,7 @@ natsConn_disposeRespInfo(natsConnection *nc, respInfo *resp, bool needsLock) natsConn_Lock(nc); resp->closed = false; + resp->closedSts = NATS_OK; resp->removed = false; resp->msg = NULL; @@ -1360,7 +1361,7 @@ natsConn_initResp(natsConnection *nc, natsMsgHandler cb) // This will clear any pending Request calls. // Lock is assumed to be held by the caller. static void -_clearPendingRequestCalls(natsConnection *nc) +_clearPendingRequestCalls(natsConnection *nc, natsStatus reason) { natsStrHashIter iter; void *p = NULL; @@ -1374,6 +1375,7 @@ _clearPendingRequestCalls(natsConnection *nc) respInfo *val = (respInfo*) p; natsMutex_Lock(val->mu); val->closed = true; + val->closedSts = reason; val->removed = true; natsCondition_Signal(val->cond); natsMutex_Unlock(val->mu); @@ -1410,9 +1412,6 @@ _doReconnect(void *arg) natsConn_Lock(nc); - // Kick out all calls to natsConnection_Flush[Timeout](). - _clearPendingFlushRequests(nc); - // Clear any error. nc->err = NATS_OK; nc->errStr[0] = '\0'; @@ -2031,6 +2030,13 @@ _processOpError(natsConnection *nc, natsStatus s, bool initialConnect) nc->sockCtx.fd = NATS_SOCK_INVALID; } + // Fail pending flush requests. + if (ls == NATS_OK) + _clearPendingFlushRequests(nc); + // If option set, also fail pending requests. + if ((ls == NATS_OK) && nc->opts->failRequestsOnDisconnect) + _clearPendingRequestCalls(nc, NATS_CONNECTION_DISCONNECTED); + // Create the pending buffer to hold all write requests while we try // to reconnect. if (ls == NATS_OK) @@ -2382,7 +2388,7 @@ _close(natsConnection *nc, natsConnStatus status, bool fromPublicClose, bool doC _clearPendingFlushRequests(nc); // Kick out any queued and blocking requests. - _clearPendingRequestCalls(nc); + _clearPendingRequestCalls(nc, NATS_CONNECTION_CLOSED); if (nc->ptmr != NULL) natsTimer_Stop(nc->ptmr); diff --git a/src/nats.h b/src/nats.h index 7888cf16c..2d5028714 100644 --- a/src/nats.h +++ b/src/nats.h @@ -1534,6 +1534,23 @@ natsOptions_SetSendAsap(natsOptions *opts, bool sendAsap); NATS_EXTERN natsStatus natsOptions_UseOldRequestStyle(natsOptions *opts, bool useOldStyle); +/** \brief Fails pending requests on disconnect event. + * + * If this option is enabled, all pending #natsConnection_Request() family + * calls will fail with the #NATS_CONNECTION_DISCONNECTED status. + * + * \note This does not apply to requests from connections that use the + * old style requests. + * + * @see natsOptions_UseOldRequestStyle + * + * @param opts the pointer to the #natsOptions object. + * @param failRequests a boolean indicating if pending requests should fail + * when a disconnect event occurs. + */ +NATS_EXTERN natsStatus +natsOptions_SetFailRequestsOnDisconnect(natsOptions *opts, bool failRequests); + /** \brief Sets if connection receives its own messages. * * This configures whether the server will echo back messages diff --git a/src/natsp.h b/src/natsp.h index cce014f40..117fee738 100644 --- a/src/natsp.h +++ b/src/natsp.h @@ -261,6 +261,10 @@ struct __natsOptions // not rely on the flusher. bool sendAsap; + // If set to true, pending requests will fail with NATS_CONNECTION_DISCONNECTED + // when the library detects a disconnection. + bool failRequestsOnDisconnect; + // NoEcho configures whether the server will echo back messages // that are sent on this connection if we also have matching subscriptions. // Note this is supported on servers >= version 1.2. Proto 1 or greater. @@ -451,6 +455,7 @@ typedef struct __respInfo natsCondition *cond; natsMsg *msg; bool closed; + natsStatus closedSts; bool removed; bool pooled; diff --git a/src/opts.c b/src/opts.c index a8a92902e..81748cc7a 100644 --- a/src/opts.c +++ b/src/opts.c @@ -1077,6 +1077,16 @@ natsOptions_UseOldRequestStyle(natsOptions *opts, bool useOldStype) return NATS_OK; } +natsStatus +natsOptions_SetFailRequestsOnDisconnect(natsOptions *opts, bool failRequests) +{ + LOCK_AND_CHECK_OPTIONS(opts, 0); + opts->failRequestsOnDisconnect = failRequests; + UNLOCK_OPTS(opts); + + return NATS_OK; +} + static void _freeUserCreds(userCreds *uc) { diff --git a/src/pub.c b/src/pub.c index 63d3affbd..038f211fc 100644 --- a/src/pub.c +++ b/src/pub.c @@ -492,7 +492,7 @@ natsConnection_RequestMsg(natsMsg **replyMsg, natsConnection *nc, { // Set the correct error status that we return to the user if (resp->closed) - s = NATS_CONNECTION_CLOSED; + s = resp->closedSts; else s = NATS_TIMEOUT; } diff --git a/test/list.txt b/test/list.txt index 687991702..db0e84519 100644 --- a/test/list.txt +++ b/test/list.txt @@ -78,6 +78,7 @@ IsReconnectingAndStatus ReconnectBufSize RetryOnFailedConnect NoPartialOnReconnect +ReconnectFailsPendingRequests ErrOnConnectAndDeadlock ErrOnMaxPayloadLimit Auth diff --git a/test/test.c b/test/test.c index e0299373b..9ed2977de 100644 --- a/test/test.c +++ b/test/test.c @@ -17121,6 +17121,70 @@ test_NoPartialOnReconnect(void) _stopServer(pid); } +static void +_stopServerInThread(void *closure) +{ + natsPid pid = *((natsPid*) closure); + + nats_Sleep(150); + _stopServer(pid); +} + +static void +test_ReconnectFailsPendingRequest(void) +{ + natsStatus s; + natsOptions *opts = NULL; + natsConnection *nc = NULL; + natsSubscription *sub = NULL; + natsMsg *msg = NULL; + natsThread *t = NULL; + natsPid pid = NATS_INVALID_PID; + bool failr = false; + int iter; + + for (iter=1; iter<=2; iter++) + { + failr = (iter == 2 ? true : false); + + test("Create options: "); + s = natsOptions_Create(&opts); + IFOK(s, natsOptions_SetFailRequestsOnDisconnect(opts, failr)); + testCond(s == NATS_OK); + + test("Start server: "); + pid = _startServer("nats://127.0.0.1:4222", "-p 4222", true); + CHECK_SERVER_STARTED(pid); + testCond(true); + + test("Connect: "); + s = natsConnection_Connect(&nc, opts); + testCond(s == NATS_OK); + + test("Create service provider: "); + s = natsConnection_SubscribeSync(&sub, nc, "requests"); + testCond(s == NATS_OK); + + test("Start thread that will stop server: "); + s = natsThread_Create(&t, _stopServerInThread, (void*) &pid); + testCond(s == NATS_OK); + + test((failr ? "Fails due to disconnect: " : "Fails due to timeout: ")); + s = natsConnection_RequestString(&msg, nc, "requests", "help", 300); + testCond(s == (failr ? NATS_CONNECTION_DISCONNECTED : NATS_TIMEOUT)); + + natsThread_Join(t); + natsThread_Destroy(t); + t = NULL; + natsSubscription_Destroy(sub); + sub = NULL; + natsConnection_Destroy(nc); + nc = NULL; + natsOptions_Destroy(opts); + opts = NULL; + } +} + static void test_HeadersNotSupported(void) { @@ -20531,6 +20595,7 @@ static testInfo allTests[] = {"ReconnectBufSize", test_ReconnectBufSize}, {"RetryOnFailedConnect", test_RetryOnFailedConnect}, {"NoPartialOnReconnect", test_NoPartialOnReconnect}, + {"ReconnectFailsPendingRequests", test_ReconnectFailsPendingRequest}, {"ErrOnConnectAndDeadlock", test_ErrOnConnectAndDeadlock}, {"ErrOnMaxPayloadLimit", test_ErrOnMaxPayloadLimit},