Skip to content

Commit

Permalink
Merge pull request #392 from nats-io/fix_391
Browse files Browse the repository at this point in the history
[ADDED] natsOptions_SetFailRequestsOnDisconnect() option
  • Loading branch information
kozlovic authored Dec 29, 2020
2 parents caa998c + e7b0332 commit 242427c
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 6 deletions.
16 changes: 11 additions & 5 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -1203,6 +1203,7 @@ natsConn_disposeRespInfo(natsConnection *nc, respInfo *resp, bool needsLock)
natsConn_Lock(nc);

resp->closed = false;
resp->closedSts = NATS_OK;
resp->removed = false;
resp->msg = NULL;

Expand Down Expand Up @@ -1360,7 +1361,7 @@ natsConn_initResp(natsConnection *nc, natsMsgHandler cb)
// This will clear any pending Request calls.
// Lock is assumed to be held by the caller.
static void
_clearPendingRequestCalls(natsConnection *nc)
_clearPendingRequestCalls(natsConnection *nc, natsStatus reason)
{
natsStrHashIter iter;
void *p = NULL;
Expand All @@ -1374,6 +1375,7 @@ _clearPendingRequestCalls(natsConnection *nc)
respInfo *val = (respInfo*) p;
natsMutex_Lock(val->mu);
val->closed = true;
val->closedSts = reason;
val->removed = true;
natsCondition_Signal(val->cond);
natsMutex_Unlock(val->mu);
Expand Down Expand Up @@ -1410,9 +1412,6 @@ _doReconnect(void *arg)

natsConn_Lock(nc);

// Kick out all calls to natsConnection_Flush[Timeout]().
_clearPendingFlushRequests(nc);

// Clear any error.
nc->err = NATS_OK;
nc->errStr[0] = '\0';
Expand Down Expand Up @@ -2031,6 +2030,13 @@ _processOpError(natsConnection *nc, natsStatus s, bool initialConnect)
nc->sockCtx.fd = NATS_SOCK_INVALID;
}

// Fail pending flush requests.
if (ls == NATS_OK)
_clearPendingFlushRequests(nc);
// If option set, also fail pending requests.
if ((ls == NATS_OK) && nc->opts->failRequestsOnDisconnect)
_clearPendingRequestCalls(nc, NATS_CONNECTION_DISCONNECTED);

