Skip to content

Commit

Permalink
Merge pull request #185 from nats-io/v2.1.2
Browse files Browse the repository at this point in the history
V2.1.2
  • Loading branch information
sasbury authored Oct 22, 2018
2 parents 061d76b + 98a8928 commit ced0af2
Show file tree
Hide file tree
Showing 23 changed files with 307 additions and 156 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ sudo: required
jdk:
- oraclejdk8
- oraclejdk9
- oraclejdk10
- openjdk8
- openjdk9
- openjdk10
Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@

# Change Log

## Version 2.1.2

* [FIXED] #181 - issue with default pending limits on consumers not matching doc
* [FIXED] #179 - added version variable for jars in build.gradle to make it easier to change
* [ADDED] Support for UTF8 subjects

## Version 2.1.1

* [FIXED] Issue with version in Nats.java, also updated deploying.md with checklist
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ Version 2.1 uses a simplified versioning scheme. Any issues will be fixed in the

Previous versions are still available in the repo.

### UTF-8 Subjects

The client protocol spec doesn't explicitly state the encoding on subjects. Some clients use ASCII and some use UTF-8 which matches ASCII for a-Z and 0-9. Until 2.1.2 the 2.0+ version of the Java client used ASCII for performance reasons. As of 2.1.2 you can choose to support UTF-8 subjects via the Options. Keep in mind that there is a small performance penalty for UTF-8 encoding and decoding in benchmarks, but depending on your application this cost may be negligible. Also, keep in mind that not all clients support UTF-8 and test accordingly.

## Installation

