diff --git a/.travis/deploying.md b/.travis/deploying.md index 41b91a5ad..fd8490309 100644 --- a/.travis/deploying.md +++ b/.travis/deploying.md @@ -13,6 +13,13 @@ Travis doesn't support sonatype deploy correctly. There is an issue where the va The gradle close and release process will fail if there is more than one repository staged. You may need to manually drop repositories from staging during testing on a single version number. +## Before you Release + +1. Check that the Nats.java version is updated. +2. Check that the version in gradle.build is updated, including the jar file versions +3. Check dependency versions. +4. Check that the changelog.md is ready + ## Manually Deploying You can deploy manually by setting up your gradle.properties to have: diff --git a/CHANGELOG.md b/CHANGELOG.md index 0da479419..0a0aaf835 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,11 @@ # Change Log +## Version 2.1.1 + +* [FIXED] Issue with version in Nats.java, also updated deploying.md with checklist +* [FIXED] Fixed issue during reconnect where buffered messages blocked protocol messages + ## Version 2.1.0 * [ADDED] Support for consumer or connection drain. (New API lead to version bump.) diff --git a/build.gradle b/build.gradle index 2880f157c..11a6d1d35 100644 --- a/build.gradle +++ b/build.gradle @@ -13,7 +13,7 @@ plugins { // Update version here, repeated check-ins not into master will have snapshot on them def versionMajor = 2 def versionMinor = 1 -def versionPatch = 0 +def versionPatch = 1 def versionModifier = "" def branch = System.getenv("TRAVIS_BRANCH"); @@ -70,7 +70,7 @@ osgiClasses { jar { manifest { attributes('Implementation-Title': 'Java Nats', - 'Implementation-Version': '2.0.1', + 'Implementation-Version': '2.1.1', 'Implementation-Vendor': 'nats.io') } exclude("io/nats/examples/**") @@ -112,7 +112,7 @@ task examplesJar(type: Jar) { classifier = 'examples' manifest { attributes('Implementation-Title': 'Java Nats Examples', - 'Implementation-Version': '2.0.1', + 'Implementation-Version': '2.1.1', 'Implementation-Vendor': 'nats.io') } from(sourceSets.main.output) { diff --git a/src/main/java/io/nats/client/Nats.java b/src/main/java/io/nats/client/Nats.java index 429ed5b75..d532ad7b5 100644 --- a/src/main/java/io/nats/client/Nats.java +++ b/src/main/java/io/nats/client/Nats.java @@ -72,7 +72,7 @@ public class Nats { /** * Current version of the library - {@value #CLIENT_VERSION} */ - public static final String CLIENT_VERSION = "2.0.0"; + public static final String CLIENT_VERSION = "2.1.1"; /** * Current language of the library - {@value #CLIENT_LANGUAGE} diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index 0d5b54b2a..55b06b8b8 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -202,6 +202,8 @@ void reconnect() throws InterruptedException { return; } + this.writer.setReconnectMode(true); + while (!isConnected() && !isClosed() && !this.isClosing()) { Collection serversToTry = buildReconnectList(); @@ -243,8 +245,8 @@ void reconnect() throws InterruptedException { } this.subscribers.forEach((sid, sub) -> { - if (!sub.isDraining()) { - sendSubscriptionMessage(sub.getSID(), sub.getSubject(), sub.getQueueName()); + if (sub.getDispatcher() == null && !sub.isDraining()) { + sendSubscriptionMessage(sub.getSID(), sub.getSubject(), sub.getQueueName(), true); } }); @@ -259,6 +261,8 @@ void reconnect() throws InterruptedException { } catch (Exception exp) { this.processException(exp); } + + this.writer.setReconnectMode(false); processConnectionEvent(Events.RESUBSCRIBED); } @@ -324,7 +328,7 @@ void tryToConnect(String serverURI) { this.timer.schedule(new TimerTask() { public void run() { if (isConnected()) { - sendPing(); + softPing(); // The timer always uses the standard queue } } }, pingMillis, pingMillis); @@ -700,7 +704,7 @@ void sendUnsub(NatsSubscription sub, int after) { protocolBuilder.append(String.valueOf(after)); } NatsMessage unsubMsg = new NatsMessage(protocolBuilder.toString()); - queueOutgoing(unsubMsg); + queueInternalOutgoing(unsubMsg); } // Assumes the null/empty checks were handled elsewhere @@ -718,11 +722,11 @@ NatsSubscription createSubscription(String subject, String queueName, NatsDispat sub = new NatsSubscription(sid, subject, queueName, this, dispatcher); subscribers.put(sid, sub); - sendSubscriptionMessage(sid, subject, queueName); + sendSubscriptionMessage(sid, subject, queueName, false); return sub; } - void sendSubscriptionMessage(CharSequence sid, String subject, String queueName) { + void sendSubscriptionMessage(CharSequence sid, String subject, String queueName, boolean treatAsInternal) { if (!isConnected()) { return;// We will setup sub on reconnect or ignore } @@ -740,7 +744,12 @@ void sendSubscriptionMessage(CharSequence sid, String subject, String queueName) protocolBuilder.append(" "); protocolBuilder.append(sid); NatsMessage subMsg = new NatsMessage(protocolBuilder.toString()); - queueOutgoing(subMsg); + + if (treatAsInternal) { + queueInternalOutgoing(subMsg); + } else { + queueOutgoing(subMsg); + } } String createInbox() { @@ -965,14 +974,22 @@ void sendConnect(String serverURI) { String connectOptions = this.options.buildProtocolConnectOptionsString(serverURI, info.isAuthRequired()); connectString.append(connectOptions); NatsMessage msg = new NatsMessage(connectString.toString()); - queueOutgoing(msg); + queueInternalOutgoing(msg); + } + + CompletableFuture sendPing() { + return this.sendPing(true); } + CompletableFuture softPing() { + return this.sendPing(false); + } + // Send a ping request and push a pong future on the queue. // futures are completed in order, keep this one if a thread wants to wait // for a specific pong. Note, if no pong returns the wait will not return // without setting a timeout. - CompletableFuture sendPing() { + CompletableFuture sendPing(boolean treatAsInternal) { int max = this.options.getMaxPingsOut(); if (!isConnectedOrConnecting()) { @@ -989,14 +1006,20 @@ CompletableFuture sendPing() { CompletableFuture pongFuture = new CompletableFuture<>(); NatsMessage msg = new NatsMessage(NatsConnection.OP_PING); pongQueue.add(pongFuture); - queueOutgoing(msg); + + if (treatAsInternal) { + queueInternalOutgoing(msg); + } else { + queueOutgoing(msg); + } + this.statistics.incrementPingCount(); return pongFuture; } void sendPong() { NatsMessage msg = new NatsMessage(NatsConnection.OP_PONG); - queueOutgoing(msg); + queueInternalOutgoing(msg); } // Called by the reader @@ -1086,6 +1109,13 @@ void queueOutgoing(NatsMessage msg) { this.writer.queue(msg); } + void queueInternalOutgoing(NatsMessage msg) { + if (msg.getControlLineLength() > this.options.getMaxControlLine()) { + throw new IllegalArgumentException("Control line is too long"); + } + this.writer.queueInternalMessage(msg); + } + void deliverMessage(NatsMessage msg) { this.statistics.incrementInMsgs(); this.statistics.incrementInBytes(msg.getSizeInBytes()); diff --git a/src/main/java/io/nats/client/impl/NatsConnectionWriter.java b/src/main/java/io/nats/client/impl/NatsConnectionWriter.java index b32295cbd..3a60f8b81 100644 --- a/src/main/java/io/nats/client/impl/NatsConnectionWriter.java +++ b/src/main/java/io/nats/client/impl/NatsConnectionWriter.java @@ -32,21 +32,25 @@ class NatsConnectionWriter implements Runnable { private CompletableFuture stopped; private Future dataPortFuture; private final AtomicBoolean running; + private final AtomicBoolean reconnectMode; private byte[] sendBuffer; private MessageQueue outgoing; + private MessageQueue reconnectOutgoing; NatsConnectionWriter(NatsConnection connection) { this.connection = connection; this.running = new AtomicBoolean(false); + this.reconnectMode = new AtomicBoolean(false); this.stopped = new CompletableFuture<>(); this.stopped.complete(Boolean.TRUE); // we are stopped on creation this.sendBuffer = new byte[connection.getOptions().getBufferSize()]; outgoing = new MessageQueue(true); + reconnectOutgoing = new MessageQueue(true); } // Should only be called if the current thread has exited. @@ -68,6 +72,7 @@ void start(Future dataPortFuture) { Future stop() { this.running.set(false); this.outgoing.pause(); + this.reconnectOutgoing.pause(); // Clear old ping/pong requests byte[] pingRequest = NatsConnection.OP_PING.getBytes(StandardCharsets.UTF_8); @@ -80,16 +85,24 @@ Future stop() { public void run() { Duration waitForMessage = Duration.ofMinutes(2); // This can be long since no one is sending + Duration reconnectWait = Duration.ofMillis(1); // This can be long since no one is sending long maxMessages = 1000; try { DataPort dataPort = this.dataPortFuture.get(); // Will wait for the future to complete NatsStatistics stats = this.connection.getNatsStatistics(); this.outgoing.resume(); + this.reconnectOutgoing.resume(); while (this.running.get()) { int sendPosition = 0; - NatsMessage msg = this.outgoing.accumulate(this.sendBuffer.length, maxMessages, waitForMessage); + NatsMessage msg = null; + + if (reconnectMode.get()) { + msg = this.reconnectOutgoing.accumulate(this.sendBuffer.length, maxMessages, reconnectWait); + } else { + msg = this.outgoing.accumulate(this.sendBuffer.length, maxMessages, waitForMessage); + } if (msg == null) { // Make sure we are still running continue; @@ -149,11 +162,23 @@ public void run() { } } + void setReconnectMode(boolean tf) { + reconnectMode.set(tf); + } + boolean canQueue(NatsMessage msg, long maxSize) { return (maxSize <= 0 || (outgoing.sizeInBytes() + msg.getSizeInBytes()) < maxSize); } void queue(NatsMessage msg) { - outgoing.push(msg); + this.outgoing.push(msg); + } + + void queueInternalMessage(NatsMessage msg) { + if (this.reconnectMode.get()) { + this.reconnectOutgoing.push(msg); + } else { + this.outgoing.push(msg); + } } } \ No newline at end of file diff --git a/src/main/java/io/nats/client/impl/NatsDispatcher.java b/src/main/java/io/nats/client/impl/NatsDispatcher.java index a1eb7c804..f85e07790 100644 --- a/src/main/java/io/nats/client/impl/NatsDispatcher.java +++ b/src/main/java/io/nats/client/impl/NatsDispatcher.java @@ -140,7 +140,7 @@ MessageQueue getMessageQueue() { void resendSubscriptions() { this.subscriptions.forEach((id, sub)->{ - this.connection.sendSubscriptionMessage(sub.getSID(), sub.getSubject(), sub.getQueueName()); + this.connection.sendSubscriptionMessage(sub.getSID(), sub.getSubject(), sub.getQueueName(), true); }); } diff --git a/src/test/java/io/nats/client/AuthTests.java b/src/test/java/io/nats/client/AuthTests.java index 730835fbe..c054fa819 100644 --- a/src/test/java/io/nats/client/AuthTests.java +++ b/src/test/java/io/nats/client/AuthTests.java @@ -14,6 +14,7 @@ package io.nats.client; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -22,6 +23,7 @@ import org.junit.Test; +import io.nats.client.Connection.Status; import io.nats.client.ConnectionListener.Events; public class AuthTests { @@ -45,6 +47,58 @@ public void testUserPass() throws Exception { } } + @Test + public void testUserPassOnReconnect() throws Exception { + TestHandler handler = new TestHandler(); + int port = NatsTestServer.nextPort(); + Connection nc = null; + Subscription sub = null; + String[] customArgs = {"--user","stephen","--pass","password"}; + + try (NatsTestServer ts = new NatsTestServer(customArgs, port, false)) { + // See config file for user/pass + Options options = new Options.Builder(). + server(ts.getURI()). + maxReconnects(-1). + userInfo("stephen", "password"). + connectionListener(handler). + build(); + nc = Nats.connect(options); + assertTrue("Connected Status", Connection.Status.CONNECTED == nc.getStatus()); + + sub = nc.subscribe("test"); + nc.publish("test", null); + nc.flush(Duration.ofSeconds(5)); + Message msg = sub.nextMessage(Duration.ofSeconds(5)); + assertNotNull(msg); + handler.prepForStatusChange(Events.DISCONNECTED); + } + + try { + nc.flush(Duration.ofSeconds(1)); + } catch (Exception exp) { + } + + handler.waitForStatusChange(5, TimeUnit.SECONDS); + assertTrue("Reconnecting status", Connection.Status.RECONNECTING == nc.getStatus() || + Connection.Status.DISCONNECTED == nc.getStatus()); + handler.prepForStatusChange(Events.RESUBSCRIBED); + + + try (NatsTestServer ts = new NatsTestServer(customArgs, port, false)) { + handler.waitForStatusChange(5, TimeUnit.SECONDS); + assertTrue("Connected Status", Connection.Status.CONNECTED == nc.getStatus()); + + nc.publish("test", null); + nc.flush(Duration.ofSeconds(5)); + Message msg = sub.nextMessage(Duration.ofSeconds(5)); + assertNotNull(msg); + + nc.close(); + assertTrue("Closed Status", Connection.Status.CLOSED == nc.getStatus()); + } + } + @Test public void testUserBCryptPass() throws Exception { /* @@ -89,6 +143,58 @@ public void testUserPassInURL() throws Exception { } } + @Test + public void testUserPassInURLOnReconnect() throws Exception { + TestHandler handler = new TestHandler(); + int port = NatsTestServer.nextPort(); + Connection nc = null; + Subscription sub = null; + String[] customArgs = {"--user","stephen","--pass","password"}; + + try (NatsTestServer ts = new NatsTestServer(customArgs, port, false)) { + // See config file for user/pass + Options options = new Options.Builder(). + server("nats://stephen:password@localhost:"+ts.getPort()). + maxReconnects(-1). + connectionListener(handler). + build(); + nc = Nats.connect(options); + assertTrue("Connected Status", Connection.Status.CONNECTED == nc.getStatus()); + + sub = nc.subscribe("test"); + nc.publish("test", null); + nc.flush(Duration.ofSeconds(5)); + Message msg = sub.nextMessage(Duration.ofSeconds(5)); + assertNotNull(msg); + handler.prepForStatusChange(Events.DISCONNECTED); + } + + try { + nc.flush(Duration.ofSeconds(1)); + } catch (Exception exp) { + } + + handler.waitForStatusChange(5, TimeUnit.SECONDS); + Status status = nc.getStatus(); + assertTrue("Reconnecting status", Connection.Status.RECONNECTING == status || + Connection.Status.DISCONNECTED == status); + handler.prepForStatusChange(Events.RESUBSCRIBED); + + + try (NatsTestServer ts = new NatsTestServer(customArgs, port, false)) { + handler.waitForStatusChange(5, TimeUnit.SECONDS); + assertTrue("Connected Status", Connection.Status.CONNECTED == nc.getStatus()); + + nc.publish("test", null); + nc.flush(Duration.ofSeconds(5)); + Message msg = sub.nextMessage(Duration.ofSeconds(5)); + assertNotNull(msg); + + nc.close(); + assertTrue("Closed Status", Connection.Status.CLOSED == nc.getStatus()); + } + } + @Test public void testUserPassInURLClusteredWithDifferentUser() throws Exception { String[] customArgs1 = {"--user","stephen","--pass","password"}; diff --git a/src/test/java/io/nats/client/NatsTestServer.java b/src/test/java/io/nats/client/NatsTestServer.java index a87c1b6c9..b3e518bd4 100644 --- a/src/test/java/io/nats/client/NatsTestServer.java +++ b/src/test/java/io/nats/client/NatsTestServer.java @@ -85,6 +85,13 @@ public NatsTestServer(String[] customArgs, boolean debug) { start(); } + public NatsTestServer(String[] customArgs, int port, boolean debug) { + this.port = port; + this.debug = debug; + this.customArgs = customArgs; + start(); + } + public void start() { ArrayList cmd = new ArrayList(); diff --git a/src/test/java/io/nats/client/impl/ReconnectTests.java b/src/test/java/io/nats/client/impl/ReconnectTests.java index 45b1b17af..ce661ed68 100644 --- a/src/test/java/io/nats/client/impl/ReconnectTests.java +++ b/src/test/java/io/nats/client/impl/ReconnectTests.java @@ -132,7 +132,6 @@ public void testSimpleReconnect() throws Exception { //Includes test for subscri } } - @Test public void testSubscribeDuringReconnect() throws Exception { NatsConnection nc = null; @@ -192,6 +191,92 @@ public void testSubscribeDuringReconnect() throws Exception { } } + @Test + public void testReconnectBuffer() throws Exception { + NatsConnection nc = null; + TestHandler handler = new TestHandler(); + int port = NatsTestServer.nextPort(); + Subscription sub; + long start = 0; + long end = 0; + String[] customArgs = {"--user","stephen","--pass","password"}; + + handler.setPrintExceptions(true); + + try { + try (NatsTestServer ts = new NatsTestServer(customArgs, port, false)) { + Options options = new Options.Builder(). + server(ts.getURI()). + maxReconnects(-1). + userInfo("stephen", "password"). + reconnectWait(Duration.ofMillis(1000)). + connectionListener(handler). + build(); + nc = (NatsConnection) Nats.connect(options); + assertTrue("Connected Status", Connection.Status.CONNECTED == nc.getStatus()); + + sub = nc.subscribe("subsubject"); + + final NatsConnection nnc = nc; + Dispatcher d = nc.createDispatcher((msg) -> { + nnc.publish(msg.getReplyTo(), msg.getData()); + }); + d.subscribe("dispatchSubject"); + nc.flush(Duration.ofMillis(1000)); + + Future inc = nc.request("dispatchSubject", "test".getBytes(StandardCharsets.UTF_8)); + Message msg = inc.get(); + assertNotNull(msg); + + nc.publish("subsubject", null); + msg = sub.nextMessage(Duration.ofMillis(100)); + assertNotNull(msg); + + handler.prepForStatusChange(Events.DISCONNECTED); + start = System.nanoTime(); + } + + flushAndWait(nc, handler); + checkReconnectingStatus(nc); + + // Send a message to the dispatcher and one to the subscriber + // These should be sent on reconnect + Future inc = nc.request("dispatchSubject", "test".getBytes(StandardCharsets.UTF_8)); + nc.publish("subsubject", null); + nc.publish("subsubject", null); + + handler.prepForStatusChange(Events.RESUBSCRIBED); + + try (NatsTestServer ts = new NatsTestServer(customArgs, port, false)) { + handler.waitForStatusChange(5000, TimeUnit.MILLISECONDS); + assertTrue("Connected Status", Connection.Status.CONNECTED == nc.getStatus()); + + end = System.nanoTime(); + + assertTrue("reconnect wait", 1_000_000 * (end-start) > 1000); + + // Check the message we sent to dispatcher + Message msg = inc.get(500, TimeUnit.MILLISECONDS); + assertNotNull(msg); + + // Check the two we sent to subscriber + msg = sub.nextMessage(Duration.ofMillis(500)); + assertNotNull(msg); + + msg = sub.nextMessage(Duration.ofMillis(500)); + assertNotNull(msg); + } + + assertEquals("reconnect count", 1, nc.getNatsStatistics().getReconnects()); + assertTrue("exception count", nc.getNatsStatistics().getExceptions() > 0); + } finally { + if (nc != null) { + nc.close(); + assertTrue("Closed Status", Connection.Status.CLOSED == nc.getStatus()); + } + } + } + @Test public void testMaxReconnects() throws Exception { Connection nc = null;