diff --git a/src/examples/java/io/nats/examples/jetstream/simple/ContextExample.java b/src/examples/java/io/nats/examples/jetstream/simple/ContextExample.java index 473d59fda..3f1e97479 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/ContextExample.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/ContextExample.java @@ -62,7 +62,7 @@ public static void main(String[] args) { System.out.println("C3. " + consumerContext.getCachedConsumerInfo()); // get a ConsumerContext from the stream context for a pre-existing consumer - consumerContext = streamContext.consumerContext(CONSUMER_NAME); + consumerContext = streamContext.createConsumerContext(CONSUMER_NAME); System.out.println("C4. " + consumerContext.getCachedConsumerInfo()); } catch (JetStreamApiException | IOException | InterruptedException ioe) { diff --git a/src/examples/java/io/nats/examples/jetstream/simple/IterableConsumerExample.java b/src/examples/java/io/nats/examples/jetstream/simple/IterableConsumerExample.java index 79cb990d0..3eabd5760 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/IterableConsumerExample.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/IterableConsumerExample.java @@ -61,7 +61,7 @@ public static void main(String[] args) { Thread consumeThread = new Thread(() -> { int count = 0; long start = System.currentTimeMillis(); - try (IterableConsumer consumer = consumerContext.startIterate()) { + try (IterableConsumer consumer = consumerContext.iterate()) { System.out.println("Starting main loop."); while (count < STOP_COUNT) { Message msg = consumer.nextMessage(1000); diff --git a/src/examples/java/io/nats/examples/jetstream/simple/MessageConsumerExample.java b/src/examples/java/io/nats/examples/jetstream/simple/MessageConsumerExample.java index 949482f4b..8021cb34f 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/MessageConsumerExample.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/MessageConsumerExample.java @@ -53,7 +53,7 @@ public static void main(String[] args) { try { streamContext = nc.streamContext(STREAM); streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build()); - consumerContext = streamContext.consumerContext(CONSUMER_NAME); + consumerContext = streamContext.createConsumerContext(CONSUMER_NAME); } catch (JetStreamApiException | IOException e) { // JetStreamApiException: @@ -79,7 +79,7 @@ public static void main(String[] args) { // create the consumer then use it try { - MessageConsumer consumer = consumerContext.startConsume(handler); + MessageConsumer consumer = consumerContext.consume(handler); latch.await(); // once the consumer is stopped, the client will drain messages System.out.println("Stop the consumer..."); diff --git a/src/examples/java/io/nats/examples/jetstream/simple/QueueExample.java b/src/examples/java/io/nats/examples/jetstream/simple/QueueExample.java index 0ca78d153..b4fc3a1ce 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/QueueExample.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/QueueExample.java @@ -128,7 +128,7 @@ static class HandlerConsumerHolder extends ConsumerHolder { public HandlerConsumerHolder(int id, StreamContext sc, CountDownLatch latch) throws JetStreamApiException, IOException { super(id, sc, latch); - messageConsumer = consumerContext.startConsume(msg -> { + messageConsumer = consumerContext.consume(msg -> { thisReceived.incrementAndGet(); latch.countDown(); String data = new String(msg.getData(), StandardCharsets.US_ASCII); @@ -150,7 +150,7 @@ static class IterableConsumerHolder extends ConsumerHolder { public IterableConsumerHolder(int id, StreamContext sc, CountDownLatch latch) throws JetStreamApiException, IOException { super(id, sc, latch); - iterableConsumer = consumerContext.startIterate(); + iterableConsumer = consumerContext.iterate(); t = new Thread(() -> { while (latch.getCount() > 0) { try { @@ -187,7 +187,7 @@ public ConsumerHolder(int id, StreamContext sc, CountDownLatch latch) throws Jet this.id = id; thisReceived = new AtomicInteger(); this.latch = latch; - consumerContext = sc.consumerContext(CONSUMER_NAME); + consumerContext = sc.createConsumerContext(CONSUMER_NAME); } public abstract void stop() throws InterruptedException; diff --git a/src/main/java/io/nats/client/BaseConsumeOptions.java b/src/main/java/io/nats/client/BaseConsumeOptions.java index 4e00964df..f71c1c616 100644 --- a/src/main/java/io/nats/client/BaseConsumeOptions.java +++ b/src/main/java/io/nats/client/BaseConsumeOptions.java @@ -51,7 +51,7 @@ protected BaseConsumeOptions(Builder b) { idleHeartbeat = Math.min(MAX_HEARTBEAT_MILLIS, expiresIn * MAX_IDLE_HEARTBEAT_PERCENT / 100); } - public long getExpiresIn() { + public long getExpiresInMillis() { return expiresIn; } diff --git a/src/main/java/io/nats/client/ConsumerContext.java b/src/main/java/io/nats/client/ConsumerContext.java index 06bb991e1..3bdaf9acc 100644 --- a/src/main/java/io/nats/client/ConsumerContext.java +++ b/src/main/java/io/nats/client/ConsumerContext.java @@ -121,7 +121,7 @@ public interface ConsumerContext { * server such as timeout or interruption * @throws JetStreamApiException the request had an error related to the data */ - IterableConsumer startIterate() throws IOException, JetStreamApiException; + IterableConsumer iterate() throws IOException, JetStreamApiException; /** * Start a long-running IterableConsumer with custom ConsumeOptions. See {@link IterableConsumer} and {@link ConsumeOptions} @@ -132,7 +132,7 @@ public interface ConsumerContext { * server such as timeout or interruption * @throws JetStreamApiException the request had an error related to the data */ - IterableConsumer startIterate(ConsumeOptions consumeOptions) throws IOException, JetStreamApiException; + IterableConsumer iterate(ConsumeOptions consumeOptions) throws IOException, JetStreamApiException; /** * Start a long-running MessageConsumer with default ConsumeOptions. See {@link MessageConsumer} and {@link ConsumeOptions} @@ -142,7 +142,7 @@ public interface ConsumerContext { * server such as timeout or interruption * @throws JetStreamApiException the request had an error related to the data */ - MessageConsumer startConsume(MessageHandler handler) throws IOException, JetStreamApiException; + MessageConsumer consume(MessageHandler handler) throws IOException, JetStreamApiException; /** * Start a long-running MessageConsumer with custom ConsumeOptions. See {@link MessageConsumer} and {@link ConsumeOptions} @@ -153,5 +153,5 @@ public interface ConsumerContext { * server such as timeout or interruption * @throws JetStreamApiException the request had an error related to the data */ - MessageConsumer startConsume(MessageHandler handler, ConsumeOptions consumeOptions) throws IOException, JetStreamApiException; + MessageConsumer consume(MessageHandler handler, ConsumeOptions consumeOptions) throws IOException, JetStreamApiException; } diff --git a/src/main/java/io/nats/client/MessageConsumer.java b/src/main/java/io/nats/client/MessageConsumer.java index a98c50832..595923f41 100644 --- a/src/main/java/io/nats/client/MessageConsumer.java +++ b/src/main/java/io/nats/client/MessageConsumer.java @@ -48,4 +48,8 @@ public interface MessageConsumer extends AutoCloseable { * @throws InterruptedException if one is thrown, in order to propagate it up */ void stop(long timeout) throws InterruptedException; + + boolean isStopped(); + + boolean isFinished(); } diff --git a/src/main/java/io/nats/client/StreamContext.java b/src/main/java/io/nats/client/StreamContext.java index b32a70589..3517b3040 100644 --- a/src/main/java/io/nats/client/StreamContext.java +++ b/src/main/java/io/nats/client/StreamContext.java @@ -73,7 +73,7 @@ public interface StreamContext { PurgeResponse purge(PurgeOptions options) throws IOException, JetStreamApiException; /** - * Create a consumer context for on the context's stream and specific named consumer. + * Create a consumer context for the context's stream and specific named consumer. * Verifies that the consumer exists. * @param consumerName the name of the consumer * @return a ConsumerContext object @@ -81,7 +81,17 @@ public interface StreamContext { * server such as timeout or interruption * @throws JetStreamApiException the request had an error related to the data */ - ConsumerContext consumerContext(String consumerName) throws IOException, JetStreamApiException; + ConsumerContext createConsumerContext(String consumerName) throws IOException, JetStreamApiException; + + /** + * Create an ordered consumer context for the context's stream. + * @param config the configuration for the ordered consumer + * @return a ConsumerContext object + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + ConsumerContext createOrderedConsumer(OrderedConsumerConfiguration config) throws IOException, JetStreamApiException; /** * Management function to create or update a consumer on this stream. @@ -194,50 +204,4 @@ public interface StreamContext { * @return true if the delete succeeded */ boolean deleteMessage(long seq, boolean erase) throws IOException, JetStreamApiException; - - /** - * Start a long-running Ordered IterableConsumer. See {@link IterableConsumer} and {@link ConsumeOptions} - * IterableConsumer requires the developer call nextMessage. - * @param config the configuration for the ordered consumer - * @return the IterableConsumer instance - * @throws IOException covers various communication issues with the NATS - * server such as timeout or interruption - * @throws JetStreamApiException the request had an error related to the data - */ - IterableConsumer startOrderedIterate(OrderedConsumerConfiguration config) throws IOException, JetStreamApiException; - - /** - * Start a long-running Ordered IterableConsumer with custom ConsumeOptions. See {@link IterableConsumer} and {@link ConsumeOptions} - * IterableConsumer requires the developer call nextMessage. - * @param config the configuration for the ordered consumer - * @param consumeOptions the custom consume options - * @return the IterableConsumer instance - * @throws IOException covers various communication issues with the NATS - * server such as timeout or interruption - * @throws JetStreamApiException the request had an error related to the data - */ - IterableConsumer startOrderedIterate(OrderedConsumerConfiguration config, ConsumeOptions consumeOptions) throws IOException, JetStreamApiException; - - /** - * Start a long-running Ordered MessageConsumer. See {@link MessageConsumer} and {@link ConsumeOptions} - * @param config the configuration for the ordered consumer - * @param handler the MessageHandler used for receiving messages. - * @return the MessageConsumer instance - * @throws IOException covers various communication issues with the NATS - * server such as timeout or interruption - * @throws JetStreamApiException the request had an error related to the data - */ - MessageConsumer startOrderedConsume(OrderedConsumerConfiguration config, MessageHandler handler) throws IOException, JetStreamApiException; - - /** - * Start a long-running Ordered MessageConsumer with custom ConsumeOptions. See {@link MessageConsumer} and {@link ConsumeOptions} - * @param config the configuration for the ordered consumer - * @param handler the MessageHandler used for receiving messages. - * @param consumeOptions the custom consume options - * @return the MessageConsumer instance - * @throws IOException covers various communication issues with the NATS - * server such as timeout or interruption - * @throws JetStreamApiException the request had an error related to the data - */ - MessageConsumer startOrderedConsume(OrderedConsumerConfiguration config, MessageHandler handler, ConsumeOptions consumeOptions) throws IOException, JetStreamApiException; } diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index 0642e4294..199e8e448 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -2148,7 +2148,7 @@ public StreamContext streamContext(String streamName, JetStreamOptions options) */ @Override public ConsumerContext consumerContext(String streamName, String consumerName) throws IOException, JetStreamApiException { - return streamContext(streamName).consumerContext(consumerName); + return streamContext(streamName).createConsumerContext(consumerName); } /** @@ -2156,7 +2156,7 @@ public ConsumerContext consumerContext(String streamName, String consumerName) t */ @Override public ConsumerContext consumerContext(String streamName, String consumerName, JetStreamOptions options) throws IOException, JetStreamApiException { - return streamContext(streamName, options).consumerContext(consumerName); + return streamContext(streamName, options).createConsumerContext(consumerName); } /** diff --git a/src/main/java/io/nats/client/impl/NatsConsumerContext.java b/src/main/java/io/nats/client/impl/NatsConsumerContext.java index e6d5351f6..6064d7961 100644 --- a/src/main/java/io/nats/client/impl/NatsConsumerContext.java +++ b/src/main/java/io/nats/client/impl/NatsConsumerContext.java @@ -14,7 +14,9 @@ package io.nats.client.impl; import io.nats.client.*; +import io.nats.client.api.ConsumerConfiguration; import io.nats.client.api.ConsumerInfo; +import io.nats.client.api.OrderedConsumerConfiguration; import io.nats.client.support.Validator; import java.io.IOException; @@ -24,20 +26,108 @@ import static io.nats.client.BaseConsumeOptions.MIN_EXPIRES_MILLS; import static io.nats.client.ConsumeOptions.DEFAULT_CONSUME_OPTIONS; import static io.nats.client.impl.NatsJetStreamSubscription.EXPIRE_ADJUSTMENT; +import static io.nats.client.support.ConsumerUtils.nextOrderedConsumerConfiguration; /** * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ -public class NatsConsumerContext implements ConsumerContext { +public class NatsConsumerContext implements ConsumerContext, SimplifiedSubscriptionMaker { + private enum ConsumeType { + next, fetch, iterate, consume + } + private final Object stateLock; private final NatsStreamContext streamContext; - private final PullSubscribeOptions bindPso; - private ConsumerInfo lastConsumerInfo; + private final boolean ordered; + private final String consumerName; + private final ConsumerConfiguration originalOrderedCc; + private final String subscribeSubject; + private final PullSubscribeOptions unorderedBindPso; + + private ConsumerInfo cachedConsumerInfo; + private NatsMessageConsumerBase lastConsumer; + private long highestSeq; + private ConsumeType lastConsumerType; + private Dispatcher dispatcher; + + NatsConsumerContext(NatsStreamContext streamContext, ConsumerInfo ci) { + stateLock = new Object(); + this.streamContext = streamContext; + ordered = false; + consumerName = ci.getName(); + originalOrderedCc = null; + subscribeSubject = null; + unorderedBindPso = PullSubscribeOptions.bind(streamContext.streamName, consumerName); + cachedConsumerInfo = ci; + } - NatsConsumerContext(NatsStreamContext streamContext, ConsumerInfo ci) throws IOException { + NatsConsumerContext(NatsStreamContext streamContext, OrderedConsumerConfiguration config) { + stateLock = new Object(); this.streamContext = streamContext; - bindPso = PullSubscribeOptions.bind(streamContext.streamName, ci.getName()); - lastConsumerInfo = ci; + ordered = true; + consumerName = null; + originalOrderedCc = ConsumerConfiguration.builder() + .filterSubject(config.getFilterSubject()) + .deliverPolicy(config.getDeliverPolicy()) + .startSequence(config.getStartSequence()) + .startTime(config.getStartTime()) + .replayPolicy(config.getReplayPolicy()) + .headersOnly(config.getHeadersOnly()) + .build(); + subscribeSubject = originalOrderedCc.getFilterSubject(); + unorderedBindPso = null; + } + + static class OrderedPullSubscribeOptionsBuilder extends PullSubscribeOptions.Builder { + public OrderedPullSubscribeOptionsBuilder(String streamName, ConsumerConfiguration cc) { + stream(streamName); + configuration(cc); + ordered = true; + } + } + + public NatsJetStreamPullSubscription subscribe(MessageHandler messageHandler) throws IOException, JetStreamApiException { + PullSubscribeOptions pso; + if (ordered) { + if (lastConsumer != null) { + highestSeq = Math.max(highestSeq, lastConsumer.pmm.lastStreamSeq); + } + ConsumerConfiguration cc = lastConsumer == null + ? originalOrderedCc + : nextOrderedConsumerConfiguration(originalOrderedCc, highestSeq, null); + pso = new OrderedPullSubscribeOptionsBuilder(streamContext.streamName, cc).build(); + } + else { + pso = unorderedBindPso; + } + + if (messageHandler == null) { + return (NatsJetStreamPullSubscription)streamContext.js.subscribe(subscribeSubject, pso); + } + + if (dispatcher == null) { + dispatcher = streamContext.js.conn.createDispatcher(); + } + return (NatsJetStreamPullSubscription)streamContext.js.subscribe(subscribeSubject, dispatcher, messageHandler, pso); + } + + private void checkState() throws IOException { + if (lastConsumer != null) { + if (ordered) { + if (!lastConsumer.finished) { + throw new IOException(lastConsumerType + "is already running. Ordered Consumer does not allow multiple instances at time."); + } + } + if (lastConsumer.finished) { + lastConsumer.lenientClose(); // finished, might as well make sure the sub is closed. + } + } + } + + private NatsMessageConsumerBase trackConsume(ConsumeType ct, NatsMessageConsumerBase con) { + lastConsumerType = ct; + lastConsumer = con; + return con; } /** @@ -45,7 +135,7 @@ public class NatsConsumerContext implements ConsumerContext { */ @Override public String getConsumerName() { - return lastConsumerInfo.getName(); + return consumerName; } /** @@ -53,13 +143,18 @@ public String getConsumerName() { */ @Override public ConsumerInfo getConsumerInfo() throws IOException, JetStreamApiException { - lastConsumerInfo = streamContext.jsm.getConsumerInfo(streamContext.streamName, lastConsumerInfo.getName()); - return lastConsumerInfo; + if (consumerName != null) { + cachedConsumerInfo = streamContext.jsm.getConsumerInfo(streamContext.streamName, cachedConsumerInfo.getName()); + } + return cachedConsumerInfo; } + /** + * {@inheritDoc} + */ @Override public ConsumerInfo getCachedConsumerInfo() { - return lastConsumerInfo; + return cachedConsumerInfo; } /** @@ -67,7 +162,7 @@ public ConsumerInfo getCachedConsumerInfo() { */ @Override public Message next() throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException { - return new NextSub(DEFAULT_EXPIRES_IN_MILLIS, streamContext.js, bindPso).next(); + return next(DEFAULT_EXPIRES_IN_MILLIS); } /** @@ -75,10 +170,7 @@ public Message next() throws IOException, InterruptedException, JetStreamStatusC */ @Override public Message next(Duration maxWait) throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException { - if (maxWait == null) { - return new NextSub(DEFAULT_EXPIRES_IN_MILLIS, streamContext.js, bindPso).next(); - } - return next(maxWait.toMillis()); + return maxWait == null ? next(DEFAULT_EXPIRES_IN_MILLIS) : next(maxWait.toMillis()); } /** @@ -86,36 +178,24 @@ public Message next(Duration maxWait) throws IOException, InterruptedException, */ @Override public Message next(long maxWaitMillis) throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException { - if (maxWaitMillis < MIN_EXPIRES_MILLS) { - throw new IllegalArgumentException("Max wait must be at least " + MIN_EXPIRES_MILLS + " milliseconds."); - } - return new NextSub(maxWaitMillis, streamContext.js, bindPso).next(); - } + synchronized (stateLock) { + checkState(); + if (maxWaitMillis < MIN_EXPIRES_MILLS) { + throw new IllegalArgumentException("Max wait must be at least " + MIN_EXPIRES_MILLS + " milliseconds."); + } - static class NextSub { - private final long maxWaitMillis; - private final NatsJetStreamPullSubscription sub; + NatsMessageConsumerBase c = new NatsMessageConsumerBase(cachedConsumerInfo, subscribe(null)); + trackConsume(ConsumeType.next, c); + c.sub._pull(PullRequestOptions.builder(1).expiresIn(maxWaitMillis - EXPIRE_ADJUSTMENT).build(), false, null); - public NextSub(long maxWaitMillis, NatsJetStream js, PullSubscribeOptions pso) throws JetStreamApiException, IOException { - sub = new SimplifiedSubscriptionMaker(js, pso).makeSubscription(null); - this.maxWaitMillis = maxWaitMillis; - sub._pull(PullRequestOptions.builder(1).expiresIn(maxWaitMillis - EXPIRE_ADJUSTMENT).build(), false, null); - } - - Message next() throws JetStreamStatusCheckedException, InterruptedException { try { - return sub.nextMessage(maxWaitMillis); + return c.sub.nextMessage(maxWaitMillis); } catch (JetStreamStatusException e) { throw new JetStreamStatusCheckedException(e); } finally { - try { - sub.unsubscribe(); - } - catch (Exception ignore) { - // ignored - } + c.lenientClose(); } } } @@ -141,44 +221,51 @@ public FetchConsumer fetchBytes(int maxBytes) throws IOException, JetStreamApiEx */ @Override public FetchConsumer fetch(FetchConsumeOptions fetchConsumeOptions) throws IOException, JetStreamApiException { - Validator.required(fetchConsumeOptions, "Fetch Consume Options"); - return new NatsFetchConsumer(new SimplifiedSubscriptionMaker(streamContext.js, bindPso), lastConsumerInfo, fetchConsumeOptions); + synchronized (stateLock) { + checkState(); + Validator.required(fetchConsumeOptions, "Fetch Consume Options"); + return (FetchConsumer)trackConsume(ConsumeType.fetch, new NatsFetchConsumer(this, cachedConsumerInfo, fetchConsumeOptions)); + } } /** * {@inheritDoc} */ @Override - public IterableConsumer startIterate() throws IOException, JetStreamApiException { - return new NatsIterableConsumer(new SimplifiedSubscriptionMaker(streamContext.js, bindPso), lastConsumerInfo, DEFAULT_CONSUME_OPTIONS); + public IterableConsumer iterate() throws IOException, JetStreamApiException { + return iterate(DEFAULT_CONSUME_OPTIONS); } /** * {@inheritDoc} */ @Override - public IterableConsumer startIterate(ConsumeOptions consumeOptions) throws IOException, JetStreamApiException { - Validator.required(consumeOptions, "Consume Options"); - return new NatsIterableConsumer(new SimplifiedSubscriptionMaker(streamContext.js, bindPso), lastConsumerInfo, consumeOptions); + public IterableConsumer iterate(ConsumeOptions consumeOptions) throws IOException, JetStreamApiException { + synchronized (stateLock) { + checkState(); + Validator.required(consumeOptions, "Consume Options"); + return (IterableConsumer) trackConsume(ConsumeType.iterate, new NatsIterableConsumer(this, cachedConsumerInfo, consumeOptions)); + } } /** * {@inheritDoc} */ @Override - public MessageConsumer startConsume(MessageHandler handler) throws IOException, JetStreamApiException { - Validator.required(handler, "Message Handler"); - return new NatsMessageConsumer(new SimplifiedSubscriptionMaker(streamContext.js, bindPso), lastConsumerInfo, handler, DEFAULT_CONSUME_OPTIONS); + public MessageConsumer consume(MessageHandler handler) throws IOException, JetStreamApiException { + return consume(handler, DEFAULT_CONSUME_OPTIONS); } /** * {@inheritDoc} */ @Override - public MessageConsumer startConsume(MessageHandler handler, ConsumeOptions consumeOptions) throws IOException, JetStreamApiException { - Validator.required(handler, "Message Handler"); - Validator.required(consumeOptions, "Consume Options"); - return new NatsMessageConsumer(new SimplifiedSubscriptionMaker(streamContext.js, bindPso), lastConsumerInfo, handler, consumeOptions); + public MessageConsumer consume(MessageHandler handler, ConsumeOptions consumeOptions) throws IOException, JetStreamApiException { + synchronized (stateLock) { + checkState(); + Validator.required(handler, "Message Handler"); + Validator.required(consumeOptions, "Consume Options"); + return trackConsume(ConsumeType.consume, new NatsMessageConsumer(this, cachedConsumerInfo, handler, consumeOptions)); + } } - } diff --git a/src/main/java/io/nats/client/impl/NatsFetchConsumer.java b/src/main/java/io/nats/client/impl/NatsFetchConsumer.java index 2ecefd420..8c7146161 100644 --- a/src/main/java/io/nats/client/impl/NatsFetchConsumer.java +++ b/src/main/java/io/nats/client/impl/NatsFetchConsumer.java @@ -24,14 +24,13 @@ class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer private long startNanos; NatsFetchConsumer(SimplifiedSubscriptionMaker subscriptionMaker, - ConsumerInfo lastConsumerInfo, - FetchConsumeOptions fetchConsumeOptions) throws IOException, JetStreamApiException { - super(lastConsumerInfo); - initSub(subscriptionMaker.makeSubscription(null)); - maxWaitNanos = fetchConsumeOptions.getExpiresIn() * 1_000_000; + ConsumerInfo cachedConsumerInfo, + FetchConsumeOptions fetchConsumeOptions) throws IOException, JetStreamApiException { + super(cachedConsumerInfo, subscriptionMaker.subscribe(null)); + maxWaitNanos = fetchConsumeOptions.getExpiresInMillis() * 1_000_000; PullRequestOptions pro = PullRequestOptions.builder(fetchConsumeOptions.getMaxMessages()) .maxBytes(fetchConsumeOptions.getMaxBytes()) - .expiresIn(fetchConsumeOptions.getExpiresIn()) + .expiresIn(fetchConsumeOptions.getExpiresInMillis()) .idleHeartbeat(fetchConsumeOptions.getIdleHeartbeat()) .build(); pullSubject = sub._pull(pro, false, null); @@ -41,16 +40,34 @@ class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer @Override public Message nextMessage() throws InterruptedException, JetStreamStatusCheckedException { try { - if (startNanos == -1) { - startNanos = System.nanoTime(); + if (finished) { + return null; } - long timeLeftMillis = (maxWaitNanos - (System.nanoTime() - startNanos)) / 1_000_000; - // if the manager thinks it has received everything in the pull, it means // that all the messages are already in the internal queue and there is // no waiting necessary - if (timeLeftMillis < 1 | pmm.pendingMessages < 1 || (pmm.trackingBytes && pmm.pendingBytes < 1)) { + if (noMorePending()) { + Message m = sub._nextUnmanagedNoWait(pullSubject); + if (m == null) { + // if there are no messages in the internal cache AND there are no more pending, + // they all have been read and we can go ahead and close the subscription. + finished = true; + lenientClose(); + } + return m; + } + + // by not starting the timer until the first call, it gives a little buffer around + // the next message to account for latency of incoming messages + if (startNanos == -1) { + startNanos = System.nanoTime(); + } + long timeLeftMillis = (maxWaitNanos - (System.nanoTime() - startNanos)) / 1_000_000; + + // if the timer has run out, don't allow waiting + // this might happen once, but it should already be noMorePending + if (timeLeftMillis < 1) { return sub._nextUnmanagedNoWait(pullSubject); // null means don't wait } diff --git a/src/main/java/io/nats/client/impl/NatsIterableConsumer.java b/src/main/java/io/nats/client/impl/NatsIterableConsumer.java index 54de313cf..2c928cd37 100644 --- a/src/main/java/io/nats/client/impl/NatsIterableConsumer.java +++ b/src/main/java/io/nats/client/impl/NatsIterableConsumer.java @@ -21,8 +21,8 @@ class NatsIterableConsumer extends NatsMessageConsumer implements IterableConsumer { - NatsIterableConsumer(SimplifiedSubscriptionMaker subscriptionMaker, ConsumerInfo lastConsumerInfo, ConsumeOptions opts) throws IOException, JetStreamApiException { - super(subscriptionMaker, lastConsumerInfo, null, opts); + NatsIterableConsumer(SimplifiedSubscriptionMaker subscriptionMaker, ConsumerInfo cachedConsumerInfo, ConsumeOptions opts) throws IOException, JetStreamApiException { + super(subscriptionMaker, cachedConsumerInfo, null, opts); } /** diff --git a/src/main/java/io/nats/client/impl/NatsJetStream.java b/src/main/java/io/nats/client/impl/NatsJetStream.java index 49e0e3b6f..2ac309f9d 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStream.java +++ b/src/main/java/io/nats/client/impl/NatsJetStream.java @@ -649,7 +649,7 @@ public StreamContext streamContext(String streamName) throws IOException, JetStr public ConsumerContext consumerContext(String streamName, String consumerName) throws IOException, JetStreamApiException { Validator.validateStreamName(streamName, true); Validator.required(consumerName, "Consumer Name"); - return getNatsStreamContext(streamName).consumerContext(consumerName); + return getNatsStreamContext(streamName).createConsumerContext(consumerName); } private NatsStreamContext getNatsStreamContext(String streamName) throws IOException, JetStreamApiException { diff --git a/src/main/java/io/nats/client/impl/NatsMessageConsumer.java b/src/main/java/io/nats/client/impl/NatsMessageConsumer.java index cffb76d33..0e9092aea 100644 --- a/src/main/java/io/nats/client/impl/NatsMessageConsumer.java +++ b/src/main/java/io/nats/client/impl/NatsMessageConsumer.java @@ -25,15 +25,12 @@ class NatsMessageConsumer extends NatsMessageConsumerBase implements TrackPendin protected final PullRequestOptions rePullPro; protected final int thresholdMessages; protected final long thresholdBytes; - protected final SimplifiedSubscriptionMaker subscriptionMaker; NatsMessageConsumer(SimplifiedSubscriptionMaker subscriptionMaker, - ConsumerInfo lastConsumerInfo, + ConsumerInfo cachedConsumerInfo, MessageHandler messageHandler, ConsumeOptions opts) throws IOException, JetStreamApiException { - super(lastConsumerInfo); - this.subscriptionMaker = subscriptionMaker; - initSub(subscriptionMaker.makeSubscription(messageHandler)); + super(cachedConsumerInfo, subscriptionMaker.subscribe(messageHandler)); int bm = opts.getBatchSize(); long bb = opts.getBatchBytes(); @@ -42,7 +39,7 @@ class NatsMessageConsumer extends NatsMessageConsumerBase implements TrackPendin long rePullBytes = bb == 0 ? 0 : Math.max(1, bb * opts.getThresholdPercent() / 100); rePullPro = PullRequestOptions.builder(rePullMessages) .maxBytes(rePullBytes) - .expiresIn(opts.getExpiresIn()) + .expiresIn(opts.getExpiresInMillis()) .idleHeartbeat(opts.getIdleHeartbeat()) .build(); @@ -51,7 +48,7 @@ class NatsMessageConsumer extends NatsMessageConsumerBase implements TrackPendin sub._pull(PullRequestOptions.builder(bm) .maxBytes(bb) - .expiresIn(opts.getExpiresIn()) + .expiresIn(opts.getExpiresInMillis()) .idleHeartbeat(opts.getIdleHeartbeat()) .build(), false, this); @@ -59,9 +56,7 @@ class NatsMessageConsumer extends NatsMessageConsumerBase implements TrackPendin @Override public void track(int pendingMessages, long pendingBytes, boolean trackingBytes) { - if (!stopped && - (pmm.pendingMessages <= thresholdMessages - || (pmm.trackingBytes && pmm.pendingBytes <= thresholdBytes))) + if (!stopped && (pendingMessages <= thresholdMessages || (trackingBytes && pendingBytes <= thresholdBytes))) { sub._pull(rePullPro, false, this); } diff --git a/src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java b/src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java index f0c33a180..2746b4754 100644 --- a/src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java +++ b/src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java @@ -24,27 +24,37 @@ class NatsMessageConsumerBase implements MessageConsumer { protected NatsJetStreamPullSubscription sub; protected PullMessageManager pmm; protected boolean stopped; - protected ConsumerInfo lastConsumerInfo; + protected boolean finished; + protected ConsumerInfo cachedConsumerInfo; - NatsMessageConsumerBase(ConsumerInfo lastConsumerInfo) { - this.lastConsumerInfo = lastConsumerInfo; - } - - protected void initSub(NatsJetStreamPullSubscription sub) throws JetStreamApiException, IOException { + NatsMessageConsumerBase(ConsumerInfo cachedConsumerInfo, NatsJetStreamPullSubscription sub) { + this.cachedConsumerInfo = cachedConsumerInfo; this.sub = sub; - if (lastConsumerInfo == null) { - lastConsumerInfo = sub.getConsumerInfo(); - } pmm = (PullMessageManager)sub.manager; } + boolean noMorePending() { + return pmm.pendingMessages < 1 || (pmm.trackingBytes && pmm.pendingBytes < 1); + } + + public boolean isStopped() { + return stopped; + } + + public boolean isFinished() { + return finished; + } + /** * {@inheritDoc} */ @Override public ConsumerInfo getConsumerInfo() throws IOException, JetStreamApiException { - lastConsumerInfo = sub.getConsumerInfo(); - return lastConsumerInfo; + // don't loo+k up consumer info if it was never set - this check is for ordered consumer + if (cachedConsumerInfo != null) { + cachedConsumerInfo = sub.getConsumerInfo(); + } + return cachedConsumerInfo; } /** @@ -52,7 +62,7 @@ public ConsumerInfo getConsumerInfo() throws IOException, JetStreamApiException */ @Override public ConsumerInfo getCachedConsumerInfo() { - return lastConsumerInfo; + return cachedConsumerInfo; } /** @@ -71,12 +81,17 @@ public void stop(long timeout) throws InterruptedException { } finally { stopped = true; + finished = true; } } } @Override public void close() throws Exception { + lenientClose(); + } + + protected void lenientClose() { try { if (!stopped && sub.isActive()) { if (sub.getNatsDispatcher() != null) { @@ -90,5 +105,9 @@ public void close() throws Exception { catch (Throwable ignore) { // nothing to do } + finally { + stopped = true; + finished = true; + } } } diff --git a/src/main/java/io/nats/client/impl/NatsStreamContext.java b/src/main/java/io/nats/client/impl/NatsStreamContext.java index ae85b14e1..e3678ce42 100644 --- a/src/main/java/io/nats/client/impl/NatsStreamContext.java +++ b/src/main/java/io/nats/client/impl/NatsStreamContext.java @@ -20,9 +20,6 @@ import java.io.IOException; import java.util.List; -import static io.nats.client.ConsumeOptions.DEFAULT_CONSUME_OPTIONS; -import static io.nats.client.support.ConsumerUtils.generateConsumerName; - /** * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ @@ -83,10 +80,16 @@ public PurgeResponse purge(PurgeOptions options) throws IOException, JetStreamAp * {@inheritDoc} */ @Override - public ConsumerContext consumerContext(String consumerName) throws IOException, JetStreamApiException { + public ConsumerContext createConsumerContext(String consumerName) throws IOException, JetStreamApiException { return new NatsConsumerContext(this, jsm.getConsumerInfo(streamName, consumerName)); } + @Override + public ConsumerContext createOrderedConsumer(OrderedConsumerConfiguration config) throws IOException, JetStreamApiException { + Validator.required(config, "Ordered Consumer Config"); + return new NatsConsumerContext(this, config); + } + /** * {@inheritDoc} */ @@ -174,53 +177,4 @@ public boolean deleteMessage(long seq) throws IOException, JetStreamApiException public boolean deleteMessage(long seq, boolean erase) throws IOException, JetStreamApiException { return jsm.deleteMessage(streamName, seq, erase); } - - static class OrderedPullSubscribeOptionsBuilder extends PullSubscribeOptions.Builder { - public OrderedPullSubscribeOptionsBuilder(String streamName, ConsumerConfiguration cc) { - stream(streamName); - configuration(cc); - ordered = true; - } - } - - @Override - public IterableConsumer startOrderedIterate(OrderedConsumerConfiguration config) throws IOException, JetStreamApiException { - return startOrderedIterate(config, DEFAULT_CONSUME_OPTIONS); - } - - @Override - public IterableConsumer startOrderedIterate(OrderedConsumerConfiguration config, ConsumeOptions consumeOptions) throws IOException, JetStreamApiException { - Validator.required(config, "Ordered Consumer Config"); - Validator.required(consumeOptions, "Consume Options"); - ConsumerConfiguration cc = getBackingConsumerConfiguration(config); - PullSubscribeOptions pso = new OrderedPullSubscribeOptionsBuilder(streamName, cc).build(); - return new NatsIterableConsumer(new SimplifiedSubscriptionMaker(js, pso, cc.getFilterSubject()), null, consumeOptions); - } - - @Override - public MessageConsumer startOrderedConsume(OrderedConsumerConfiguration config, MessageHandler handler) throws IOException, JetStreamApiException { - return startOrderedConsume(config, handler, DEFAULT_CONSUME_OPTIONS); - } - - @Override - public MessageConsumer startOrderedConsume(OrderedConsumerConfiguration config, MessageHandler handler, ConsumeOptions consumeOptions) throws IOException, JetStreamApiException { - Validator.required(config, "Ordered Consumer Config"); - Validator.required(handler, "Message Handler"); - Validator.required(consumeOptions, "Consume Options"); - ConsumerConfiguration cc = getBackingConsumerConfiguration(config); - PullSubscribeOptions pso = new OrderedPullSubscribeOptionsBuilder(streamName, cc).build(); - return new NatsMessageConsumer(new SimplifiedSubscriptionMaker(js, pso, cc.getFilterSubject()), null, handler, consumeOptions); - } - - private ConsumerConfiguration getBackingConsumerConfiguration(OrderedConsumerConfiguration config) { - return ConsumerConfiguration.builder() - .name(generateConsumerName()) - .filterSubject(config.getFilterSubject()) - .deliverPolicy(config.getDeliverPolicy()) - .startSequence(config.getStartSequence()) - .startTime(config.getStartTime()) - .replayPolicy(config.getReplayPolicy()) - .headersOnly(config.getHeadersOnly()) - .build(); - } } diff --git a/src/main/java/io/nats/client/impl/OrderedMessageManager.java b/src/main/java/io/nats/client/impl/OrderedMessageManager.java index 4860b1f42..687868986 100644 --- a/src/main/java/io/nats/client/impl/OrderedMessageManager.java +++ b/src/main/java/io/nats/client/impl/OrderedMessageManager.java @@ -16,25 +16,27 @@ import io.nats.client.Message; import io.nats.client.SubscribeOptions; import io.nats.client.api.ConsumerConfiguration; -import io.nats.client.api.DeliverPolicy; import java.util.concurrent.atomic.AtomicReference; import static io.nats.client.impl.MessageManager.ManageResult.MESSAGE; import static io.nats.client.impl.MessageManager.ManageResult.STATUS_HANDLED; +import static io.nats.client.support.ConsumerUtils.nextOrderedConsumerConfiguration; class OrderedMessageManager extends PushMessageManager { protected long expectedExternalConsumerSeq; protected final AtomicReference targetSid; - protected OrderedMessageManager(NatsConnection conn, - NatsJetStream js, - String stream, - SubscribeOptions so, - ConsumerConfiguration originalCc, - boolean queueMode, - boolean syncMode) { + protected OrderedMessageManager( + NatsConnection conn, + NatsJetStream js, + String stream, + SubscribeOptions so, + ConsumerConfiguration originalCc, + boolean queueMode, + boolean syncMode) + { super(conn, js, stream, so, originalCc, queueMode, syncMode); expectedExternalConsumerSeq = 1; // always starts at 1 targetSid = new AtomicReference<>(); @@ -82,12 +84,7 @@ private void handleErrorCondition() { // 3. make a new consumer using the same deliver subject but // with a new starting point - ConsumerConfiguration userCC = ConsumerConfiguration.builder(originalCc) - .deliverPolicy(DeliverPolicy.ByStartSequence) - .deliverSubject(newDeliverSubject) - .startSequence(Math.max(1, lastStreamSeq + 1)) - .startTime(null) // clear start time in case it was originally set - .build(); + ConsumerConfiguration userCC = nextOrderedConsumerConfiguration(originalCc, lastStreamSeq, newDeliverSubject); js._createConsumerUnsubscribeOnException(stream, userCC, sub); // 4. restart the manager. diff --git a/src/main/java/io/nats/client/impl/PullOrderedMessageManager.java b/src/main/java/io/nats/client/impl/PullOrderedMessageManager.java index a469010d3..2ca41e884 100644 --- a/src/main/java/io/nats/client/impl/PullOrderedMessageManager.java +++ b/src/main/java/io/nats/client/impl/PullOrderedMessageManager.java @@ -16,13 +16,12 @@ import io.nats.client.Message; import io.nats.client.SubscribeOptions; import io.nats.client.api.ConsumerConfiguration; -import io.nats.client.api.DeliverPolicy; -import io.nats.client.support.ConsumerUtils; import java.util.concurrent.atomic.AtomicReference; import static io.nats.client.impl.MessageManager.ManageResult.MESSAGE; import static io.nats.client.impl.MessageManager.ManageResult.STATUS_HANDLED; +import static io.nats.client.support.ConsumerUtils.nextOrderedConsumerConfiguration; class PullOrderedMessageManager extends PullMessageManager { @@ -86,13 +85,7 @@ private void handleErrorCondition() { // 3. make a new consumer using the same deliver subject but // with a new starting point - ConsumerConfiguration userCC = ConsumerConfiguration.builder(originalCc) - .name(ConsumerUtils.generateConsumerName()) - .deliverPolicy(DeliverPolicy.ByStartSequence) - .deliverSubject(newDeliverSubject) - .startSequence(Math.max(1, lastStreamSeq + 1)) - .startTime(null) // clear start time in case it was originally set - .build(); + ConsumerConfiguration userCC = nextOrderedConsumerConfiguration(originalCc, lastStreamSeq, newDeliverSubject); js._createConsumerUnsubscribeOnException(stream, userCC, sub); // 4. restart the manager. diff --git a/src/main/java/io/nats/client/impl/SimplifiedSubscriptionMaker.java b/src/main/java/io/nats/client/impl/SimplifiedSubscriptionMaker.java index ff883fe18..b2bf03217 100644 --- a/src/main/java/io/nats/client/impl/SimplifiedSubscriptionMaker.java +++ b/src/main/java/io/nats/client/impl/SimplifiedSubscriptionMaker.java @@ -13,38 +13,11 @@ package io.nats.client.impl; -import io.nats.client.Dispatcher; import io.nats.client.JetStreamApiException; import io.nats.client.MessageHandler; -import io.nats.client.PullSubscribeOptions; import java.io.IOException; -public class SimplifiedSubscriptionMaker { - private final NatsJetStream js; - private final PullSubscribeOptions pso; - private final String subscribeSubject; - @SuppressWarnings("FieldCanBeLocal") // can't be local because I want to keep its reference as part of the state - private Dispatcher dispatcher; - - public SimplifiedSubscriptionMaker(NatsJetStream js, PullSubscribeOptions pso) { - this.js = js; - this.pso = pso; - this.subscribeSubject = null; - } - - public SimplifiedSubscriptionMaker(NatsJetStream js, PullSubscribeOptions pso, String subscribeSubject) { - this.js = js; - this.pso = pso; - this.subscribeSubject = subscribeSubject; - } - - public NatsJetStreamPullSubscription makeSubscription(MessageHandler messageHandler) throws IOException, JetStreamApiException { - if (messageHandler == null) { - return (NatsJetStreamPullSubscription)js.subscribe(subscribeSubject, pso); - } - - dispatcher = js.conn.createDispatcher(); - return (NatsJetStreamPullSubscription)js.subscribe(subscribeSubject, dispatcher, messageHandler, pso); - } +public interface SimplifiedSubscriptionMaker { + NatsJetStreamPullSubscription subscribe(MessageHandler messageHandler) throws IOException, JetStreamApiException; } diff --git a/src/main/java/io/nats/client/support/ConsumerUtils.java b/src/main/java/io/nats/client/support/ConsumerUtils.java index 369df22a6..af15a5622 100644 --- a/src/main/java/io/nats/client/support/ConsumerUtils.java +++ b/src/main/java/io/nats/client/support/ConsumerUtils.java @@ -14,6 +14,8 @@ package io.nats.client.support; import io.nats.client.NUID; +import io.nats.client.api.ConsumerConfiguration; +import io.nats.client.api.DeliverPolicy; public abstract class ConsumerUtils { private ConsumerUtils() {} /* ensures cannot be constructed */ @@ -21,4 +23,17 @@ private ConsumerUtils() {} /* ensures cannot be constructed */ public static String generateConsumerName() { return NUID.nextGlobalSequence(); } + + public static ConsumerConfiguration nextOrderedConsumerConfiguration( + ConsumerConfiguration originalCc, + long lastStreamSeq, + String newDeliverSubject) + { + return ConsumerConfiguration.builder(originalCc) + .deliverPolicy(DeliverPolicy.ByStartSequence) + .deliverSubject(newDeliverSubject) + .startSequence(Math.max(1, lastStreamSeq + 1)) + .startTime(null) // clear start time in case it was originally set + .build(); + } } diff --git a/src/test/java/io/nats/client/impl/SimplificationTests.java b/src/test/java/io/nats/client/impl/SimplificationTests.java index eb2e4d98d..215bccc58 100644 --- a/src/test/java/io/nats/client/impl/SimplificationTests.java +++ b/src/test/java/io/nats/client/impl/SimplificationTests.java @@ -58,7 +58,7 @@ public void testStreamContext() throws Exception { } private static void _testStreamContext(JetStream js, StreamContext streamContext) throws IOException, JetStreamApiException { - assertThrows(JetStreamApiException.class, () -> streamContext.consumerContext(DURABLE)); + assertThrows(JetStreamApiException.class, () -> streamContext.createConsumerContext(DURABLE)); assertThrows(JetStreamApiException.class, () -> streamContext.deleteConsumer(DURABLE)); ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(DURABLE).build(); @@ -75,10 +75,10 @@ private static void _testStreamContext(JetStream js, StreamContext streamContext assertEquals(1, streamContext.getConsumerNames().size()); assertEquals(1, streamContext.getConsumers().size()); - assertNotNull(streamContext.consumerContext(DURABLE)); + assertNotNull(streamContext.createConsumerContext(DURABLE)); streamContext.deleteConsumer(DURABLE); - assertThrows(JetStreamApiException.class, () -> streamContext.consumerContext(DURABLE)); + assertThrows(JetStreamApiException.class, () -> streamContext.createConsumerContext(DURABLE)); assertThrows(JetStreamApiException.class, () -> streamContext.deleteConsumer(DURABLE)); // coverage @@ -238,14 +238,14 @@ public void testIterableConsumer() throws Exception { int stopCount = 500; // create the consumer then use it - try (IterableConsumer consumer = consumerContext.startIterate()) { + try (IterableConsumer consumer = consumerContext.iterate()) { _testIterable(js, stopCount, consumer); } // coverage - IterableConsumer consumer = consumerContext.startIterate(ConsumeOptions.DEFAULT_CONSUME_OPTIONS); + IterableConsumer consumer = consumerContext.iterate(ConsumeOptions.DEFAULT_CONSUME_OPTIONS); consumer.close(); - assertThrows(IllegalArgumentException.class, () -> consumerContext.startIterate((ConsumeOptions) null)); + assertThrows(IllegalArgumentException.class, () -> consumerContext.iterate((ConsumeOptions) null)); }); } @@ -260,7 +260,8 @@ public void testOrderedIterableConsumerBasic() throws Exception { int stopCount = 500; OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(SUBJECT); - try (IterableConsumer consumer = sc.startOrderedIterate(occ)) { + ConsumerContext cc = sc.createOrderedConsumer(occ); + try (IterableConsumer consumer = cc.iterate()) { _testIterable(js, stopCount, consumer); } }); @@ -332,7 +333,7 @@ public void testConsumeWithHandler() throws Exception { } }; - try (MessageConsumer consumer = consumerContext.startConsume(handler)) { + try (MessageConsumer consumer = consumerContext.consume(handler)) { latch.await(); consumer.stop(200); assertTrue(atomicCount.get() > 500); @@ -388,14 +389,14 @@ public void testCoverage() throws Exception { ConsumerContext cctx1 = nc.consumerContext(STREAM, name(1)); ConsumerContext cctx2 = nc.consumerContext(STREAM, name(2), JetStreamOptions.DEFAULT_JS_OPTIONS); ConsumerContext cctx3 = js.consumerContext(STREAM, name(3)); - ConsumerContext cctx4 = sctx1.consumerContext(name(4)); + ConsumerContext cctx4 = sctx1.createConsumerContext(name(4)); ConsumerContext cctx5 = sctx1.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(name(5)).build()); ConsumerContext cctx6 = sctx1.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(name(6)).build()); - closeConsumer(cctx1.startIterate(), name(1), true); - closeConsumer(cctx2.startIterate(ConsumeOptions.DEFAULT_CONSUME_OPTIONS), name(2), true); - closeConsumer(cctx3.startConsume(m -> {}), name(3), true); - closeConsumer(cctx4.startConsume(m -> {}, ConsumeOptions.DEFAULT_CONSUME_OPTIONS), name(4), true); + closeConsumer(cctx1.iterate(), name(1), true); + closeConsumer(cctx2.iterate(ConsumeOptions.DEFAULT_CONSUME_OPTIONS), name(2), true); + closeConsumer(cctx3.consume(m -> {}), name(3), true); + closeConsumer(cctx4.consume(m -> {}, ConsumeOptions.DEFAULT_CONSUME_OPTIONS), name(4), true); closeConsumer(cctx5.fetchMessages(1), name(5), false); closeConsumer(cctx6.fetchBytes(1000), name(6), false); }); @@ -413,7 +414,7 @@ private void closeConsumer(MessageConsumer con, String name, boolean doStop) thr public void testFetchConsumeOptionsBuilder() { FetchConsumeOptions fco = FetchConsumeOptions.builder().build(); assertEquals(DEFAULT_MESSAGE_COUNT, fco.getMaxMessages()); - assertEquals(DEFAULT_EXPIRES_IN_MILLIS, fco.getExpiresIn()); + assertEquals(DEFAULT_EXPIRES_IN_MILLIS, fco.getExpiresInMillis()); assertEquals(DEFAULT_THRESHOLD_PERCENT, fco.getThresholdPercent()); assertEquals(0, fco.getMaxBytes()); assertEquals(DEFAULT_EXPIRES_IN_MILLIS * MAX_IDLE_HEARTBEAT_PERCENT / 100, fco.getIdleHeartbeat()); @@ -443,7 +444,7 @@ public void testFetchConsumeOptionsBuilder() { public void testConsumeOptionsBuilder() { ConsumeOptions co = ConsumeOptions.builder().build(); assertEquals(DEFAULT_MESSAGE_COUNT, co.getBatchSize()); - assertEquals(DEFAULT_EXPIRES_IN_MILLIS, co.getExpiresIn()); + assertEquals(DEFAULT_EXPIRES_IN_MILLIS, co.getExpiresInMillis()); assertEquals(DEFAULT_THRESHOLD_PERCENT, co.getThresholdPercent()); assertEquals(0, co.getBatchBytes()); assertEquals(DEFAULT_EXPIRES_IN_MILLIS * MAX_IDLE_HEARTBEAT_PERCENT / 100, co.getIdleHeartbeat()); @@ -482,13 +483,13 @@ public void testConsumeOptionsBuilder() { assertEquals(100, co.getThresholdPercent()); co = ConsumeOptions.builder().expiresIn(0).build(); - assertEquals(DEFAULT_EXPIRES_IN_MILLIS, co.getExpiresIn()); + assertEquals(DEFAULT_EXPIRES_IN_MILLIS, co.getExpiresInMillis()); co = ConsumeOptions.builder().expiresIn(-1).build(); - assertEquals(DEFAULT_EXPIRES_IN_MILLIS, co.getExpiresIn()); + assertEquals(DEFAULT_EXPIRES_IN_MILLIS, co.getExpiresInMillis()); co = ConsumeOptions.builder().expiresIn(-999).build(); - assertEquals(DEFAULT_EXPIRES_IN_MILLIS, co.getExpiresIn()); + assertEquals(DEFAULT_EXPIRES_IN_MILLIS, co.getExpiresInMillis()); assertThrows(IllegalArgumentException.class, () -> ConsumeOptions.builder().expiresIn(MIN_EXPIRES_MILLS - 1).build()); @@ -513,46 +514,104 @@ protected Boolean beforeQueueProcessorImpl(NatsMessage msg) { } } + public static class PullOrderedNextTestDropSimulator extends PullOrderedMessageManager { + public PullOrderedNextTestDropSimulator(NatsConnection conn, NatsJetStream js, String stream, SubscribeOptions so, ConsumerConfiguration serverCC, boolean queueMode, boolean syncMode) { + super(conn, js, stream, so, serverCC, syncMode); + } + + // these have to be static or the test keeps repeating + static boolean ss2 = true; + static boolean ss5 = true; + + @Override + protected Boolean beforeQueueProcessorImpl(NatsMessage msg) { + if (msg.isJetStream()) { + long ss = msg.metaData().streamSequence(); + if (ss == 2 && ss2) { + ss2 = false; + return false; + } + if (ss == 5 && ss5) { + ss5 = false; + return false; + } + } + + return super.beforeQueueProcessorImpl(msg); + } + } + @Test - public void testOrderedIterable() throws Exception { + public void testOrderedActives() throws Exception { runInJsServer(this::mustBeAtLeast291, nc -> { // Setup JetStream js = nc.jetStream(); JetStreamManagement jsm = nc.jetStreamManagement(); - String subject = subject(111); - createMemoryStream(jsm, stream(111), subject); + String stream = stream("soa"); + String subject = subject("uoa"); + createMemoryStream(jsm, stream, subject); + + StreamContext sc = js.streamContext(stream); - StreamContext sc = js.streamContext(stream(111)); + jsPublish(js, subject, 101, 6); + + OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(subject); + // Get this in place before subscriptions are made + ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedNextTestDropSimulator::new; + testOrderedActiveNext(sc, occ); - // Get this in place before any subscriptions are made + // Get this in place before subscriptions are made ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new; + testOrderedActiveFetch(sc, occ); + testOrderedActiveIterable(sc, occ); + }); + } - // Published messages will be intercepted by the OrderedTestDropSimulator - new Thread(() -> { - try { - Thread.sleep(1000); // give the consumer time to get setup before publishing - jsPublish(js, subject, 101, 6); - } - catch (Exception e) { - throw new RuntimeException(e); + private static void testOrderedActiveNext(StreamContext sc, OrderedConsumerConfiguration occ) throws Exception { + ConsumerContext cc = sc.createOrderedConsumer(occ); + // Loop through the messages to make sure I get stream sequence 1 to 6 + int expectedStreamSeq = 1; + while (expectedStreamSeq <= 6) { + Message m = cc.next(1000); + if (m != null) { + assertEquals(expectedStreamSeq, m.metaData().streamSequence()); + assertEquals(1, m.metaData().consumerSequence()); + ++expectedStreamSeq; + } + } + } + + private static void testOrderedActiveFetch(StreamContext sc, OrderedConsumerConfiguration occ) throws Exception { + ConsumerContext cc = sc.createOrderedConsumer(occ); + try (FetchConsumer fcon = cc.fetchMessages(6)) { + // Loop through the messages to make sure I get stream sequence 1 to 6 + int expectedStreamSeq = 1; + while (expectedStreamSeq <= 6) { + Message m = fcon.nextMessage(); + if (m != null) { + assertEquals(expectedStreamSeq, m.metaData().streamSequence()); + assertEquals(EXPECTED_CON_SEQ_NUMS[expectedStreamSeq-1], m.metaData().consumerSequence()); + ++expectedStreamSeq; } - }).start(); + } + } + } - OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(subject); - try (IterableConsumer icon = sc.startOrderedIterate(occ)) { - // Loop through the messages to make sure I get stream sequence 1 to 6 - int expectedStreamSeq = 1; - while (expectedStreamSeq <= 6) { - Message m = icon.nextMessage(Duration.ofSeconds(1)); // use duration version here for coverage - if (m != null) { - assertEquals(expectedStreamSeq, m.metaData().streamSequence()); - assertEquals(EXPECTED_CON_SEQ_NUMS[expectedStreamSeq-1], m.metaData().consumerSequence()); - ++expectedStreamSeq; - } + private static void testOrderedActiveIterable(StreamContext sc, OrderedConsumerConfiguration occ) throws Exception { + ConsumerContext cc = sc.createOrderedConsumer(occ); + try (IterableConsumer icon = cc.iterate()) { + // Loop through the messages to make sure I get stream sequence 1 to 6 + int expectedStreamSeq = 1; + while (expectedStreamSeq <= 6) { + Message m = icon.nextMessage(Duration.ofSeconds(1)); // use duration version here for coverage + if (m != null) { + assertEquals(expectedStreamSeq, m.metaData().streamSequence()); + assertEquals(EXPECTED_CON_SEQ_NUMS[expectedStreamSeq-1], m.metaData().consumerSequence()); + ++expectedStreamSeq; } } - }); + } } @Test @@ -567,7 +626,7 @@ public void testOrderedConsume() throws Exception { StreamContext sc = js.streamContext(stream(222)); - // Get this in place before any subscriptions are made + // Get this in place before subscriptions are made ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new; CountDownLatch msgLatch = new CountDownLatch(6); @@ -582,7 +641,8 @@ public void testOrderedConsume() throws Exception { }; OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(subject); - try (MessageConsumer mcon = sc.startOrderedConsume(occ, handler)) { + ConsumerContext cc = sc.createOrderedConsumer(occ); + try (MessageConsumer mcon = cc.consume(handler)) { jsPublish(js, subject, 201, 6); // wait for the messages @@ -599,4 +659,83 @@ public void testOrderedConsume() throws Exception { } }); } + + @Test + public void testOrderedMultipleWays() throws Exception { + runInJsServer(this::mustBeAtLeast291, nc -> { + // Setup + JetStream js = nc.jetStream(); + JetStreamManagement jsm = nc.jetStreamManagement(); + + String subject = subject(333); + createMemoryStream(jsm, stream(333), subject); + + StreamContext sc = js.streamContext(stream(333)); + + OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(subject); + ConsumerContext ctx = sc.createOrderedConsumer(occ); + + assertNull(ctx.getConsumerName()); + assertNull(ctx.getConsumerInfo()); + assertNull(ctx.getCachedConsumerInfo()); + + for (int x = 0 ; x < 10_000; x++) { + js.publish(subject, ("multiple" + x).getBytes()); + } + + Message m = ctx.next(1000); + assertNotNull(m); + assertEquals(1, m.metaData().streamSequence()); + + int seq = 2; + try (FetchConsumer fc = ctx.fetchMessages(5)) { + while (seq <= 6) { + m = fc.nextMessage(); + assertNotNull(m); + assertEquals(seq, m.metaData().streamSequence()); + assertFalse(fc.isFinished()); + assertThrows(IOException.class, () -> ctx.next(1000)); + assertThrows(IOException.class, () -> ctx.consume(null)); + seq++; + } + + m = fc.nextMessage(); + assertNull(m); + assertTrue(fc.isFinished()); + } + + ConsumeOptions copts = ConsumeOptions.builder().batchSize(10).build(); + try (IterableConsumer ic = ctx.iterate(copts)) { + m = ic.nextMessage(1000); + ic.stop(100); + while (m != null) { + assertEquals(seq, m.metaData().streamSequence()); + if (!ic.isFinished()) { + assertThrows(IOException.class, () -> ctx.next(1000)); + assertThrows(IOException.class, () -> ctx.fetchMessages(1)); + } + ++seq; + m = ic.nextMessage(1000); + } + } + + m = ctx.next(1000); + assertNotNull(m); + assertEquals(seq++, m.metaData().streamSequence()); + + int last = Math.min(seq + 10, 9999); + int f = last - seq; + try (FetchConsumer fc = ctx.fetchMessages(f)) { + while (seq < last) { + m = fc.nextMessage(); + assertNotNull(m); + assertEquals(seq, m.metaData().streamSequence()); + assertFalse(fc.isFinished()); + assertThrows(IOException.class, () -> ctx.next(1000)); + assertThrows(IOException.class, () -> ctx.consume(null)); + seq++; + } + } + }); + } } diff --git a/src/test/java/io/nats/client/utils/TestBase.java b/src/test/java/io/nats/client/utils/TestBase.java index 24f6dd742..7f7d1ebe1 100644 --- a/src/test/java/io/nats/client/utils/TestBase.java +++ b/src/test/java/io/nats/client/utils/TestBase.java @@ -237,64 +237,64 @@ private static void cleanupJs(Connection c) public static final String KEY = "key"; public static final String DATA = "data"; - public static String stream(int seq) { - return STREAM + "-" + seq; + public static String stream(Object variant) { + return STREAM + "-" + variant; } - public static String mirror(int seq) { - return MIRROR + "-" + seq; + public static String mirror(Object variant) { + return MIRROR + "-" + variant; } - public static String source(int seq) { - return SOURCE + "-" + seq; + public static String source(Object variant) { + return SOURCE + "-" + variant; } - public static String subject(int seq) { - return SUBJECT + "-" + seq; + public static String subject(Object variant) { + return SUBJECT + "-" + variant; } public static String subjectDot(String field) { return SUBJECT + DOT + field; } - public static String queue(int seq) { - return QUEUE + "-" + seq; + public static String queue(Object variant) { + return QUEUE + "-" + variant; } - public static String durable(int seq) { - return DURABLE + "-" + seq; + public static String durable(Object variant) { + return DURABLE + "-" + variant; } - public static String durable(String vary, int seq) { - return DURABLE + "-" + vary + "-" + seq; + public static String durable(String vary, Object variant) { + return DURABLE + "-" + vary + "-" + variant; } - public static String name(int seq) { - return NAME + "-" + seq; + public static String name(Object variant) { + return NAME + "-" + variant; } - public static String deliver(int seq) { - return DELIVER + "-" + seq; + public static String deliver(Object variant) { + return DELIVER + "-" + variant; } - public static String bucket(int seq) { - return BUCKET + "-" + seq; + public static String bucket(Object variant) { + return BUCKET + "-" + variant; } - public static String key(int seq) { - return KEY + "-" + seq; + public static String key(Object variant) { + return KEY + "-" + variant; } - public static String messageId(int seq) { - return MESSAGE_ID + "-" + seq; + public static String messageId(Object variant) { + return MESSAGE_ID + "-" + variant; } - public static String data(int seq) { - return DATA + "-" + seq; + public static String data(Object variant) { + return DATA + "-" + variant; } - public static byte[] dataBytes(int seq) { - return data(seq).getBytes(StandardCharsets.US_ASCII); + public static byte[] dataBytes(Object variant) { + return data(variant).getBytes(StandardCharsets.US_ASCII); } public static NatsMessage getDataMessage(String data) {