The java-nats client is provided in a single jar file, with no external dependencies. See [Building From Source](#building-from-source) for details on building the library.
Expand Down
7 changes: 4 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ plugins {
// Update version here, repeated check-ins not into master will have snapshot on them
def versionMajor = 2
def versionMinor = 1
def versionPatch = 1
def versionPatch = 2
def versionModifier = ""
def jarVersion = "2.1.2"
def branch = System.getenv("TRAVIS_BRANCH");

def getVersionName = { ->
Expand Down Expand Up @@ -70,7 +71,7 @@ osgiClasses {
jar {
manifest {
attributes('Implementation-Title': 'Java Nats',
'Implementation-Version': '2.1.1',
'Implementation-Version': jarVersion,
'Implementation-Vendor': 'nats.io')
}
exclude("io/nats/examples/**")
Expand Down Expand Up @@ -112,7 +113,7 @@ task examplesJar(type: Jar) {
classifier = 'examples'
manifest {
attributes('Implementation-Title': 'Java Nats Examples',
'Implementation-Version': '2.1.1',
'Implementation-Version': jarVersion,
'Implementation-Vendor': 'nats.io')
}
from(sourceSets.main.output) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ public void execute(Options connectOptions) throws InterruptedException {
}
try {
Subscription sub = subConnect.subscribe(subject);
subConnect.flush(Duration.ZERO);
subConnect.flush(Duration.ofSeconds(5));
subReady.complete(null);
go.get();

int count = 0;
while(count < this.getMessageCount()) {
Message msg = sub.nextMessage(Duration.ZERO);
Message msg = sub.nextMessage(Duration.ofSeconds(5));

if (msg != null){
measurements.add(System.nanoTime() - start.get());
Expand Down
156 changes: 89 additions & 67 deletions src/examples/java/io/nats/examples/autobench/NatsAutoBench.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,33 +29,56 @@
*/
public class NatsAutoBench {
static final String usageString =
"\nUsage: java NatsAutoBench [server]"
+ "\n\nUse tls:// or opentls:// to require tls, via the Default SSLContext\n";
"\nUsage: java NatsAutoBench [serverURL] [help] [utf8] [tiny|small|med]"
+ "\n\nUse tls:// or opentls:// to require tls, via the Default SSLContext\n"
+ "\n\ntiny, small and med reduce the number of messages used for tests, which can help on slower machines\n";

public static void main(String args[]) {
String server;

if (args.length == 1) {
server = args[0];
} else if (args.length == 0) {
server = Options.DEFAULT_URL;
} else {
usage();
return;
String server = Options.DEFAULT_URL;
boolean utf8 = false;
int baseMsgs = 100_000;
int latencyMsgs = 5_000;

if (args.length > 0) {
for (String s : args) {
if (s.equals("utf8")) {
utf8 = true;
}else if (s.equals("med")) {
baseMsgs = 50_000;
latencyMsgs = 2_500;
} else if (s.equals("small")) {
baseMsgs = 5_000;
latencyMsgs = 250;
} else if (s.equals("tiny")) {
baseMsgs = 1_000;
latencyMsgs = 50;
} else if (s.equals("nano")) {
baseMsgs = 10;
latencyMsgs = 5;
} else if (s.equals("help")) {
usage();
return;
} else {
server = s;
}
}
}

if ("help".equals(server)) {
usage();
}
System.out.printf("Connecting to gnatsd at %s\n", server);

try {
Options connectOptions = new Options.Builder().
server(server).
connectionTimeout(Duration.ofMillis(1000)).
noReconnect().
build();

List<AutoBenchmark> tests = buildTestList();
Options.Builder builder = new Options.Builder().
server(server).
connectionTimeout(Duration.ofSeconds(1)).
noReconnect();

if (utf8) {
System.out.printf("Enabling UTF-8 subjects\n");
builder.supportUTF8Subjects();
}

Options connectOptions = builder.build();
List<AutoBenchmark> tests = buildTestList(baseMsgs, latencyMsgs);

System.out.println("Running warmup");
runWarmup(connectOptions);
Expand All @@ -72,7 +95,7 @@ public static void main(String args[]) {
// Ask for GC and wait a moment between tests
System.gc();
try {
Thread.sleep(100);
Thread.sleep(500);
} catch (Exception exp) {
// ignore
}
Expand Down Expand Up @@ -111,57 +134,56 @@ public static void runWarmup(Options connectOptions) throws Exception {
}
}

public static List<AutoBenchmark> buildTestList() {
public static List<AutoBenchmark> buildTestList(int baseMsgs, int latencyMsgs) {
ArrayList<AutoBenchmark> tests = new ArrayList<>();

/**/
tests.add(new PubBenchmark("PubOnly 0b", 10_000_000, 0));
tests.add(new PubBenchmark("PubOnly 8b", 10_000_000, 8));
tests.add(new PubBenchmark("PubOnly 32b", 10_000_000, 32));
tests.add(new PubBenchmark("PubOnly 256b", 10_000_000, 256));
tests.add(new PubBenchmark("PubOnly 512b", 10_000_000, 512));
tests.add(new PubBenchmark("PubOnly 1k", 1_000_000, 1024));
tests.add(new PubBenchmark("PubOnly 4k", 500_000, 4*1024));
tests.add(new PubBenchmark("PubOnly 8k", 100_000, 8*1024));

tests.add(new PubSubBenchmark("PubSub 0b", 10_000_000, 0));
tests.add(new PubSubBenchmark("PubSub 8b", 10_000_000, 8));
tests.add(new PubSubBenchmark("PubSub 32b", 10_000_000, 32));
tests.add(new PubSubBenchmark("PubSub 256b", 10_000_000, 256));
tests.add(new PubSubBenchmark("PubSub 512b", 5_000_000, 512));
tests.add(new PubSubBenchmark("PubSub 1k", 1_000_000, 1024));
tests.add(new PubSubBenchmark("PubSub 4k", 100_000, 4*1024));
tests.add(new PubSubBenchmark("PubSub 8k", 100_000, 8*1024));
tests.add(new PubBenchmark("PubOnly 0b", 100 * baseMsgs, 0));
tests.add(new PubBenchmark("PubOnly 8b", 100 * baseMsgs, 8));
tests.add(new PubBenchmark("PubOnly 32b", 100 * baseMsgs, 32));
tests.add(new PubBenchmark("PubOnly 256b", 100 * baseMsgs, 256));
tests.add(new PubBenchmark("PubOnly 512b", 100 * baseMsgs, 512));
tests.add(new PubBenchmark("PubOnly 1k", 10 * baseMsgs, 1024));
tests.add(new PubBenchmark("PubOnly 4k", 5 * baseMsgs, 4*1024));
tests.add(new PubBenchmark("PubOnly 8k", baseMsgs, 8*1024));

tests.add(new PubSubBenchmark("PubSub 0b", 100 * baseMsgs, 0));
tests.add(new PubSubBenchmark("PubSub 8b", 100 * baseMsgs, 8));
tests.add(new PubSubBenchmark("PubSub 32b", 100 * baseMsgs, 32));
tests.add(new PubSubBenchmark("PubSub 256b", 100 * baseMsgs, 256));
tests.add(new PubSubBenchmark("PubSub 512b", 50 * baseMsgs, 512));
tests.add(new PubSubBenchmark("PubSub 1k", 10 * baseMsgs, 1024));
tests.add(new PubSubBenchmark("PubSub 4k", baseMsgs, 4*1024));
tests.add(new PubSubBenchmark("PubSub 8k", baseMsgs, 8*1024));

tests.add(new PubDispatchBenchmark("PubDispatch 0b", 100 * baseMsgs, 0));
tests.add(new PubDispatchBenchmark("PubDispatch 8b", 100 * baseMsgs, 8));
tests.add(new PubDispatchBenchmark("PubDispatch 32b", 100 * baseMsgs, 32));
tests.add(new PubDispatchBenchmark("PubDispatch 256b", 100 * baseMsgs, 256));
tests.add(new PubDispatchBenchmark("PubDispatch 512b", 50 * baseMsgs, 512));
tests.add(new PubDispatchBenchmark("PubDispatch 1k", 10 * baseMsgs, 1024));
tests.add(new PubDispatchBenchmark("PubDispatch 4k", baseMsgs, 4*1024));
tests.add(new PubDispatchBenchmark("PubDispatch 8k", baseMsgs, 8*1024));

tests.add(new PubDispatchBenchmark("PubDispatch 0b", 10_000_000, 0));
tests.add(new PubDispatchBenchmark("PubDispatch 8b", 10_000_000, 8));
tests.add(new PubDispatchBenchmark("PubDispatch 32b", 10_000_000, 32));
tests.add(new PubDispatchBenchmark("PubDispatch 256b", 10_000_000, 256));
tests.add(new PubDispatchBenchmark("PubDispatch 512b", 5_000_000, 512));
tests.add(new PubDispatchBenchmark("PubDispatch 1k", 1_000_000, 1024));
tests.add(new PubDispatchBenchmark("PubDispatch 4k", 100_000, 4*1024));
tests.add(new PubDispatchBenchmark("PubDispatch 8k", 100_000, 8*1024));

// Request reply is a 4 message trip, and runs the full loop before sending another message
// so we run fewer because the client cannot batch any socket calls to the server together
tests.add(new ReqReplyBenchmark("ReqReply 0b", 20_000, 0));
tests.add(new ReqReplyBenchmark("ReqReply 8b", 20_000, 8));
tests.add(new ReqReplyBenchmark("ReqReply 32b", 10_000, 32));
tests.add(new ReqReplyBenchmark("ReqReply 256b", 10_000, 256));
tests.add(new ReqReplyBenchmark("ReqReply 512b", 10_000, 512));
tests.add(new ReqReplyBenchmark("ReqReply 1k", 10_000, 1024));
tests.add(new ReqReplyBenchmark("ReqReply 4k", 10_000, 4*1024));
tests.add(new ReqReplyBenchmark("ReqReply 8k", 10_000, 8*1024));

int latencyTests = 5_000;
tests.add(new LatencyBenchmark("Latency 0b", latencyTests, 0));
tests.add(new LatencyBenchmark("Latency 8b", latencyTests, 8));
tests.add(new LatencyBenchmark("Latency 32b", latencyTests, 32));
tests.add(new LatencyBenchmark("Latency 256b", latencyTests, 256));
tests.add(new LatencyBenchmark("Latency 512b", latencyTests, 512));
tests.add(new LatencyBenchmark("Latency 1k", latencyTests, 1024));
tests.add(new LatencyBenchmark("Latency 4k", latencyTests, 4 * 1024));
tests.add(new LatencyBenchmark("Latency 8k", latencyTests, 8 * 1024));
tests.add(new ReqReplyBenchmark("ReqReply 0b", baseMsgs / 5, 0));
tests.add(new ReqReplyBenchmark("ReqReply 8b", baseMsgs / 5, 8));
tests.add(new ReqReplyBenchmark("ReqReply 32b", baseMsgs / 10, 32));
tests.add(new ReqReplyBenchmark("ReqReply 256b", baseMsgs / 10, 256));
tests.add(new ReqReplyBenchmark("ReqReply 512b", baseMsgs / 10, 512));
tests.add(new ReqReplyBenchmark("ReqReply 1k", baseMsgs / 10, 1024));
tests.add(new ReqReplyBenchmark("ReqReply 4k", baseMsgs / 10, 4*1024));
tests.add(new ReqReplyBenchmark("ReqReply 8k", baseMsgs / 10, 8*1024));

tests.add(new LatencyBenchmark("Latency 0b", latencyMsgs, 0));
tests.add(new LatencyBenchmark("Latency 8b", latencyMsgs, 8));
tests.add(new LatencyBenchmark("Latency 32b", latencyMsgs, 32));
tests.add(new LatencyBenchmark("Latency 256b", latencyMsgs, 256));
tests.add(new LatencyBenchmark("Latency 512b", latencyMsgs, 512));
tests.add(new LatencyBenchmark("Latency 1k", latencyMsgs, 1024));
tests.add(new LatencyBenchmark("Latency 4k", latencyMsgs, 4 * 1024));
tests.add(new LatencyBenchmark("Latency 8k", latencyMsgs, 8 * 1024));
/**/

return tests;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void execute(Options connectOptions) throws InterruptedException {
for(int i = 0; i < this.getMessageCount(); i++) {
nc.publish(subject, payload);
}
try {nc.flush(Duration.ZERO);}catch(Exception e){}
try {nc.flush(Duration.ofSeconds(5));}catch(Exception e){}
this.endTiming();
} finally {
nc.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void executeWithLimiter(Options connectOptions) throws InterruptedException {
}
});
d.subscribe(subject);
subConnect.flush(Duration.ZERO);
subConnect.flush(Duration.ofSeconds(5));
subReady.complete(null);

// For simplicity the test doesn't have a connection listener so just loop
Expand Down Expand Up @@ -103,7 +103,7 @@ void executeWithLimiter(Options connectOptions) throws InterruptedException {
pubConnect.publish(subject, payload);
this.adjustAndSleep(pubConnect);
}
try {pubConnect.flush(Duration.ZERO);}catch(Exception e){}
try {pubConnect.flush(Duration.ofSeconds(5));}catch(Exception e){}

pubDone.complete(null);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,16 @@ void executeWithLimiter(Options connectOptions) throws InterruptedException {
}
try {
Subscription sub = subConnect.subscribe(subject);
subConnect.flush(Duration.ZERO);
subConnect.flush(Duration.ofSeconds(5));
subReady.complete(null);

while(count < this.getMessageCount()) {
Message msg = sub.nextMessage(Duration.ZERO);
Message msg = sub.nextMessage(Duration.ofSeconds(5));

if (msg != null){
count++;
} else {
throw new Exception("No messages within timeout.");
}
}

Expand Down Expand Up @@ -88,7 +90,7 @@ void executeWithLimiter(Options connectOptions) throws InterruptedException {
pubConnect.publish(subject, payload);
adjustAndSleep(pubConnect);
}
try {pubConnect.flush(Duration.ZERO);}catch(Exception e){}
try {pubConnect.flush(Duration.ofSeconds(5));}catch(Exception e){}

pubDone.complete(null);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,20 @@ public void execute(Options connectOptions) throws InterruptedException {
}
try {
Subscription sub = replyConnect.subscribe(subject);
replyConnect.flush(Duration.ZERO);
replyConnect.flush(Duration.ofSeconds(5));
replyReady.complete(null);

int count = 0;
while(count < this.getMessageCount()) {
Message msg = sub.nextMessage(Duration.ZERO);
Message msg = sub.nextMessage(Duration.ofSeconds(5));

if (msg != null){
replyConnect.publish(msg.getReplyTo(), payload);
count++;
}
}
replyDone.complete(null);
replyConnect.flush(Duration.ZERO);
replyConnect.flush(Duration.ofSeconds(5));

} catch (Exception exp) {
this.setException(exp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,12 @@ void adjustAndSleep(Connection nc) throws InterruptedException {
delay = delay * 1000; // we are doing this every 1000 messages

long nanos = (long)(delay * 1e9);

LockSupport.parkNanos(nanos);

// Flush small messages regularly
if (this.getMessageSize() < 64 && count != 0 && count % 100_000 == 0) {
try {nc.flush(Duration.ZERO);}catch(Exception e){}
try {nc.flush(Duration.ofSeconds(5));}catch(Exception e){}
}
}
}
4 changes: 2 additions & 2 deletions src/examples/java/io/nats/examples/benchmark/NatsBench.java
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public void run() {
nc.publish(subject, payload);
sent.incrementAndGet();
}
nc.flush(Duration.ZERO);
nc.flush(Duration.ofSeconds(5));
long end = System.nanoTime();

bench.addPubSample(new Sample(numMsgs, size, start.get(), end, nc.getStatistics()));
Expand Down Expand Up @@ -265,7 +265,7 @@ void adjustAndSleep(Connection nc) throws InterruptedException {

// Flush small messages regularly
if (this.size < 64 && count != 0 && count % 100_000 == 0) {
try {nc.flush(Duration.ZERO);}catch(Exception e){}
try {nc.flush(Duration.ofSeconds(5));}catch(Exception e){}
}
}
}
Expand Down
Loading

0 comments on commit ced0af2

Please sign in to comment.