From 7834a459ea2a03c3150aef5b10e2435b363b1c05 Mon Sep 17 00:00:00 2001 From: Stephen Asbury Date: Wed, 12 Sep 2018 14:26:38 -0700 Subject: [PATCH 1/7] [FIXED] fixed version and added checklist for next time. Fixes #176 --- .travis/deploying.md | 6 ++++++ CHANGELOG.md | 4 ++++ build.gradle | 2 +- src/main/java/io/nats/client/Nats.java | 2 +- 4 files changed, 12 insertions(+), 2 deletions(-) diff --git a/.travis/deploying.md b/.travis/deploying.md index 41b91a5ad..fed0a233a 100644 --- a/.travis/deploying.md +++ b/.travis/deploying.md @@ -13,6 +13,12 @@ Travis doesn't support sonatype deploy correctly. There is an issue where the va The gradle close and release process will fail if there is more than one repository staged. You may need to manually drop repositories from staging during testing on a single version number. +## Before you Release + +1. Check that the Nats.java version is updated. +2. Check that the version in gradle.build is updated. +3. Check that the changelog.md is ready + ## Manually Deploying You can deploy manually by setting up your gradle.properties to have: diff --git a/CHANGELOG.md b/CHANGELOG.md index 0da479419..596ed3c37 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,10 @@ # Change Log +## Version 2.1.1 + +* [FIXED] Issue with version in Nats.java, also updated deploying.md with checklist + ## Version 2.1.0 * [ADDED] Support for consumer or connection drain. (New API lead to version bump.) diff --git a/build.gradle b/build.gradle index 2880f157c..cb70c2645 100644 --- a/build.gradle +++ b/build.gradle @@ -13,7 +13,7 @@ plugins { // Update version here, repeated check-ins not into master will have snapshot on them def versionMajor = 2 def versionMinor = 1 -def versionPatch = 0 +def versionPatch = 1 def versionModifier = "" def branch = System.getenv("TRAVIS_BRANCH"); diff --git a/src/main/java/io/nats/client/Nats.java b/src/main/java/io/nats/client/Nats.java index 429ed5b75..d532ad7b5 100644 --- a/src/main/java/io/nats/client/Nats.java +++ b/src/main/java/io/nats/client/Nats.java @@ -72,7 +72,7 @@ public class Nats { /** * Current version of the library - {@value #CLIENT_VERSION} */ - public static final String CLIENT_VERSION = "2.0.0"; + public static final String CLIENT_VERSION = "2.1.1"; /** * Current language of the library - {@value #CLIENT_LANGUAGE} From 5ae8bdb9e3080fac47bbef467088458a7e373c44 Mon Sep 17 00:00:00 2001 From: Stephen Asbury Date: Thu, 13 Sep 2018 11:59:57 -0700 Subject: [PATCH 2/7] Added a couple tests for auth+reconnect to help replicate #177. --- src/test/java/io/nats/client/AuthTests.java | 104 ++++++++++++++++++ .../java/io/nats/client/NatsTestServer.java | 7 ++ 2 files changed, 111 insertions(+) diff --git a/src/test/java/io/nats/client/AuthTests.java b/src/test/java/io/nats/client/AuthTests.java index 730835fbe..bcbaeeb9b 100644 --- a/src/test/java/io/nats/client/AuthTests.java +++ b/src/test/java/io/nats/client/AuthTests.java @@ -14,6 +14,7 @@ package io.nats.client; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -45,6 +46,58 @@ public void testUserPass() throws Exception { } } + @Test + public void testUserPassOnReconnect() throws Exception { + TestHandler handler = new TestHandler(); + int port = NatsTestServer.nextPort(); + Connection nc = null; + Subscription sub = null; + String[] customArgs = {"--user","stephen","--pass","password"}; + + try (NatsTestServer ts = new NatsTestServer(customArgs, port, false)) { + // See config file for user/pass + Options options = new Options.Builder(). + server(ts.getURI()). + maxReconnects(-1). + userInfo("stephen", "password"). + connectionListener(handler). + build(); + nc = Nats.connect(options); + assertTrue("Connected Status", Connection.Status.CONNECTED == nc.getStatus()); + + sub = nc.subscribe("test"); + nc.publish("test", null); + nc.flush(Duration.ofSeconds(5)); + Message msg = sub.nextMessage(Duration.ofSeconds(5)); + assertNotNull(msg); + handler.prepForStatusChange(Events.DISCONNECTED); + } + + try { + nc.flush(Duration.ofSeconds(1)); + } catch (Exception exp) { + } + + handler.waitForStatusChange(5, TimeUnit.SECONDS); + assertTrue("Reconnecting status", Connection.Status.RECONNECTING == nc.getStatus() || + Connection.Status.DISCONNECTED == nc.getStatus()); + handler.prepForStatusChange(Events.RESUBSCRIBED); + + + try (NatsTestServer ts = new NatsTestServer(customArgs, port, false)) { + handler.waitForStatusChange(5, TimeUnit.SECONDS); + assertTrue("Connected Status", Connection.Status.CONNECTED == nc.getStatus()); + + nc.publish("test", null); + nc.flush(Duration.ofSeconds(5)); + Message msg = sub.nextMessage(Duration.ofSeconds(5)); + assertNotNull(msg); + + nc.close(); + assertTrue("Closed Status", Connection.Status.CLOSED == nc.getStatus()); + } + } + @Test public void testUserBCryptPass() throws Exception { /* @@ -89,6 +142,57 @@ public void testUserPassInURL() throws Exception { } } + @Test + public void testUserPassInURLOnReconnect() throws Exception { + TestHandler handler = new TestHandler(); + int port = NatsTestServer.nextPort(); + Connection nc = null; + Subscription sub = null; + String[] customArgs = {"--user","stephen","--pass","password"}; + + try (NatsTestServer ts = new NatsTestServer(customArgs, port, false)) { + // See config file for user/pass + Options options = new Options.Builder(). + server("nats://stephen:password@localhost:"+ts.getPort()). + maxReconnects(-1). + connectionListener(handler). + build(); + nc = Nats.connect(options); + assertTrue("Connected Status", Connection.Status.CONNECTED == nc.getStatus()); + + sub = nc.subscribe("test"); + nc.publish("test", null); + nc.flush(Duration.ofSeconds(5)); + Message msg = sub.nextMessage(Duration.ofSeconds(5)); + assertNotNull(msg); + handler.prepForStatusChange(Events.DISCONNECTED); + } + + try { + nc.flush(Duration.ofSeconds(1)); + } catch (Exception exp) { + } + + handler.waitForStatusChange(5, TimeUnit.SECONDS); + assertTrue("Reconnecting status", Connection.Status.RECONNECTING == nc.getStatus() || + Connection.Status.DISCONNECTED == nc.getStatus()); + handler.prepForStatusChange(Events.RESUBSCRIBED); + + + try (NatsTestServer ts = new NatsTestServer(customArgs, port, false)) { + handler.waitForStatusChange(5, TimeUnit.SECONDS); + assertTrue("Connected Status", Connection.Status.CONNECTED == nc.getStatus()); + + nc.publish("test", null); + nc.flush(Duration.ofSeconds(5)); + Message msg = sub.nextMessage(Duration.ofSeconds(5)); + assertNotNull(msg); + + nc.close(); + assertTrue("Closed Status", Connection.Status.CLOSED == nc.getStatus()); + } + } + @Test public void testUserPassInURLClusteredWithDifferentUser() throws Exception { String[] customArgs1 = {"--user","stephen","--pass","password"}; diff --git a/src/test/java/io/nats/client/NatsTestServer.java b/src/test/java/io/nats/client/NatsTestServer.java index a87c1b6c9..b3e518bd4 100644 --- a/src/test/java/io/nats/client/NatsTestServer.java +++ b/src/test/java/io/nats/client/NatsTestServer.java @@ -85,6 +85,13 @@ public NatsTestServer(String[] customArgs, boolean debug) { start(); } + public NatsTestServer(String[] customArgs, int port, boolean debug) { + this.port = port; + this.debug = debug; + this.customArgs = customArgs; + start(); + } + public void start() { ArrayList cmd = new ArrayList(); From 4fb2bb9ab8a84ab045f453e743fd26b7d72c0a2a Mon Sep 17 00:00:00 2001 From: Stephen Asbury Date: Tue, 18 Sep 2018 16:03:13 -0700 Subject: [PATCH 3/7] [FIXED] issue with reconnect buffer blocking protocol messages Now protocol messages have a fast path during reconnect During normal communication protocol messages share the queue Fixes #177 --- CHANGELOG.md | 1 + .../io/nats/client/impl/NatsConnection.java | 25 ++++-- .../client/impl/NatsConnectionWriter.java | 29 ++++++- src/test/java/io/nats/client/AuthTests.java | 6 +- .../io/nats/client/impl/ReconnectTests.java | 87 ++++++++++++++++++- 5 files changed, 137 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 596ed3c37..0a0aaf835 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ## Version 2.1.1 * [FIXED] Issue with version in Nats.java, also updated deploying.md with checklist +* [FIXED] Fixed issue during reconnect where buffered messages blocked protocol messages ## Version 2.1.0 diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index 0d5b54b2a..4730e1a9a 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -202,6 +202,8 @@ void reconnect() throws InterruptedException { return; } + this.writer.setReconnectMode(true); + while (!isConnected() && !isClosed() && !this.isClosing()) { Collection serversToTry = buildReconnectList(); @@ -259,6 +261,8 @@ void reconnect() throws InterruptedException { } catch (Exception exp) { this.processException(exp); } + + this.writer.setReconnectMode(false); processConnectionEvent(Events.RESUBSCRIBED); } @@ -700,7 +704,7 @@ void sendUnsub(NatsSubscription sub, int after) { protocolBuilder.append(String.valueOf(after)); } NatsMessage unsubMsg = new NatsMessage(protocolBuilder.toString()); - queueOutgoing(unsubMsg); + queueProtocolOutgoing(unsubMsg); } // Assumes the null/empty checks were handled elsewhere @@ -740,7 +744,7 @@ void sendSubscriptionMessage(CharSequence sid, String subject, String queueName) protocolBuilder.append(" "); protocolBuilder.append(sid); NatsMessage subMsg = new NatsMessage(protocolBuilder.toString()); - queueOutgoing(subMsg); + queueProtocolOutgoing(subMsg); } String createInbox() { @@ -965,9 +969,9 @@ void sendConnect(String serverURI) { String connectOptions = this.options.buildProtocolConnectOptionsString(serverURI, info.isAuthRequired()); connectString.append(connectOptions); NatsMessage msg = new NatsMessage(connectString.toString()); - queueOutgoing(msg); + queueProtocolOutgoing(msg); } - + // Send a ping request and push a pong future on the queue. // futures are completed in order, keep this one if a thread wants to wait // for a specific pong. Note, if no pong returns the wait will not return @@ -989,14 +993,16 @@ CompletableFuture sendPing() { CompletableFuture pongFuture = new CompletableFuture<>(); NatsMessage msg = new NatsMessage(NatsConnection.OP_PING); pongQueue.add(pongFuture); - queueOutgoing(msg); + + queueProtocolOutgoing(msg); + this.statistics.incrementPingCount(); return pongFuture; } void sendPong() { NatsMessage msg = new NatsMessage(NatsConnection.OP_PONG); - queueOutgoing(msg); + queueProtocolOutgoing(msg); } // Called by the reader @@ -1086,6 +1092,13 @@ void queueOutgoing(NatsMessage msg) { this.writer.queue(msg); } + void queueProtocolOutgoing(NatsMessage msg) { + if (msg.getControlLineLength() > this.options.getMaxControlLine()) { + throw new IllegalArgumentException("Control line is too long"); + } + this.writer.queueProtocolMessage(msg); + } + void deliverMessage(NatsMessage msg) { this.statistics.incrementInMsgs(); this.statistics.incrementInBytes(msg.getSizeInBytes()); diff --git a/src/main/java/io/nats/client/impl/NatsConnectionWriter.java b/src/main/java/io/nats/client/impl/NatsConnectionWriter.java index b32295cbd..be21fadac 100644 --- a/src/main/java/io/nats/client/impl/NatsConnectionWriter.java +++ b/src/main/java/io/nats/client/impl/NatsConnectionWriter.java @@ -32,21 +32,25 @@ class NatsConnectionWriter implements Runnable { private CompletableFuture stopped; private Future dataPortFuture; private final AtomicBoolean running; + private final AtomicBoolean reconnectMode; private byte[] sendBuffer; private MessageQueue outgoing; + private MessageQueue reconnectOutgoing; NatsConnectionWriter(NatsConnection connection) { this.connection = connection; this.running = new AtomicBoolean(false); + this.reconnectMode = new AtomicBoolean(false); this.stopped = new CompletableFuture<>(); this.stopped.complete(Boolean.TRUE); // we are stopped on creation this.sendBuffer = new byte[connection.getOptions().getBufferSize()]; outgoing = new MessageQueue(true); + reconnectOutgoing = new MessageQueue(true); } // Should only be called if the current thread has exited. @@ -68,6 +72,7 @@ void start(Future dataPortFuture) { Future stop() { this.running.set(false); this.outgoing.pause(); + this.reconnectOutgoing.pause(); // Clear old ping/pong requests byte[] pingRequest = NatsConnection.OP_PING.getBytes(StandardCharsets.UTF_8); @@ -80,16 +85,24 @@ Future stop() { public void run() { Duration waitForMessage = Duration.ofMinutes(2); // This can be long since no one is sending + Duration reconnectWait = Duration.ofMillis(1); // This can be long since no one is sending long maxMessages = 1000; try { DataPort dataPort = this.dataPortFuture.get(); // Will wait for the future to complete NatsStatistics stats = this.connection.getNatsStatistics(); this.outgoing.resume(); + this.reconnectOutgoing.resume(); while (this.running.get()) { int sendPosition = 0; - NatsMessage msg = this.outgoing.accumulate(this.sendBuffer.length, maxMessages, waitForMessage); + NatsMessage msg = null; + + if (this.reconnectMode.get()) { + msg = this.reconnectOutgoing.accumulate(this.sendBuffer.length, maxMessages, reconnectWait); + } else { + msg = this.outgoing.accumulate(this.sendBuffer.length, maxMessages, waitForMessage); + } if (msg == null) { // Make sure we are still running continue; @@ -149,11 +162,23 @@ public void run() { } } + void setReconnectMode(boolean tf) { + reconnectMode.set(tf); + } + boolean canQueue(NatsMessage msg, long maxSize) { return (maxSize <= 0 || (outgoing.sizeInBytes() + msg.getSizeInBytes()) < maxSize); } void queue(NatsMessage msg) { - outgoing.push(msg); + this.outgoing.push(msg); + } + + void queueProtocolMessage(NatsMessage msg) { + if (this.reconnectMode.get()) { + this.reconnectOutgoing.push(msg); + } else { + this.outgoing.push(msg); + } } } \ No newline at end of file diff --git a/src/test/java/io/nats/client/AuthTests.java b/src/test/java/io/nats/client/AuthTests.java index bcbaeeb9b..c054fa819 100644 --- a/src/test/java/io/nats/client/AuthTests.java +++ b/src/test/java/io/nats/client/AuthTests.java @@ -23,6 +23,7 @@ import org.junit.Test; +import io.nats.client.Connection.Status; import io.nats.client.ConnectionListener.Events; public class AuthTests { @@ -174,8 +175,9 @@ public void testUserPassInURLOnReconnect() throws Exception { } handler.waitForStatusChange(5, TimeUnit.SECONDS); - assertTrue("Reconnecting status", Connection.Status.RECONNECTING == nc.getStatus() || - Connection.Status.DISCONNECTED == nc.getStatus()); + Status status = nc.getStatus(); + assertTrue("Reconnecting status", Connection.Status.RECONNECTING == status || + Connection.Status.DISCONNECTED == status); handler.prepForStatusChange(Events.RESUBSCRIBED); diff --git a/src/test/java/io/nats/client/impl/ReconnectTests.java b/src/test/java/io/nats/client/impl/ReconnectTests.java index 45b1b17af..ce661ed68 100644 --- a/src/test/java/io/nats/client/impl/ReconnectTests.java +++ b/src/test/java/io/nats/client/impl/ReconnectTests.java @@ -132,7 +132,6 @@ public void testSimpleReconnect() throws Exception { //Includes test for subscri } } - @Test public void testSubscribeDuringReconnect() throws Exception { NatsConnection nc = null; @@ -192,6 +191,92 @@ public void testSubscribeDuringReconnect() throws Exception { } } + @Test + public void testReconnectBuffer() throws Exception { + NatsConnection nc = null; + TestHandler handler = new TestHandler(); + int port = NatsTestServer.nextPort(); + Subscription sub; + long start = 0; + long end = 0; + String[] customArgs = {"--user","stephen","--pass","password"}; + + handler.setPrintExceptions(true); + + try { + try (NatsTestServer ts = new NatsTestServer(customArgs, port, false)) { + Options options = new Options.Builder(). + server(ts.getURI()). + maxReconnects(-1). + userInfo("stephen", "password"). + reconnectWait(Duration.ofMillis(1000)). + connectionListener(handler). + build(); + nc = (NatsConnection) Nats.connect(options); + assertTrue("Connected Status", Connection.Status.CONNECTED == nc.getStatus()); + + sub = nc.subscribe("subsubject"); + + final NatsConnection nnc = nc; + Dispatcher d = nc.createDispatcher((msg) -> { + nnc.publish(msg.getReplyTo(), msg.getData()); + }); + d.subscribe("dispatchSubject"); + nc.flush(Duration.ofMillis(1000)); + + Future inc = nc.request("dispatchSubject", "test".getBytes(StandardCharsets.UTF_8)); + Message msg = inc.get(); + assertNotNull(msg); + + nc.publish("subsubject", null); + msg = sub.nextMessage(Duration.ofMillis(100)); + assertNotNull(msg); + + handler.prepForStatusChange(Events.DISCONNECTED); + start = System.nanoTime(); + } + + flushAndWait(nc, handler); + checkReconnectingStatus(nc); + + // Send a message to the dispatcher and one to the subscriber + // These should be sent on reconnect + Future inc = nc.request("dispatchSubject", "test".getBytes(StandardCharsets.UTF_8)); + nc.publish("subsubject", null); + nc.publish("subsubject", null); + + handler.prepForStatusChange(Events.RESUBSCRIBED); + + try (NatsTestServer ts = new NatsTestServer(customArgs, port, false)) { + handler.waitForStatusChange(5000, TimeUnit.MILLISECONDS); + assertTrue("Connected Status", Connection.Status.CONNECTED == nc.getStatus()); + + end = System.nanoTime(); + + assertTrue("reconnect wait", 1_000_000 * (end-start) > 1000); + + // Check the message we sent to dispatcher + Message msg = inc.get(500, TimeUnit.MILLISECONDS); + assertNotNull(msg); + + // Check the two we sent to subscriber + msg = sub.nextMessage(Duration.ofMillis(500)); + assertNotNull(msg); + + msg = sub.nextMessage(Duration.ofMillis(500)); + assertNotNull(msg); + } + + assertEquals("reconnect count", 1, nc.getNatsStatistics().getReconnects()); + assertTrue("exception count", nc.getNatsStatistics().getExceptions() > 0); + } finally { + if (nc != null) { + nc.close(); + assertTrue("Closed Status", Connection.Status.CLOSED == nc.getStatus()); + } + } + } + @Test public void testMaxReconnects() throws Exception { Connection nc = null; From b20ebd048da32cb0adc934b4f5092c4d0a1a629f Mon Sep 17 00:00:00 2001 From: Stephen Asbury Date: Wed, 19 Sep 2018 12:45:03 -0700 Subject: [PATCH 4/7] Cleaned up reconnect queue code, safer timing on re-sub and ping timer Removed double resub for dispatcher subscriptions, which fixed an issue with drain --- .../io/nats/client/impl/NatsConnection.java | 43 +++++++++++++------ .../client/impl/NatsConnectionWriter.java | 4 +- .../io/nats/client/impl/NatsDispatcher.java | 2 +- 3 files changed, 33 insertions(+), 16 deletions(-) diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index 4730e1a9a..b20a5a3d4 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -245,8 +245,8 @@ void reconnect() throws InterruptedException { } this.subscribers.forEach((sid, sub) -> { - if (!sub.isDraining()) { - sendSubscriptionMessage(sub.getSID(), sub.getSubject(), sub.getQueueName()); + if (sub.getDispatcher() == null && !sub.isDraining()) { + sendSubscriptionMessage(sub.getSID(), sub.getSubject(), sub.getQueueName(), true); } }); @@ -328,7 +328,7 @@ void tryToConnect(String serverURI) { this.timer.schedule(new TimerTask() { public void run() { if (isConnected()) { - sendPing(); + softPing(); // The timer always uses the standard queue } } }, pingMillis, pingMillis); @@ -704,7 +704,7 @@ void sendUnsub(NatsSubscription sub, int after) { protocolBuilder.append(String.valueOf(after)); } NatsMessage unsubMsg = new NatsMessage(protocolBuilder.toString()); - queueProtocolOutgoing(unsubMsg); + queueInternalOutgoing(unsubMsg); } // Assumes the null/empty checks were handled elsewhere @@ -722,11 +722,11 @@ NatsSubscription createSubscription(String subject, String queueName, NatsDispat sub = new NatsSubscription(sid, subject, queueName, this, dispatcher); subscribers.put(sid, sub); - sendSubscriptionMessage(sid, subject, queueName); + sendSubscriptionMessage(sid, subject, queueName, false); return sub; } - void sendSubscriptionMessage(CharSequence sid, String subject, String queueName) { + void sendSubscriptionMessage(CharSequence sid, String subject, String queueName, boolean fastPath) { if (!isConnected()) { return;// We will setup sub on reconnect or ignore } @@ -744,7 +744,12 @@ void sendSubscriptionMessage(CharSequence sid, String subject, String queueName) protocolBuilder.append(" "); protocolBuilder.append(sid); NatsMessage subMsg = new NatsMessage(protocolBuilder.toString()); - queueProtocolOutgoing(subMsg); + + if (fastPath) { + queueInternalOutgoing(subMsg); + } else { + queueOutgoing(subMsg); + } } String createInbox() { @@ -969,14 +974,22 @@ void sendConnect(String serverURI) { String connectOptions = this.options.buildProtocolConnectOptionsString(serverURI, info.isAuthRequired()); connectString.append(connectOptions); NatsMessage msg = new NatsMessage(connectString.toString()); - queueProtocolOutgoing(msg); + queueInternalOutgoing(msg); + } + + CompletableFuture sendPing() { + return this.sendPing(true); + } + + CompletableFuture softPing() { + return this.sendPing(false); } // Send a ping request and push a pong future on the queue. // futures are completed in order, keep this one if a thread wants to wait // for a specific pong. Note, if no pong returns the wait will not return // without setting a timeout. - CompletableFuture sendPing() { + CompletableFuture sendPing(boolean fastPath) { int max = this.options.getMaxPingsOut(); if (!isConnectedOrConnecting()) { @@ -994,7 +1007,11 @@ CompletableFuture sendPing() { NatsMessage msg = new NatsMessage(NatsConnection.OP_PING); pongQueue.add(pongFuture); - queueProtocolOutgoing(msg); + if (fastPath) { + queueInternalOutgoing(msg); + } else { + queueOutgoing(msg); + } this.statistics.incrementPingCount(); return pongFuture; @@ -1002,7 +1019,7 @@ CompletableFuture sendPing() { void sendPong() { NatsMessage msg = new NatsMessage(NatsConnection.OP_PONG); - queueProtocolOutgoing(msg); + queueInternalOutgoing(msg); } // Called by the reader @@ -1092,11 +1109,11 @@ void queueOutgoing(NatsMessage msg) { this.writer.queue(msg); } - void queueProtocolOutgoing(NatsMessage msg) { + void queueInternalOutgoing(NatsMessage msg) { if (msg.getControlLineLength() > this.options.getMaxControlLine()) { throw new IllegalArgumentException("Control line is too long"); } - this.writer.queueProtocolMessage(msg); + this.writer.queueInternallMessage(msg); } void deliverMessage(NatsMessage msg) { diff --git a/src/main/java/io/nats/client/impl/NatsConnectionWriter.java b/src/main/java/io/nats/client/impl/NatsConnectionWriter.java index be21fadac..511065d41 100644 --- a/src/main/java/io/nats/client/impl/NatsConnectionWriter.java +++ b/src/main/java/io/nats/client/impl/NatsConnectionWriter.java @@ -98,7 +98,7 @@ public void run() { int sendPosition = 0; NatsMessage msg = null; - if (this.reconnectMode.get()) { + if (reconnectMode.get()) { msg = this.reconnectOutgoing.accumulate(this.sendBuffer.length, maxMessages, reconnectWait); } else { msg = this.outgoing.accumulate(this.sendBuffer.length, maxMessages, waitForMessage); @@ -174,7 +174,7 @@ void queue(NatsMessage msg) { this.outgoing.push(msg); } - void queueProtocolMessage(NatsMessage msg) { + void queueInternallMessage(NatsMessage msg) { if (this.reconnectMode.get()) { this.reconnectOutgoing.push(msg); } else { diff --git a/src/main/java/io/nats/client/impl/NatsDispatcher.java b/src/main/java/io/nats/client/impl/NatsDispatcher.java index a1eb7c804..f85e07790 100644 --- a/src/main/java/io/nats/client/impl/NatsDispatcher.java +++ b/src/main/java/io/nats/client/impl/NatsDispatcher.java @@ -140,7 +140,7 @@ MessageQueue getMessageQueue() { void resendSubscriptions() { this.subscriptions.forEach((id, sub)->{ - this.connection.sendSubscriptionMessage(sub.getSID(), sub.getSubject(), sub.getQueueName()); + this.connection.sendSubscriptionMessage(sub.getSID(), sub.getSubject(), sub.getQueueName(), true); }); } From d04f654d4e2ba2bc39f43bf82085de322eb387a9 Mon Sep 17 00:00:00 2001 From: Stephen Asbury Date: Wed, 19 Sep 2018 12:46:38 -0700 Subject: [PATCH 5/7] Fixed versions in jars and also updated deploying instructions. --- .travis/deploying.md | 5 +++-- build.gradle | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/.travis/deploying.md b/.travis/deploying.md index fed0a233a..fd8490309 100644 --- a/.travis/deploying.md +++ b/.travis/deploying.md @@ -16,8 +16,9 @@ The gradle close and release process will fail if there is more than one reposit ## Before you Release 1. Check that the Nats.java version is updated. -2. Check that the version in gradle.build is updated. -3. Check that the changelog.md is ready +2. Check that the version in gradle.build is updated, including the jar file versions +3. Check dependency versions. +4. Check that the changelog.md is ready ## Manually Deploying diff --git a/build.gradle b/build.gradle index cb70c2645..11a6d1d35 100644 --- a/build.gradle +++ b/build.gradle @@ -70,7 +70,7 @@ osgiClasses { jar { manifest { attributes('Implementation-Title': 'Java Nats', - 'Implementation-Version': '2.0.1', + 'Implementation-Version': '2.1.1', 'Implementation-Vendor': 'nats.io') } exclude("io/nats/examples/**") @@ -112,7 +112,7 @@ task examplesJar(type: Jar) { classifier = 'examples' manifest { attributes('Implementation-Title': 'Java Nats Examples', - 'Implementation-Version': '2.0.1', + 'Implementation-Version': '2.1.1', 'Implementation-Vendor': 'nats.io') } from(sourceSets.main.output) { From f15ea35d9200527e00445c33c7a2ca1e84c9608c Mon Sep 17 00:00:00 2001 From: Stephen Asbury Date: Wed, 19 Sep 2018 13:54:54 -0700 Subject: [PATCH 6/7] Renamed "fastpath" to "treat as internal" --- src/main/java/io/nats/client/impl/NatsConnection.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index b20a5a3d4..7ebca6404 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -726,7 +726,7 @@ NatsSubscription createSubscription(String subject, String queueName, NatsDispat return sub; } - void sendSubscriptionMessage(CharSequence sid, String subject, String queueName, boolean fastPath) { + void sendSubscriptionMessage(CharSequence sid, String subject, String queueName, boolean treatAsInternal) { if (!isConnected()) { return;// We will setup sub on reconnect or ignore } @@ -745,7 +745,7 @@ void sendSubscriptionMessage(CharSequence sid, String subject, String queueName, protocolBuilder.append(sid); NatsMessage subMsg = new NatsMessage(protocolBuilder.toString()); - if (fastPath) { + if (treatAsInternal) { queueInternalOutgoing(subMsg); } else { queueOutgoing(subMsg); @@ -989,7 +989,7 @@ CompletableFuture softPing() { // futures are completed in order, keep this one if a thread wants to wait // for a specific pong. Note, if no pong returns the wait will not return // without setting a timeout. - CompletableFuture sendPing(boolean fastPath) { + CompletableFuture sendPing(boolean treatAsInternal) { int max = this.options.getMaxPingsOut(); if (!isConnectedOrConnecting()) { @@ -1007,7 +1007,7 @@ CompletableFuture sendPing(boolean fastPath) { NatsMessage msg = new NatsMessage(NatsConnection.OP_PING); pongQueue.add(pongFuture); - if (fastPath) { + if (treatAsInternal) { queueInternalOutgoing(msg); } else { queueOutgoing(msg); From 1fd2cc4fab4e4b2c739dc4685970d6100aaff27c Mon Sep 17 00:00:00 2001 From: Stephen Asbury Date: Wed, 19 Sep 2018 14:00:46 -0700 Subject: [PATCH 7/7] Fixed typo in method name --- src/main/java/io/nats/client/impl/NatsConnection.java | 2 +- src/main/java/io/nats/client/impl/NatsConnectionWriter.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index 7ebca6404..55b06b8b8 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -1113,7 +1113,7 @@ void queueInternalOutgoing(NatsMessage msg) { if (msg.getControlLineLength() > this.options.getMaxControlLine()) { throw new IllegalArgumentException("Control line is too long"); } - this.writer.queueInternallMessage(msg); + this.writer.queueInternalMessage(msg); } void deliverMessage(NatsMessage msg) { diff --git a/src/main/java/io/nats/client/impl/NatsConnectionWriter.java b/src/main/java/io/nats/client/impl/NatsConnectionWriter.java index 511065d41..3a60f8b81 100644 --- a/src/main/java/io/nats/client/impl/NatsConnectionWriter.java +++ b/src/main/java/io/nats/client/impl/NatsConnectionWriter.java @@ -174,7 +174,7 @@ void queue(NatsMessage msg) { this.outgoing.push(msg); } - void queueInternallMessage(NatsMessage msg) { + void queueInternalMessage(NatsMessage msg) { if (this.reconnectMode.get()) { this.reconnectOutgoing.push(msg); } else {