diff --git a/src/examples/java/io/nats/examples/benchmark/NatsBench2.java b/src/examples/java/io/nats/examples/benchmark/NatsBench2.java index 94cba0e0a..bfa4eee91 100644 --- a/src/examples/java/io/nats/examples/benchmark/NatsBench2.java +++ b/src/examples/java/io/nats/examples/benchmark/NatsBench2.java @@ -27,6 +27,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import static io.nats.client.support.NatsConstants.OUTPUT_QUEUE_IS_FULL; + /** * A utility class for measuring NATS performance, similar to the version in go * and node. The various tradeoffs to make this code act/work like the other @@ -279,7 +281,7 @@ public void run() { nc.publish(subject, payload); success = true; } catch (IllegalStateException ex) { - if (ex.getMessage().contains("Output queue is full")) { + if (ex.getMessage().contains(OUTPUT_QUEUE_IS_FULL)) { success = false; Thread.sleep(1000); } else { diff --git a/src/main/java/io/nats/client/Connection.java b/src/main/java/io/nats/client/Connection.java index 4b053ccf8..b63a9dfbc 100644 --- a/src/main/java/io/nats/client/Connection.java +++ b/src/main/java/io/nats/client/Connection.java @@ -541,15 +541,14 @@ enum Status { /** * Immediately flushes the underlying connection buffer if the connection is valid. * @throws IOException the connection flush fails - * @throws IllegalStateException the connection is not connected */ void flushBuffer() throws IOException; /** * Forces reconnect behavior. Stops the current connection including the reading and writing, * copies already queued outgoing messages, and then begins the reconnect logic. - * @throws IOException the force reconnected fails - * @throws InterruptedException if one is thrown, in order to propagate it up + * @throws IOException the forceReconnect fails + * @throws InterruptedException the connection is not connected */ void forceReconnect() throws IOException, InterruptedException; diff --git a/src/main/java/io/nats/client/Options.java b/src/main/java/io/nats/client/Options.java index 7b02c575c..9c97fc5d9 100644 --- a/src/main/java/io/nats/client/Options.java +++ b/src/main/java/io/nats/client/Options.java @@ -120,6 +120,11 @@ public class Options { */ public static final Duration DEFAULT_SOCKET_WRITE_TIMEOUT = Duration.ofMinutes(1); + /** + * Constant used for calculating if a socket write timeout is large enough. + */ + public static final long MINIMUM_SOCKET_WRITE_TIMEOUT_GT_CONNECTION_TIMEOUT = 100; + /** * Default server ping interval. The client will send a ping to the server on this interval to insure liveness. * The server may send pings to the client as well, these are handled automatically by the library, @@ -1240,11 +1245,23 @@ public Builder maxControlLine(int bytes) { * Set the timeout for connection attempts. Each server in the options is allowed this timeout * so if 3 servers are tried with a timeout of 5s the total time could be 15s. * - * @param time the time to wait + * @param connectionTimeout the time to wait + * @return the Builder for chaining + */ + public Builder connectionTimeout(Duration connectionTimeout) { + this.connectionTimeout = connectionTimeout; + return this; + } + + /** + * Set the timeout for connection attempts. Each server in the options is allowed this timeout + * so if 3 servers are tried with a timeout of 5s the total time could be 15s. + * + * @param connectionTimeoutMillis the time to wait in milliseconds * @return the Builder for chaining */ - public Builder connectionTimeout(Duration time) { - this.connectionTimeout = time; + public Builder connectionTimeout(long connectionTimeoutMillis) { + this.connectionTimeout = Duration.ofMillis(connectionTimeoutMillis); return this; } @@ -1728,9 +1745,17 @@ else if (useDefaultTls) { new DefaultThreadFactory(threadPrefix)); } - if (socketWriteTimeout != null && socketWriteTimeout.toMillis() < 1) { + if (socketWriteTimeout == null || socketWriteTimeout.toMillis() < 1) { socketWriteTimeout = null; } + else { + long swtMin = connectionTimeout.toMillis() + MINIMUM_SOCKET_WRITE_TIMEOUT_GT_CONNECTION_TIMEOUT; + if (socketWriteTimeout.toMillis() < swtMin) { + throw new IllegalStateException("Socket Write Timeout must be at least " + + MINIMUM_SOCKET_WRITE_TIMEOUT_GT_CONNECTION_TIMEOUT + + " milliseconds greater than the Connection Timeout"); + } + } if (errorListener == null) { errorListener = new ErrorListenerLoggerImpl(); diff --git a/src/main/java/io/nats/client/impl/MessageQueue.java b/src/main/java/io/nats/client/impl/MessageQueue.java index 9ae7111bc..a3c6c5e78 100644 --- a/src/main/java/io/nats/client/impl/MessageQueue.java +++ b/src/main/java/io/nats/client/impl/MessageQueue.java @@ -24,6 +24,7 @@ import java.util.function.Predicate; import static io.nats.client.support.NatsConstants.EMPTY_BODY; +import static io.nats.client.support.NatsConstants.OUTPUT_QUEUE_IS_FULL; class MessageQueue { protected static final int STOPPED = 0; @@ -35,9 +36,10 @@ class MessageQueue { protected final AtomicInteger running; protected final boolean singleReaderMode; protected final LinkedBlockingQueue queue; - protected final Lock filterLock; + protected final Lock editLock; protected final int publishHighwaterMark; protected final boolean discardWhenFull; + protected final long offerLockMillis; protected final long offerTimeoutMillis; protected final Duration requestCleanupInterval; @@ -47,7 +49,11 @@ class MessageQueue { protected final NatsMessage poisonPill; MessageQueue(boolean singleReaderMode, Duration requestCleanupInterval) { - this(singleReaderMode, -1, false, requestCleanupInterval); + this(singleReaderMode, -1, false, requestCleanupInterval, null); + } + + MessageQueue(boolean singleReaderMode, Duration requestCleanupInterval, MessageQueue source) { + this(singleReaderMode, -1, false, requestCleanupInterval, source); } /** @@ -61,31 +67,40 @@ class MessageQueue { * @param requestCleanupInterval is used to figure the offerTimeoutMillis */ MessageQueue(boolean singleReaderMode, int publishHighwaterMark, boolean discardWhenFull, Duration requestCleanupInterval) { + this(singleReaderMode, publishHighwaterMark, discardWhenFull, requestCleanupInterval, null); + } + + MessageQueue(boolean singleReaderMode, int publishHighwaterMark, boolean discardWhenFull, Duration requestCleanupInterval, MessageQueue source) { this.publishHighwaterMark = publishHighwaterMark; this.queue = publishHighwaterMark > 0 ? new LinkedBlockingQueue<>(publishHighwaterMark) : new LinkedBlockingQueue<>(); this.discardWhenFull = discardWhenFull; this.running = new AtomicInteger(RUNNING); this.sizeInBytes = new AtomicLong(0); this.length = new AtomicLong(0); - this.offerTimeoutMillis = calculateOfferTimeoutMillis(requestCleanupInterval); + this.offerLockMillis = requestCleanupInterval.toMillis(); + this.offerTimeoutMillis = Math.max(1, requestCleanupInterval.toMillis() * 95 / 100); // The poisonPill is used to stop poll and accumulate when the queue is stopped this.poisonPill = new NatsMessage("_poison", null, EMPTY_BODY); - this.filterLock = new ReentrantLock(); + editLock = new ReentrantLock(); this.singleReaderMode = singleReaderMode; this.requestCleanupInterval = requestCleanupInterval; + + if (source != null) { + source.drainTo(this); + } } - MessageQueue(MessageQueue source) { - this(source.singleReaderMode, source.publishHighwaterMark, source.discardWhenFull, source.requestCleanupInterval); - source.queue.drainTo(queue); - length.set(queue.size()); - } - - private static long calculateOfferTimeoutMillis(Duration requestCleanupInterval) { - return Math.max(1, requestCleanupInterval.toMillis() * 95 / 100); + void drainTo(MessageQueue target) { + editLock.lock(); + try { + queue.drainTo(target.queue); + target.length.set(queue.size()); + } finally { + editLock.unlock(); + } } boolean isSingleReaderMode() { @@ -124,21 +139,36 @@ boolean push(NatsMessage msg) { } boolean push(NatsMessage msg, boolean internal) { - this.filterLock.lock(); + long start = System.currentTimeMillis(); + try { + // try to get the lock, but don't wait forever + // assuming that if we are waiting for the lock + // another push likely has the lock and + if (!editLock.tryLock(offerLockMillis, TimeUnit.MILLISECONDS)) { + throw new IllegalStateException(OUTPUT_QUEUE_IS_FULL + queue.size()); + } + } + catch (InterruptedException e) { + return false; + } + try { - // If we aren't running, then we need to obey the filter lock - // to avoid ordering problems if (!internal && this.discardWhenFull) { return this.queue.offer(msg); } - if (!this.offer(msg)) { - throw new IllegalStateException("Output queue is full " + queue.size()); + + long timeoutLeft = Math.max(100, offerTimeoutMillis - (System.currentTimeMillis() - start)); + + if (!this.queue.offer(msg, timeoutLeft, TimeUnit.MILLISECONDS)) { + throw new IllegalStateException(OUTPUT_QUEUE_IS_FULL + queue.size()); } this.sizeInBytes.getAndAdd(msg.getSizeInBytes()); this.length.incrementAndGet(); return true; + } catch (InterruptedException ie) { + return false; } finally { - this.filterLock.unlock(); + editLock.unlock(); } } @@ -154,14 +184,6 @@ void poisonTheQueue() { } } - boolean offer(NatsMessage msg) { - try { - return this.queue.offer(msg, offerTimeoutMillis, TimeUnit.MILLISECONDS); - } catch (InterruptedException ie) { - return false; - } - } - NatsMessage poll(Duration timeout) throws InterruptedException { NatsMessage msg = null; @@ -289,7 +311,7 @@ long sizeInBytes() { } void filter(Predicate p) { - this.filterLock.lock(); + editLock.lock(); try { if (this.isRunning()) { throw new IllegalStateException("Filter is only supported when the queue is paused"); @@ -307,7 +329,7 @@ void filter(Predicate p) { } this.queue.addAll(newQueue); } finally { - this.filterLock.unlock(); + editLock.unlock(); } } -} \ No newline at end of file +} diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index 178907c76..8a1899637 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -45,6 +45,7 @@ class NatsConnection implements Connection { public static final double NANOS_PER_SECOND = 1_000_000_000.0; + private final Options options; private final StatisticsCollector statistics; @@ -52,8 +53,7 @@ class NatsConnection implements Connection { private boolean connecting; // you can only connect in one thread private boolean disconnecting; // you can only disconnect in one thread private boolean closing; // respect a close call regardless - private Exception exceptionDuringConnectChange; // an exception occurred in another thread while disconnecting or - // connecting + private Exception exceptionDuringConnectChange; // exception occurred in another thread while dis/connecting private final ReentrantLock closeSocketLock; private Status status; @@ -66,14 +66,13 @@ class NatsConnection implements Connection { private CompletableFuture reconnectWaiter; private final HashMap serverAuthErrors; - private final NatsConnectionReader reader; + private NatsConnectionReader reader; private NatsConnectionWriter writer; private final AtomicReference serverInfo; private final Map subscribers; - private final Map dispatchers; // use a concurrent map so we get more consistent iteration - // behavior + private final Map dispatchers; // use a concurrent map so we get more consistent iteration behavior private final Collection connectionListeners; private final Map responsesAwaiting; private final Map responsesRespondedTo; @@ -93,9 +92,9 @@ class NatsConnection implements Connection { private final AtomicReference lastError; private final AtomicReference> draining; private final AtomicBoolean blockPublishForDrain; + private final AtomicBoolean tryingToConnect; private final ExecutorService callbackRunner; - private final ExecutorService executor; private final ExecutorService connectExecutor; private final boolean advancedTracking; @@ -109,7 +108,7 @@ class NatsConnection implements Connection { NatsConnection(Options options) { trace = options.isTraceConnection(); - timeTraceLogger = options.getTimeTraceLogger();; + timeTraceLogger = options.getTimeTraceLogger(); timeTraceLogger.trace("creating connection object"); this.options = options; @@ -152,15 +151,16 @@ class NatsConnection implements Connection { this.pongQueue = new ConcurrentLinkedDeque<>(); this.draining = new AtomicReference<>(); this.blockPublishForDrain = new AtomicBoolean(); + this.tryingToConnect = new AtomicBoolean(); timeTraceLogger.trace("creating executors"); - this.callbackRunner = Executors.newSingleThreadExecutor(); this.executor = options.getExecutor(); + this.callbackRunner = Executors.newSingleThreadExecutor(); this.connectExecutor = Executors.newSingleThreadExecutor(); timeTraceLogger.trace("creating reader and writer"); this.reader = new NatsConnectionReader(this); - this.writer = new NatsConnectionWriter(this); + this.writer = new NatsConnectionWriter(this, null); this.needPing = new AtomicBoolean(true); @@ -175,6 +175,18 @@ class NatsConnection implements Connection { // Connect is only called after creation void connect(boolean reconnectOnConnect) throws InterruptedException, IOException { + if (!tryingToConnect.get()) { + try { + tryingToConnect.set(true); + connectImpl(reconnectOnConnect); + } + finally { + tryingToConnect.set(false); + } + } + } + + void connectImpl(boolean reconnectOnConnect) throws InterruptedException, IOException { if (options.getServers().isEmpty()) { throw new IllegalArgumentException("No servers provided in options"); } @@ -238,7 +250,7 @@ else if (cur.equals(first)) { if (!isConnected() && !isClosed()) { if (reconnectOnConnect) { timeTraceLogger.trace("trying to reconnect on connect"); - reconnect(); + reconnectImpl(); // call the impl here otherwise the tryingToConnect guard will block the behavior } else { timeTraceLogger.trace("connection failed, closing to cleanup"); @@ -246,12 +258,9 @@ else if (cur.equals(first)) { String err = connectError.get(); if (this.isAuthenticationError(err)) { - String msg = "Authentication error connecting to NATS server: " + err; - throw new AuthenticationException(msg); - } else { - String msg = "Unable to connect to NATS servers: " + failList; - throw new IOException(msg); + throw new AuthenticationException("Authentication error connecting to NATS server: " + err); } + throw new IOException("Unable to connect to NATS servers: " + failList); } } else if (trace) { @@ -263,21 +272,80 @@ else if (trace) { @Override public void forceReconnect() throws IOException, InterruptedException { + if (!tryingToConnect.get()) { + try { + tryingToConnect.set(true); + forceReconnectImpl(); + } + finally { + tryingToConnect.set(false); + } + } + } + + void forceReconnectImpl() throws IOException, InterruptedException { + NatsConnectionWriter oldWriter = writer; + closeSocketLock.lock(); try { updateStatus(Status.DISCONNECTED); + + // Close and reset the current data port and future + if (dataPortFuture != null) { + dataPortFuture.cancel(true); + dataPortFuture = null; + } + if (dataPort != null) { + try { + dataPort.close(); + } + catch (IOException ignore) { + } + finally { + dataPort = null; + } + } + + // stop i/o reader.stop(); writer.stop(); - writer = new NatsConnectionWriter(writer); + + // new reader/writer + reader = new NatsConnectionReader(this); + writer = new NatsConnectionWriter(this, writer); } finally { closeSocketLock.unlock(); } - reconnect(); + + try { + // calling connect just starts like a new connection versus reconnect + // but we have to manually resubscribe like reconnect once it is connected + // also, lets assume we never want to try the currently connected server + serverPool.connectFailed(currentServer); // we don't want to connect to the same server + reconnectImpl(); + writer.setReconnectMode(false); + } + catch (InterruptedException e) { + // if there is an exception close() will have been called already + Thread.currentThread().interrupt(); + } + } + + void reconnect() throws InterruptedException { + if (!tryingToConnect.get()) { + try { + tryingToConnect.set(true); + reconnectImpl(); + } + finally { + tryingToConnect.set(false); + } + } } // Reconnect can only be called when the connection is disconnected - void reconnect() throws InterruptedException { + void reconnectImpl() throws InterruptedException { if (isClosed()) { return; } @@ -368,12 +436,13 @@ else if (first.equals(cur)) { this.processException(exp); } - // When the flush returns we are done sending internal messages, so we can - // switch to the - // non-reconnect queue - this.writer.setReconnectMode(false); - processConnectionEvent(Events.RESUBSCRIBED); + + processConnectionEvent(Events.RECONNECTED); + + // When the flush returns we are done sending internal messages, + // so we can switch to the non-reconnect queue + this.writer.setReconnectMode(false); } long timeCheck(long endNanos, String message) throws TimeoutException { @@ -519,7 +588,12 @@ void tryToConnect(NatsUri cur, NatsUri resolved, long now) { this.timer.schedule(new TimerTask() { public void run() { if (isConnected()) { - softPing(); // The timer always uses the standard queue + try { + softPing(); // The timer always uses the standard queue + } + catch (Exception e) { + // it's running in a thread, there is no point throwing here + } } } }, pingMillis, pingMillis); @@ -553,10 +627,7 @@ public void run() { statusLock.unlock(); } timeTraceLogger.trace("status updated"); - } catch (RuntimeException exp) { // runtime exceptions, like illegalArgs - processException(exp); - throw exp; - } catch (Exception exp) { // every thing else + } catch (Exception exp) { processException(exp); try { this.closeSocket(false); @@ -625,6 +696,7 @@ void handleCommunicationIssue(Exception io) { this.closeSocket(true); } catch (InterruptedException e) { processException(e); + Thread.currentThread().interrupt(); } }); } @@ -634,10 +706,8 @@ void handleCommunicationIssue(Exception io) { void closeSocket(boolean tryReconnectIfConnected) throws InterruptedException { // Ensure we close the socket exclusively within one thread. closeSocketLock.lock(); - try { boolean wasConnected; - statusLock.lock(); try { if (isDisconnectingOrClosed()) { @@ -675,7 +745,7 @@ void closeSocket(boolean tryReconnectIfConnected) throws InterruptedException { } // Close socket is called when another connect attempt is possible - // Close is called when the connection should shutdown, period + // Close is called when the connection should shut down, period /** * {@inheritDoc} */ @@ -874,8 +944,7 @@ void publishInternal(String subject, String replyTo, Headers headers, byte[] dat throw new IllegalStateException("Connection is Draining"); // Ok to publish while waiting on subs } - Connection.Status stat = this.status; - if ((stat == Status.RECONNECTING || stat == Status.DISCONNECTED) + if ((status == Status.RECONNECTING || status == Status.DISCONNECTED) && !this.writer.canQueueDuringReconnect(npm)) { throw new IllegalStateException( "Unable to queue any more messages during reconnect, max buffer is " + options.getReconnectBufferSize()); @@ -1419,8 +1488,6 @@ public Duration RTT() throws IOException { // for a specific pong. Note, if no pong returns the wait will not return // without setting a timeout. CompletableFuture sendPing(boolean treatAsInternal) { - int max = this.options.getMaxPingsOut(); - if (!isConnectedOrConnecting()) { CompletableFuture retVal = new CompletableFuture<>(); retVal.complete(Boolean.FALSE); @@ -1434,6 +1501,7 @@ CompletableFuture sendPing(boolean treatAsInternal) { return retVal; } + int max = options.getMaxPingsOut(); if (max > 0 && pongQueue.size() + 1 > max) { handleCommunicationIssue(new IllegalStateException("Max outgoing Ping count exceeded.")); return null; @@ -1872,6 +1940,10 @@ boolean isConnected() { return this.status == Status.CONNECTED; } + boolean isDisconnected() { + return this.status == Status.DISCONNECTED; + } + boolean isConnectedOrConnecting() { statusLock.lock(); try { @@ -2107,7 +2179,8 @@ public CompletableFuture drain(Duration timeout) throws TimeoutExceptio try { this.close(false);// close the connection after the last flush } catch (InterruptedException e) { - this.processException(e); + processException(e); + Thread.currentThread().interrupt(); } tracker.complete(false); } diff --git a/src/main/java/io/nats/client/impl/NatsConnectionWriter.java b/src/main/java/io/nats/client/impl/NatsConnectionWriter.java index e421668d0..f7acda1ae 100644 --- a/src/main/java/io/nats/client/impl/NatsConnectionWriter.java +++ b/src/main/java/io/nats/client/impl/NatsConnectionWriter.java @@ -1,3 +1,4 @@ + // Copyright 2015-2018 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -39,7 +40,7 @@ class NatsConnectionWriter implements Runnable { private final ReentrantLock writerLock; private Future stopped; private Future dataPortFuture; - private DataPort dataPort = null; + private DataPort dataPort; private final AtomicBoolean running; private final AtomicBoolean reconnectMode; private final ReentrantLock startStopLock; @@ -51,12 +52,12 @@ class NatsConnectionWriter implements Runnable { private final MessageQueue reconnectOutgoing; private final long reconnectBufferSize; - NatsConnectionWriter(NatsConnection connection) { + NatsConnectionWriter(NatsConnection connection, NatsConnectionWriter sourceWriter) { this.connection = connection; writerLock = new ReentrantLock(); this.running = new AtomicBoolean(false); - this.reconnectMode = new AtomicBoolean(false); + this.reconnectMode = new AtomicBoolean(sourceWriter != null); this.startStopLock = new ReentrantLock(); this.stopped = new CompletableFuture<>(); ((CompletableFuture)this.stopped).complete(Boolean.TRUE); // we are stopped on creation @@ -69,34 +70,15 @@ class NatsConnectionWriter implements Runnable { outgoing = new MessageQueue(true, options.getMaxMessagesInOutgoingQueue(), options.isDiscardMessagesWhenOutgoingQueueFull(), - options.getRequestCleanupInterval()); + options.getRequestCleanupInterval(), + sourceWriter == null ? null : sourceWriter.outgoing); // The "reconnect" buffer contains internal messages, and we will keep it unlimited in size - reconnectOutgoing = new MessageQueue(true, options.getRequestCleanupInterval()); + reconnectOutgoing = new MessageQueue(true, options.getRequestCleanupInterval(), + sourceWriter == null ? null : sourceWriter.reconnectOutgoing); reconnectBufferSize = options.getReconnectBufferSize(); } - NatsConnectionWriter(NatsConnectionWriter sourceWriter) { - this.connection = sourceWriter.connection; - writerLock = new ReentrantLock(); - - this.running = new AtomicBoolean(false); - this.reconnectMode = new AtomicBoolean(false); - this.startStopLock = new ReentrantLock(); - this.stopped = new CompletableFuture<>(); - ((CompletableFuture)this.stopped).complete(Boolean.TRUE); // we are stopped on creation - - int sbl = sourceWriter.sendBufferLength.get(); - sendBufferLength = new AtomicInteger(); - sendBuffer = new byte[sbl]; - - outgoing = new MessageQueue(sourceWriter.outgoing); - - // The "reconnect" buffer contains internal messages, and we will keep it unlimited in size - reconnectOutgoing = new MessageQueue(sourceWriter.reconnectOutgoing); - reconnectBufferSize = sourceWriter.reconnectBufferSize; - } - // Should only be called if the current thread has exited. // Use the Future from stop() to determine if it is ok to call this. // This method resets that future so mistiming can result in badness. @@ -132,7 +114,6 @@ Future stop() { this.startStopLock.unlock(); } } - return this.stopped; } diff --git a/src/main/java/io/nats/client/impl/SocketDataPortWithWriteTimeout.java b/src/main/java/io/nats/client/impl/SocketDataPortWithWriteTimeout.java index 5feb1a9f9..855123af9 100644 --- a/src/main/java/io/nats/client/impl/SocketDataPortWithWriteTimeout.java +++ b/src/main/java/io/nats/client/impl/SocketDataPortWithWriteTimeout.java @@ -42,10 +42,12 @@ public void run() { out.close(); } catch (IOException ignore) {} - try { - connection.forceReconnect(); - } - catch (InterruptedException | IOException ignore) {} + connection.getExecutor().submit(() -> { + try { + connection.forceReconnect(); + } + catch (IOException | InterruptedException ignore) {} + }); } } } @@ -80,6 +82,11 @@ public void write(byte[] src, int toWrite) throws IOException { public void close() throws IOException { try { writeWatcherTask.cancel(); + } + catch (Exception ignore) { + // don't want this to be passed along + } + try { writeWatcherTimer.cancel(); } catch (Exception ignore) { diff --git a/src/main/java/io/nats/client/support/NatsConstants.java b/src/main/java/io/nats/client/support/NatsConstants.java index 07b753096..c00882253 100644 --- a/src/main/java/io/nats/client/support/NatsConstants.java +++ b/src/main/java/io/nats/client/support/NatsConstants.java @@ -90,5 +90,8 @@ public interface NatsConstants { String INVALID_HEADER_STATUS_CODE = "Invalid header status code"; String SERIALIZED_HEADER_CANNOT_BE_NULL_OR_EMPTY = "Serialized header cannot be null or empty."; + // The trailing space is intentional as in "Output queue is full 5000" + String OUTPUT_QUEUE_IS_FULL = "Output queue is full "; + long NANOS_PER_MILLI = 1_000_000L; } diff --git a/src/test/java/io/nats/client/impl/ConnectionListenerTests.java b/src/test/java/io/nats/client/impl/ConnectionListenerTests.java index 4fb949dc7..00d9651a1 100644 --- a/src/test/java/io/nats/client/impl/ConnectionListenerTests.java +++ b/src/test/java/io/nats/client/impl/ConnectionListenerTests.java @@ -73,7 +73,7 @@ public void testDiscoveredServersCountAndListenerInOptions() throws Exception { @Test public void testDisconnectReconnectCount() throws Exception { int port; - Connection nc = null; + Connection nc; ListenerForTesting listener = new ListenerForTesting(); try (NatsTestServer ts = new NatsTestServer(false)) { Options options = new Options.Builder(). @@ -96,7 +96,7 @@ public void testDisconnectReconnectCount() throws Exception { try (NatsTestServer ts = new NatsTestServer(port, false)) { standardConnectionWait(nc); - assertEquals(1, listener.getEventCount(Events.RECONNECTED)); + assertEquals(2, listener.getEventCount(Events.RECONNECTED)); assertEquals(ts.getURI(), nc.getConnectedUrl()); standardCloseConnection(nc); } diff --git a/src/test/java/io/nats/client/impl/ListenerForTesting.java b/src/test/java/io/nats/client/impl/ListenerForTesting.java index f8d117327..48f14a0b1 100644 --- a/src/test/java/io/nats/client/impl/ListenerForTesting.java +++ b/src/test/java/io/nats/client/impl/ListenerForTesting.java @@ -100,9 +100,7 @@ private boolean waitForBooleanFuture(CompletableFuture future, long tim try { return future.get(timeout, units); } catch (TimeoutException | ExecutionException | InterruptedException e) { - if (printExceptions) { - e.printStackTrace(); - } + maybePrintException("waitForBooleanFuture", e); return false; } } @@ -111,13 +109,18 @@ private T waitForFuture(CompletableFuture future, long waitInMillis) { try { return future.get(waitInMillis, TimeUnit.MILLISECONDS); } catch (TimeoutException | ExecutionException | InterruptedException e) { - if (printExceptions) { - e.printStackTrace(); - } + maybePrintException("waitForFuture", e); return null; } } + private void maybePrintException(String label, Exception e) { + if (printExceptions) { + System.err.print("LFT " + label + ": "); + e.printStackTrace(); + } + } + public void prepForStatusChange(Events waitFor) { prepLock.lock(); try { @@ -145,9 +148,7 @@ public void exceptionOccurred(Connection conn, Exception exp) { if (verbose) { report("exceptionOccurred", "conn:" + conn.hashCode() + ", " + exp); } - if (printExceptions) { - exp.printStackTrace(); - } + maybePrintException("exceptionOccurred", exp); } } diff --git a/src/test/java/io/nats/client/impl/MessageQueueTests.java b/src/test/java/io/nats/client/impl/MessageQueueTests.java index fe6431ee3..f246e1317 100644 --- a/src/test/java/io/nats/client/impl/MessageQueueTests.java +++ b/src/test/java/io/nats/client/impl/MessageQueueTests.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import static io.nats.client.support.NatsConstants.OUTPUT_QUEUE_IS_FULL; import static org.junit.jupiter.api.Assertions.*; public class MessageQueueTests { @@ -584,7 +585,7 @@ public void testExceptionWhenQueueIsFull() { q.push(msg3); fail("Expected " + IllegalStateException.class.getSimpleName()); } catch (IllegalStateException e) { - assertEquals("Output queue is full 2", e.getMessage()); + assertEquals(OUTPUT_QUEUE_IS_FULL + "2", e.getMessage()); } } diff --git a/src/test/java/io/nats/client/impl/ReconnectTests.java b/src/test/java/io/nats/client/impl/ReconnectTests.java index 79dc523e8..8ed366697 100644 --- a/src/test/java/io/nats/client/impl/ReconnectTests.java +++ b/src/test/java/io/nats/client/impl/ReconnectTests.java @@ -32,6 +32,7 @@ import java.util.function.BiConsumer; import java.util.function.Function; +import static io.nats.client.support.NatsConstants.OUTPUT_QUEUE_IS_FULL; import static io.nats.client.utils.TestBase.*; import static org.junit.jupiter.api.Assertions.*; @@ -744,6 +745,8 @@ public void testSocketDataPortTimeout() throws Exception { ListenerForTesting listener = new ListenerForTesting(); Options.Builder builder = Options.builder() .socketWriteTimeout(5000) + .pingInterval(Duration.ofSeconds(1)) + .maxMessagesInOutgoingQueue(100) .dataPortType(SocketDataPortBlockSimulator.class.getCanonicalName()) .connectionListener(listener) .errorListener(listener); @@ -757,7 +760,6 @@ public void testSocketDataPortTimeout() throws Exception { NatsRunnerUtils.getNatsLocalhostUri(port1), NatsRunnerUtils.getNatsLocalhostUri(port2) }; - //noinspection resource Connection nc = standardConnection(builder.servers(servers).build()); String subject = subject(); int connectedPort = nc.getServerInfo().getPort(); @@ -770,12 +772,13 @@ public void testSocketDataPortTimeout() throws Exception { } } catch (Exception e) { - if (e.getMessage().contains("Output queue is full")) { + if (e.getMessage().contains(OUTPUT_QUEUE_IS_FULL)) { gotOutputQueueIsFull.set(true); } } } assertNotEquals(connectedPort, nc.getServerInfo().getPort()); + nc.close(); })); assertTrue(gotOutputQueueIsFull.get()); diff --git a/src/test/java/io/nats/client/impl/SocketDataPortBlockSimulator.java b/src/test/java/io/nats/client/impl/SocketDataPortBlockSimulator.java index 99755ad64..032b753eb 100644 --- a/src/test/java/io/nats/client/impl/SocketDataPortBlockSimulator.java +++ b/src/test/java/io/nats/client/impl/SocketDataPortBlockSimulator.java @@ -74,22 +74,33 @@ public void connect(NatsConnection conn, NatsUri nuri, long timeoutNanos) throws public static AtomicLong SIMULATE_SOCKET_BLOCK = new AtomicLong(); AtomicLong blocking = new AtomicLong(); public void write(byte[] src, int toWrite) throws IOException { - writeMustBeDoneBy = System.nanoTime() + writeTimeoutNanos; - blocking.set(SIMULATE_SOCKET_BLOCK.get()); - while (blocking.get() > 0) { - try { - Thread.sleep(100); - blocking.addAndGet(-100); + try { + writeMustBeDoneBy = System.nanoTime() + writeTimeoutNanos; + blocking.set(SIMULATE_SOCKET_BLOCK.get()); + while (blocking.get() > 0) { + try { + Thread.sleep(100); + blocking.addAndGet(-100); + } + catch (InterruptedException ignore) { + return; + } } - catch (InterruptedException ignore) {} + out.write(src, 0, toWrite); + } + finally { + writeMustBeDoneBy = Long.MAX_VALUE; } - out.write(src, 0, toWrite); - writeMustBeDoneBy = Long.MAX_VALUE; } public void close() throws IOException { try { writeWatcherTask.cancel(); + } + catch (Exception ignore) { + // don't want this to be passed along + } + try { writeWatcherTimer.cancel(); } catch (Exception ignore) {