// Create the pending buffer to hold all write requests while we try
// to reconnect.
if (ls == NATS_OK)
Expand Down Expand Up @@ -2382,7 +2388,7 @@ _close(natsConnection *nc, natsConnStatus status, bool fromPublicClose, bool doC
_clearPendingFlushRequests(nc);

// Kick out any queued and blocking requests.
_clearPendingRequestCalls(nc);
_clearPendingRequestCalls(nc, NATS_CONNECTION_CLOSED);

if (nc->ptmr != NULL)
natsTimer_Stop(nc->ptmr);
Expand Down
17 changes: 17 additions & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -1534,6 +1534,23 @@ natsOptions_SetSendAsap(natsOptions *opts, bool sendAsap);
NATS_EXTERN natsStatus
natsOptions_UseOldRequestStyle(natsOptions *opts, bool useOldStyle);

/** \brief Fails pending requests on disconnect event.
*
* If this option is enabled, all pending #natsConnection_Request() family
* calls will fail with the #NATS_CONNECTION_DISCONNECTED status.
*
* \note This does not apply to requests from connections that use the
* old style requests.
*
* @see natsOptions_UseOldRequestStyle
*
* @param opts the pointer to the #natsOptions object.
* @param failRequests a boolean indicating if pending requests should fail
* when a disconnect event occurs.
*/
NATS_EXTERN natsStatus
natsOptions_SetFailRequestsOnDisconnect(natsOptions *opts, bool failRequests);

/** \brief Sets if connection receives its own messages.
*
* This configures whether the server will echo back messages
Expand Down
5 changes: 5 additions & 0 deletions src/natsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,10 @@ struct __natsOptions
// not rely on the flusher.
bool sendAsap;

// If set to true, pending requests will fail with NATS_CONNECTION_DISCONNECTED
// when the library detects a disconnection.
bool failRequestsOnDisconnect;

// NoEcho configures whether the server will echo back messages
// that are sent on this connection if we also have matching subscriptions.
// Note this is supported on servers >= version 1.2. Proto 1 or greater.
Expand Down Expand Up @@ -451,6 +455,7 @@ typedef struct __respInfo
natsCondition *cond;
natsMsg *msg;
bool closed;
natsStatus closedSts;
bool removed;
bool pooled;

Expand Down
10 changes: 10 additions & 0 deletions src/opts.c
Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,16 @@ natsOptions_UseOldRequestStyle(natsOptions *opts, bool useOldStype)
return NATS_OK;
}

natsStatus
natsOptions_SetFailRequestsOnDisconnect(natsOptions *opts, bool failRequests)
{
LOCK_AND_CHECK_OPTIONS(opts, 0);
opts->failRequestsOnDisconnect = failRequests;
UNLOCK_OPTS(opts);

return NATS_OK;
}

static void
_freeUserCreds(userCreds *uc)
{
Expand Down
2 changes: 1 addition & 1 deletion src/pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ natsConnection_RequestMsg(natsMsg **replyMsg, natsConnection *nc,
{
// Set the correct error status that we return to the user
if (resp->closed)
s = NATS_CONNECTION_CLOSED;
s = resp->closedSts;
else
s = NATS_TIMEOUT;
}
Expand Down
1 change: 1 addition & 0 deletions test/list.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ IsReconnectingAndStatus
ReconnectBufSize
RetryOnFailedConnect
NoPartialOnReconnect
ReconnectFailsPendingRequests
ErrOnConnectAndDeadlock
ErrOnMaxPayloadLimit
Auth
Expand Down
65 changes: 65 additions & 0 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -17121,6 +17121,70 @@ test_NoPartialOnReconnect(void)
_stopServer(pid);
}

static void
_stopServerInThread(void *closure)
{
natsPid pid = *((natsPid*) closure);

nats_Sleep(150);
_stopServer(pid);
}

static void
test_ReconnectFailsPendingRequest(void)
{
natsStatus s;
natsOptions *opts = NULL;
natsConnection *nc = NULL;
natsSubscription *sub = NULL;
natsMsg *msg = NULL;
natsThread *t = NULL;
natsPid pid = NATS_INVALID_PID;
bool failr = false;
int iter;

for (iter=1; iter<=2; iter++)
{
failr = (iter == 2 ? true : false);

test("Create options: ");
s = natsOptions_Create(&opts);
IFOK(s, natsOptions_SetFailRequestsOnDisconnect(opts, failr));
testCond(s == NATS_OK);

test("Start server: ");
pid = _startServer("nats://127.0.0.1:4222", "-p 4222", true);
CHECK_SERVER_STARTED(pid);
testCond(true);

test("Connect: ");
s = natsConnection_Connect(&nc, opts);
testCond(s == NATS_OK);

test("Create service provider: ");
s = natsConnection_SubscribeSync(&sub, nc, "requests");
testCond(s == NATS_OK);

test("Start thread that will stop server: ");
s = natsThread_Create(&t, _stopServerInThread, (void*) &pid);
testCond(s == NATS_OK);

test((failr ? "Fails due to disconnect: " : "Fails due to timeout: "));
s = natsConnection_RequestString(&msg, nc, "requests", "help", 300);
testCond(s == (failr ? NATS_CONNECTION_DISCONNECTED : NATS_TIMEOUT));

natsThread_Join(t);
natsThread_Destroy(t);
t = NULL;
natsSubscription_Destroy(sub);
sub = NULL;
natsConnection_Destroy(nc);
nc = NULL;
natsOptions_Destroy(opts);
opts = NULL;
}
}

static void
test_HeadersNotSupported(void)
{
Expand Down Expand Up @@ -20531,6 +20595,7 @@ static testInfo allTests[] =
{"ReconnectBufSize", test_ReconnectBufSize},
{"RetryOnFailedConnect", test_RetryOnFailedConnect},
{"NoPartialOnReconnect", test_NoPartialOnReconnect},
{"ReconnectFailsPendingRequests", test_ReconnectFailsPendingRequest},

{"ErrOnConnectAndDeadlock", test_ErrOnConnectAndDeadlock},
{"ErrOnMaxPayloadLimit", test_ErrOnMaxPayloadLimit},
Expand Down

0 comments on commit 242427c

Please sign in to comment.