From dde7c17a2909718e6ad605025c4934910b21edd1 Mon Sep 17 00:00:00 2001 From: Scott Fauerbach Date: Wed, 14 Jun 2023 19:26:27 -0400 Subject: [PATCH] Simplification Tuning (#926) --- .../nats/examples/jetstream/NatsJsUtils.java | 4 +++ .../jetstream/simple/ContextExample.java | 2 +- .../jetstream/simple/FetchBytesExample.java | 8 ++--- .../simple/FetchMessagesExample.java | 8 ++--- .../simple/IterableConsumerExample.java | 8 ++--- .../simple/MessageConsumerExample.java | 18 +++-------- .../jetstream/simple/NextExample.java | 2 +- .../java/io/nats/client/MessageConsumer.java | 8 ++--- .../java/io/nats/client/StreamContext.java | 4 +-- .../nats/client/impl/NatsMessageConsumer.java | 2 +- .../client/impl/NatsMessageConsumerBase.java | 31 ++++++++++++------- .../nats/client/impl/NatsStreamContext.java | 2 +- .../nats/client/impl/SimplificationTests.java | 10 +++--- 13 files changed, 50 insertions(+), 57 deletions(-) diff --git a/src/examples/java/io/nats/examples/jetstream/NatsJsUtils.java b/src/examples/java/io/nats/examples/jetstream/NatsJsUtils.java index 46ba351a7..4932e0513 100644 --- a/src/examples/java/io/nats/examples/jetstream/NatsJsUtils.java +++ b/src/examples/java/io/nats/examples/jetstream/NatsJsUtils.java @@ -419,6 +419,10 @@ public static int count408s(List messages) { return count; } + public static void createCleanMemStream(Connection nc, String stream, String... subs) throws IOException, JetStreamApiException { + createCleanMemStream(nc.jetStreamManagement(), stream, subs); + } + public static void createCleanMemStream(JetStreamManagement jsm, String stream, String... subs) throws IOException, JetStreamApiException { try { jsm.deleteStream(stream); diff --git a/src/examples/java/io/nats/examples/jetstream/simple/ContextExample.java b/src/examples/java/io/nats/examples/jetstream/simple/ContextExample.java index 5b8b20f3a..473d59fda 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/ContextExample.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/ContextExample.java @@ -50,7 +50,7 @@ public static void main(String[] args) { System.out.println("S3. " + streamContext.getStreamInfo()); // when you create a consumer from the stream context you get a ConsumerContext in return - ConsumerContext consumerContext = streamContext.addConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build()); + ConsumerContext consumerContext = streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build()); System.out.println("C1. " + consumerContext.getCachedConsumerInfo()); // get a ConsumerContext from the connection for a pre-existing consumer diff --git a/src/examples/java/io/nats/examples/jetstream/simple/FetchBytesExample.java b/src/examples/java/io/nats/examples/jetstream/simple/FetchBytesExample.java index 91d45c04b..c74b021bd 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/FetchBytesExample.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/FetchBytesExample.java @@ -81,7 +81,7 @@ private static void simpleFetch(Connection nc, JetStream js, String label, int m ConsumerContext consumerContext; try { streamContext = nc.streamContext(STREAM); - consumerContext = streamContext.addConsumer(ConsumerConfiguration.builder().durable(consumerName).build()); + consumerContext = streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(consumerName).build()); } catch (JetStreamApiException | IOException e) { // JetStreamApiException: @@ -108,7 +108,8 @@ private static void simpleFetch(Connection nc, JetStream js, String label, int m // create the consumer then use it int receivedMessages = 0; long receivedBytes = 0; - try (FetchConsumer consumer = consumerContext.fetch(fetchConsumeOptions)){ + try { + FetchConsumer consumer = consumerContext.fetch(fetchConsumeOptions); Message msg = consumer.nextMessage(); while (msg != null) { msg.ack(); @@ -136,9 +137,6 @@ private static void simpleFetch(Connection nc, JetStream js, String label, int m System.err.println("Exception should be handled properly, just exiting here."); System.exit(-1); } - catch (Exception e) { - // For FetchConsumer since it is AutoCloseable - } long elapsed = System.currentTimeMillis() - start; printSummary(receivedMessages, receivedBytes, elapsed); diff --git a/src/examples/java/io/nats/examples/jetstream/simple/FetchMessagesExample.java b/src/examples/java/io/nats/examples/jetstream/simple/FetchMessagesExample.java index ed0a52afb..f5350b014 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/FetchMessagesExample.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/FetchMessagesExample.java @@ -75,7 +75,7 @@ private static void simpleFetch(Connection nc, JetStream js, String label, int m ConsumerContext consumerContext; try { streamContext = nc.streamContext(STREAM); - consumerContext = streamContext.addConsumer(ConsumerConfiguration.builder().durable(consumerName).build()); + consumerContext = streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(consumerName).build()); } catch (JetStreamApiException | IOException e) { // JetStreamApiException: @@ -97,7 +97,8 @@ private static void simpleFetch(Connection nc, JetStream js, String label, int m // create the consumer then use it int receivedMessages = 0; - try (FetchConsumer consumer = consumerContext.fetch(fetchConsumeOptions)){ + try { + FetchConsumer consumer = consumerContext.fetch(fetchConsumeOptions); Message msg = consumer.nextMessage(); while (msg != null) { msg.ack(); @@ -121,9 +122,6 @@ private static void simpleFetch(Connection nc, JetStream js, String label, int m System.err.println("Exception should be handled properly, just exiting here."); System.exit(-1); } - catch (Exception e) { - // For FetchConsumer since it is AutoCloseable - } long elapsed = System.currentTimeMillis() - start; printSummary(receivedMessages, elapsed); diff --git a/src/examples/java/io/nats/examples/jetstream/simple/IterableConsumerExample.java b/src/examples/java/io/nats/examples/jetstream/simple/IterableConsumerExample.java index dd965e704..c9ece052d 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/IterableConsumerExample.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/IterableConsumerExample.java @@ -47,7 +47,7 @@ public static void main(String[] args) { ConsumerContext consumerContext; try { streamContext = nc.streamContext(STREAM); - consumerContext = streamContext.addConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build()); + consumerContext = streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build()); } catch (JetStreamApiException | IOException e) { // JetStreamApiException: @@ -61,7 +61,8 @@ public static void main(String[] args) { Thread consumeThread = new Thread(() -> { int count = 0; long start = System.nanoTime(); - try (IterableConsumer consumer = consumerContext.consume()){ + try { + IterableConsumer consumer = consumerContext.consume(); System.out.println("Starting main loop."); while (count < STOP_COUNT) { Message msg = consumer.nextMessage(1000); @@ -95,9 +96,6 @@ public static void main(String[] args) { // developer interrupted this thread? return; } - catch (Exception e) { - // For IterableConsumer since it is AutoCloseable - } report("Done", System.nanoTime() - start, count); }); consumeThread.start(); diff --git a/src/examples/java/io/nats/examples/jetstream/simple/MessageConsumerExample.java b/src/examples/java/io/nats/examples/jetstream/simple/MessageConsumerExample.java index 2e1ada6a6..3af7f37b6 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/MessageConsumerExample.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/MessageConsumerExample.java @@ -17,7 +17,7 @@ import io.nats.client.api.ConsumerConfiguration; import java.io.IOException; -import java.util.concurrent.*; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import static io.nats.examples.jetstream.simple.Utils.createOrReplaceStream; @@ -52,7 +52,7 @@ public static void main(String[] args) { ConsumerContext consumerContext; try { streamContext = nc.streamContext(STREAM); - streamContext.addConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build()); + streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build()); consumerContext = streamContext.consumerContext(CONSUMER_NAME); } catch (JetStreamApiException | IOException e) { @@ -78,17 +78,12 @@ public static void main(String[] args) { }; // create the consumer then use it - try (MessageConsumer consumer = consumerContext.consume(handler)){ + try { + MessageConsumer consumer = consumerContext.consume(handler); latch.await(); // once the consumer is stopped, the client will drain messages System.out.println("Stop the consumer..."); - CompletableFuture stopFuture = consumer.stop(1000); - try { - stopFuture.get(1, TimeUnit.SECONDS); - } - catch (ExecutionException | TimeoutException e) { - // from the future.get - } + consumer.stop(1000); } catch (JetStreamApiException | IOException e) { // JetStreamApiException: @@ -98,9 +93,6 @@ public static void main(String[] args) { // likely a connection problem return; } - catch (Exception e) { - // For IterableConsumer since it is AutoCloseable - } report("Final", start, atomicCount.get()); } diff --git a/src/examples/java/io/nats/examples/jetstream/simple/NextExample.java b/src/examples/java/io/nats/examples/jetstream/simple/NextExample.java index b0684e8cb..69002c287 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/NextExample.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/NextExample.java @@ -42,7 +42,7 @@ public static void main(String[] args) { ConsumerContext consumerContext; try { streamContext = nc.streamContext(STREAM); - consumerContext = streamContext.addConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build()); + consumerContext = streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build()); } catch (JetStreamApiException | IOException e) { // JetStreamApiException: diff --git a/src/main/java/io/nats/client/MessageConsumer.java b/src/main/java/io/nats/client/MessageConsumer.java index 59d30a12b..a152ce427 100644 --- a/src/main/java/io/nats/client/MessageConsumer.java +++ b/src/main/java/io/nats/client/MessageConsumer.java @@ -16,14 +16,13 @@ import io.nats.client.api.ConsumerInfo; import java.io.IOException; -import java.util.concurrent.CompletableFuture; /** * The MessageConsumer interface is the core interface replacing * a subscription for a simplified consumer. * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ -public interface MessageConsumer extends AutoCloseable { +public interface MessageConsumer { /** * Gets information about the consumer behind this subscription. * @return consumer information @@ -35,12 +34,11 @@ public interface MessageConsumer extends AutoCloseable { /** * Stop the MessageConsumer from asking for any more messages from the server. - * Messages do not immediately stop + * There still may be messages available and coming across the wire. * @param timeout The time to wait for the stop to succeed, pass 0 to wait * forever. Stop involves moving messages to and from the server * so a very short timeout is not recommended. - * @return A future so you could wait for the stop to know when there are no more messages. * @throws InterruptedException if one is thrown, in order to propagate it up */ - CompletableFuture stop(long timeout) throws InterruptedException; + void stop(long timeout) throws InterruptedException; } diff --git a/src/main/java/io/nats/client/StreamContext.java b/src/main/java/io/nats/client/StreamContext.java index 1ff30e7a5..aa95cf023 100644 --- a/src/main/java/io/nats/client/StreamContext.java +++ b/src/main/java/io/nats/client/StreamContext.java @@ -84,14 +84,14 @@ public interface StreamContext { ConsumerContext consumerContext(String consumerName) throws IOException, JetStreamApiException; /** - * Management function to creates a consumer on this stream. + * Management function to create or update a consumer on this stream. * @param config the consumer configuration to use. * @return a ConsumerContext object * @throws IOException covers various communication issues with the NATS * server such as timeout or interruption * @throws JetStreamApiException the request had an error related to the data */ - ConsumerContext addConsumer(ConsumerConfiguration config) throws IOException, JetStreamApiException; + ConsumerContext createOrUpdateConsumer(ConsumerConfiguration config) throws IOException, JetStreamApiException; /** * Management function to deletes a consumer. diff --git a/src/main/java/io/nats/client/impl/NatsMessageConsumer.java b/src/main/java/io/nats/client/impl/NatsMessageConsumer.java index 7740baf77..3a6f89b0e 100644 --- a/src/main/java/io/nats/client/impl/NatsMessageConsumer.java +++ b/src/main/java/io/nats/client/impl/NatsMessageConsumer.java @@ -54,7 +54,7 @@ class NatsMessageConsumer extends NatsMessageConsumerBase implements TrackPendin @Override public void track(int pendingMessages, long pendingBytes, boolean trackingBytes) { - if (drainFuture == null && + if (!stopped && (pmm.pendingMessages <= thresholdMessages || (pmm.trackingBytes && pmm.pendingBytes <= thresholdBytes))) { diff --git a/src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java b/src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java index 0cc2bcf52..88160286f 100644 --- a/src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java +++ b/src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java @@ -19,13 +19,12 @@ import java.io.IOException; import java.time.Duration; -import java.util.concurrent.CompletableFuture; class NatsMessageConsumerBase implements MessageConsumer { protected NatsJetStreamPullSubscription sub; protected PullMessageManager pmm; protected final Object subLock; - protected CompletableFuture drainFuture; + protected boolean stopped; NatsMessageConsumerBase() { subLock = new Object(); @@ -50,24 +49,28 @@ public ConsumerInfo getConsumerInfo() throws IOException, JetStreamApiException * {@inheritDoc} */ @Override - public CompletableFuture stop(long timeout) throws InterruptedException { + public void stop(long timeout) throws InterruptedException { synchronized (subLock) { - if (drainFuture == null) { - if (sub.getNatsDispatcher() != null) { - drainFuture = sub.getDispatcher().drain(Duration.ofMillis(timeout)); + if (!stopped) { + try { + if (sub.getNatsDispatcher() != null) { + sub.getDispatcher().drain(Duration.ofMillis(timeout)); + } + else { + sub.drain(Duration.ofMillis(timeout)); + } } - else { - drainFuture = sub.drain(Duration.ofMillis(timeout)); + finally { + stopped = true; } } - return drainFuture; } } @Override - public void close() throws Exception { - synchronized (subLock) { - if (drainFuture == null && sub.isActive()) { + protected void finalize() throws Throwable { + try { + if (!stopped && sub.isActive()) { if (sub.getNatsDispatcher() != null) { sub.getDispatcher().unsubscribe(sub); } @@ -76,5 +79,9 @@ public void close() throws Exception { } } } + catch (Throwable ignore) { + // nothing to do + } + super.finalize(); } } diff --git a/src/main/java/io/nats/client/impl/NatsStreamContext.java b/src/main/java/io/nats/client/impl/NatsStreamContext.java index fbd9e9c9e..5a29332c2 100644 --- a/src/main/java/io/nats/client/impl/NatsStreamContext.java +++ b/src/main/java/io/nats/client/impl/NatsStreamContext.java @@ -84,7 +84,7 @@ public ConsumerContext consumerContext(String consumerName) throws IOException, * {@inheritDoc} */ @Override - public ConsumerContext addConsumer(ConsumerConfiguration config) throws IOException, JetStreamApiException { + public ConsumerContext createOrUpdateConsumer(ConsumerConfiguration config) throws IOException, JetStreamApiException { return new NatsConsumerContext(this, jsm.addOrUpdateConsumer(streamName, config)); } diff --git a/src/test/java/io/nats/client/impl/SimplificationTests.java b/src/test/java/io/nats/client/impl/SimplificationTests.java index c00866793..61b7610a4 100644 --- a/src/test/java/io/nats/client/impl/SimplificationTests.java +++ b/src/test/java/io/nats/client/impl/SimplificationTests.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.time.Duration; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static io.nats.client.BaseConsumeOptions.*; @@ -60,7 +59,7 @@ private static void _testStreamContext(JetStream js, StreamContext streamContext assertThrows(JetStreamApiException.class, () -> streamContext.deleteConsumer(DURABLE)); ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(DURABLE).build(); - ConsumerContext consumerContext = streamContext.addConsumer(cc); + ConsumerContext consumerContext = streamContext.createOrUpdateConsumer(cc); ConsumerInfo ci = consumerContext.getConsumerInfo(); assertEquals(STREAM, ci.getStreamName()); assertEquals(DURABLE, ci.getName()); @@ -367,8 +366,8 @@ public void testCoverage() throws Exception { ConsumerContext cctx2 = nc.consumerContext(STREAM, name(2), JetStreamOptions.DEFAULT_JS_OPTIONS); ConsumerContext cctx3 = js.consumerContext(STREAM, name(3)); ConsumerContext cctx4 = sctx1.consumerContext(name(4)); - ConsumerContext cctx5 = sctx1.addConsumer(ConsumerConfiguration.builder().durable(name(5)).build()); - ConsumerContext cctx6 = sctx1.addConsumer(ConsumerConfiguration.builder().durable(name(6)).build()); + ConsumerContext cctx5 = sctx1.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(name(5)).build()); + ConsumerContext cctx6 = sctx1.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(name(6)).build()); closeConsumer(cctx1.consume(), name(1), true); closeConsumer(cctx2.consume(ConsumeOptions.DEFAULT_CONSUME_OPTIONS), name(2), true); @@ -383,9 +382,8 @@ private void closeConsumer(MessageConsumer con, String name, boolean doStop) thr ConsumerInfo ci = con.getConsumerInfo(); assertEquals(name, ci.getName()); if (doStop) { - assertTrue(con.stop(100).get(100, TimeUnit.MILLISECONDS)); + con.stop(100); } - con.close(); } @Test