From 4882a62961f117dc4bfbbee40a2ce42c01a6cb6e Mon Sep 17 00:00:00 2001 From: Scott Fauerbach Date: Fri, 11 Aug 2023 16:19:21 -0400 Subject: [PATCH] Simplification More Review (#948) --- README.md | 2 +- .../jetstream/simple/ContextExample.java | 12 +- .../jetstream/simple/FetchBytesExample.java | 2 +- .../simple/FetchMessagesExample.java | 2 +- .../simple/IterableConsumerExample.java | 52 ++- .../simple/MessageConsumerExample.java | 40 +- .../jetstream/simple/NextExample.java | 2 +- .../jetstream/simple/QueueExample.java | 8 +- .../nats/examples/jetstream/simple/Utils.java | 13 +- .../io/nats/client/BaseConsumeOptions.java | 2 +- .../io/nats/client/BaseConsumerContext.java | 160 +++++++ .../io/nats/client/BaseMessageConsumer.java | 27 ++ src/main/java/io/nats/client/Connection.java | 16 +- .../java/io/nats/client/ConsumerContext.java | 113 +---- src/main/java/io/nats/client/JetStream.java | 10 +- .../java/io/nats/client/MessageConsumer.java | 20 +- ...Utils.java => OrderedConsumerContext.java} | 17 +- .../java/io/nats/client/StreamContext.java | 62 +-- ...java => OrderedConsumerConfiguration.java} | 64 +-- .../io/nats/client/impl/MessageManager.java | 2 +- .../io/nats/client/impl/NatsConnection.java | 16 +- .../nats/client/impl/NatsConsumerContext.java | 219 +++++++--- .../nats/client/impl/NatsFetchConsumer.java | 44 +- .../client/impl/NatsIterableConsumer.java | 12 +- .../io/nats/client/impl/NatsJetStream.java | 12 +- .../nats/client/impl/NatsJetStreamImpl.java | 19 +- .../impl/NatsJetStreamPullSubscription.java | 4 +- .../nats/client/impl/NatsMessageConsumer.java | 35 +- .../client/impl/NatsMessageConsumerBase.java | 63 +-- .../impl/NatsOrderedConsumerContext.java | 90 ++++ .../nats/client/impl/NatsStreamContext.java | 100 +---- .../client/impl/OrderedMessageManager.java | 29 +- ...er.java => OrderedPullMessageManager.java} | 14 +- ...Listener.java => PullManagerObserver.java} | 4 +- .../nats/client/impl/PullMessageManager.java | 16 +- .../impl/SimplifiedSubscriptionMaker.java | 30 +- .../java/io/nats/client/support/Encoding.java | 30 +- .../support/NatsJetStreamClientError.java | 2 + .../io/nats/client/support/Validator.java | 2 +- .../io/nats/client/impl/HeadersTests.java | 20 + .../client/impl/JetStreamManagementTests.java | 13 +- .../nats/client/impl/JetStreamPushTests.java | 1 + .../nats/client/impl/SimplificationTests.java | 393 ++++++++++++++---- .../java/io/nats/client/utils/TestBase.java | 56 +-- .../java/io/nats/service/ServiceTests.java | 2 +- 45 files changed, 1166 insertions(+), 686 deletions(-) create mode 100644 src/main/java/io/nats/client/BaseConsumerContext.java create mode 100644 src/main/java/io/nats/client/BaseMessageConsumer.java rename src/main/java/io/nats/client/{support/ConsumerUtils.java => OrderedConsumerContext.java} (65%) rename src/main/java/io/nats/client/api/{OrderedConsumerConfig.java => OrderedConsumerConfiguration.java} (62%) create mode 100644 src/main/java/io/nats/client/impl/NatsOrderedConsumerContext.java rename src/main/java/io/nats/client/impl/{PullOrderedMessageManager.java => OrderedPullMessageManager.java} (84%) rename src/main/java/io/nats/client/impl/{TrackPendingListener.java => PullManagerObserver.java} (84%) diff --git a/README.md b/README.md index 7e28dbddc..595a20ab6 100644 --- a/README.md +++ b/README.md @@ -764,7 +764,7 @@ You can however set the deliver policy which will be used to start the subscript | JsSoOrderedNotAllowedWithDurable | SO | 90106 | Durable is not allowed with an ordered consumer. | | JsSoOrderedNotAllowedWithDeliverSubject | SO | 90107 | Deliver subject is not allowed with an ordered consumer. | | JsSoOrderedRequiresAckPolicyNone | SO | 90108 | Ordered consumer requires Ack Policy None. | -| JsSoOrderedRequiresMaxDeliver | SO | 90109 | Max deliver is limited to 1 with an ordered consumer. | +| JsSoOrderedRequiresMaxDeliverOfOne | SO | 90109 | Max deliver is limited to 1 with an ordered consumer. | | JsSoNameMismatch | SO | 90110 | Builder name must match the consumer configuration name if both are provided. | | JsSoOrderedMemStorageNotSuppliedOrTrue | SO | 90111 | Mem Storage must be true if supplied. | | JsSoOrderedReplicasNotSuppliedOrOne | SO | 90112 | Replicas must be 1 if supplied. | diff --git a/src/examples/java/io/nats/examples/jetstream/simple/ContextExample.java b/src/examples/java/io/nats/examples/jetstream/simple/ContextExample.java index 473d59fda..506db0fc5 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/ContextExample.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/ContextExample.java @@ -38,15 +38,15 @@ public static void main(String[] args) { createOrReplaceStream(nc.jetStreamManagement(), STREAM, SUBJECT); // get a stream context from the connection - StreamContext streamContext = nc.streamContext(STREAM); + StreamContext streamContext = nc.getStreamContext(STREAM); System.out.println("S1. " + streamContext.getStreamInfo()); // get a stream context from the connection, supplying custom JetStreamOptions - streamContext = nc.streamContext(STREAM, JetStreamOptions.builder().build()); + streamContext = nc.getStreamContext(STREAM, JetStreamOptions.builder().build()); System.out.println("S2. " + streamContext.getStreamInfo()); // get a stream context from the JetStream context - streamContext = js.streamContext(STREAM); + streamContext = js.getStreamContext(STREAM); System.out.println("S3. " + streamContext.getStreamInfo()); // when you create a consumer from the stream context you get a ConsumerContext in return @@ -54,15 +54,15 @@ public static void main(String[] args) { System.out.println("C1. " + consumerContext.getCachedConsumerInfo()); // get a ConsumerContext from the connection for a pre-existing consumer - consumerContext = nc.consumerContext(STREAM, CONSUMER_NAME); + consumerContext = nc.getConsumerContext(STREAM, CONSUMER_NAME); System.out.println("C2. " + consumerContext.getCachedConsumerInfo()); // get a ConsumerContext from the connection for a pre-existing consumer, supplying custom JetStreamOptions - consumerContext = nc.consumerContext(STREAM, CONSUMER_NAME, JetStreamOptions.builder().build()); + consumerContext = nc.getConsumerContext(STREAM, CONSUMER_NAME, JetStreamOptions.builder().build()); System.out.println("C3. " + consumerContext.getCachedConsumerInfo()); // get a ConsumerContext from the stream context for a pre-existing consumer - consumerContext = streamContext.consumerContext(CONSUMER_NAME); + consumerContext = streamContext.getConsumerContext(CONSUMER_NAME); System.out.println("C4. " + consumerContext.getCachedConsumerInfo()); } catch (JetStreamApiException | IOException | InterruptedException ioe) { diff --git a/src/examples/java/io/nats/examples/jetstream/simple/FetchBytesExample.java b/src/examples/java/io/nats/examples/jetstream/simple/FetchBytesExample.java index 80903e07e..1696e600a 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/FetchBytesExample.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/FetchBytesExample.java @@ -80,7 +80,7 @@ private static void simpleFetch(Connection nc, JetStream js, String label, int m StreamContext streamContext; ConsumerContext consumerContext; try { - streamContext = nc.streamContext(STREAM); + streamContext = nc.getStreamContext(STREAM); consumerContext = streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(consumerName).build()); } catch (JetStreamApiException | IOException e) { diff --git a/src/examples/java/io/nats/examples/jetstream/simple/FetchMessagesExample.java b/src/examples/java/io/nats/examples/jetstream/simple/FetchMessagesExample.java index 0eb6f884c..3578e0f30 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/FetchMessagesExample.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/FetchMessagesExample.java @@ -74,7 +74,7 @@ private static void simpleFetch(Connection nc, JetStream js, String label, int m StreamContext streamContext; ConsumerContext consumerContext; try { - streamContext = nc.streamContext(STREAM); + streamContext = nc.getStreamContext(STREAM); consumerContext = streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(consumerName).build()); } catch (JetStreamApiException | IOException e) { diff --git a/src/examples/java/io/nats/examples/jetstream/simple/IterableConsumerExample.java b/src/examples/java/io/nats/examples/jetstream/simple/IterableConsumerExample.java index 43c7971f1..6276c60cd 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/IterableConsumerExample.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/IterableConsumerExample.java @@ -26,13 +26,12 @@ * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public class IterableConsumerExample { - private static final String STREAM = "manually-stream"; - private static final String SUBJECT = "manually-subject"; - private static final String CONSUMER_NAME = "manually-consumer"; - private static final String MESSAGE_TEXT = "manually"; + private static final String STREAM = "iterable-stream"; + private static final String SUBJECT = "iterable-subject"; + private static final String CONSUMER_NAME = "iterable-consumer"; + private static final String MESSAGE_TEXT = "iterable"; private static final int STOP_COUNT = 500; private static final int REPORT_EVERY = 50; - private static final int JITTER = 20; private static final String SERVER = "nats://localhost:4222"; @@ -46,7 +45,7 @@ public static void main(String[] args) { StreamContext streamContext; ConsumerContext consumerContext; try { - streamContext = nc.streamContext(STREAM); + streamContext = nc.getStreamContext(STREAM); consumerContext = streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build()); } catch (JetStreamApiException | IOException e) { @@ -58,33 +57,44 @@ public static void main(String[] args) { return; } + System.out.println("Starting publish..."); + Publisher publisher = new Publisher(js, SUBJECT, MESSAGE_TEXT, 10); + Thread pubThread = new Thread(publisher); + pubThread.start(); + + // set up the iterable consumer Thread consumeThread = new Thread(() -> { int count = 0; long start = System.currentTimeMillis(); - try (IterableConsumer consumer = consumerContext.consume()) { + try (IterableConsumer consumer = consumerContext.iterate()) { System.out.println("Starting main loop."); while (count < STOP_COUNT) { Message msg = consumer.nextMessage(1000); if (msg != null) { msg.ack(); if (++count % REPORT_EVERY == 0) { - report("Main Loop Running", System.currentTimeMillis() - start, count); + report("Main loop running", System.currentTimeMillis() - start, count); } } } - report("Main Loop Stopped", System.currentTimeMillis() - start, count); + report("Main loop stopped", System.currentTimeMillis() - start, count); - System.out.println("Pausing for effect...allow more messages come across."); - Thread.sleep(JITTER * 2); // allows more messages to come across - consumer.stop(1000); + // The consumer has at least 1 pull request active. When stop is called, + // no more pull requests will be made, but messages already requested + // will still come across the wire to the client. + consumer.stop(); System.out.println("Starting post-stop loop."); - Message msg = consumer.nextMessage(1000); - while (msg != null) { - msg.ack(); - report("Post-stop loop running", System.currentTimeMillis() - start, ++count); - msg = consumer.nextMessage(1000); + while (!consumer.isFinished()) { + Message msg = consumer.nextMessage(1000); + if (msg != null) { + msg.ack(); + if (++count % REPORT_EVERY == 0) { + report("Post-stop loop running", System.currentTimeMillis() - start, ++count); + } + } } + report("Post-stop loop stopped", System.currentTimeMillis() - start, count); } catch (JetStreamStatusCheckedException | InterruptedException | IOException | JetStreamApiException e) { // JetStreamStatusCheckedException: @@ -105,13 +115,9 @@ public static void main(String[] args) { report("Done", System.currentTimeMillis() - start, count); }); consumeThread.start(); - - Publisher publisher = new Publisher(js, SUBJECT, MESSAGE_TEXT, JITTER); - Thread pubThread = new Thread(publisher); - pubThread.start(); - consumeThread.join(); - publisher.stopPublishing(); + + publisher.stopPublishing(); // otherwise it will complain when the connection goes away pubThread.join(); } catch (IOException | InterruptedException ioe) { diff --git a/src/examples/java/io/nats/examples/jetstream/simple/MessageConsumerExample.java b/src/examples/java/io/nats/examples/jetstream/simple/MessageConsumerExample.java index a4f36cda1..d751bb3e0 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/MessageConsumerExample.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/MessageConsumerExample.java @@ -20,18 +20,18 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +import static io.nats.examples.jetstream.simple.Utils.Publisher; import static io.nats.examples.jetstream.simple.Utils.createOrReplaceStream; -import static io.nats.examples.jetstream.simple.Utils.publish; /** * This example will demonstrate simplified consume with a handler * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public class MessageConsumerExample { - private static final String STREAM = "consume-handler-stream"; - private static final String SUBJECT = "consume-handler-subject"; - private static final String CONSUMER_NAME = "consume-handler-consumer"; - private static final String MESSAGE_TEXT = "consume-handler"; + private static final String STREAM = "consume-stream"; + private static final String SUBJECT = "consume-subject"; + private static final String CONSUMER_NAME = "consume-consumer"; + private static final String MESSAGE_TEXT = "consume"; private static final int STOP_COUNT = 500; private static final int REPORT_EVERY = 100; @@ -43,17 +43,18 @@ public static void main(String[] args) { JetStream js = nc.jetStream(); createOrReplaceStream(nc.jetStreamManagement(), STREAM, SUBJECT); - // publishing so there are lots of messages - System.out.println("Publishing..."); - publish(js, SUBJECT, MESSAGE_TEXT, 2500); + System.out.println("Starting publish..."); + Publisher publisher = new Publisher(js, SUBJECT, MESSAGE_TEXT, 10); + Thread pubThread = new Thread(publisher); + pubThread.start(); // get stream context, create consumer and get the consumer context StreamContext streamContext; ConsumerContext consumerContext; try { - streamContext = nc.streamContext(STREAM); + streamContext = nc.getStreamContext(STREAM); streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build()); - consumerContext = streamContext.consumerContext(CONSUMER_NAME); + consumerContext = streamContext.getConsumerContext(CONSUMER_NAME); } catch (JetStreamApiException | IOException e) { // JetStreamApiException: @@ -78,13 +79,19 @@ public static void main(String[] args) { }; // create the consumer then use it - try { - MessageConsumer consumer = consumerContext.consume(handler); + try (MessageConsumer consumer = consumerContext.consume(handler)) { latch.await(); - // once the consumer is stopped, the client will drain messages + + // The consumer has at least 1 pull request active. When stop is called, + // no more pull requests will be made, but messages already requested + // will still come across the wire to the client. System.out.println("Stop the consumer..."); - consumer.stop(1000); - Thread.sleep(1000); // enough for messages to drain after stop + consumer.stop(); + + // wait until the consumer is finished + while (!consumer.isFinished()) { + Thread.sleep(10); + } } catch (JetStreamApiException | IOException e) { // JetStreamApiException: @@ -103,6 +110,9 @@ public static void main(String[] args) { } report("Final", start, atomicCount.get()); + + publisher.stopPublishing(); // otherwise it will complain when the connection goes away + pubThread.join(); } catch (IOException | InterruptedException ioe) { // IOException: diff --git a/src/examples/java/io/nats/examples/jetstream/simple/NextExample.java b/src/examples/java/io/nats/examples/jetstream/simple/NextExample.java index f2d105d78..a70cac466 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/NextExample.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/NextExample.java @@ -41,7 +41,7 @@ public static void main(String[] args) { StreamContext streamContext; ConsumerContext consumerContext; try { - streamContext = nc.streamContext(STREAM); + streamContext = nc.getStreamContext(STREAM); consumerContext = streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build()); } catch (JetStreamApiException | IOException e) { diff --git a/src/examples/java/io/nats/examples/jetstream/simple/QueueExample.java b/src/examples/java/io/nats/examples/jetstream/simple/QueueExample.java index 539bba610..a50900c3b 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/QueueExample.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/QueueExample.java @@ -51,7 +51,7 @@ public static void main(String[] args) { // to allow ConsumerContexts to be created from the name. StreamContext streamContext; try { - streamContext = nc.streamContext(STREAM); + streamContext = nc.getStreamContext(STREAM); streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build()); } catch (JetStreamApiException | IOException e) { @@ -139,7 +139,7 @@ public HandlerConsumerHolder(int id, StreamContext sc, CountDownLatch latch) thr @Override public void stop() throws InterruptedException { - messageConsumer.stop(1000); + messageConsumer.stop(); } } @@ -150,7 +150,7 @@ static class IterableConsumerHolder extends ConsumerHolder { public IterableConsumerHolder(int id, StreamContext sc, CountDownLatch latch) throws JetStreamApiException, IOException { super(id, sc, latch); - iterableConsumer = consumerContext.consume(); + iterableConsumer = consumerContext.iterate(); t = new Thread(() -> { while (latch.getCount() > 0) { try { @@ -187,7 +187,7 @@ public ConsumerHolder(int id, StreamContext sc, CountDownLatch latch) throws Jet this.id = id; thisReceived = new AtomicInteger(); this.latch = latch; - consumerContext = sc.consumerContext(CONSUMER_NAME); + consumerContext = sc.getConsumerContext(CONSUMER_NAME); } public abstract void stop() throws InterruptedException; diff --git a/src/examples/java/io/nats/examples/jetstream/simple/Utils.java b/src/examples/java/io/nats/examples/jetstream/simple/Utils.java index a5e53b9fb..bd3fa4743 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/Utils.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/Utils.java @@ -20,6 +20,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; public class Utils { @@ -58,28 +59,34 @@ public static class Publisher implements Runnable { private final String messageText; private final int jitter; private final AtomicBoolean keepGoing = new AtomicBoolean(true); - private int pubNo; + private final AtomicInteger pubCount; public Publisher(JetStream js, String subject, String messageText, int jitter) { this.js = js; this.subject = subject; this.messageText = messageText; this.jitter = jitter; + pubCount = new AtomicInteger(); } public void stopPublishing() { keepGoing.set(false); } + public int getPubCount() { + return pubCount.get(); + } + @Override public void run() { try { while (keepGoing.get()) { //noinspection BusyWait Thread.sleep(ThreadLocalRandom.current().nextLong(jitter)); - js.publish(subject, (messageText + "-" + (++pubNo)).getBytes()); + js.publish(subject, (messageText + "-" + (pubCount.incrementAndGet())).getBytes()); } - } catch (Exception e) { + } + catch (Exception e) { throw new RuntimeException(e); } } diff --git a/src/main/java/io/nats/client/BaseConsumeOptions.java b/src/main/java/io/nats/client/BaseConsumeOptions.java index 4e00964df..f71c1c616 100644 --- a/src/main/java/io/nats/client/BaseConsumeOptions.java +++ b/src/main/java/io/nats/client/BaseConsumeOptions.java @@ -51,7 +51,7 @@ protected BaseConsumeOptions(Builder b) { idleHeartbeat = Math.min(MAX_HEARTBEAT_MILLIS, expiresIn * MAX_IDLE_HEARTBEAT_PERCENT / 100); } - public long getExpiresIn() { + public long getExpiresInMillis() { return expiresIn; } diff --git a/src/main/java/io/nats/client/BaseConsumerContext.java b/src/main/java/io/nats/client/BaseConsumerContext.java new file mode 100644 index 000000000..48961b27a --- /dev/null +++ b/src/main/java/io/nats/client/BaseConsumerContext.java @@ -0,0 +1,160 @@ +// Copyright 2020-2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client; + +import java.io.IOException; +import java.time.Duration; + +/** + * The Consumer Context provides a convenient interface around a defined JetStream Consumer + * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE + */ +public interface BaseConsumerContext { + + /** + * Read the next message with max wait set to {@value BaseConsumeOptions#DEFAULT_EXPIRES_IN_MILLIS} ms + * @return the next message or null if the max wait expires + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws InterruptedException if one is thrown, in order to propagate it up + * @throws JetStreamStatusCheckedException an exception representing a status that requires attention, + * such as the consumer was deleted on the server in the middle of use. + * @throws JetStreamApiException the request had an error related to the data + */ + Message next() throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException; + + /** + * Read the next message with provided max wait + * @param maxWait duration of max wait + * @return the next message or null if the max wait expires + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws InterruptedException if one is thrown, in order to propagate it up + * @throws JetStreamStatusCheckedException an exception representing a status that requires attention, + * such as the consumer was deleted on the server in the middle of use. + * @throws JetStreamApiException the request had an error related to the data + */ + Message next(Duration maxWait) throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException; + + /** + * Read the next message with provided max wait + * @param maxWaitMillis the max wait value in milliseconds + * @return the next message or null if the max wait expires + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws InterruptedException if one is thrown, in order to propagate it up + * @throws JetStreamStatusCheckedException an exception representing a status that requires attention, + * such as the consumer was deleted on the server in the middle of use. + * @throws JetStreamApiException the request had an error related to the data + */ + Message next(long maxWaitMillis) throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException; + + /** + * Start a one use Fetch Consumer using all defaults other than the number of messages. See {@link FetchConsumer} + * @param maxMessages the maximum number of message to consume + * @return the FetchConsumer instance + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + FetchConsumer fetchMessages(int maxMessages) throws IOException, JetStreamApiException; + + /** + * Start a one use Fetch Consumer using all defaults other than the number of bytes. See {@link FetchConsumer} + * @param maxBytes the maximum number of bytes to consume + * @return the FetchConsumer instance + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + FetchConsumer fetchBytes(int maxBytes) throws IOException, JetStreamApiException; + + /** + * Start a one use Fetch Consumer with complete custom consume options. See {@link FetchConsumer} + * @param fetchConsumeOptions the custom fetch consume options. See {@link FetchConsumeOptions} + * @return the FetchConsumer instance + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + FetchConsumer fetch(FetchConsumeOptions fetchConsumeOptions) throws IOException, JetStreamApiException; + + /** + * Start a long-running IterableConsumer with default ConsumeOptions. See {@link IterableConsumer} and {@link ConsumeOptions} + * IterableConsumer require the developer call nextMessage. + * @return the IterableConsumer instance + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + IterableConsumer iterate() throws IOException, JetStreamApiException; + + /** + * Start a long-running IterableConsumer with custom ConsumeOptions. See {@link IterableConsumer} and {@link ConsumeOptions} + * IterableConsumer requires the developer call nextMessage. + * @param consumeOptions the custom consume options + * @return the IterableConsumer instance + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + IterableConsumer iterate(ConsumeOptions consumeOptions) throws IOException, JetStreamApiException; + + /** + * Start a long-running MessageConsumer with default ConsumeOptions. See {@link MessageConsumer} and {@link ConsumeOptions} + * @param handler the MessageHandler used for receiving messages. + * @return the MessageConsumer instance + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + MessageConsumer consume(MessageHandler handler) throws IOException, JetStreamApiException; + + /** + * Start a long-running MessageConsumer with default ConsumeOptions. See {@link MessageConsumer} and {@link ConsumeOptions} + * + * @param dispatcher The dispatcher to handle this subscription + * @param handler the MessageHandler used for receiving messages. + * @return the MessageConsumer instance + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + MessageConsumer consume(Dispatcher dispatcher, MessageHandler handler) throws IOException, JetStreamApiException; + + /** + * Start a long-running MessageConsumer with custom ConsumeOptions. See {@link MessageConsumer} and {@link ConsumeOptions} + * + * @param consumeOptions the custom consume options + * @param handler the MessageHandler used for receiving messages. + * @return the MessageConsumer instance + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + MessageConsumer consume(ConsumeOptions consumeOptions, MessageHandler handler) throws IOException, JetStreamApiException; + + /** + * Start a long-running MessageConsumer with custom ConsumeOptions. See {@link MessageConsumer} and {@link ConsumeOptions} + * + * @param consumeOptions the custom consume options + * @param dispatcher The dispatcher to handle this subscription + * @param handler the MessageHandler used for receiving messages. + * @return the MessageConsumer instance + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + MessageConsumer consume(ConsumeOptions consumeOptions, Dispatcher dispatcher, MessageHandler handler) throws IOException, JetStreamApiException; +} diff --git a/src/main/java/io/nats/client/BaseMessageConsumer.java b/src/main/java/io/nats/client/BaseMessageConsumer.java new file mode 100644 index 000000000..7e34329fb --- /dev/null +++ b/src/main/java/io/nats/client/BaseMessageConsumer.java @@ -0,0 +1,27 @@ +// Copyright 2020-2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client; + +/** + * The BaseMessageConsumer interface is the core interface replacing + * a subscription for a simplified consumer. + * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE + */ +public interface BaseMessageConsumer extends AutoCloseable { + /** + * Stop the MessageConsumer from asking for any more messages from the server. + * The consumer will finish all pull request already in progress, but will not start any new ones. + */ + void stop(); +} diff --git a/src/main/java/io/nats/client/Connection.java b/src/main/java/io/nats/client/Connection.java index f6dad4682..c23ac46ce 100644 --- a/src/main/java/io/nats/client/Connection.java +++ b/src/main/java/io/nats/client/Connection.java @@ -553,17 +553,17 @@ enum Status { Duration RTT() throws IOException; /** - * Gets a stream context. + * Get a stream context for a specific stream. * @param streamName the stream for the context * @return a StreamContext instance. * @throws IOException covers various communication issues with the NATS * server such as timeout or interruption * @throws JetStreamApiException the request had an error related to the data */ - StreamContext streamContext(String streamName) throws IOException, JetStreamApiException; + StreamContext getStreamContext(String streamName) throws IOException, JetStreamApiException; /** - * Gets a stream context. + * Get a stream context for a specific stream. * @param streamName the stream for the context * @param options JetStream options. * @return a StreamContext instance. @@ -571,10 +571,10 @@ enum Status { * server such as timeout or interruption * @throws JetStreamApiException the request had an error related to the data */ - StreamContext streamContext(String streamName, JetStreamOptions options) throws IOException, JetStreamApiException; + StreamContext getStreamContext(String streamName, JetStreamOptions options) throws IOException, JetStreamApiException; /** - * Create a consumer context for a specific named stream and specific named consumer. + * Get a consumer context for a specific named stream and specific named consumer. * Verifies that the stream and consumer exist. * EXPERIMENTAL API SUBJECT TO CHANGE * @param streamName the name of the stream @@ -584,10 +584,10 @@ enum Status { * server such as timeout or interruption * @throws JetStreamApiException the request had an error related to the data */ - ConsumerContext consumerContext(String streamName, String consumerName) throws IOException, JetStreamApiException; + ConsumerContext getConsumerContext(String streamName, String consumerName) throws IOException, JetStreamApiException; /** - * Create a consumer context for a specific named stream and specific named consumer. + * Get a consumer context for a specific named stream and specific named consumer. * Verifies that the stream and consumer exist. * EXPERIMENTAL API SUBJECT TO CHANGE * @param streamName the name of the stream @@ -598,7 +598,7 @@ enum Status { * server such as timeout or interruption * @throws JetStreamApiException the request had an error related to the data */ - ConsumerContext consumerContext(String streamName, String consumerName, JetStreamOptions options) throws IOException, JetStreamApiException; + ConsumerContext getConsumerContext(String streamName, String consumerName, JetStreamOptions options) throws IOException, JetStreamApiException; /** * Gets a context for publishing and subscribing to subjects backed by Jetstream streams diff --git a/src/main/java/io/nats/client/ConsumerContext.java b/src/main/java/io/nats/client/ConsumerContext.java index f6fb5fce7..dcbb2b8c0 100644 --- a/src/main/java/io/nats/client/ConsumerContext.java +++ b/src/main/java/io/nats/client/ConsumerContext.java @@ -16,13 +16,12 @@ import io.nats.client.api.ConsumerInfo; import java.io.IOException; -import java.time.Duration; /** * The Consumer Context provides a convenient interface around a defined JetStream Consumer * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ -public interface ConsumerContext { +public interface ConsumerContext extends BaseConsumerContext { /** * Gets the consumer name that was used to create the context. * @return the consumer name @@ -44,114 +43,4 @@ public interface ConsumerContext { * @return consumer information */ ConsumerInfo getCachedConsumerInfo(); - - /** - * Read the next message with max wait set to {@value BaseConsumeOptions#DEFAULT_EXPIRES_IN_MILLIS} ms - * @return the next message or null if the max wait expires - * @throws IOException covers various communication issues with the NATS - * server such as timeout or interruption - * @throws InterruptedException if one is thrown, in order to propagate it up - * @throws JetStreamStatusCheckedException an exception representing a status that requires attention, - * such as the consumer was deleted on the server in the middle of use. - * @throws JetStreamApiException the request had an error related to the data - */ - Message next() throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException; - - /** - * Read the next message with provide max wait - * @param maxWait duration of max wait - * @return the next message or null if the max wait expires - * @throws IOException covers various communication issues with the NATS - * server such as timeout or interruption - * @throws InterruptedException if one is thrown, in order to propagate it up - * @throws JetStreamStatusCheckedException an exception representing a status that requires attention, - * such as the consumer was deleted on the server in the middle of use. - * @throws JetStreamApiException the request had an error related to the data - */ - Message next(Duration maxWait) throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException; - - /** - * Read the next message with provide max wait - * @param maxWaitMillis the max wait value in milliseconds - * @return the next message or null if the max wait expires - * @throws IOException covers various communication issues with the NATS - * server such as timeout or interruption - * @throws InterruptedException if one is thrown, in order to propagate it up - * @throws JetStreamStatusCheckedException an exception representing a status that requires attention, - * such as the consumer was deleted on the server in the middle of use. - * @throws JetStreamApiException the request had an error related to the data - */ - Message next(long maxWaitMillis) throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException; - - /** - * Create a one use Fetch Consumer using all defaults other than the number of messages. See {@link FetchConsumer} - * @param maxMessages the maximum number of message to consume - * @return the FetchConsumer instance - * @throws IOException covers various communication issues with the NATS - * server such as timeout or interruption - * @throws JetStreamApiException the request had an error related to the data - */ - FetchConsumer fetchMessages(int maxMessages) throws IOException, JetStreamApiException; - - /** - * Create a one use Fetch Consumer using all defaults other than the number of bytes. See {@link FetchConsumer} - * @param maxBytes the maximum number of bytes to consume - * @return the FetchConsumer instance - * @throws IOException covers various communication issues with the NATS - * server such as timeout or interruption - * @throws JetStreamApiException the request had an error related to the data - */ - FetchConsumer fetchBytes(int maxBytes) throws IOException, JetStreamApiException; - - /** - * Create a one use Fetch Consumer with complete custom consume options. See {@link FetchConsumer} - * @param fetchConsumeOptions the custom fetch consume options. See {@link FetchConsumeOptions} - * @return the FetchConsumer instance - * @throws IOException covers various communication issues with the NATS - * server such as timeout or interruption - * @throws JetStreamApiException the request had an error related to the data - */ - FetchConsumer fetch(FetchConsumeOptions fetchConsumeOptions) throws IOException, JetStreamApiException; - - /** - * Create a long-running IterableConsumer with default ConsumeOptions. See {@link ConsumeOptions} - * IterableConsumer require the developer call nextMessage. See {@link IterableConsumer} - * @return the IterableConsumer instance - * @throws IOException covers various communication issues with the NATS - * server such as timeout or interruption - * @throws JetStreamApiException the request had an error related to the data - */ - IterableConsumer consume() throws IOException, JetStreamApiException; - - /** - * Create a long-running IterableConsumer with custom ConsumeOptions. See {@link IterableConsumer} and {@link ConsumeOptions} - * IterableConsumer requires the developer call nextMessage. - * @param consumeOptions the custom consume options - * @return the IterableConsumer instance - * @throws IOException covers various communication issues with the NATS - * server such as timeout or interruption - * @throws JetStreamApiException the request had an error related to the data - */ - IterableConsumer consume(ConsumeOptions consumeOptions) throws IOException, JetStreamApiException; - - /** - * Create a long-running MessageConsumer with default ConsumeOptions. See {@link MessageConsumer} and {@link ConsumeOptions} - * @param handler the MessageHandler used for receiving messages. - * @return the MessageConsumer instance - * @throws IOException covers various communication issues with the NATS - * server such as timeout or interruption - * @throws JetStreamApiException the request had an error related to the data - */ - MessageConsumer consume(MessageHandler handler) throws IOException, JetStreamApiException; - - /** - * Create a long-running MessageConsumer with custom ConsumeOptions. See {@link MessageConsumer} and {@link ConsumeOptions} - * @param handler the MessageHandler used for receiving messages. - * @param consumeOptions the custom consume options - * @return the MessageConsumer instance - * @throws IOException covers various communication issues with the NATS - * server such as timeout or interruption - * @throws JetStreamApiException the request had an error related to the data - */ - MessageConsumer consume(MessageHandler handler, ConsumeOptions consumeOptions) throws IOException, JetStreamApiException; } diff --git a/src/main/java/io/nats/client/JetStream.java b/src/main/java/io/nats/client/JetStream.java index a02588e2a..b196ddb18 100644 --- a/src/main/java/io/nats/client/JetStream.java +++ b/src/main/java/io/nats/client/JetStream.java @@ -456,7 +456,6 @@ public interface JetStream { /** * Create a subscription to the specified subject in the mode of pull, with additional options - * * @param subject The subject to subscribe to * @param options pull subscription options * @return The subscription @@ -468,7 +467,6 @@ public interface JetStream { /** * Create an asynchronous subscription to the specified subject in the mode of pull, with additional options - * * @param subject The subject to subscribe to * @param dispatcher The dispatcher to handle this subscription * @param handler The target for the messages @@ -481,7 +479,7 @@ public interface JetStream { JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, PullSubscribeOptions options) throws IOException, JetStreamApiException; /** - * Create a stream context for a specific named stream. Verifies that the stream exists. + * Get a stream context for a specific named stream. Verifies that the stream exists. * EXPERIMENTAL API SUBJECT TO CHANGE * @param streamName the name of the stream * @return a StreamContext object @@ -489,10 +487,10 @@ public interface JetStream { * server such as timeout or interruption * @throws JetStreamApiException the request had an error related to the data */ - StreamContext streamContext(String streamName) throws IOException, JetStreamApiException; + StreamContext getStreamContext(String streamName) throws IOException, JetStreamApiException; /** - * Create a consumer context for a specific named stream and specific named consumer. + * Get a consumer context for a specific named stream and specific named consumer. * Verifies that the stream and consumer exist. * EXPERIMENTAL API SUBJECT TO CHANGE * @param streamName the name of the stream @@ -502,5 +500,5 @@ public interface JetStream { * server such as timeout or interruption * @throws JetStreamApiException the request had an error related to the data */ - ConsumerContext consumerContext(String streamName, String consumerName) throws IOException, JetStreamApiException; + ConsumerContext getConsumerContext(String streamName, String consumerName) throws IOException, JetStreamApiException; } diff --git a/src/main/java/io/nats/client/MessageConsumer.java b/src/main/java/io/nats/client/MessageConsumer.java index a98c50832..5f87aa43d 100644 --- a/src/main/java/io/nats/client/MessageConsumer.java +++ b/src/main/java/io/nats/client/MessageConsumer.java @@ -41,11 +41,19 @@ public interface MessageConsumer extends AutoCloseable { /** * Stop the MessageConsumer from asking for any more messages from the server. - * There still may be messages available and coming across the wire. - * @param timeout The time to wait for the stop to succeed, pass 0 to wait - * forever. Stop involves moving messages to and from the server - * so a very short timeout is not recommended. - * @throws InterruptedException if one is thrown, in order to propagate it up + * The consumer will finish all pull request already in progress, but will not start any new ones. */ - void stop(long timeout) throws InterruptedException; + void stop(); + + /** + * Stopped indicates whether consuming has been stopped. Can be stopped without being finished. + * @return the stopped flag + */ + boolean isStopped(); + + /** + * Finish indicates all messages have been received from the server. Can be finished without being stopped. + * @return the finished flag + */ + boolean isFinished(); } diff --git a/src/main/java/io/nats/client/support/ConsumerUtils.java b/src/main/java/io/nats/client/OrderedConsumerContext.java similarity index 65% rename from src/main/java/io/nats/client/support/ConsumerUtils.java rename to src/main/java/io/nats/client/OrderedConsumerContext.java index 369df22a6..3063c90e9 100644 --- a/src/main/java/io/nats/client/support/ConsumerUtils.java +++ b/src/main/java/io/nats/client/OrderedConsumerContext.java @@ -1,4 +1,4 @@ -// Copyright 2023 The NATS Authors +// Copyright 2020-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at: @@ -11,14 +11,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -package io.nats.client.support; +package io.nats.client; -import io.nats.client.NUID; - -public abstract class ConsumerUtils { - private ConsumerUtils() {} /* ensures cannot be constructed */ - - public static String generateConsumerName() { - return NUID.nextGlobalSequence(); - } +/** + * The Consumer Context provides a convenient interface around a defined JetStream Consumer + * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE + */ +public interface OrderedConsumerContext extends BaseConsumerContext { } diff --git a/src/main/java/io/nats/client/StreamContext.java b/src/main/java/io/nats/client/StreamContext.java index 3b65fe6f1..af9ac137c 100644 --- a/src/main/java/io/nats/client/StreamContext.java +++ b/src/main/java/io/nats/client/StreamContext.java @@ -20,7 +20,7 @@ /** * The Stream Context provide a set of operations for managing the stream - * and its contents and managing consumers. + * and its contents and for managing consumers. * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public interface StreamContext { @@ -73,7 +73,7 @@ public interface StreamContext { PurgeResponse purge(PurgeOptions options) throws IOException, JetStreamApiException; /** - * Create a consumer context for on the context's stream and specific named consumer. + * Get a consumer context for the context's stream and specific named consumer. * Verifies that the consumer exists. * @param consumerName the name of the consumer * @return a ConsumerContext object @@ -81,7 +81,7 @@ public interface StreamContext { * server such as timeout or interruption * @throws JetStreamApiException the request had an error related to the data */ - ConsumerContext consumerContext(String consumerName) throws IOException, JetStreamApiException; + ConsumerContext getConsumerContext(String consumerName) throws IOException, JetStreamApiException; /** * Management function to create or update a consumer on this stream. @@ -93,6 +93,16 @@ public interface StreamContext { */ ConsumerContext createOrUpdateConsumer(ConsumerConfiguration config) throws IOException, JetStreamApiException; + /** + * Create an ordered consumer context for the context's stream. + * @param config the configuration for the ordered consumer + * @return an OrderedConsumerContext object + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + OrderedConsumerContext createOrderedConsumer(OrderedConsumerConfiguration config) throws IOException, JetStreamApiException; + /** * Management function to deletes a consumer. * @param consumerName the name of the consumer. @@ -194,50 +204,4 @@ public interface StreamContext { * @return true if the delete succeeded */ boolean deleteMessage(long seq, boolean erase) throws IOException, JetStreamApiException; - - /** - * Create a long-running IterableConsumer. - * IterableConsumer requires the developer call nextMessage. - * @param config the configuration for the ordered consumer - * @return the IterableConsumer instance - * @throws IOException covers various communication issues with the NATS - * server such as timeout or interruption - * @throws JetStreamApiException the request had an error related to the data - */ - IterableConsumer orderedConsume(OrderedConsumerConfig config) throws IOException, JetStreamApiException; - - /** - * Create a long-running IterableConsumer with custom ConsumeOptions. See {@link IterableConsumer} and {@link ConsumeOptions} - * IterableConsumer requires the developer call nextMessage. - * @param config the configuration for the ordered consumer - * @param consumeOptions the custom consume options - * @return the IterableConsumer instance - * @throws IOException covers various communication issues with the NATS - * server such as timeout or interruption - * @throws JetStreamApiException the request had an error related to the data - */ - IterableConsumer orderedConsume(OrderedConsumerConfig config, ConsumeOptions consumeOptions) throws IOException, JetStreamApiException; - - /** - * Create a long-running MessageConsumer. - * @param config the configuration for the ordered consumer - * @param handler the MessageHandler used for receiving messages. - * @return the MessageConsumer instance - * @throws IOException covers various communication issues with the NATS - * server such as timeout or interruption - * @throws JetStreamApiException the request had an error related to the data - */ - MessageConsumer orderedConsume(OrderedConsumerConfig config, MessageHandler handler) throws IOException, JetStreamApiException; - - /** - * Create a long-running MessageConsumer with custom ConsumeOptions. See {@link MessageConsumer} and {@link ConsumeOptions} - * @param config the configuration for the ordered consumer - * @param handler the MessageHandler used for receiving messages. - * @param consumeOptions the custom consume options - * @return the MessageConsumer instance - * @throws IOException covers various communication issues with the NATS - * server such as timeout or interruption - * @throws JetStreamApiException the request had an error related to the data - */ - MessageConsumer orderedConsume(OrderedConsumerConfig config, MessageHandler handler, ConsumeOptions consumeOptions) throws IOException, JetStreamApiException; } diff --git a/src/main/java/io/nats/client/api/OrderedConsumerConfig.java b/src/main/java/io/nats/client/api/OrderedConsumerConfiguration.java similarity index 62% rename from src/main/java/io/nats/client/api/OrderedConsumerConfig.java rename to src/main/java/io/nats/client/api/OrderedConsumerConfiguration.java index 4abe21c17..d877faba3 100644 --- a/src/main/java/io/nats/client/api/OrderedConsumerConfig.java +++ b/src/main/java/io/nats/client/api/OrderedConsumerConfiguration.java @@ -15,69 +15,75 @@ import java.time.ZonedDateTime; -import static io.nats.client.support.Validator.emptyAsNull; +import static io.nats.client.support.Validator.emptyOrNullAs; -public class OrderedConsumerConfig { +public class OrderedConsumerConfiguration { + + public static String DEFAULT_FILTER_SUBJECT = ">"; + + private String filterSubject; private DeliverPolicy deliverPolicy; private Long startSequence; private ZonedDateTime startTime; - private String filterSubject; private ReplayPolicy replayPolicy; private Boolean headersOnly; /** - * OrderedConsumerConfig creation works like a builder. + * OrderedConsumerConfiguration creation works like a builder. * The builder supports chaining and will create a default set of options if * no methods are calls, including setting the filter subject to ">" */ - public OrderedConsumerConfig() {} + public OrderedConsumerConfiguration() { + startSequence = ConsumerConfiguration.LONG_UNSET; + filterSubject = DEFAULT_FILTER_SUBJECT; + } /** - * Sets the delivery policy of the OrderedConsumerConfig. - * @param deliverPolicy the delivery policy. + * Sets the filter subject of the OrderedConsumerConfiguration. + * @param filterSubject the filter subject * @return Builder */ - public OrderedConsumerConfig deliverPolicy(DeliverPolicy deliverPolicy) { - this.deliverPolicy = deliverPolicy; + public OrderedConsumerConfiguration filterSubject(String filterSubject) { + this.filterSubject = emptyOrNullAs(filterSubject, DEFAULT_FILTER_SUBJECT); return this; } /** - * Sets the start sequence of the OrderedConsumerConfig. - * @param startSequence the start sequence + * Sets the delivery policy of the OrderedConsumerConfiguration. + * @param deliverPolicy the delivery policy. * @return Builder */ - public OrderedConsumerConfig startSequence(long startSequence) { - this.startSequence = startSequence < 1 ? null : startSequence; + public OrderedConsumerConfiguration deliverPolicy(DeliverPolicy deliverPolicy) { + this.deliverPolicy = deliverPolicy; return this; } /** - * Sets the start time of the OrderedConsumerConfig. - * @param startTime the start time + * Sets the start sequence of the OrderedConsumerConfiguration. + * @param startSequence the start sequence * @return Builder */ - public OrderedConsumerConfig startTime(ZonedDateTime startTime) { - this.startTime = startTime; + public OrderedConsumerConfiguration startSequence(long startSequence) { + this.startSequence = startSequence < 1 ? ConsumerConfiguration.LONG_UNSET : startSequence; return this; } /** - * Sets the filter subject of the OrderedConsumerConfig. - * @param filterSubject the filter subject + * Sets the start time of the OrderedConsumerConfiguration. + * @param startTime the start time * @return Builder */ - public OrderedConsumerConfig filterSubject(String filterSubject) { - this.filterSubject = emptyAsNull(filterSubject); + public OrderedConsumerConfiguration startTime(ZonedDateTime startTime) { + this.startTime = startTime; return this; } /** - * Sets the replay policy of the OrderedConsumerConfig. + * Sets the replay policy of the OrderedConsumerConfiguration. * @param replayPolicy the replay policy. * @return Builder */ - public OrderedConsumerConfig replayPolicy(ReplayPolicy replayPolicy) { + public OrderedConsumerConfiguration replayPolicy(ReplayPolicy replayPolicy) { this.replayPolicy = replayPolicy; return this; } @@ -88,11 +94,15 @@ public OrderedConsumerConfig replayPolicy(ReplayPolicy replayPolicy) { * @param headersOnly the flag * @return Builder */ - public OrderedConsumerConfig headersOnly(Boolean headersOnly) { - this.headersOnly = headersOnly; + public OrderedConsumerConfiguration headersOnly(Boolean headersOnly) { + this.headersOnly = headersOnly != null && headersOnly ? true : null; return this; } + public String getFilterSubject() { + return filterSubject; + } + public DeliverPolicy getDeliverPolicy() { return deliverPolicy; } @@ -105,10 +115,6 @@ public ZonedDateTime getStartTime() { return startTime; } - public String getFilterSubject() { - return filterSubject; - } - public ReplayPolicy getReplayPolicy() { return replayPolicy; } diff --git a/src/main/java/io/nats/client/impl/MessageManager.java b/src/main/java/io/nats/client/impl/MessageManager.java index 207588a5c..bb2767a0b 100644 --- a/src/main/java/io/nats/client/impl/MessageManager.java +++ b/src/main/java/io/nats/client/impl/MessageManager.java @@ -76,7 +76,7 @@ protected void shutdown() { shutdownHeartbeatTimer(); } - protected void startPullRequest(String pullSubject, PullRequestOptions pullRequestOptions, boolean raiseStatusWarnings, TrackPendingListener trackPendingListener) { + protected void startPullRequest(String pullSubject, PullRequestOptions pullRequestOptions, boolean raiseStatusWarnings, PullManagerObserver pullManagerObserver) { // does nothing - only implemented for pulls, but in base class since instance is referenced as MessageManager, not subclass } diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index 88905d14a..dd2a34df2 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -2127,36 +2127,36 @@ void lenientFlushBuffer() { * {@inheritDoc} */ @Override - public StreamContext streamContext(String streamName) throws IOException, JetStreamApiException { + public StreamContext getStreamContext(String streamName) throws IOException, JetStreamApiException { Validator.validateStreamName(streamName, true); ensureNotClosing(); - return new NatsStreamContext(this, null, streamName); + return new NatsStreamContext(streamName, null, this, null); } /** * {@inheritDoc} */ @Override - public StreamContext streamContext(String streamName, JetStreamOptions options) throws IOException, JetStreamApiException { + public StreamContext getStreamContext(String streamName, JetStreamOptions options) throws IOException, JetStreamApiException { Validator.validateStreamName(streamName, true); ensureNotClosing(); - return new NatsStreamContext(this, options, streamName); + return new NatsStreamContext(streamName, null, this, options); } /** * {@inheritDoc} */ @Override - public ConsumerContext consumerContext(String streamName, String consumerName) throws IOException, JetStreamApiException { - return streamContext(streamName).consumerContext(consumerName); + public ConsumerContext getConsumerContext(String streamName, String consumerName) throws IOException, JetStreamApiException { + return getStreamContext(streamName).getConsumerContext(consumerName); } /** * {@inheritDoc} */ @Override - public ConsumerContext consumerContext(String streamName, String consumerName, JetStreamOptions options) throws IOException, JetStreamApiException { - return streamContext(streamName, options).consumerContext(consumerName); + public ConsumerContext getConsumerContext(String streamName, String consumerName, JetStreamOptions options) throws IOException, JetStreamApiException { + return getStreamContext(streamName, options).getConsumerContext(consumerName); } /** diff --git a/src/main/java/io/nats/client/impl/NatsConsumerContext.java b/src/main/java/io/nats/client/impl/NatsConsumerContext.java index e584527b4..455356c88 100644 --- a/src/main/java/io/nats/client/impl/NatsConsumerContext.java +++ b/src/main/java/io/nats/client/impl/NatsConsumerContext.java @@ -14,7 +14,9 @@ package io.nats.client.impl; import io.nats.client.*; +import io.nats.client.api.ConsumerConfiguration; import io.nats.client.api.ConsumerInfo; +import io.nats.client.api.OrderedConsumerConfiguration; import io.nats.client.support.Validator; import java.io.IOException; @@ -28,16 +30,101 @@ /** * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ -public class NatsConsumerContext implements ConsumerContext { +public class NatsConsumerContext implements ConsumerContext, SimplifiedSubscriptionMaker { + private final Object stateLock; + private final NatsStreamContext streamCtx; + private final boolean ordered; + private final String consumerName; + private final ConsumerConfiguration originalOrderedCc; + private final String subscribeSubject; + private final PullSubscribeOptions unorderedBindPso; - private final NatsStreamContext streamContext; - private final PullSubscribeOptions bindPso; - private ConsumerInfo lastConsumerInfo; + private ConsumerInfo cachedConsumerInfo; + private NatsMessageConsumerBase lastConsumer; + private long highestSeq; + private Dispatcher defaultDispatcher; - NatsConsumerContext(NatsStreamContext streamContext, ConsumerInfo ci) throws IOException { - this.streamContext = streamContext; - bindPso = PullSubscribeOptions.bind(streamContext.streamName, ci.getName()); - lastConsumerInfo = ci; + NatsConsumerContext(NatsStreamContext sc, ConsumerInfo ci) { + stateLock = new Object(); + streamCtx = sc; + ordered = false; + consumerName = ci.getName(); + originalOrderedCc = null; + subscribeSubject = null; + unorderedBindPso = PullSubscribeOptions.bind(sc.streamName, consumerName); + cachedConsumerInfo = ci; + } + + NatsConsumerContext(NatsStreamContext sc, OrderedConsumerConfiguration config) { + stateLock = new Object(); + streamCtx = sc; + ordered = true; + consumerName = null; + originalOrderedCc = ConsumerConfiguration.builder() + .filterSubject(config.getFilterSubject()) + .deliverPolicy(config.getDeliverPolicy()) + .startSequence(config.getStartSequence()) + .startTime(config.getStartTime()) + .replayPolicy(config.getReplayPolicy()) + .headersOnly(config.getHeadersOnly()) + .build(); + subscribeSubject = originalOrderedCc.getFilterSubject(); + unorderedBindPso = null; + } + + static class OrderedPullSubscribeOptionsBuilder extends PullSubscribeOptions.Builder { + OrderedPullSubscribeOptionsBuilder(String streamName, ConsumerConfiguration cc) { + stream(streamName); + configuration(cc); + ordered = true; + } + } + + public NatsJetStreamPullSubscription subscribe(MessageHandler messageHandler, Dispatcher userDispatcher) throws IOException, JetStreamApiException { + PullSubscribeOptions pso; + if (ordered) { + if (lastConsumer != null) { + highestSeq = Math.max(highestSeq, lastConsumer.pmm.lastStreamSeq); + } + ConsumerConfiguration cc = lastConsumer == null + ? originalOrderedCc + : streamCtx.js.nextOrderedConsumerConfiguration(originalOrderedCc, highestSeq, null); + pso = new OrderedPullSubscribeOptionsBuilder(streamCtx.streamName, cc).build(); + } + else { + pso = unorderedBindPso; + } + + if (messageHandler == null) { + return (NatsJetStreamPullSubscription) streamCtx.js.subscribe(subscribeSubject, pso); + } + + Dispatcher d = userDispatcher; + if (d == null) { + if (defaultDispatcher == null) { + defaultDispatcher = streamCtx.js.conn.createDispatcher(); + } + d = defaultDispatcher; + } + return (NatsJetStreamPullSubscription) streamCtx.js.subscribe(subscribeSubject, d, messageHandler, pso); + } + + private void checkState() throws IOException { + if (lastConsumer != null) { + if (ordered) { + if (!lastConsumer.finished) { + throw new IOException("The ordered consumer is already receiving messages. Ordered Consumer does not allow multiple instances at time."); + } + } + if (lastConsumer.finished && !lastConsumer.stopped) { + lastConsumer.lenientClose(); // finished, might as well make sure the sub is closed. + } + } + } + + private NatsMessageConsumerBase trackConsume(NatsMessageConsumerBase con) { + lastConsumer = con; + return con; } /** @@ -45,7 +132,7 @@ public class NatsConsumerContext implements ConsumerContext { */ @Override public String getConsumerName() { - return lastConsumerInfo.getName(); + return consumerName; } /** @@ -53,13 +140,18 @@ public String getConsumerName() { */ @Override public ConsumerInfo getConsumerInfo() throws IOException, JetStreamApiException { - lastConsumerInfo = streamContext.jsm.getConsumerInfo(streamContext.streamName, lastConsumerInfo.getName()); - return lastConsumerInfo; + if (consumerName != null) { + cachedConsumerInfo = streamCtx.jsm.getConsumerInfo(streamCtx.streamName, cachedConsumerInfo.getName()); + } + return cachedConsumerInfo; } + /** + * {@inheritDoc} + */ @Override public ConsumerInfo getCachedConsumerInfo() { - return lastConsumerInfo; + return cachedConsumerInfo; } /** @@ -67,7 +159,7 @@ public ConsumerInfo getCachedConsumerInfo() { */ @Override public Message next() throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException { - return new NextSub(DEFAULT_EXPIRES_IN_MILLIS, streamContext.js, bindPso).next(); + return next(DEFAULT_EXPIRES_IN_MILLIS); } /** @@ -75,10 +167,7 @@ public Message next() throws IOException, InterruptedException, JetStreamStatusC */ @Override public Message next(Duration maxWait) throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException { - if (maxWait == null) { - return new NextSub(DEFAULT_EXPIRES_IN_MILLIS, streamContext.js, bindPso).next(); - } - return next(maxWait.toMillis()); + return maxWait == null ? next(DEFAULT_EXPIRES_IN_MILLIS) : next(maxWait.toMillis()); } /** @@ -86,36 +175,33 @@ public Message next(Duration maxWait) throws IOException, InterruptedException, */ @Override public Message next(long maxWaitMillis) throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException { - if (maxWaitMillis < MIN_EXPIRES_MILLS) { - throw new IllegalArgumentException("Max wait must be at least " + MIN_EXPIRES_MILLS + " milliseconds."); - } - return new NextSub(maxWaitMillis, streamContext.js, bindPso).next(); - } - - static class NextSub { - private final long maxWaitMillis; - private final NatsJetStreamPullSubscription sub; + NatsMessageConsumerBase con; + synchronized (stateLock) { + checkState(); + if (maxWaitMillis < MIN_EXPIRES_MILLS) { + throw new IllegalArgumentException("Max wait must be at least " + MIN_EXPIRES_MILLS + " milliseconds."); + } - public NextSub(long maxWaitMillis, NatsJetStream js, PullSubscribeOptions pso) throws JetStreamApiException, IOException { - sub = new SimplifiedSubscriptionMaker(js, pso).makeSubscription(null); - this.maxWaitMillis = maxWaitMillis; - sub._pull(PullRequestOptions.builder(1).expiresIn(maxWaitMillis - EXPIRE_ADJUSTMENT).build(), false, null); + //noinspection resource I close it manually down below + con = new NatsMessageConsumerBase(cachedConsumerInfo); + con.initSub(subscribe(null, null)); + con.sub._pull(PullRequestOptions.builder(1) + .expiresIn(maxWaitMillis - EXPIRE_ADJUSTMENT) + .build(), false, null); + trackConsume(con); } - Message next() throws JetStreamStatusCheckedException, InterruptedException { + // intentionally outside of lock + try { + return con.sub.nextMessage(maxWaitMillis); + } + finally { try { - return sub.nextMessage(maxWaitMillis); - } - catch (JetStreamStatusException e) { - throw new JetStreamStatusCheckedException(e); + con.finished = true; + con.close(); } - finally { - try { - sub.unsubscribe(); - } - catch (Exception ignore) { - // ignored - } + catch (Exception e) { + // from close/autocloseable, but we know it doesn't actually throw } } } @@ -141,25 +227,31 @@ public FetchConsumer fetchBytes(int maxBytes) throws IOException, JetStreamApiEx */ @Override public FetchConsumer fetch(FetchConsumeOptions fetchConsumeOptions) throws IOException, JetStreamApiException { - Validator.required(fetchConsumeOptions, "Fetch Consume Options"); - return new NatsFetchConsumer(new SimplifiedSubscriptionMaker(streamContext.js, bindPso), fetchConsumeOptions, lastConsumerInfo); + synchronized (stateLock) { + checkState(); + Validator.required(fetchConsumeOptions, "Fetch Consume Options"); + return (FetchConsumer)trackConsume(new NatsFetchConsumer(this, cachedConsumerInfo, fetchConsumeOptions)); + } } /** * {@inheritDoc} */ @Override - public IterableConsumer consume() throws IOException, JetStreamApiException { - return new NatsIterableConsumer(new SimplifiedSubscriptionMaker(streamContext.js, bindPso), DEFAULT_CONSUME_OPTIONS, lastConsumerInfo); + public IterableConsumer iterate() throws IOException, JetStreamApiException { + return iterate(DEFAULT_CONSUME_OPTIONS); } /** * {@inheritDoc} */ @Override - public IterableConsumer consume(ConsumeOptions consumeOptions) throws IOException, JetStreamApiException { - Validator.required(consumeOptions, "Consume Options"); - return new NatsIterableConsumer(new SimplifiedSubscriptionMaker(streamContext.js, bindPso), consumeOptions, lastConsumerInfo); + public IterableConsumer iterate(ConsumeOptions consumeOptions) throws IOException, JetStreamApiException { + synchronized (stateLock) { + checkState(); + Validator.required(consumeOptions, "Consume Options"); + return (IterableConsumer) trackConsume(new NatsIterableConsumer(this, cachedConsumerInfo, consumeOptions)); + } } /** @@ -167,18 +259,35 @@ public IterableConsumer consume(ConsumeOptions consumeOptions) throws IOExceptio */ @Override public MessageConsumer consume(MessageHandler handler) throws IOException, JetStreamApiException { - Validator.required(handler, "Message Handler"); - return new NatsMessageConsumer(new SimplifiedSubscriptionMaker(streamContext.js, bindPso), handler, DEFAULT_CONSUME_OPTIONS, lastConsumerInfo); + return consume(DEFAULT_CONSUME_OPTIONS, null, handler); } /** * {@inheritDoc} */ @Override - public MessageConsumer consume(MessageHandler handler, ConsumeOptions consumeOptions) throws IOException, JetStreamApiException { - Validator.required(handler, "Message Handler"); - Validator.required(consumeOptions, "Consume Options"); - return new NatsMessageConsumer(new SimplifiedSubscriptionMaker(streamContext.js, bindPso), handler, consumeOptions, lastConsumerInfo); + public MessageConsumer consume(Dispatcher dispatcher, MessageHandler handler) throws IOException, JetStreamApiException { + return consume(DEFAULT_CONSUME_OPTIONS, dispatcher, handler); } + /** + * {@inheritDoc} + */ + @Override + public MessageConsumer consume(ConsumeOptions consumeOptions, MessageHandler handler) throws IOException, JetStreamApiException { + return consume(consumeOptions, null, handler); + } + + /** + * {@inheritDoc} + */ + @Override + public MessageConsumer consume(ConsumeOptions consumeOptions, Dispatcher userDispatcher, MessageHandler handler) throws IOException, JetStreamApiException { + synchronized (stateLock) { + checkState(); + Validator.required(handler, "Message Handler"); + Validator.required(consumeOptions, "Consume Options"); + return trackConsume(new NatsMessageConsumer(this, cachedConsumerInfo, consumeOptions, userDispatcher, handler)); + } + } } diff --git a/src/main/java/io/nats/client/impl/NatsFetchConsumer.java b/src/main/java/io/nats/client/impl/NatsFetchConsumer.java index 17a18d11d..805049d61 100644 --- a/src/main/java/io/nats/client/impl/NatsFetchConsumer.java +++ b/src/main/java/io/nats/client/impl/NatsFetchConsumer.java @@ -23,17 +23,19 @@ class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer private final String pullSubject; private long startNanos; - public NatsFetchConsumer(SimplifiedSubscriptionMaker subscriptionMaker, - FetchConsumeOptions fetchConsumeOptions, - ConsumerInfo lastConsumerInfo) throws IOException, JetStreamApiException { - super(lastConsumerInfo); - initSub(subscriptionMaker.makeSubscription(null)); - maxWaitNanos = fetchConsumeOptions.getExpiresIn() * 1_000_000; + NatsFetchConsumer(SimplifiedSubscriptionMaker subscriptionMaker, + ConsumerInfo cachedConsumerInfo, + FetchConsumeOptions fetchConsumeOptions) throws IOException, JetStreamApiException + { + super(cachedConsumerInfo); + + maxWaitNanos = fetchConsumeOptions.getExpiresInMillis() * 1_000_000; PullRequestOptions pro = PullRequestOptions.builder(fetchConsumeOptions.getMaxMessages()) .maxBytes(fetchConsumeOptions.getMaxBytes()) - .expiresIn(fetchConsumeOptions.getExpiresIn()) + .expiresIn(fetchConsumeOptions.getExpiresInMillis()) .idleHeartbeat(fetchConsumeOptions.getIdleHeartbeat()) .build(); + initSub(subscriptionMaker.subscribe(null, null)); pullSubject = sub._pull(pro, false, null); startNanos = -1; } @@ -41,16 +43,34 @@ public NatsFetchConsumer(SimplifiedSubscriptionMaker subscriptionMaker, @Override public Message nextMessage() throws InterruptedException, JetStreamStatusCheckedException { try { - if (startNanos == -1) { - startNanos = System.nanoTime(); + if (finished) { + return null; } - long timeLeftMillis = (maxWaitNanos - (System.nanoTime() - startNanos)) / 1_000_000; - // if the manager thinks it has received everything in the pull, it means // that all the messages are already in the internal queue and there is // no waiting necessary - if (timeLeftMillis < 1 | pmm.pendingMessages < 1 || (pmm.trackingBytes && pmm.pendingBytes < 1)) { + if (pmm.noMorePending()) { + Message m = sub._nextUnmanagedNoWait(pullSubject); + if (m == null) { + // if there are no messages in the internal cache AND there are no more pending, + // they all have been read and we can go ahead and close the subscription. + finished = true; + lenientClose(); + } + return m; + } + + // by not starting the timer until the first call, it gives a little buffer around + // the next message to account for latency of incoming messages + if (startNanos == -1) { + startNanos = System.nanoTime(); + } + long timeLeftMillis = (maxWaitNanos - (System.nanoTime() - startNanos)) / 1_000_000; + + // if the timer has run out, don't allow waiting + // this might happen once, but it should already be noMorePending + if (timeLeftMillis < 1) { return sub._nextUnmanagedNoWait(pullSubject); // null means don't wait } diff --git a/src/main/java/io/nats/client/impl/NatsIterableConsumer.java b/src/main/java/io/nats/client/impl/NatsIterableConsumer.java index 4d6cea4d3..01d8c7b7a 100644 --- a/src/main/java/io/nats/client/impl/NatsIterableConsumer.java +++ b/src/main/java/io/nats/client/impl/NatsIterableConsumer.java @@ -19,10 +19,10 @@ import java.io.IOException; import java.time.Duration; -public class NatsIterableConsumer extends NatsMessageConsumer implements IterableConsumer { +class NatsIterableConsumer extends NatsMessageConsumer implements IterableConsumer { - public NatsIterableConsumer(SimplifiedSubscriptionMaker subscriptionMaker, ConsumeOptions opts, ConsumerInfo lastConsumerInfo) throws IOException, JetStreamApiException { - super(subscriptionMaker, null, opts, lastConsumerInfo); + NatsIterableConsumer(SimplifiedSubscriptionMaker subscriptionMaker, ConsumerInfo cachedConsumerInfo, ConsumeOptions opts) throws IOException, JetStreamApiException { + super(subscriptionMaker, cachedConsumerInfo, opts, null, null); } /** @@ -31,7 +31,11 @@ public NatsIterableConsumer(SimplifiedSubscriptionMaker subscriptionMaker, Consu @Override public Message nextMessage(Duration timeout) throws InterruptedException, JetStreamStatusCheckedException { try { - return sub.nextMessage(timeout); + Message msg = sub.nextMessage(timeout); + if (msg != null && stopped && pmm.noMorePending()) { + finished = true; + } + return msg; } catch (JetStreamStatusException e) { throw new JetStreamStatusCheckedException(e); diff --git a/src/main/java/io/nats/client/impl/NatsJetStream.java b/src/main/java/io/nats/client/impl/NatsJetStream.java index 5936d70ac..6649ca64e 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStream.java +++ b/src/main/java/io/nats/client/impl/NatsJetStream.java @@ -232,7 +232,7 @@ MessageManager createMessageManager( MessageManagerFactory _pullMessageManagerFactory = (mmConn, mmJs, mmStream, mmSo, mmCc, mmQueueMode, mmSyncMode) -> new PullMessageManager(mmConn, mmSo, mmSyncMode); MessageManagerFactory _pullOrderedMessageManagerFactory = - (mmConn, mmJs, mmStream, mmSo, mmCc, mmQueueMode, mmSyncMode) -> new PullOrderedMessageManager(mmConn, mmJs, mmStream, mmSo, mmCc, mmSyncMode); + (mmConn, mmJs, mmStream, mmSo, mmCc, mmQueueMode, mmSyncMode) -> new OrderedPullMessageManager(mmConn, mmJs, mmStream, mmSo, mmCc, mmSyncMode); JetStreamSubscription createSubscription(String subject, String queueName, @@ -325,7 +325,7 @@ JetStreamSubscription createSubscription(String subject, // because modifications are not allowed during create subscription ConsumerConfigurationComparer userCCC = new ConsumerConfigurationComparer(userCC); List changes = userCCC.getChanges(serverCC); - if (changes.size() > 0) { + if (!changes.isEmpty()) { throw JsSubExistingConsumerCannotBeModified.instance("Changed fields: " + changes); } @@ -637,7 +637,7 @@ private boolean isSubjectRequired(SubscribeOptions options) { * {@inheritDoc} */ @Override - public StreamContext streamContext(String streamName) throws IOException, JetStreamApiException { + public StreamContext getStreamContext(String streamName) throws IOException, JetStreamApiException { Validator.validateStreamName(streamName, true); return getNatsStreamContext(streamName); } @@ -646,13 +646,13 @@ public StreamContext streamContext(String streamName) throws IOException, JetStr * {@inheritDoc} */ @Override - public ConsumerContext consumerContext(String streamName, String consumerName) throws IOException, JetStreamApiException { + public ConsumerContext getConsumerContext(String streamName, String consumerName) throws IOException, JetStreamApiException { Validator.validateStreamName(streamName, true); Validator.required(consumerName, "Consumer Name"); - return getNatsStreamContext(streamName).consumerContext(consumerName); + return getNatsStreamContext(streamName).getConsumerContext(consumerName); } private NatsStreamContext getNatsStreamContext(String streamName) throws IOException, JetStreamApiException { - return new NatsStreamContext(this, conn, jso, streamName); + return new NatsStreamContext(streamName, this, conn, jso); } } diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java b/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java index 681e44293..c7708776d 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java @@ -16,6 +16,7 @@ import io.nats.client.JetStreamApiException; import io.nats.client.JetStreamOptions; import io.nats.client.Message; +import io.nats.client.NUID; import io.nats.client.api.*; import io.nats.client.support.NatsJetStreamConstants; @@ -24,7 +25,6 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; -import static io.nats.client.support.ConsumerUtils.generateConsumerName; import static io.nats.client.support.NatsJetStreamClientError.JsConsumerCreate290NotAvailable; import static io.nats.client.support.NatsRequestCompletableFuture.CancelAction; @@ -150,6 +150,23 @@ List _getStreamNames(String subjectFilter) throws IOException, JetStream // ---------------------------------------------------------------------------------------------------- // General Utils // ---------------------------------------------------------------------------------------------------- + String generateConsumerName() { + return NUID.nextGlobalSequence(); + } + + ConsumerConfiguration nextOrderedConsumerConfiguration( + ConsumerConfiguration originalCc, + long lastStreamSeq, + String newDeliverSubject) + { + return ConsumerConfiguration.builder(originalCc) + .deliverPolicy(DeliverPolicy.ByStartSequence) + .deliverSubject(newDeliverSubject) + .startSequence(Math.max(1, lastStreamSeq + 1)) + .startTime(null) // clear start time in case it was originally set + .build(); + } + ConsumerInfo lookupConsumerInfo(String streamName, String consumerName) throws IOException, JetStreamApiException { try { return _getConsumerInfo(streamName, consumerName); diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamPullSubscription.java b/src/main/java/io/nats/client/impl/NatsJetStreamPullSubscription.java index 5ab2bec1a..f122543fb 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamPullSubscription.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamPullSubscription.java @@ -58,10 +58,10 @@ public void pull(PullRequestOptions pullRequestOptions) { _pull(pullRequestOptions, true, null); } - protected String _pull(PullRequestOptions pullRequestOptions, boolean raiseStatusWarnings, TrackPendingListener trackPendingListener) { + protected String _pull(PullRequestOptions pullRequestOptions, boolean raiseStatusWarnings, PullManagerObserver pullManagerObserver) { String publishSubject = js.prependPrefix(String.format(JSAPI_CONSUMER_MSG_NEXT, stream, consumerName)); String pullSubject = getSubject().replace("*", Long.toString(this.pullSubjectIdHolder.incrementAndGet())); - manager.startPullRequest(pullSubject, pullRequestOptions, raiseStatusWarnings, trackPendingListener); + manager.startPullRequest(pullSubject, pullRequestOptions, raiseStatusWarnings, pullManagerObserver); connection.publish(publishSubject, pullSubject, pullRequestOptions.serialize()); connection.lenientFlushBuffer(); return pullSubject; diff --git a/src/main/java/io/nats/client/impl/NatsMessageConsumer.java b/src/main/java/io/nats/client/impl/NatsMessageConsumer.java index 4852210ee..d499c1c5b 100644 --- a/src/main/java/io/nats/client/impl/NatsMessageConsumer.java +++ b/src/main/java/io/nats/client/impl/NatsMessageConsumer.java @@ -13,27 +13,23 @@ package io.nats.client.impl; -import io.nats.client.ConsumeOptions; -import io.nats.client.JetStreamApiException; -import io.nats.client.MessageHandler; -import io.nats.client.PullRequestOptions; +import io.nats.client.*; import io.nats.client.api.ConsumerInfo; import java.io.IOException; -class NatsMessageConsumer extends NatsMessageConsumerBase implements TrackPendingListener { +class NatsMessageConsumer extends NatsMessageConsumerBase implements PullManagerObserver { protected final PullRequestOptions rePullPro; protected final int thresholdMessages; protected final long thresholdBytes; - protected final SimplifiedSubscriptionMaker subscriptionMaker; NatsMessageConsumer(SimplifiedSubscriptionMaker subscriptionMaker, - MessageHandler messageHandler, + ConsumerInfo cachedConsumerInfo, ConsumeOptions opts, - ConsumerInfo lastConsumerInfo) throws IOException, JetStreamApiException { - super(lastConsumerInfo); - this.subscriptionMaker = subscriptionMaker; - initSub(subscriptionMaker.makeSubscription(messageHandler)); + Dispatcher userDispatcher, + final MessageHandler userMessageHandler) throws IOException, JetStreamApiException + { + super(cachedConsumerInfo); int bm = opts.getBatchSize(); long bb = opts.getBatchBytes(); @@ -42,26 +38,31 @@ class NatsMessageConsumer extends NatsMessageConsumerBase implements TrackPendin long rePullBytes = bb == 0 ? 0 : Math.max(1, bb * opts.getThresholdPercent() / 100); rePullPro = PullRequestOptions.builder(rePullMessages) .maxBytes(rePullBytes) - .expiresIn(opts.getExpiresIn()) + .expiresIn(opts.getExpiresInMillis()) .idleHeartbeat(opts.getIdleHeartbeat()) .build(); thresholdMessages = bm - rePullMessages; thresholdBytes = bb == 0 ? Integer.MIN_VALUE : bb - rePullBytes; + MessageHandler mh = userMessageHandler == null ? null : msg -> { + userMessageHandler.onMessage(msg); + if (stopped && pmm.noMorePending()) { + finished = true; + } + }; + initSub(subscriptionMaker.subscribe(mh, userDispatcher)); sub._pull(PullRequestOptions.builder(bm) .maxBytes(bb) - .expiresIn(opts.getExpiresIn()) + .expiresIn(opts.getExpiresInMillis()) .idleHeartbeat(opts.getIdleHeartbeat()) .build(), false, this); } @Override - public void track(int pendingMessages, long pendingBytes, boolean trackingBytes) { - if (!stopped && - (pmm.pendingMessages <= thresholdMessages - || (pmm.trackingBytes && pmm.pendingBytes <= thresholdBytes))) + public void pendingUpdated() { + if (!stopped && (pmm.pendingMessages <= thresholdMessages || (pmm.trackingBytes && pmm.pendingBytes <= thresholdBytes))) { sub._pull(rePullPro, false, this); } diff --git a/src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java b/src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java index 25b865b36..a5ea48955 100644 --- a/src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java +++ b/src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java @@ -18,37 +18,47 @@ import io.nats.client.api.ConsumerInfo; import java.io.IOException; -import java.time.Duration; class NatsMessageConsumerBase implements MessageConsumer { protected NatsJetStreamPullSubscription sub; protected PullMessageManager pmm; - protected final Object subLock; protected boolean stopped; - protected ConsumerInfo lastConsumerInfo; + protected boolean finished; + protected ConsumerInfo cachedConsumerInfo; - NatsMessageConsumerBase(ConsumerInfo lastConsumerInfo) { - subLock = new Object(); - this.lastConsumerInfo = lastConsumerInfo; + NatsMessageConsumerBase(ConsumerInfo cachedConsumerInfo) { + this.cachedConsumerInfo = cachedConsumerInfo; } - protected void initSub(NatsJetStreamPullSubscription sub) throws JetStreamApiException, IOException { + void initSub(NatsJetStreamPullSubscription sub) { this.sub = sub; - if (lastConsumerInfo == null) { - lastConsumerInfo = sub.getConsumerInfo(); - } pmm = (PullMessageManager)sub.manager; } + /** + * {@inheritDoc} + */ + public boolean isStopped() { + return stopped; + } + + /** + * {@inheritDoc} + */ + public boolean isFinished() { + return finished; + } + /** * {@inheritDoc} */ @Override public ConsumerInfo getConsumerInfo() throws IOException, JetStreamApiException { - synchronized (subLock) { - lastConsumerInfo = sub.getConsumerInfo(); - return lastConsumerInfo; + // don't look up consumer info if it was never set - this check is for ordered consumer + if (cachedConsumerInfo != null) { + cachedConsumerInfo = sub.getConsumerInfo(); } + return cachedConsumerInfo; } /** @@ -56,35 +66,26 @@ public ConsumerInfo getConsumerInfo() throws IOException, JetStreamApiException */ @Override public ConsumerInfo getCachedConsumerInfo() { - return lastConsumerInfo; + return cachedConsumerInfo; } /** * {@inheritDoc} */ @Override - public void stop(long timeout) throws InterruptedException { - synchronized (subLock) { - if (!stopped) { - try { - if (sub.getNatsDispatcher() != null) { - sub.getDispatcher().drain(Duration.ofMillis(timeout)); - } - else { - sub.drain(Duration.ofMillis(timeout)); - } - } - finally { - stopped = true; - } - } - } + public void stop() { + stopped = true; } @Override public void close() throws Exception { + lenientClose(); + } + + protected void lenientClose() { try { - if (!stopped && sub.isActive()) { + if (!stopped || sub.isActive()) { + stopped = true; if (sub.getNatsDispatcher() != null) { sub.getDispatcher().unsubscribe(sub); } diff --git a/src/main/java/io/nats/client/impl/NatsOrderedConsumerContext.java b/src/main/java/io/nats/client/impl/NatsOrderedConsumerContext.java new file mode 100644 index 000000000..5eafbb800 --- /dev/null +++ b/src/main/java/io/nats/client/impl/NatsOrderedConsumerContext.java @@ -0,0 +1,90 @@ +// Copyright 2020-2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client.impl; + +import io.nats.client.*; +import io.nats.client.api.OrderedConsumerConfiguration; + +import java.io.IOException; +import java.time.Duration; + +/** + * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE + */ +public class NatsOrderedConsumerContext implements OrderedConsumerContext { + NatsConsumerContext impl; + NatsOrderedConsumerContext(NatsStreamContext streamContext, OrderedConsumerConfiguration config) { + impl = new NatsConsumerContext(streamContext, config); + } + + @Override + public Message next() throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException { + return impl.next(); + } + + @Override + public Message next(Duration maxWait) throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException { + return impl.next(maxWait); + } + + @Override + public Message next(long maxWaitMillis) throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException { + return impl.next(maxWaitMillis); + } + + @Override + public FetchConsumer fetchMessages(int maxMessages) throws IOException, JetStreamApiException { + return impl.fetchMessages(maxMessages); + } + + @Override + public FetchConsumer fetchBytes(int maxBytes) throws IOException, JetStreamApiException { + return impl.fetchBytes(maxBytes); + } + + @Override + public FetchConsumer fetch(FetchConsumeOptions fetchConsumeOptions) throws IOException, JetStreamApiException { + return impl.fetch(fetchConsumeOptions); + } + + @Override + public IterableConsumer iterate() throws IOException, JetStreamApiException { + return impl.iterate(); + } + + @Override + public IterableConsumer iterate(ConsumeOptions consumeOptions) throws IOException, JetStreamApiException { + return impl.iterate(consumeOptions); + } + + @Override + public MessageConsumer consume(MessageHandler handler) throws IOException, JetStreamApiException { + return impl.consume(handler); + } + + @Override + public MessageConsumer consume(Dispatcher dispatcher, MessageHandler handler) throws IOException, JetStreamApiException { + return impl.consume(dispatcher, handler); + } + + @Override + public MessageConsumer consume(ConsumeOptions consumeOptions, MessageHandler handler) throws IOException, JetStreamApiException { + return impl.consume(consumeOptions, handler); + } + + @Override + public MessageConsumer consume(ConsumeOptions consumeOptions, Dispatcher dispatcher, MessageHandler handler) throws IOException, JetStreamApiException { + return impl.consume(consumeOptions, dispatcher, handler); + } +} diff --git a/src/main/java/io/nats/client/impl/NatsStreamContext.java b/src/main/java/io/nats/client/impl/NatsStreamContext.java index 3d2de7b19..875949224 100644 --- a/src/main/java/io/nats/client/impl/NatsStreamContext.java +++ b/src/main/java/io/nats/client/impl/NatsStreamContext.java @@ -15,34 +15,23 @@ import io.nats.client.*; import io.nats.client.api.*; -import io.nats.client.support.Validator; import java.io.IOException; import java.util.List; -import static io.nats.client.ConsumeOptions.DEFAULT_CONSUME_OPTIONS; -import static io.nats.client.support.ConsumerUtils.generateConsumerName; - /** * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ class NatsStreamContext implements StreamContext { - final NatsJetStreamManagement jsm; - final NatsJetStream js; final String streamName; - - NatsStreamContext(NatsConnection connection, JetStreamOptions jsOptions, String streamName) throws IOException, JetStreamApiException { - jsm = new NatsJetStreamManagement(connection, jsOptions); - js = new NatsJetStream(connection, jsOptions); - this.streamName = streamName; - jsm.getStreamInfo(streamName); // this is just verifying that the stream exists - } + final NatsJetStream js; + final NatsJetStreamManagement jsm; // for when this is contructed from the NatsJetStream itself - NatsStreamContext(NatsJetStream js, NatsConnection connection, JetStreamOptions jsOptions, String streamName) throws IOException, JetStreamApiException { - jsm = new NatsJetStreamManagement(connection, jsOptions); - this.js = js; + NatsStreamContext(String streamName, NatsJetStream js, NatsConnection connection, JetStreamOptions jsOptions) throws IOException, JetStreamApiException { this.streamName = streamName; + this.js = js == null ? new NatsJetStream(connection, jsOptions) : js; + jsm = new NatsJetStreamManagement(connection, jsOptions); jsm.getStreamInfo(streamName); // this is just verifying that the stream exists } @@ -90,7 +79,7 @@ public PurgeResponse purge(PurgeOptions options) throws IOException, JetStreamAp * {@inheritDoc} */ @Override - public ConsumerContext consumerContext(String consumerName) throws IOException, JetStreamApiException { + public ConsumerContext getConsumerContext(String consumerName) throws IOException, JetStreamApiException { return new NatsConsumerContext(this, jsm.getConsumerInfo(streamName, consumerName)); } @@ -102,6 +91,14 @@ public ConsumerContext createOrUpdateConsumer(ConsumerConfiguration config) thro return new NatsConsumerContext(this, jsm.addOrUpdateConsumer(streamName, config)); } + /** + * {@inheritDoc} + */ + @Override + public OrderedConsumerContext createOrderedConsumer(OrderedConsumerConfiguration config) throws IOException, JetStreamApiException { + return new NatsOrderedConsumerContext(this, config); + } + /** * {@inheritDoc} */ @@ -181,73 +178,4 @@ public boolean deleteMessage(long seq) throws IOException, JetStreamApiException public boolean deleteMessage(long seq, boolean erase) throws IOException, JetStreamApiException { return jsm.deleteMessage(streamName, seq, erase); } - - static class OrderedPullSubscribeOptionsBuilder extends PullSubscribeOptions.Builder { - public OrderedPullSubscribeOptionsBuilder(String stream, ConsumerConfiguration config) { - stream(stream).configuration(config); - ordered = true; - } - } - - @Override - public IterableConsumer orderedConsume(OrderedConsumerConfig config) throws IOException, JetStreamApiException { - return orderedConsume(config, DEFAULT_CONSUME_OPTIONS); - } - - @Override - public IterableConsumer orderedConsume(OrderedConsumerConfig config, ConsumeOptions consumeOptions) throws IOException, JetStreamApiException { - Validator.required(config, "Ordered Consumer Config"); - Validator.required(consumeOptions, "Consume Options"); - ConsumerConfiguration cc = getBackingConsumerConfiguration(config); - PullSubscribeOptions pso = new OrderedPullSubscribeOptionsBuilder(streamName, cc).build(); - return new NatsIterableConsumer(new SimplifiedSubscriptionMaker(js, pso, cc.getFilterSubject()), consumeOptions, null); - } - - @Override - public MessageConsumer orderedConsume(OrderedConsumerConfig config, MessageHandler handler) throws IOException, JetStreamApiException { - return orderedConsume(config, handler, DEFAULT_CONSUME_OPTIONS); - } - - @Override - public MessageConsumer orderedConsume(OrderedConsumerConfig config, MessageHandler handler, ConsumeOptions consumeOptions) throws IOException, JetStreamApiException { - Validator.required(config, "Ordered Consumer Config"); - Validator.required(handler, "Message Handler"); - Validator.required(consumeOptions, "Consume Options"); - ConsumerConfiguration cc = getBackingConsumerConfiguration(config); - PullSubscribeOptions pso = new OrderedPullSubscribeOptionsBuilder(streamName, cc).build(); - return new NatsMessageConsumer(new SimplifiedSubscriptionMaker(js, pso, cc.getFilterSubject()), handler, consumeOptions, null); - } - - private ConsumerConfiguration getBackingConsumerConfiguration(OrderedConsumerConfig config) { - ConsumerConfiguration.Builder b = ConsumerConfiguration.builder().name(generateConsumerName()); - - if (config.getDeliverPolicy() != null) { - b.deliverPolicy(config.getDeliverPolicy()); - } - - if (config.getStartSequence() != null) { - b.startSequence(config.getStartSequence()); - } - - if (config.getStartTime() != null) { - b.startTime(config.getStartTime()); - } - - if (config.getFilterSubject() == null) { - b.filterSubject(">"); - } - else { - b.filterSubject(config.getFilterSubject()); - } - - if (config.getReplayPolicy() != null) { - b.replayPolicy(config.getReplayPolicy()); - } - - if (config.getHeadersOnly() != null && config.getHeadersOnly()) { - b.headersOnly(true); - } - - return b.build(); - } } diff --git a/src/main/java/io/nats/client/impl/OrderedMessageManager.java b/src/main/java/io/nats/client/impl/OrderedMessageManager.java index 52e550349..9acba9fc9 100644 --- a/src/main/java/io/nats/client/impl/OrderedMessageManager.java +++ b/src/main/java/io/nats/client/impl/OrderedMessageManager.java @@ -16,7 +16,6 @@ import io.nats.client.Message; import io.nats.client.SubscribeOptions; import io.nats.client.api.ConsumerConfiguration; -import io.nats.client.api.DeliverPolicy; import java.util.concurrent.atomic.AtomicReference; @@ -28,13 +27,15 @@ class OrderedMessageManager extends PushMessageManager { protected long expectedExternalConsumerSeq; protected final AtomicReference targetSid; - protected OrderedMessageManager(NatsConnection conn, - NatsJetStream js, - String stream, - SubscribeOptions so, - ConsumerConfiguration originalCc, - boolean queueMode, - boolean syncMode) { + protected OrderedMessageManager( + NatsConnection conn, + NatsJetStream js, + String stream, + SubscribeOptions so, + ConsumerConfiguration originalCc, + boolean queueMode, + boolean syncMode) + { super(conn, js, stream, so, originalCc, queueMode, syncMode); expectedExternalConsumerSeq = 1; // always starts at 1 targetSid = new AtomicReference<>(); @@ -82,22 +83,16 @@ private void handleErrorCondition() { // 3. make a new consumer using the same deliver subject but // with a new starting point - ConsumerConfiguration userCC = ConsumerConfiguration.builder(originalCc) - .deliverPolicy(DeliverPolicy.ByStartSequence) - .deliverSubject(newDeliverSubject) - .startSequence(Math.max(1, lastStreamSeq + 1)) - .startTime(null) // clear start time in case it was originally set - .build(); + ConsumerConfiguration userCC = js.nextOrderedConsumerConfiguration(originalCc, lastStreamSeq, newDeliverSubject); js._createConsumerUnsubscribeOnException(stream, userCC, sub); // 4. restart the manager. startup(sub); } catch (Exception e) { - IllegalStateException ise = new IllegalStateException("Ordered subscription fatal error.", e); - js.conn.processException(ise); + js.conn.processException(e); if (syncMode) { - throw ise; + throw new RuntimeException("Ordered subscription fatal error.", e); } } } diff --git a/src/main/java/io/nats/client/impl/PullOrderedMessageManager.java b/src/main/java/io/nats/client/impl/OrderedPullMessageManager.java similarity index 84% rename from src/main/java/io/nats/client/impl/PullOrderedMessageManager.java rename to src/main/java/io/nats/client/impl/OrderedPullMessageManager.java index a469010d3..656b0d465 100644 --- a/src/main/java/io/nats/client/impl/PullOrderedMessageManager.java +++ b/src/main/java/io/nats/client/impl/OrderedPullMessageManager.java @@ -16,15 +16,13 @@ import io.nats.client.Message; import io.nats.client.SubscribeOptions; import io.nats.client.api.ConsumerConfiguration; -import io.nats.client.api.DeliverPolicy; -import io.nats.client.support.ConsumerUtils; import java.util.concurrent.atomic.AtomicReference; import static io.nats.client.impl.MessageManager.ManageResult.MESSAGE; import static io.nats.client.impl.MessageManager.ManageResult.STATUS_HANDLED; -class PullOrderedMessageManager extends PullMessageManager { +class OrderedPullMessageManager extends PullMessageManager { protected final ConsumerConfiguration originalCc; protected final NatsJetStream js; @@ -32,7 +30,7 @@ class PullOrderedMessageManager extends PullMessageManager { protected long expectedExternalConsumerSeq; protected final AtomicReference targetSid; - protected PullOrderedMessageManager(NatsConnection conn, + protected OrderedPullMessageManager(NatsConnection conn, NatsJetStream js, String stream, SubscribeOptions so, ConsumerConfiguration originalCc, boolean syncMode) { @@ -86,13 +84,7 @@ private void handleErrorCondition() { // 3. make a new consumer using the same deliver subject but // with a new starting point - ConsumerConfiguration userCC = ConsumerConfiguration.builder(originalCc) - .name(ConsumerUtils.generateConsumerName()) - .deliverPolicy(DeliverPolicy.ByStartSequence) - .deliverSubject(newDeliverSubject) - .startSequence(Math.max(1, lastStreamSeq + 1)) - .startTime(null) // clear start time in case it was originally set - .build(); + ConsumerConfiguration userCC = js.nextOrderedConsumerConfiguration(originalCc, lastStreamSeq, newDeliverSubject); js._createConsumerUnsubscribeOnException(stream, userCC, sub); // 4. restart the manager. diff --git a/src/main/java/io/nats/client/impl/TrackPendingListener.java b/src/main/java/io/nats/client/impl/PullManagerObserver.java similarity index 84% rename from src/main/java/io/nats/client/impl/TrackPendingListener.java rename to src/main/java/io/nats/client/impl/PullManagerObserver.java index ec3a55573..838d52927 100644 --- a/src/main/java/io/nats/client/impl/TrackPendingListener.java +++ b/src/main/java/io/nats/client/impl/PullManagerObserver.java @@ -13,6 +13,6 @@ package io.nats.client.impl; -interface TrackPendingListener { - void track(int pendingMessages, long pendingBytes, boolean trackingBytes); +interface PullManagerObserver { + void pendingUpdated(); } diff --git a/src/main/java/io/nats/client/impl/PullMessageManager.java b/src/main/java/io/nats/client/impl/PullMessageManager.java index 8a7a6d2ea..374993672 100644 --- a/src/main/java/io/nats/client/impl/PullMessageManager.java +++ b/src/main/java/io/nats/client/impl/PullMessageManager.java @@ -29,7 +29,7 @@ class PullMessageManager extends MessageManager { protected long pendingBytes; protected boolean trackingBytes; protected boolean raiseStatusWarnings; - protected TrackPendingListener trackPendingListener; + protected PullManagerObserver pullManagerObserver; protected PullMessageManager(NatsConnection conn, SubscribeOptions so, boolean syncMode) { super(conn, so, syncMode); @@ -45,10 +45,10 @@ protected void startup(NatsJetStreamSubscription sub) { } @Override - protected void startPullRequest(String pullSubject, PullRequestOptions pro, boolean raiseStatusWarnings, TrackPendingListener trackPendingListener) { + protected void startPullRequest(String pullSubject, PullRequestOptions pro, boolean raiseStatusWarnings, PullManagerObserver pullManagerObserver) { synchronized (stateChangeLock) { this.raiseStatusWarnings = raiseStatusWarnings; - this.trackPendingListener = trackPendingListener; + this.pullManagerObserver = pullManagerObserver; pendingMessages += pro.getBatchSize(); pendingBytes += pro.getMaxBytes(); trackingBytes = (pendingBytes > 0); @@ -78,8 +78,8 @@ private void trackPending(int m, long b) { shutdownHeartbeatTimer(); } } - if (trackPendingListener != null) { - trackPendingListener.track(pendingMessages, pendingBytes, trackingBytes); + if (pullManagerObserver != null) { + pullManagerObserver.pendingUpdated(); } } } @@ -155,8 +155,12 @@ protected ManageResult manageStatus(Message msg) { break; } - // fall through, all others are errors + // all others are errors conn.executeCallback((c, el) -> el.pullStatusError(c, sub, status)); return STATUS_ERROR; } + + protected boolean noMorePending() { + return pendingMessages < 1 || (trackingBytes && pendingBytes < 1); + } } diff --git a/src/main/java/io/nats/client/impl/SimplifiedSubscriptionMaker.java b/src/main/java/io/nats/client/impl/SimplifiedSubscriptionMaker.java index ff883fe18..7790215ae 100644 --- a/src/main/java/io/nats/client/impl/SimplifiedSubscriptionMaker.java +++ b/src/main/java/io/nats/client/impl/SimplifiedSubscriptionMaker.java @@ -16,35 +16,9 @@ import io.nats.client.Dispatcher; import io.nats.client.JetStreamApiException; import io.nats.client.MessageHandler; -import io.nats.client.PullSubscribeOptions; import java.io.IOException; -public class SimplifiedSubscriptionMaker { - private final NatsJetStream js; - private final PullSubscribeOptions pso; - private final String subscribeSubject; - @SuppressWarnings("FieldCanBeLocal") // can't be local because I want to keep its reference as part of the state - private Dispatcher dispatcher; - - public SimplifiedSubscriptionMaker(NatsJetStream js, PullSubscribeOptions pso) { - this.js = js; - this.pso = pso; - this.subscribeSubject = null; - } - - public SimplifiedSubscriptionMaker(NatsJetStream js, PullSubscribeOptions pso, String subscribeSubject) { - this.js = js; - this.pso = pso; - this.subscribeSubject = subscribeSubject; - } - - public NatsJetStreamPullSubscription makeSubscription(MessageHandler messageHandler) throws IOException, JetStreamApiException { - if (messageHandler == null) { - return (NatsJetStreamPullSubscription)js.subscribe(subscribeSubject, pso); - } - - dispatcher = js.conn.createDispatcher(); - return (NatsJetStreamPullSubscription)js.subscribe(subscribeSubject, dispatcher, messageHandler, pso); - } +interface SimplifiedSubscriptionMaker { + NatsJetStreamPullSubscription subscribe(MessageHandler optionalMessageHandler, Dispatcher optionalDispatcher) throws IOException, JetStreamApiException; } diff --git a/src/main/java/io/nats/client/support/Encoding.java b/src/main/java/io/nats/client/support/Encoding.java index 0e3687532..ab348bab5 100644 --- a/src/main/java/io/nats/client/support/Encoding.java +++ b/src/main/java/io/nats/client/support/Encoding.java @@ -23,29 +23,57 @@ public abstract class Encoding { private Encoding() {} /* ensures cannot be constructed */ /** - * @deprecated prefere base64UrlEncode + * base64 url encode a byte array to a byte array + * @param input the input byte array to encode + * @return the encoded byte array + * @deprecated prefer base64UrlEncode */ @Deprecated public static byte[] base64Encode(byte[] input) { return Base64.getUrlEncoder().withoutPadding().encode(input); } + /** + * base64 url encode a byte array to a byte array + * @param input the input byte array to encode + * @return the encoded byte array + */ public static byte[] base64UrlEncode(byte[] input) { return Base64.getUrlEncoder().withoutPadding().encode(input); } + /** + * base64 url encode a byte array to a string + * @param input the input byte array to encode + * @return the encoded string + */ public static String toBase64Url(byte[] input) { return new String(base64UrlEncode(input)); } + /** + * base64 url encode a string to a string + * @param input the input string to encode + * @return the encoded string + */ public static String toBase64Url(String input) { return new String(base64UrlEncode(input.getBytes(StandardCharsets.US_ASCII))); } + /** + * base64 url decode a byte array + * @param input the input byte array to decode + * @return the decoded byte array + */ public static byte[] base64UrlDecode(byte[] input) { return Base64.getUrlDecoder().decode(input); } + /** + * get a string from a base64 url encoded byte array + * @param input the input string to decode + * @return the decoded string + */ public static String fromBase64Url(String input) { return new String(base64UrlDecode(input.getBytes(StandardCharsets.US_ASCII))); } diff --git a/src/main/java/io/nats/client/support/NatsJetStreamClientError.java b/src/main/java/io/nats/client/support/NatsJetStreamClientError.java index ae43d7860..f563eff4c 100644 --- a/src/main/java/io/nats/client/support/NatsJetStreamClientError.java +++ b/src/main/java/io/nats/client/support/NatsJetStreamClientError.java @@ -73,6 +73,8 @@ public class NatsJetStreamClientError { @Deprecated // More comprehensive name public static final NatsJetStreamClientError JsConsumerCantUseNameBefore290 = new NatsJetStreamClientError(CON, 90301, "Name field not valid against pre v2.9.0 servers."); + + @Deprecated // More comprehensive name public static final NatsJetStreamClientError JsSoOrderedRequiresMaxDeliver = new NatsJetStreamClientError(SO, 90109, "Max Deliver is limited to 1 with an ordered consumer."); private final String id; diff --git a/src/main/java/io/nats/client/support/Validator.java b/src/main/java/io/nats/client/support/Validator.java index 1ef86a433..c4aeb9745 100644 --- a/src/main/java/io/nats/client/support/Validator.java +++ b/src/main/java/io/nats/client/support/Validator.java @@ -133,7 +133,7 @@ public static String required(String s, String label) { public static T required(T o, String label) { if (o == null) { - throw new IllegalArgumentException(label + " cannot be null or empty."); + throw new IllegalArgumentException(label + " cannot be null."); } return o; } diff --git a/src/test/java/io/nats/client/impl/HeadersTests.java b/src/test/java/io/nats/client/impl/HeadersTests.java index 5f3d3d484..44d7cfc1c 100644 --- a/src/test/java/io/nats/client/impl/HeadersTests.java +++ b/src/test/java/io/nats/client/impl/HeadersTests.java @@ -753,4 +753,24 @@ public void testTokenSamePoint() { public void testToString() { assertNotNull(new Status(1, "msg").toString()); // COVERAGE } + + @Test + public void put_map_works() { + Map> map = new HashMap<>(); + map.put(KEY1, Collections.singletonList(VAL1)); + map.put(KEY2, Arrays.asList(VAL2, VAL3)); + Headers h = new Headers(); + h.put(map); + + assertEquals(2, h.size()); + assertTrue(h.containsKey(KEY1)); + assertTrue(h.containsKey(KEY2)); + assertEquals(1, h.get(KEY1).size()); + assertEquals(2, h.get(KEY2).size()); + assertTrue(h.get(KEY1).contains(VAL1)); + assertEquals(VAL1, h.getFirst(KEY1)); + assertTrue(h.get(KEY2).contains(VAL2)); + assertTrue(h.get(KEY2).contains(VAL3)); + assertEquals(VAL2, h.getFirst(KEY2)); + } } diff --git a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java index b56cae592..22c202a30 100644 --- a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java @@ -261,11 +261,16 @@ public void testAddUpdateStreamInvalids() throws Exception { .build(); assertThrows(JetStreamApiException.class, () -> jsm.updateStream(scMaxCon)); - // cannot change RetentionPolicy StreamConfiguration scReten = getTestStreamConfigurationBuilder() - .retentionPolicy(RetentionPolicy.Interest) - .build(); - assertThrows(JetStreamApiException.class, () -> jsm.updateStream(scReten)); + .retentionPolicy(RetentionPolicy.Interest) + .build(); + if (nc.getServerInfo().isOlderThanVersion("2.10")) { + // cannot change RetentionPolicy + assertThrows(JetStreamApiException.class, () -> jsm.updateStream(scReten)); + } + else { + jsm.updateStream(scReten); + } jsm.deleteStream(STREAM); diff --git a/src/test/java/io/nats/client/impl/JetStreamPushTests.java b/src/test/java/io/nats/client/impl/JetStreamPushTests.java index 428f4228f..3c5df505d 100644 --- a/src/test/java/io/nats/client/impl/JetStreamPushTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamPushTests.java @@ -316,6 +316,7 @@ private void assertCantPullOnPushSub(JetStreamSubscription sub) { assertThrows(IllegalStateException.class, () -> sub.fetch(1, Duration.ofSeconds(1))); assertThrows(IllegalStateException.class, () -> sub.iterate(1, 1000)); assertThrows(IllegalStateException.class, () -> sub.iterate(1, Duration.ofSeconds(1))); + assertThrows(IllegalStateException.class, () -> sub.reader(1, 2)); } @Test diff --git a/src/test/java/io/nats/client/impl/SimplificationTests.java b/src/test/java/io/nats/client/impl/SimplificationTests.java index 904485482..60c614481 100644 --- a/src/test/java/io/nats/client/impl/SimplificationTests.java +++ b/src/test/java/io/nats/client/impl/SimplificationTests.java @@ -19,7 +19,9 @@ import java.io.IOException; import java.time.Duration; +import java.time.ZonedDateTime; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -29,36 +31,36 @@ public class SimplificationTests extends JetStreamTestBase { - private boolean runTest(ServerInfo si) { + private boolean mustBeAtLeast291(ServerInfo si) { return si.isSameOrNewerThanVersion("2.9.1"); } @Test public void testStreamContext() throws Exception { - runInJsServer(this::runTest, nc -> { + runInJsServer(this::mustBeAtLeast291, nc -> { JetStreamManagement jsm = nc.jetStreamManagement(); JetStream js = nc.jetStream(); - assertThrows(JetStreamApiException.class, () -> nc.streamContext(STREAM)); - assertThrows(JetStreamApiException.class, () -> nc.streamContext(STREAM, JetStreamOptions.DEFAULT_JS_OPTIONS)); - assertThrows(JetStreamApiException.class, () -> js.streamContext(STREAM)); + assertThrows(JetStreamApiException.class, () -> nc.getStreamContext(STREAM)); + assertThrows(JetStreamApiException.class, () -> nc.getStreamContext(STREAM, JetStreamOptions.DEFAULT_JS_OPTIONS)); + assertThrows(JetStreamApiException.class, () -> js.getStreamContext(STREAM)); createMemoryStream(jsm, STREAM, SUBJECT); - StreamContext streamContext = nc.streamContext(STREAM); + StreamContext streamContext = nc.getStreamContext(STREAM); assertEquals(STREAM, streamContext.getStreamName()); _testStreamContext(js, streamContext); jsm.deleteStream(STREAM); createMemoryStream(jsm, STREAM, SUBJECT); - streamContext = js.streamContext(STREAM); + streamContext = js.getStreamContext(STREAM); assertEquals(STREAM, streamContext.getStreamName()); _testStreamContext(js, streamContext); }); } private static void _testStreamContext(JetStream js, StreamContext streamContext) throws IOException, JetStreamApiException { - assertThrows(JetStreamApiException.class, () -> streamContext.consumerContext(DURABLE)); + assertThrows(JetStreamApiException.class, () -> streamContext.getConsumerContext(DURABLE)); assertThrows(JetStreamApiException.class, () -> streamContext.deleteConsumer(DURABLE)); ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(DURABLE).build(); @@ -75,10 +77,10 @@ private static void _testStreamContext(JetStream js, StreamContext streamContext assertEquals(1, streamContext.getConsumerNames().size()); assertEquals(1, streamContext.getConsumers().size()); - assertNotNull(streamContext.consumerContext(DURABLE)); + assertNotNull(streamContext.getConsumerContext(DURABLE)); streamContext.deleteConsumer(DURABLE); - assertThrows(JetStreamApiException.class, () -> streamContext.consumerContext(DURABLE)); + assertThrows(JetStreamApiException.class, () -> streamContext.getConsumerContext(DURABLE)); assertThrows(JetStreamApiException.class, () -> streamContext.deleteConsumer(DURABLE)); // coverage @@ -119,7 +121,7 @@ private static void _testStreamContext(JetStream js, StreamContext streamContext @Test public void testFetch() throws Exception { - runInJsServer(this::runTest, nc -> { + runInJsServer(this::mustBeAtLeast291, nc -> { createDefaultTestStream(nc); JetStream js = nc.jetStream(); for (int x = 1; x <= 20; x++) { @@ -140,11 +142,6 @@ public void testFetch() throws Exception { // 1D. simple-consumer-40msgs was created in 1C and has no messages available _testFetch("1D", nc, 40, 0, 40); - // don't test bytes before 2.9.1 - if (nc.getServerInfo().isOlderThanVersion("2.9.1")) { - return; - } - // 2. Different max bytes sizes demonstrate expiration behavior // - each test message is approximately 100 bytes @@ -170,7 +167,7 @@ private static void _testFetch(String label, Connection nc, int maxMessages, int jsm.addOrUpdateConsumer(STREAM, cc); // Consumer[Context] - ConsumerContext consumerContext = js.consumerContext(STREAM, name); + ConsumerContext consumerContext = js.getConsumerContext(STREAM, name); // Custom consume options FetchConsumeOptions.Builder builder = FetchConsumeOptions.builder().expiresIn(2000); @@ -228,7 +225,7 @@ private static String generateConsumerName(int maxMessages, int maxBytes) { @Test public void testIterableConsumer() throws Exception { - runInJsServer(this::runTest, nc -> { + runInJsServer(this::mustBeAtLeast291, nc -> { JetStreamManagement jsm = nc.jetStreamManagement(); createDefaultTestStream(jsm); @@ -239,33 +236,34 @@ public void testIterableConsumer() throws Exception { jsm.addOrUpdateConsumer(STREAM, cc); // Consumer[Context] - ConsumerContext consumerContext = js.consumerContext(STREAM, DURABLE); + ConsumerContext consumerContext = js.getConsumerContext(STREAM, DURABLE); int stopCount = 500; // create the consumer then use it - try (IterableConsumer consumer = consumerContext.consume()) { + try (IterableConsumer consumer = consumerContext.iterate()) { _testIterable(js, stopCount, consumer); } // coverage - IterableConsumer consumer = consumerContext.consume(ConsumeOptions.DEFAULT_CONSUME_OPTIONS); + IterableConsumer consumer = consumerContext.iterate(ConsumeOptions.DEFAULT_CONSUME_OPTIONS); consumer.close(); - assertThrows(IllegalArgumentException.class, () -> consumerContext.consume((ConsumeOptions) null)); + assertThrows(IllegalArgumentException.class, () -> consumerContext.iterate((ConsumeOptions) null)); }); } @Test public void testOrderedIterableConsumerBasic() throws Exception { - runInJsServer(this::runTest, nc -> { + runInJsServer(this::mustBeAtLeast291, nc -> { JetStreamManagement jsm = nc.jetStreamManagement(); - - createDefaultTestStream(jsm); JetStream js = nc.jetStream(); - StreamContext sc = nc.streamContext(STREAM); + createDefaultTestStream(jsm); + + StreamContext sc = nc.getStreamContext(STREAM); int stopCount = 500; - OrderedConsumerConfig occ = new OrderedConsumerConfig().filterSubject(SUBJECT); - try (IterableConsumer consumer = sc.orderedConsume(occ)) { + OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(SUBJECT); + OrderedConsumerContext ctx = sc.createOrderedConsumer(occ); + try (IterableConsumer consumer = ctx.iterate()) { _testIterable(js, stopCount, consumer); } }); @@ -284,7 +282,7 @@ private static void _testIterable(JetStream js, int stopCount, IterableConsumer } Thread.sleep(50); // allows more messages to come across - consumer.stop(200); + consumer.stop(); Message msg = consumer.nextMessage(1000); while (msg != null) { @@ -312,7 +310,7 @@ private static void _testIterable(JetStream js, int stopCount, IterableConsumer @Test public void testConsumeWithHandler() throws Exception { - runInJsServer(this::runTest, nc -> { + runInJsServer(this::mustBeAtLeast291, nc -> { JetStreamManagement jsm = nc.jetStreamManagement(); createDefaultTestStream(jsm); @@ -324,7 +322,7 @@ public void testConsumeWithHandler() throws Exception { jsm.addOrUpdateConsumer(STREAM, cc); // Consumer[Context] - ConsumerContext consumerContext = js.consumerContext(STREAM, NAME); + ConsumerContext consumerContext = js.getConsumerContext(STREAM, NAME); int stopCount = 500; @@ -339,7 +337,7 @@ public void testConsumeWithHandler() throws Exception { try (MessageConsumer consumer = consumerContext.consume(handler)) { latch.await(); - consumer.stop(200); + consumer.stop(); assertTrue(atomicCount.get() > 500); } }); @@ -347,11 +345,11 @@ public void testConsumeWithHandler() throws Exception { @Test public void testNext() throws Exception { - runInJsServer(this::runTest, nc -> { + runInJsServer(this::mustBeAtLeast291, nc -> { JetStreamManagement jsm = nc.jetStreamManagement(); + JetStream js = nc.jetStream(); createDefaultTestStream(jsm); - JetStream js = nc.jetStream(); jsPublish(js, SUBJECT, 4); // Pre define a consumer @@ -359,7 +357,7 @@ public void testNext() throws Exception { jsm.addOrUpdateConsumer(STREAM, cc); // Consumer[Context] - ConsumerContext consumerContext = js.consumerContext(STREAM, NAME); + ConsumerContext consumerContext = js.getConsumerContext(STREAM, NAME); assertThrows(IllegalArgumentException.class, () -> consumerContext.next(1)); assertNotNull(consumerContext.next(1000)); @@ -372,7 +370,7 @@ public void testNext() throws Exception { @Test public void testCoverage() throws Exception { - runInJsServer(this::runTest, nc -> { + runInJsServer(this::mustBeAtLeast291, nc -> { JetStreamManagement jsm = nc.jetStreamManagement(); createDefaultTestStream(jsm); @@ -385,22 +383,22 @@ public void testCoverage() throws Exception { jsm.addOrUpdateConsumer(STREAM, ConsumerConfiguration.builder().durable(name(4)).build()); // Stream[Context] - StreamContext sctx1 = nc.streamContext(STREAM); - nc.streamContext(STREAM, JetStreamOptions.DEFAULT_JS_OPTIONS); - js.streamContext(STREAM); + StreamContext sctx1 = nc.getStreamContext(STREAM); + nc.getStreamContext(STREAM, JetStreamOptions.DEFAULT_JS_OPTIONS); + js.getStreamContext(STREAM); // Consumer[Context] - ConsumerContext cctx1 = nc.consumerContext(STREAM, name(1)); - ConsumerContext cctx2 = nc.consumerContext(STREAM, name(2), JetStreamOptions.DEFAULT_JS_OPTIONS); - ConsumerContext cctx3 = js.consumerContext(STREAM, name(3)); - ConsumerContext cctx4 = sctx1.consumerContext(name(4)); + ConsumerContext cctx1 = nc.getConsumerContext(STREAM, name(1)); + ConsumerContext cctx2 = nc.getConsumerContext(STREAM, name(2), JetStreamOptions.DEFAULT_JS_OPTIONS); + ConsumerContext cctx3 = js.getConsumerContext(STREAM, name(3)); + ConsumerContext cctx4 = sctx1.getConsumerContext(name(4)); ConsumerContext cctx5 = sctx1.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(name(5)).build()); ConsumerContext cctx6 = sctx1.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(name(6)).build()); - closeConsumer(cctx1.consume(), name(1), true); - closeConsumer(cctx2.consume(ConsumeOptions.DEFAULT_CONSUME_OPTIONS), name(2), true); + closeConsumer(cctx1.iterate(), name(1), true); + closeConsumer(cctx2.iterate(ConsumeOptions.DEFAULT_CONSUME_OPTIONS), name(2), true); closeConsumer(cctx3.consume(m -> {}), name(3), true); - closeConsumer(cctx4.consume(m -> {}, ConsumeOptions.DEFAULT_CONSUME_OPTIONS), name(4), true); + closeConsumer(cctx4.consume(ConsumeOptions.DEFAULT_CONSUME_OPTIONS, m -> {}), name(4), true); closeConsumer(cctx5.fetchMessages(1), name(5), false); closeConsumer(cctx6.fetchBytes(1000), name(6), false); }); @@ -410,7 +408,7 @@ private void closeConsumer(MessageConsumer con, String name, boolean doStop) thr ConsumerInfo ci = con.getConsumerInfo(); assertEquals(name, ci.getName()); if (doStop) { - con.stop(100); + con.stop(); } } @@ -418,7 +416,7 @@ private void closeConsumer(MessageConsumer con, String name, boolean doStop) thr public void testFetchConsumeOptionsBuilder() { FetchConsumeOptions fco = FetchConsumeOptions.builder().build(); assertEquals(DEFAULT_MESSAGE_COUNT, fco.getMaxMessages()); - assertEquals(DEFAULT_EXPIRES_IN_MILLIS, fco.getExpiresIn()); + assertEquals(DEFAULT_EXPIRES_IN_MILLIS, fco.getExpiresInMillis()); assertEquals(DEFAULT_THRESHOLD_PERCENT, fco.getThresholdPercent()); assertEquals(0, fco.getMaxBytes()); assertEquals(DEFAULT_EXPIRES_IN_MILLIS * MAX_IDLE_HEARTBEAT_PERCENT / 100, fco.getIdleHeartbeat()); @@ -448,7 +446,7 @@ public void testFetchConsumeOptionsBuilder() { public void testConsumeOptionsBuilder() { ConsumeOptions co = ConsumeOptions.builder().build(); assertEquals(DEFAULT_MESSAGE_COUNT, co.getBatchSize()); - assertEquals(DEFAULT_EXPIRES_IN_MILLIS, co.getExpiresIn()); + assertEquals(DEFAULT_EXPIRES_IN_MILLIS, co.getExpiresInMillis()); assertEquals(DEFAULT_THRESHOLD_PERCENT, co.getThresholdPercent()); assertEquals(0, co.getBatchBytes()); assertEquals(DEFAULT_EXPIRES_IN_MILLIS * MAX_IDLE_HEARTBEAT_PERCENT / 100, co.getIdleHeartbeat()); @@ -487,20 +485,20 @@ public void testConsumeOptionsBuilder() { assertEquals(100, co.getThresholdPercent()); co = ConsumeOptions.builder().expiresIn(0).build(); - assertEquals(DEFAULT_EXPIRES_IN_MILLIS, co.getExpiresIn()); + assertEquals(DEFAULT_EXPIRES_IN_MILLIS, co.getExpiresInMillis()); co = ConsumeOptions.builder().expiresIn(-1).build(); - assertEquals(DEFAULT_EXPIRES_IN_MILLIS, co.getExpiresIn()); + assertEquals(DEFAULT_EXPIRES_IN_MILLIS, co.getExpiresInMillis()); co = ConsumeOptions.builder().expiresIn(-999).build(); - assertEquals(DEFAULT_EXPIRES_IN_MILLIS, co.getExpiresIn()); + assertEquals(DEFAULT_EXPIRES_IN_MILLIS, co.getExpiresInMillis()); assertThrows(IllegalArgumentException.class, () -> ConsumeOptions.builder().expiresIn(MIN_EXPIRES_MILLS - 1).build()); } - public static class PullOrderedTestDropSimulator extends PullOrderedMessageManager { - public PullOrderedTestDropSimulator(NatsConnection conn, NatsJetStream js, String stream, SubscribeOptions so, ConsumerConfiguration serverCC, boolean queueMode, boolean syncMode) { + public static class OrderedPullTestDropSimulator extends OrderedPullMessageManager { + public OrderedPullTestDropSimulator(NatsConnection conn, NatsJetStream js, String stream, SubscribeOptions so, ConsumerConfiguration serverCC, boolean queueMode, boolean syncMode) { super(conn, js, stream, so, serverCC, syncMode); } @@ -518,51 +516,109 @@ protected Boolean beforeQueueProcessorImpl(NatsMessage msg) { } } + public static class OrderedPullNextTestDropSimulator extends OrderedPullMessageManager { + public OrderedPullNextTestDropSimulator(NatsConnection conn, NatsJetStream js, String stream, SubscribeOptions so, ConsumerConfiguration serverCC, boolean queueMode, boolean syncMode) { + super(conn, js, stream, so, serverCC, syncMode); + } + + // these have to be static or the test keeps repeating + static boolean ss2 = true; + static boolean ss5 = true; + + @Override + protected Boolean beforeQueueProcessorImpl(NatsMessage msg) { + if (msg.isJetStream()) { + long ss = msg.metaData().streamSequence(); + if (ss == 2 && ss2) { + ss2 = false; + return false; + } + if (ss == 5 && ss5) { + ss5 = false; + return false; + } + } + + return super.beforeQueueProcessorImpl(msg); + } + } + @Test - public void testOrderedIterable() throws Exception { - runInJsServer(this::runTest, nc -> { + public void testOrderedActives() throws Exception { + runInJsServer(this::mustBeAtLeast291, nc -> { // Setup JetStream js = nc.jetStream(); JetStreamManagement jsm = nc.jetStreamManagement(); - String subject = subject(111); - createMemoryStream(jsm, stream(111), subject); + String stream = stream("soa"); + String subject = subject("uoa"); + createMemoryStream(jsm, stream, subject); - StreamContext sc = js.streamContext(stream(111)); + StreamContext sc = js.getStreamContext(stream); - // Get this in place before any subscriptions are made - ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new; + jsPublish(js, subject, 101, 6); - // Published messages will be intercepted by the OrderedTestDropSimulator - new Thread(() -> { - try { - Thread.sleep(1000); // give the consumer time to get setup before publishing - jsPublish(js, subject, 101, 6); - } - catch (Exception e) { - throw new RuntimeException(e); + OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(subject); + // Get this in place before subscriptions are made + ((NatsJetStream)js)._pullOrderedMessageManagerFactory = OrderedPullNextTestDropSimulator::new; + testOrderedActiveNext(sc, occ); + + // Get this in place before subscriptions are made + ((NatsJetStream)js)._pullOrderedMessageManagerFactory = OrderedPullTestDropSimulator::new; + testOrderedActiveFetch(sc, occ); + testOrderedActiveIterable(sc, occ); + }); + } + + private static void testOrderedActiveNext(StreamContext sc, OrderedConsumerConfiguration occ) throws Exception { + OrderedConsumerContext ctx = sc.createOrderedConsumer(occ); + // Loop through the messages to make sure I get stream sequence 1 to 6 + int expectedStreamSeq = 1; + while (expectedStreamSeq <= 6) { + Message m = ctx.next(1000); + if (m != null) { + assertEquals(expectedStreamSeq, m.metaData().streamSequence()); + assertEquals(1, m.metaData().consumerSequence()); + ++expectedStreamSeq; + } + } + } + + private static void testOrderedActiveFetch(StreamContext sc, OrderedConsumerConfiguration occ) throws Exception { + OrderedConsumerContext ctx = sc.createOrderedConsumer(occ); + try (FetchConsumer fcon = ctx.fetchMessages(6)) { + // Loop through the messages to make sure I get stream sequence 1 to 6 + int expectedStreamSeq = 1; + while (expectedStreamSeq <= 6) { + Message m = fcon.nextMessage(); + if (m != null) { + assertEquals(expectedStreamSeq, m.metaData().streamSequence()); + assertEquals(EXPECTED_CON_SEQ_NUMS[expectedStreamSeq-1], m.metaData().consumerSequence()); + ++expectedStreamSeq; } - }).start(); + } + } + } - OrderedConsumerConfig occ = new OrderedConsumerConfig().filterSubject(subject); - try (IterableConsumer icon = sc.orderedConsume(occ)) { - // Loop through the messages to make sure I get stream sequence 1 to 6 - int expectedStreamSeq = 1; - while (expectedStreamSeq <= 6) { - Message m = icon.nextMessage(Duration.ofSeconds(1)); // use duration version here for coverage - if (m != null) { - assertEquals(expectedStreamSeq, m.metaData().streamSequence()); - assertEquals(EXPECTED_CON_SEQ_NUMS[expectedStreamSeq-1], m.metaData().consumerSequence()); - ++expectedStreamSeq; - } + private static void testOrderedActiveIterable(StreamContext sc, OrderedConsumerConfiguration occ) throws Exception { + OrderedConsumerContext ctx = sc.createOrderedConsumer(occ); + try (IterableConsumer icon = ctx.iterate()) { + // Loop through the messages to make sure I get stream sequence 1 to 6 + int expectedStreamSeq = 1; + while (expectedStreamSeq <= 6) { + Message m = icon.nextMessage(Duration.ofSeconds(1)); // use duration version here for coverage + if (m != null) { + assertEquals(expectedStreamSeq, m.metaData().streamSequence()); + assertEquals(EXPECTED_CON_SEQ_NUMS[expectedStreamSeq-1], m.metaData().consumerSequence()); + ++expectedStreamSeq; } } - }); + } } @Test public void testOrderedConsume() throws Exception { - runInJsServer(this::runTest, nc -> { + runInJsServer(this::mustBeAtLeast291, nc -> { // Setup JetStream js = nc.jetStream(); JetStreamManagement jsm = nc.jetStreamManagement(); @@ -570,10 +626,10 @@ public void testOrderedConsume() throws Exception { String subject = subject(222); createMemoryStream(jsm, stream(222), subject); - StreamContext sc = js.streamContext(stream(222)); + StreamContext sc = js.getStreamContext(stream(222)); - // Get this in place before any subscriptions are made - ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new; + // Get this in place before subscriptions are made + ((NatsJetStream)js)._pullOrderedMessageManagerFactory = OrderedPullTestDropSimulator::new; CountDownLatch msgLatch = new CountDownLatch(6); AtomicInteger received = new AtomicInteger(); @@ -586,8 +642,9 @@ public void testOrderedConsume() throws Exception { msgLatch.countDown(); }; - OrderedConsumerConfig occ = new OrderedConsumerConfig().filterSubject(subject); - try (MessageConsumer mcon = sc.orderedConsume(occ, handler)) { + OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(subject); + OrderedConsumerContext ctx = sc.createOrderedConsumer(occ); + try (MessageConsumer mcon = ctx.consume(handler)) { jsPublish(js, subject, 201, 6); // wait for the messages @@ -604,4 +661,164 @@ public void testOrderedConsume() throws Exception { } }); } + + @Test + public void testOrderedMultipleWays() throws Exception { + runInJsServer(this::mustBeAtLeast291, nc -> { + // Setup + JetStream js = nc.jetStream(); + JetStreamManagement jsm = nc.jetStreamManagement(); + + String subject = subject(333); + createMemoryStream(jsm, stream(333), subject); + + StreamContext sc = js.getStreamContext(stream(333)); + + OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(subject); + OrderedConsumerContext ctx = sc.createOrderedConsumer(occ); + + // can't do others while doing next + CountDownLatch latch = new CountDownLatch(1); + new Thread(() -> { + try { + // make sure there is enough time to call other methods. + assertNull(ctx.next(2000)); + } + catch (Exception e) { + throw new RuntimeException(e); + } + finally { + latch.countDown(); + } + }).start(); + + Thread.sleep(100); // make sure there is enough time for the thread to start and get into the next method + validateCantCallOtherMethods(ctx); + + //noinspection ResultOfMethodCallIgnored + latch.await(3000, TimeUnit.MILLISECONDS); + + for (int x = 0 ; x < 10_000; x++) { + js.publish(subject, ("multiple" + x).getBytes()); + } + + // can do others now + Message m = ctx.next(1000); + assertNotNull(m); + assertEquals(1, m.metaData().streamSequence()); + + // can't do others while doing next + int seq = 2; + try (FetchConsumer fc = ctx.fetchMessages(5)) { + while (seq <= 6) { + m = fc.nextMessage(); + assertNotNull(m); + assertEquals(seq, m.metaData().streamSequence()); + assertFalse(fc.isFinished()); + validateCantCallOtherMethods(ctx); + seq++; + } + + m = fc.nextMessage(); + assertNull(m); + assertTrue(fc.isFinished()); + } + + // can do others now + m = ctx.next(1000); + assertNotNull(m); + assertEquals(seq++, m.metaData().streamSequence()); + + // can't do others while doing iterate + ConsumeOptions copts = ConsumeOptions.builder().batchSize(10).build(); + try (IterableConsumer ic = ctx.iterate(copts)) { + ic.stop(); + m = ic.nextMessage(1000); + while (m != null) { + assertEquals(seq, m.metaData().streamSequence()); + if (!ic.isFinished()) { + validateCantCallOtherMethods(ctx); + } + ++seq; + m = ic.nextMessage(1000); + } + } + + // can do others now + m = ctx.next(1000); + assertNotNull(m); + assertEquals(seq++, m.metaData().streamSequence()); + + int last = Math.min(seq + 10, 9999); + try (FetchConsumer fc = ctx.fetchMessages(last - seq)) { + while (seq < last) { + fc.stop(); + m = fc.nextMessage(); + assertNotNull(m); + assertEquals(seq, m.metaData().streamSequence()); + assertFalse(fc.isFinished()); + validateCantCallOtherMethods(ctx); + seq++; + } + } + }); + } + + private static void validateCantCallOtherMethods(OrderedConsumerContext ctx) { + assertThrows(IOException.class, () -> ctx.next(1000)); + assertThrows(IOException.class, () -> ctx.fetchMessages(1)); + assertThrows(IOException.class, () -> ctx.consume(null)); + } + + @Test + public void testOrderedConsumerBuilder() { + OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration(); + assertEquals(">", occ.getFilterSubject()); + assertNull(occ.getDeliverPolicy()); + assertEquals(ConsumerConfiguration.LONG_UNSET, occ.getStartSequence()); + assertNull(occ.getStartTime()); + assertNull(occ.getReplayPolicy()); + assertNull(occ.getHeadersOnly()); + + // nulls + occ = new OrderedConsumerConfiguration() + .filterSubject(null) + .deliverPolicy(null) + .replayPolicy(null) + .headersOnly(null); + assertEquals(">", occ.getFilterSubject()); + assertNull(occ.getDeliverPolicy()); + assertEquals(ConsumerConfiguration.LONG_UNSET, occ.getStartSequence()); + assertNull(occ.getStartTime()); + assertNull(occ.getReplayPolicy()); + assertNull(occ.getHeadersOnly()); + + // values that set to default + occ = new OrderedConsumerConfiguration() + .filterSubject("") + .startSequence(-42) + .headersOnly(false); + assertEquals(">", occ.getFilterSubject()); + assertNull(occ.getDeliverPolicy()); + assertEquals(ConsumerConfiguration.LONG_UNSET, occ.getStartSequence()); + assertNull(occ.getStartTime()); + assertNull(occ.getReplayPolicy()); + assertNull(occ.getHeadersOnly()); + + // values + ZonedDateTime zdt = ZonedDateTime.now(); + occ = new OrderedConsumerConfiguration() + .filterSubject("fs") + .deliverPolicy(DeliverPolicy.All) + .startSequence(42) + .startTime(zdt) + .replayPolicy(ReplayPolicy.Original) + .headersOnly(true); + assertEquals("fs", occ.getFilterSubject()); + assertEquals(DeliverPolicy.All, occ.getDeliverPolicy()); + assertEquals(42, occ.getStartSequence()); + assertEquals(zdt, occ.getStartTime()); + assertEquals(ReplayPolicy.Original, occ.getReplayPolicy()); + assertTrue(occ.getHeadersOnly()); + } } diff --git a/src/test/java/io/nats/client/utils/TestBase.java b/src/test/java/io/nats/client/utils/TestBase.java index 24f6dd742..7f7d1ebe1 100644 --- a/src/test/java/io/nats/client/utils/TestBase.java +++ b/src/test/java/io/nats/client/utils/TestBase.java @@ -237,64 +237,64 @@ private static void cleanupJs(Connection c) public static final String KEY = "key"; public static final String DATA = "data"; - public static String stream(int seq) { - return STREAM + "-" + seq; + public static String stream(Object variant) { + return STREAM + "-" + variant; } - public static String mirror(int seq) { - return MIRROR + "-" + seq; + public static String mirror(Object variant) { + return MIRROR + "-" + variant; } - public static String source(int seq) { - return SOURCE + "-" + seq; + public static String source(Object variant) { + return SOURCE + "-" + variant; } - public static String subject(int seq) { - return SUBJECT + "-" + seq; + public static String subject(Object variant) { + return SUBJECT + "-" + variant; } public static String subjectDot(String field) { return SUBJECT + DOT + field; } - public static String queue(int seq) { - return QUEUE + "-" + seq; + public static String queue(Object variant) { + return QUEUE + "-" + variant; } - public static String durable(int seq) { - return DURABLE + "-" + seq; + public static String durable(Object variant) { + return DURABLE + "-" + variant; } - public static String durable(String vary, int seq) { - return DURABLE + "-" + vary + "-" + seq; + public static String durable(String vary, Object variant) { + return DURABLE + "-" + vary + "-" + variant; } - public static String name(int seq) { - return NAME + "-" + seq; + public static String name(Object variant) { + return NAME + "-" + variant; } - public static String deliver(int seq) { - return DELIVER + "-" + seq; + public static String deliver(Object variant) { + return DELIVER + "-" + variant; } - public static String bucket(int seq) { - return BUCKET + "-" + seq; + public static String bucket(Object variant) { + return BUCKET + "-" + variant; } - public static String key(int seq) { - return KEY + "-" + seq; + public static String key(Object variant) { + return KEY + "-" + variant; } - public static String messageId(int seq) { - return MESSAGE_ID + "-" + seq; + public static String messageId(Object variant) { + return MESSAGE_ID + "-" + variant; } - public static String data(int seq) { - return DATA + "-" + seq; + public static String data(Object variant) { + return DATA + "-" + variant; } - public static byte[] dataBytes(int seq) { - return data(seq).getBytes(StandardCharsets.US_ASCII); + public static byte[] dataBytes(Object variant) { + return data(variant).getBytes(StandardCharsets.US_ASCII); } public static NatsMessage getDataMessage(String data) { diff --git a/src/test/java/io/nats/service/ServiceTests.java b/src/test/java/io/nats/service/ServiceTests.java index 102d9ec29..006fac662 100644 --- a/src/test/java/io/nats/service/ServiceTests.java +++ b/src/test/java/io/nats/service/ServiceTests.java @@ -553,7 +553,7 @@ public void testServiceBuilderConstruction() { IllegalArgumentException iae = assertThrows(IllegalArgumentException.class, () -> Service.builder().name(NAME).version("1.0.0").addServiceEndpoint(se).build()); - assertTrue(iae.getMessage().contains("Connection cannot be null or empty")); + assertTrue(iae.getMessage().contains("Connection cannot be null")); iae = assertThrows(IllegalArgumentException.class, () -> Service.builder().connection(conn).version("1.0.0").addServiceEndpoint(se).build());