Skip to content

Commit

Permalink
Merge pull request #178 from nats-io/v2.1.1
Browse files Browse the repository at this point in the history
V2.1.1
  • Loading branch information
sasbury authored Sep 19, 2018
2 parents 64ebc2a + 1fd2cc4 commit 061d76b
Show file tree
Hide file tree
Showing 10 changed files with 284 additions and 19 deletions.
7 changes: 7 additions & 0 deletions .travis/deploying.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ Travis doesn't support sonatype deploy correctly. There is an issue where the va

The gradle close and release process will fail if there is more than one repository staged. You may need to manually drop repositories from staging during testing on a single version number.

## Before you Release

1. Check that the Nats.java version is updated.
2. Check that the version in gradle.build is updated, including the jar file versions
3. Check dependency versions.
4. Check that the changelog.md is ready

## Manually Deploying

You can deploy manually by setting up your gradle.properties to have:
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@

# Change Log

## Version 2.1.1

* [FIXED] Issue with version in Nats.java, also updated deploying.md with checklist
* [FIXED] Fixed issue during reconnect where buffered messages blocked protocol messages

## Version 2.1.0

* [ADDED] Support for consumer or connection drain. (New API lead to version bump.)
Expand Down
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ plugins {
// Update version here, repeated check-ins not into master will have snapshot on them
def versionMajor = 2
def versionMinor = 1
def versionPatch = 0
def versionPatch = 1
def versionModifier = ""
def branch = System.getenv("TRAVIS_BRANCH");

Expand Down Expand Up @@ -70,7 +70,7 @@ osgiClasses {
jar {
manifest {
attributes('Implementation-Title': 'Java Nats',
'Implementation-Version': '2.0.1',
'Implementation-Version': '2.1.1',
'Implementation-Vendor': 'nats.io')
}
exclude("io/nats/examples/**")
Expand Down Expand Up @@ -112,7 +112,7 @@ task examplesJar(type: Jar) {
classifier = 'examples'
manifest {
attributes('Implementation-Title': 'Java Nats Examples',
'Implementation-Version': '2.0.1',
'Implementation-Version': '2.1.1',
'Implementation-Vendor': 'nats.io')
}
from(sourceSets.main.output) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/Nats.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class Nats {
/**
* Current version of the library - {@value #CLIENT_VERSION}
*/
public static final String CLIENT_VERSION = "2.0.0";
public static final String CLIENT_VERSION = "2.1.1";

/**
* Current language of the library - {@value #CLIENT_LANGUAGE}
Expand Down
52 changes: 41 additions & 11 deletions src/main/java/io/nats/client/impl/NatsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ void reconnect() throws InterruptedException {
return;
}

this.writer.setReconnectMode(true);

while (!isConnected() && !isClosed() && !this.isClosing()) {
Collection<String> serversToTry = buildReconnectList();

Expand Down Expand Up @@ -243,8 +245,8 @@ void reconnect() throws InterruptedException {
}

this.subscribers.forEach((sid, sub) -> {
if (!sub.isDraining()) {
sendSubscriptionMessage(sub.getSID(), sub.getSubject(), sub.getQueueName());
if (sub.getDispatcher() == null && !sub.isDraining()) {
sendSubscriptionMessage(sub.getSID(), sub.getSubject(), sub.getQueueName(), true);
}
});

Expand All @@ -259,6 +261,8 @@ void reconnect() throws InterruptedException {
} catch (Exception exp) {
this.processException(exp);
}

this.writer.setReconnectMode(false);

processConnectionEvent(Events.RESUBSCRIBED);
}
Expand Down Expand Up @@ -324,7 +328,7 @@ void tryToConnect(String serverURI) {
this.timer.schedule(new TimerTask() {
public void run() {
if (isConnected()) {
sendPing();
softPing(); // The timer always uses the standard queue
}
}
}, pingMillis, pingMillis);
Expand Down Expand Up @@ -700,7 +704,7 @@ void sendUnsub(NatsSubscription sub, int after) {
protocolBuilder.append(String.valueOf(after));
}
NatsMessage unsubMsg = new NatsMessage(protocolBuilder.toString());
queueOutgoing(unsubMsg);
queueInternalOutgoing(unsubMsg);
}

// Assumes the null/empty checks were handled elsewhere
Expand All @@ -718,11 +722,11 @@ NatsSubscription createSubscription(String subject, String queueName, NatsDispat
sub = new NatsSubscription(sid, subject, queueName, this, dispatcher);
subscribers.put(sid, sub);

sendSubscriptionMessage(sid, subject, queueName);
sendSubscriptionMessage(sid, subject, queueName, false);
return sub;
}

void sendSubscriptionMessage(CharSequence sid, String subject, String queueName) {
void sendSubscriptionMessage(CharSequence sid, String subject, String queueName, boolean treatAsInternal) {
if (!isConnected()) {
return;// We will setup sub on reconnect or ignore
}
Expand All @@ -740,7 +744,12 @@ void sendSubscriptionMessage(CharSequence sid, String subject, String queueName)
protocolBuilder.append(" ");
protocolBuilder.append(sid);
NatsMessage subMsg = new NatsMessage(protocolBuilder.toString());
queueOutgoing(subMsg);

if (treatAsInternal) {
queueInternalOutgoing(subMsg);
} else {
queueOutgoing(subMsg);
}
}

String createInbox() {
Expand Down Expand Up @@ -965,14 +974,22 @@ void sendConnect(String serverURI) {
String connectOptions = this.options.buildProtocolConnectOptionsString(serverURI, info.isAuthRequired());
connectString.append(connectOptions);
NatsMessage msg = new NatsMessage(connectString.toString());
queueOutgoing(msg);
queueInternalOutgoing(msg);
}

CompletableFuture<Boolean> sendPing() {
return this.sendPing(true);
}

CompletableFuture<Boolean> softPing() {
return this.sendPing(false);
}

// Send a ping request and push a pong future on the queue.
// futures are completed in order, keep this one if a thread wants to wait
// for a specific pong. Note, if no pong returns the wait will not return
// without setting a timeout.
CompletableFuture<Boolean> sendPing() {
CompletableFuture<Boolean> sendPing(boolean treatAsInternal) {
int max = this.options.getMaxPingsOut();

if (!isConnectedOrConnecting()) {
Expand All @@ -989,14 +1006,20 @@ CompletableFuture<Boolean> sendPing() {
CompletableFuture<Boolean> pongFuture = new CompletableFuture<>();
NatsMessage msg = new NatsMessage(NatsConnection.OP_PING);
pongQueue.add(pongFuture);
queueOutgoing(msg);

if (treatAsInternal) {
queueInternalOutgoing(msg);
} else {
queueOutgoing(msg);
}

this.statistics.incrementPingCount();
return pongFuture;
}

void sendPong() {
NatsMessage msg = new NatsMessage(NatsConnection.OP_PONG);
queueOutgoing(msg);
queueInternalOutgoing(msg);
}

// Called by the reader
Expand Down Expand Up @@ -1086,6 +1109,13 @@ void queueOutgoing(NatsMessage msg) {
this.writer.queue(msg);
}

void queueInternalOutgoing(NatsMessage msg) {
if (msg.getControlLineLength() > this.options.getMaxControlLine()) {
throw new IllegalArgumentException("Control line is too long");
}
this.writer.queueInternalMessage(msg);
}

void deliverMessage(NatsMessage msg) {
this.statistics.incrementInMsgs();
this.statistics.incrementInBytes(msg.getSizeInBytes());
Expand Down
29 changes: 27 additions & 2 deletions src/main/java/io/nats/client/impl/NatsConnectionWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,25 @@ class NatsConnectionWriter implements Runnable {
private CompletableFuture<Boolean> stopped;
private Future<DataPort> dataPortFuture;
private final AtomicBoolean running;
private final AtomicBoolean reconnectMode;

private byte[] sendBuffer;

private MessageQueue outgoing;
private MessageQueue reconnectOutgoing;

NatsConnectionWriter(NatsConnection connection) {
this.connection = connection;

this.running = new AtomicBoolean(false);
this.reconnectMode = new AtomicBoolean(false);
this.stopped = new CompletableFuture<>();
this.stopped.complete(Boolean.TRUE); // we are stopped on creation

this.sendBuffer = new byte[connection.getOptions().getBufferSize()];

outgoing = new MessageQueue(true);
reconnectOutgoing = new MessageQueue(true);
}

// Should only be called if the current thread has exited.
Expand All @@ -68,6 +72,7 @@ void start(Future<DataPort> dataPortFuture) {
Future<Boolean> stop() {
this.running.set(false);
this.outgoing.pause();
this.reconnectOutgoing.pause();

// Clear old ping/pong requests
byte[] pingRequest = NatsConnection.OP_PING.getBytes(StandardCharsets.UTF_8);
Expand All @@ -80,16 +85,24 @@ Future<Boolean> stop() {

public void run() {
Duration waitForMessage = Duration.ofMinutes(2); // This can be long since no one is sending
Duration reconnectWait = Duration.ofMillis(1); // This can be long since no one is sending
long maxMessages = 1000;

try {
DataPort dataPort = this.dataPortFuture.get(); // Will wait for the future to complete
NatsStatistics stats = this.connection.getNatsStatistics();
this.outgoing.resume();
this.reconnectOutgoing.resume();

while (this.running.get()) {
int sendPosition = 0;
NatsMessage msg = this.outgoing.accumulate(this.sendBuffer.length, maxMessages, waitForMessage);
NatsMessage msg = null;

if (reconnectMode.get()) {
msg = this.reconnectOutgoing.accumulate(this.sendBuffer.length, maxMessages, reconnectWait);
} else {
msg = this.outgoing.accumulate(this.sendBuffer.length, maxMessages, waitForMessage);
}

if (msg == null) { // Make sure we are still running
continue;
Expand Down Expand Up @@ -149,11 +162,23 @@ public void run() {
}
}

void setReconnectMode(boolean tf) {
reconnectMode.set(tf);
}

boolean canQueue(NatsMessage msg, long maxSize) {
return (maxSize <= 0 || (outgoing.sizeInBytes() + msg.getSizeInBytes()) < maxSize);
}

void queue(NatsMessage msg) {
outgoing.push(msg);
this.outgoing.push(msg);
}

void queueInternalMessage(NatsMessage msg) {
if (this.reconnectMode.get()) {
this.reconnectOutgoing.push(msg);
} else {
this.outgoing.push(msg);
}
}
}
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/impl/NatsDispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ MessageQueue getMessageQueue() {

void resendSubscriptions() {
this.subscriptions.forEach((id, sub)->{
this.connection.sendSubscriptionMessage(sub.getSID(), sub.getSubject(), sub.getQueueName());
this.connection.sendSubscriptionMessage(sub.getSID(), sub.getSubject(), sub.getQueueName(), true);
});
}

Expand Down
Loading

0 comments on commit 061d76b

Please sign in to comment.