From 17dc90d9f8a8ed0fc91386ba9f4ec05c522ef099 Mon Sep 17 00:00:00 2001 From: Scott Fauerbach Date: Tue, 17 May 2022 17:56:03 -0400 Subject: [PATCH] statuses with code 409 are known statuses (#643) --- .../client/impl/PullStatusMessageManager.java | 2 +- .../client/impl/PushStatusMessageManager.java | 22 ++++++++++--------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/src/main/java/io/nats/client/impl/PullStatusMessageManager.java b/src/main/java/io/nats/client/impl/PullStatusMessageManager.java index 252cded0d..de9a3e454 100644 --- a/src/main/java/io/nats/client/impl/PullStatusMessageManager.java +++ b/src/main/java/io/nats/client/impl/PullStatusMessageManager.java @@ -21,7 +21,7 @@ class PullStatusMessageManager extends MessageManager { - private static final List PULL_KNOWN_STATUS_CODES = Arrays.asList(404, 408); + private static final List PULL_KNOWN_STATUS_CODES = Arrays.asList(404, 408, 409); private int lastStatusCode = -1; diff --git a/src/main/java/io/nats/client/impl/PushStatusMessageManager.java b/src/main/java/io/nats/client/impl/PushStatusMessageManager.java index 8b3bd9bf9..d1cdcf627 100644 --- a/src/main/java/io/nats/client/impl/PushStatusMessageManager.java +++ b/src/main/java/io/nats/client/impl/PushStatusMessageManager.java @@ -20,6 +20,8 @@ import io.nats.client.api.ConsumerConfiguration; import io.nats.client.support.Status; +import java.util.Arrays; +import java.util.List; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.atomic.AtomicLong; @@ -28,6 +30,8 @@ class PushStatusMessageManager extends MessageManager { + private static final List PUSH_KNOWN_STATUS_CODES = Arrays.asList(409); + private static final int THRESHOLD = 3; private final NatsConnection conn; @@ -175,22 +179,20 @@ boolean manage(Message msg) { if (fc) { _processFlowControl(msg.getReplyTo(), ErrorListener.FlowControlSource.FLOW_CONTROL); } - return true; } - - if (status.isHeartbeat()) { + else if (status.isHeartbeat()) { if (fc) { // status flowControlSubject is set in the beforeQueueProcessor _processFlowControl(extractFcSubject(msg), ErrorListener.FlowControlSource.HEARTBEAT); } - return true; } - - // this status is unknown to us, always use the error handler. - // If it's a sync call, also throw an exception - conn.getOptions().getErrorListener().unhandledStatus(conn, sub, status); - if (syncMode) { - throw new JetStreamStatusException(sub, status); + else if (!PUSH_KNOWN_STATUS_CODES.contains(status.getCode())) { + // If this status is unknown to us, always use the error handler. + // If it's a sync call, also throw an exception + conn.getOptions().getErrorListener().unhandledStatus(conn, sub, status); + if (syncMode) { + throw new JetStreamStatusException(sub, status); + } } return true; }