From c41d96009c6de083040d2861a3f4f3fd5485d766 Mon Sep 17 00:00:00 2001 From: Scott Fauerbach Date: Mon, 4 Jul 2022 14:30:05 -0400 Subject: [PATCH] =?UTF-8?q?revert=20ConsumerConfiguration=20changes=20wher?= =?UTF-8?q?e=20some=20fields=20were=20downgrade=E2=80=A6=20(#685)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 12 +-- build.gradle | 2 +- .../java/io/nats/examples/ExampleArgs.java | 10 +++ src/examples/java/io/nats/examples/README.md | 4 +- .../nats/examples/jetstream/NatsJsUtils.java | 26 +++++-- .../io/nats/client/JetStreamSubscription.java | 6 +- .../client/api/ConsumerConfiguration.java | 77 ++++++++++--------- .../api/ConsumerConfigurationTests.java | 14 ++-- 8 files changed, 86 insertions(+), 65 deletions(-) diff --git a/README.md b/README.md index 2b53cd735..44f1d6707 100644 --- a/README.md +++ b/README.md @@ -77,9 +77,9 @@ The java-nats client is provided in a single jar file, with a single external de ### Downloading the Jar -You can download the latest jar at [https://search.maven.org/remotecontent?filepath=io/nats/jnats/2.15.4/jnats-2.15.4.jar](https://search.maven.org/remotecontent?filepath=io/nats/jnats/2.15.4/jnats-2.15.4.jar). +You can download the latest jar at [https://search.maven.org/remotecontent?filepath=io/nats/jnats/2.15.5/jnats-2.15.5.jar](https://search.maven.org/remotecontent?filepath=io/nats/jnats/2.15.5/jnats-2.15.5.jar). -The examples are available at [https://search.maven.org/remotecontent?filepath=io/nats/jnats/2.15.4/jnats-2.15.4-examples.jar](https://search.maven.org/remotecontent?filepath=io/nats/jnats/2.15.4/jnats-2.15.4-examples.jar). +The examples are available at [https://search.maven.org/remotecontent?filepath=io/nats/jnats/2.15.5/jnats-2.15.5-examples.jar](https://search.maven.org/remotecontent?filepath=io/nats/jnats/2.15.5/jnats-2.15.5-examples.jar). To use NKeys, you will need the ed25519 library, which can be downloaded at [https://repo1.maven.org/maven2/net/i2p/crypto/eddsa/0.3.0/eddsa-0.3.0.jar](https://repo1.maven.org/maven2/net/i2p/crypto/eddsa/0.3.0/eddsa-0.3.0.jar). @@ -89,7 +89,7 @@ The NATS client is available in the Maven central repository, and can be importe ```groovy dependencies { - implementation 'io.nats:jnats:2.15.4' + implementation 'io.nats:jnats:2.15.5' } ``` @@ -115,7 +115,7 @@ repositories { } dependencies { - implementation 'io.nats:jnats:2.15.4-SNAPSHOT' + implementation 'io.nats:jnats:2.15.5-SNAPSHOT' } ``` @@ -127,7 +127,7 @@ The NATS client is available on the Maven central repository, and can be importe io.nats jnats - 2.15.4 + 2.15.5 ``` @@ -161,7 +161,7 @@ If you need a snapshot version, you must enable snapshots and change your depend io.nats jnats - 2.15.4-SNAPSHOT + 2.15.5-SNAPSHOT ``` diff --git a/build.gradle b/build.gradle index 8b3b8a6c2..677266b2f 100644 --- a/build.gradle +++ b/build.gradle @@ -17,7 +17,7 @@ plugins { // src/main/java/io/nats/client/Nats.java // src/main/java/io/nats/client/package-info.java // CHANGELOG.md -def jarVersion = "2.15.4" +def jarVersion = "2.15.5" def isRelease = System.getenv("BUILD_EVENT") == "release" diff --git a/src/examples/java/io/nats/examples/ExampleArgs.java b/src/examples/java/io/nats/examples/ExampleArgs.java index b86139f27..a388fc2f0 100644 --- a/src/examples/java/io/nats/examples/ExampleArgs.java +++ b/src/examples/java/io/nats/examples/ExampleArgs.java @@ -26,6 +26,7 @@ public enum Trail {MESSAGE, COUNT, QUEUE_AND_COUNT} public String queue; public String message; public int msgCount = Integer.MIN_VALUE; + public int msgSize = Integer.MIN_VALUE; private boolean msgCountUnlimitedFlag; public int subCount = Integer.MIN_VALUE; public String stream; @@ -140,6 +141,9 @@ private void handleKeyedArg(String key, String value) { case "-mcnt": msgCount = Integer.parseInt(value); break; + case "-msize": + msgSize = Integer.parseInt(value); + break; case "-scnt": subCount = Integer.parseInt(value); break; @@ -188,6 +192,7 @@ public void displayBanner() { _banner("durable", durable); _banner("deliver", deliverSubject); _banner("msgCount", msgCount, msgCountUnlimitedFlag); + _banner("msgSize", msgSize); _banner("subCount", subCount); _banner("pullSize", pullSize); _banner("Headers", headers == null || headers.size() == 0 ? Integer.MIN_VALUE : headers.size()); @@ -256,6 +261,11 @@ public Builder defaultMsgCount(int msgCount, boolean unlimitedFlag) { return this; } + public Builder defaultMsgSize(int msgSize) { + ea.msgSize = msgSize; + return this; + } + public Builder defaultSubCount(int subCount) { ea.subCount = subCount; return this; diff --git a/src/examples/java/io/nats/examples/README.md b/src/examples/java/io/nats/examples/README.md index 375e82e27..341c173f7 100644 --- a/src/examples/java/io/nats/examples/README.md +++ b/src/examples/java/io/nats/examples/README.md @@ -90,7 +90,7 @@ You can also just insert some code before the arguments are processed to set the In the examples, the usage will show `java -cp ...` Make sure you add both the client library and examples into the classpath. For example: ```bash -java -cp build/libs/jnats-2.15.4-SNAPSHOT.jar:build/libs/jnats-2.15.4-SNAPSHOT-examples.jar io.nats.examples.NatsPub nats://localhost:4222 test "hello world" +java -cp build/libs/jnats-2.15.5-SNAPSHOT.jar:build/libs/jnats-2.15.5-SNAPSHOT-examples.jar io.nats.examples.NatsPub nats://localhost:4222 test "hello world" ``` ### Some examples depend on others @@ -108,7 +108,7 @@ java -Djavax.net.ssl.keyStore=src/test/resources/keystore.jks -Djavax.net.ssl.ke To run with the completely unverified client: ```bash -java -cp build/libs/jnats-2.15.4-SNAPSHOT.jar:build/libs/jnats-2.15.4-SNAPSHOT-examples.jar io.nats.examples.NatsSub opentls://localhost:4443 test 3 +java -cp build/libs/jnats-2.15.5-SNAPSHOT.jar:build/libs/jnats-2.15.5-SNAPSHOT-examples.jar io.nats.examples.NatsSub opentls://localhost:4443 test 3 ``` There are a set tls configuration for the server in the test files that can be used to run the NATS server. diff --git a/src/examples/java/io/nats/examples/jetstream/NatsJsUtils.java b/src/examples/java/io/nats/examples/jetstream/NatsJsUtils.java index f15767235..4d8166ec9 100644 --- a/src/examples/java/io/nats/examples/jetstream/NatsJsUtils.java +++ b/src/examples/java/io/nats/examples/jetstream/NatsJsUtils.java @@ -150,29 +150,41 @@ public static StreamInfo createStreamOrUpdateSubjects(Connection nc, String stre // PUBLISH // ---------------------------------------------------------------------------------------------------- public static void publish(Connection nc, String subject, int count) throws IOException, JetStreamApiException { - publish(nc.jetStream(), subject, "data", count, false); + publish(nc.jetStream(), subject, "data", count, -1, false); } public static void publish(JetStream js, String subject, int count) throws IOException, JetStreamApiException { - publish(js, subject, "data", count, false); + publish(js, subject, "data", count, -1, false); } public static void publish(JetStream js, String subject, String prefix, int count) throws IOException, JetStreamApiException { - publish(js, subject, prefix, count, true); + publish(js, subject, prefix, count, -1, true); } public static void publish(JetStream js, String subject, String prefix, int count, boolean verbose) throws IOException, JetStreamApiException { + publish(js, subject, prefix, count, -1, verbose); + } + + public static void publish(JetStream js, String subject, String prefix, int count, int msgSize, boolean verbose) throws IOException, JetStreamApiException { if (verbose) { System.out.print("Publish ->"); } for (int x = 1; x <= count; x++) { - String data = prefix + x; + String text = prefix + x; if (verbose) { - System.out.print(" " + data); + System.out.print(" " + text); + } + + byte[] data = text.getBytes(StandardCharsets.US_ASCII); + if (msgSize > data.length) { + byte[] larger = new byte[msgSize]; + System.arraycopy(data, 0, larger, 0, data.length); + data = larger; } - Message msg = NatsMessage.builder() + Message msg = + NatsMessage.builder() .subject(subject) - .data(data.getBytes(StandardCharsets.US_ASCII)) + .data(data) .build(); js.publish(msg); } diff --git a/src/main/java/io/nats/client/JetStreamSubscription.java b/src/main/java/io/nats/client/JetStreamSubscription.java index b60acb725..c21952844 100644 --- a/src/main/java/io/nats/client/JetStreamSubscription.java +++ b/src/main/java/io/nats/client/JetStreamSubscription.java @@ -170,7 +170,7 @@ public interface JetStreamSubscription extends Subscription { * @return the message iterator * @throws IllegalStateException if not a pull subscription. */ - Iterator iterate(final int batchSize, Duration maxWait); + Iterator iterate(int batchSize, Duration maxWait); /** * Prepares an iterator. This uses pullExpiresIn under the covers, @@ -187,7 +187,7 @@ public interface JetStreamSubscription extends Subscription { * @return the message iterator * @throws IllegalStateException if not a pull subscription. */ - Iterator iterate(final int batchSize, long maxWaitMillis); + Iterator iterate(int batchSize, long maxWaitMillis); /** * Prepares a reader. A reader looks like a push sync subscription, @@ -204,7 +204,7 @@ public interface JetStreamSubscription extends Subscription { * @return the message iterator * @throws IllegalStateException if not a pull subscription. */ - JetStreamReader reader(final int batchSize, int repullAt); + JetStreamReader reader(int batchSize, int repullAt); /** * Gets information about the consumer behind this subscription. diff --git a/src/main/java/io/nats/client/api/ConsumerConfiguration.java b/src/main/java/io/nats/client/api/ConsumerConfiguration.java index 7c992fe78..0a438ba7b 100644 --- a/src/main/java/io/nats/client/api/ConsumerConfiguration.java +++ b/src/main/java/io/nats/client/api/ConsumerConfiguration.java @@ -52,7 +52,7 @@ public class ConsumerConfiguration implements JsonSerializable { public static final long DURATION_UNSET_LONG = 0; public static final long DURATION_MIN_LONG = 1; public static final int STANDARD_MIN = 0; - public static final int MAX_DELIVER_MIN = 1; + public static final long MAX_DELIVER_MIN = 1; public static final long MIN_IDLE_HEARTBEAT_NANOS = MIN_IDLE_HEARTBEAT.toNanos(); public static final long MIN_IDLE_HEARTBEAT_MILLIS = MIN_IDLE_HEARTBEAT.toMillis(); @@ -72,12 +72,12 @@ public class ConsumerConfiguration implements JsonSerializable { protected final Duration maxExpires; protected final Duration inactiveThreshold; protected final Long startSeq; // server side this is unsigned - protected final Integer maxDeliver; + protected final Long maxDeliver; protected final Long rateLimit; // server side this is unsigned - protected final Integer maxAckPending; - protected final Integer maxPullWaiting; - protected final Integer maxBatch; - protected final Integer maxBytes; + protected final Long maxAckPending; + protected final Long maxPullWaiting; + protected final Long maxBatch; + protected final Long maxBytes; protected final Boolean flowControl; protected final Boolean headersOnly; protected final List backoff; @@ -134,12 +134,12 @@ protected ConsumerConfiguration(ConsumerConfiguration cc) { inactiveThreshold = JsonUtils.readNanos(json, INACTIVE_THRESHOLD_RE); startSeq = JsonUtils.readLong(json, OPT_START_SEQ_RE); - maxDeliver = JsonUtils.readInteger(json, MAX_DELIVER_RE); + maxDeliver = JsonUtils.readLong(json, MAX_DELIVER_RE); rateLimit = JsonUtils.readLong(json, RATE_LIMIT_BPS_RE); - maxAckPending = JsonUtils.readInteger(json, MAX_ACK_PENDING_RE); - maxPullWaiting = JsonUtils.readInteger(json, MAX_WAITING_RE); - maxBatch = JsonUtils.readInteger(json, MAX_BATCH_RE); - maxBytes = JsonUtils.readInteger(json, MAX_BYTES_RE); + maxAckPending = JsonUtils.readLong(json, MAX_ACK_PENDING_RE); + maxPullWaiting = JsonUtils.readLong(json, MAX_WAITING_RE); + maxBatch = JsonUtils.readLong(json, MAX_BATCH_RE); + maxBytes = JsonUtils.readLong(json, MAX_BYTES_RE); flowControl = JsonUtils.readBoolean(json, FLOW_CONTROL_RE, null); headersOnly = JsonUtils.readBoolean(json, HEADERS_ONLY_RE, null); @@ -290,7 +290,7 @@ public Duration getAckWait() { * Gets the max delivery amount of this consumer configuration. * @return the max delivery amount. */ - public int getMaxDeliver() { + public long getMaxDeliver() { return getOrUnset(maxDeliver); } @@ -322,7 +322,7 @@ public long getRateLimit() { * Gets the maximum ack pending configuration. * @return maximum ack pending. */ - public int getMaxAckPending() { + public long getMaxAckPending() { return getOrUnset(maxAckPending); } @@ -357,7 +357,7 @@ public boolean isFlowControl() { * Get the number of pulls that can be outstanding on a pull consumer * @return the max pull waiting */ - public int getMaxPullWaiting() { + public long getMaxPullWaiting() { return getOrUnset(maxPullWaiting); } @@ -373,7 +373,7 @@ public boolean isHeadersOnly() { * Get the max batch size for the server to allow on pull requests. * @return the max batch size */ - public int getMaxBatch() { + public long getMaxBatch() { return getOrUnset(maxBatch); } @@ -381,7 +381,7 @@ public int getMaxBatch() { * Get the max bytes size for the server to allow on pull requests. * @return the max byte size */ - public int getMaxBytes() { + public long getMaxBytes() { return getOrUnset(maxBytes); } @@ -548,12 +548,12 @@ public static class Builder { private Duration inactiveThreshold; private Long startSeq; - private Integer maxDeliver; + private Long maxDeliver; private Long rateLimit; - private Integer maxAckPending; - private Integer maxPullWaiting; - private Integer maxBatch; - private Integer maxBytes; + private Long maxAckPending; + private Long maxPullWaiting; + private Long maxBatch; + private Long maxBytes; private Boolean flowControl; private Boolean headersOnly; @@ -712,7 +712,7 @@ public Builder ackWait(long timeoutMillis) { * @return Builder */ public Builder maxDeliver(Long maxDeliver) { - this.maxDeliver = normalizeToInt(maxDeliver, MAX_DELIVER_MIN); + this.maxDeliver = normalize(maxDeliver, MAX_DELIVER_MIN); return this; } @@ -722,7 +722,7 @@ public Builder maxDeliver(Long maxDeliver) { * @return Builder */ public Builder maxDeliver(long maxDeliver) { - this.maxDeliver = normalizeToInt(maxDeliver, MAX_DELIVER_MIN); + this.maxDeliver = normalize(maxDeliver, MAX_DELIVER_MIN); return this; } @@ -782,7 +782,7 @@ public Builder rateLimit(long bitsPerSecond) { * @return Builder */ public Builder maxAckPending(Long maxAckPending) { - this.maxAckPending = normalizeToInt(maxAckPending, STANDARD_MIN); + this.maxAckPending = normalize(maxAckPending, STANDARD_MIN); return this; } @@ -792,7 +792,7 @@ public Builder maxAckPending(Long maxAckPending) { * @return Builder */ public Builder maxAckPending(long maxAckPending) { - this.maxAckPending = normalizeToInt(maxAckPending, STANDARD_MIN); + this.maxAckPending = normalize(maxAckPending, STANDARD_MIN); return this; } @@ -905,7 +905,7 @@ public Builder inactiveThreshold(long inactiveThreshold) { * @return Builder */ public Builder maxPullWaiting(Long maxPullWaiting) { - this.maxPullWaiting = normalizeToInt(maxPullWaiting, STANDARD_MIN); + this.maxPullWaiting = normalize(maxPullWaiting, STANDARD_MIN); return this; } @@ -915,7 +915,7 @@ public Builder maxPullWaiting(Long maxPullWaiting) { * @return Builder */ public Builder maxPullWaiting(long maxPullWaiting) { - this.maxPullWaiting = normalizeToInt(maxPullWaiting, STANDARD_MIN); + this.maxPullWaiting = normalize(maxPullWaiting, STANDARD_MIN); return this; } @@ -925,7 +925,7 @@ public Builder maxPullWaiting(long maxPullWaiting) { * @return Builder */ public Builder maxBatch(Long maxBatch) { - this.maxBatch = normalizeToInt(maxBatch, STANDARD_MIN); + this.maxBatch = normalize(maxBatch, STANDARD_MIN); return this; } @@ -935,7 +935,7 @@ public Builder maxBatch(Long maxBatch) { * @return Builder */ public Builder maxBatch(long maxBatch) { - this.maxBatch = normalizeToInt(maxBatch, STANDARD_MIN); + this.maxBatch = normalize(maxBatch, STANDARD_MIN); return this; } @@ -945,7 +945,7 @@ public Builder maxBatch(long maxBatch) { * @return Builder */ public Builder maxBytes(Long maxBytes) { - this.maxBytes = normalizeToInt(maxBytes, STANDARD_MIN); + this.maxBytes = normalize(maxBytes, STANDARD_MIN); return this; } @@ -955,7 +955,7 @@ public Builder maxBytes(Long maxBytes) { * @return Builder */ public Builder maxBytes(long maxBytes) { - this.maxBytes = normalizeToInt(maxBytes, STANDARD_MIN); + this.maxBytes = normalize(maxBytes, STANDARD_MIN); return this; } @@ -1067,6 +1067,11 @@ protected static int getOrUnset(Integer val) return val == null ? INTEGER_UNSET : val; } + protected static long getOrUnset(Long val) + { + return val == null ? LONG_UNSET : val; + } + protected static long getOrUnsetUlong(Long val) { return val == null || val < 0 ? ULONG_UNSET : val; @@ -1077,20 +1082,16 @@ protected static Duration getOrUnset(Duration val) return val == null ? DURATION_UNSET : val; } - protected static Integer normalizeToInt(Long l, int min) { + protected static Long normalize(Long l, long min) { if (l == null) { return null; } if (l < min) { - return INTEGER_UNSET; - } - - if (l > Integer.MAX_VALUE) { - return Integer.MAX_VALUE; + return LONG_UNSET; } - return l.intValue(); + return l; } protected static Long normalizeUlong(Long u) diff --git a/src/test/java/io/nats/client/api/ConsumerConfigurationTests.java b/src/test/java/io/nats/client/api/ConsumerConfigurationTests.java index 1f8d9d641..fd11c25c9 100644 --- a/src/test/java/io/nats/client/api/ConsumerConfigurationTests.java +++ b/src/test/java/io/nats/client/api/ConsumerConfigurationTests.java @@ -306,14 +306,12 @@ public void testUtilityMethods() { assertEquals(DURATION_UNSET, ConsumerConfiguration.getOrUnset((Duration)null)); //noinspection ConstantConditions - assertNull(ConsumerConfiguration.normalizeToInt(null, STANDARD_MIN)); - assertEquals(0, ConsumerConfiguration.normalizeToInt(0L, STANDARD_MIN)); - assertEquals(1, ConsumerConfiguration.normalizeToInt(1L, STANDARD_MIN)); - assertEquals(INTEGER_UNSET, ConsumerConfiguration.normalizeToInt((long)INTEGER_UNSET, STANDARD_MIN)); - assertEquals(INTEGER_UNSET, ConsumerConfiguration.normalizeToInt(LONG_UNSET, STANDARD_MIN)); - assertEquals(INTEGER_UNSET, ConsumerConfiguration.normalizeToInt(Long.MIN_VALUE, STANDARD_MIN)); - assertEquals(Integer.MAX_VALUE, ConsumerConfiguration.normalizeToInt((long)Integer.MAX_VALUE, STANDARD_MIN)); - assertEquals(Integer.MAX_VALUE, ConsumerConfiguration.normalizeToInt(Long.MAX_VALUE, STANDARD_MIN)); + assertNull(ConsumerConfiguration.normalize(null, STANDARD_MIN)); + assertEquals(0, ConsumerConfiguration.normalize(0L, STANDARD_MIN)); + assertEquals(1, ConsumerConfiguration.normalize(1L, STANDARD_MIN)); + assertEquals(LONG_UNSET, ConsumerConfiguration.normalize(LONG_UNSET, STANDARD_MIN)); + assertEquals(LONG_UNSET, ConsumerConfiguration.normalize(Long.MIN_VALUE, STANDARD_MIN)); + assertEquals(Long.MAX_VALUE, ConsumerConfiguration.normalize(Long.MAX_VALUE, STANDARD_MIN)); //noinspection ConstantConditions assertNull(ConsumerConfiguration.normalizeUlong(null));