Skip to content

Commit

Permalink
simplified ordered consumer from context
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Aug 3, 2023
1 parent ce2dba9 commit ded756a
Show file tree
Hide file tree
Showing 22 changed files with 486 additions and 329 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public static void main(String[] args) {
System.out.println("C3. " + consumerContext.getCachedConsumerInfo());

// get a ConsumerContext from the stream context for a pre-existing consumer
consumerContext = streamContext.consumerContext(CONSUMER_NAME);
consumerContext = streamContext.createConsumerContext(CONSUMER_NAME);
System.out.println("C4. " + consumerContext.getCachedConsumerInfo());
}
catch (JetStreamApiException | IOException | InterruptedException ioe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static void main(String[] args) {
Thread consumeThread = new Thread(() -> {
int count = 0;
long start = System.currentTimeMillis();
try (IterableConsumer consumer = consumerContext.startIterate()) {
try (IterableConsumer consumer = consumerContext.iterate()) {
System.out.println("Starting main loop.");
while (count < STOP_COUNT) {
Message msg = consumer.nextMessage(1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public static void main(String[] args) {
try {
streamContext = nc.streamContext(STREAM);
streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build());
consumerContext = streamContext.consumerContext(CONSUMER_NAME);
consumerContext = streamContext.createConsumerContext(CONSUMER_NAME);
}
catch (JetStreamApiException | IOException e) {
// JetStreamApiException:
Expand All @@ -79,7 +79,7 @@ public static void main(String[] args) {

// create the consumer then use it
try {
MessageConsumer consumer = consumerContext.startConsume(handler);
MessageConsumer consumer = consumerContext.consume(handler);
latch.await();
// once the consumer is stopped, the client will drain messages
System.out.println("Stop the consumer...");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ static class HandlerConsumerHolder extends ConsumerHolder {

public HandlerConsumerHolder(int id, StreamContext sc, CountDownLatch latch) throws JetStreamApiException, IOException {
super(id, sc, latch);
messageConsumer = consumerContext.startConsume(msg -> {
messageConsumer = consumerContext.consume(msg -> {
thisReceived.incrementAndGet();
latch.countDown();
String data = new String(msg.getData(), StandardCharsets.US_ASCII);
Expand All @@ -150,7 +150,7 @@ static class IterableConsumerHolder extends ConsumerHolder {

public IterableConsumerHolder(int id, StreamContext sc, CountDownLatch latch) throws JetStreamApiException, IOException {
super(id, sc, latch);
iterableConsumer = consumerContext.startIterate();
iterableConsumer = consumerContext.iterate();
t = new Thread(() -> {
while (latch.getCount() > 0) {
try {
Expand Down Expand Up @@ -187,7 +187,7 @@ public ConsumerHolder(int id, StreamContext sc, CountDownLatch latch) throws Jet
this.id = id;
thisReceived = new AtomicInteger();
this.latch = latch;
consumerContext = sc.consumerContext(CONSUMER_NAME);
consumerContext = sc.createConsumerContext(CONSUMER_NAME);
}

public abstract void stop() throws InterruptedException;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/BaseConsumeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ protected BaseConsumeOptions(Builder b) {
idleHeartbeat = Math.min(MAX_HEARTBEAT_MILLIS, expiresIn * MAX_IDLE_HEARTBEAT_PERCENT / 100);
}

public long getExpiresIn() {
public long getExpiresInMillis() {
return expiresIn;
}

Expand Down
8 changes: 4 additions & 4 deletions src/main/java/io/nats/client/ConsumerContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public interface ConsumerContext {
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
IterableConsumer startIterate() throws IOException, JetStreamApiException;
IterableConsumer iterate() throws IOException, JetStreamApiException;

/**
* Start a long-running IterableConsumer with custom ConsumeOptions. See {@link IterableConsumer} and {@link ConsumeOptions}
Expand All @@ -132,7 +132,7 @@ public interface ConsumerContext {
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
IterableConsumer startIterate(ConsumeOptions consumeOptions) throws IOException, JetStreamApiException;
IterableConsumer iterate(ConsumeOptions consumeOptions) throws IOException, JetStreamApiException;

/**
* Start a long-running MessageConsumer with default ConsumeOptions. See {@link MessageConsumer} and {@link ConsumeOptions}
Expand All @@ -142,7 +142,7 @@ public interface ConsumerContext {
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
MessageConsumer startConsume(MessageHandler handler) throws IOException, JetStreamApiException;
MessageConsumer consume(MessageHandler handler) throws IOException, JetStreamApiException;

/**
* Start a long-running MessageConsumer with custom ConsumeOptions. See {@link MessageConsumer} and {@link ConsumeOptions}
Expand All @@ -153,5 +153,5 @@ public interface ConsumerContext {
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
MessageConsumer startConsume(MessageHandler handler, ConsumeOptions consumeOptions) throws IOException, JetStreamApiException;
MessageConsumer consume(MessageHandler handler, ConsumeOptions consumeOptions) throws IOException, JetStreamApiException;
}
4 changes: 4 additions & 0 deletions src/main/java/io/nats/client/MessageConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,8 @@ public interface MessageConsumer extends AutoCloseable {
* @throws InterruptedException if one is thrown, in order to propagate it up
*/
void stop(long timeout) throws InterruptedException;

boolean isStopped();

boolean isFinished();
}
60 changes: 12 additions & 48 deletions src/main/java/io/nats/client/StreamContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,25 @@ public interface StreamContext {
PurgeResponse purge(PurgeOptions options) throws IOException, JetStreamApiException;

/**
* Create a consumer context for on the context's stream and specific named consumer.
* Create a consumer context for the context's stream and specific named consumer.
* Verifies that the consumer exists.
* @param consumerName the name of the consumer
* @return a ConsumerContext object
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
ConsumerContext consumerContext(String consumerName) throws IOException, JetStreamApiException;
ConsumerContext createConsumerContext(String consumerName) throws IOException, JetStreamApiException;

/**
* Create an ordered consumer context for the context's stream.
* @param config the configuration for the ordered consumer
* @return a ConsumerContext object
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
ConsumerContext createOrderedConsumer(OrderedConsumerConfiguration config) throws IOException, JetStreamApiException;

/**
* Management function to create or update a consumer on this stream.
Expand Down Expand Up @@ -194,50 +204,4 @@ public interface StreamContext {
* @return true if the delete succeeded
*/
boolean deleteMessage(long seq, boolean erase) throws IOException, JetStreamApiException;

/**
* Start a long-running Ordered IterableConsumer. See {@link IterableConsumer} and {@link ConsumeOptions}
* IterableConsumer requires the developer call nextMessage.
* @param config the configuration for the ordered consumer
* @return the IterableConsumer instance
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
IterableConsumer startOrderedIterate(OrderedConsumerConfiguration config) throws IOException, JetStreamApiException;

/**
* Start a long-running Ordered IterableConsumer with custom ConsumeOptions. See {@link IterableConsumer} and {@link ConsumeOptions}
* IterableConsumer requires the developer call nextMessage.
* @param config the configuration for the ordered consumer
* @param consumeOptions the custom consume options
* @return the IterableConsumer instance
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
IterableConsumer startOrderedIterate(OrderedConsumerConfiguration config, ConsumeOptions consumeOptions) throws IOException, JetStreamApiException;

/**
* Start a long-running Ordered MessageConsumer. See {@link MessageConsumer} and {@link ConsumeOptions}
* @param config the configuration for the ordered consumer
* @param handler the MessageHandler used for receiving messages.
* @return the MessageConsumer instance
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
MessageConsumer startOrderedConsume(OrderedConsumerConfiguration config, MessageHandler handler) throws IOException, JetStreamApiException;

/**
* Start a long-running Ordered MessageConsumer with custom ConsumeOptions. See {@link MessageConsumer} and {@link ConsumeOptions}
* @param config the configuration for the ordered consumer
* @param handler the MessageHandler used for receiving messages.
* @param consumeOptions the custom consume options
* @return the MessageConsumer instance
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
MessageConsumer startOrderedConsume(OrderedConsumerConfiguration config, MessageHandler handler, ConsumeOptions consumeOptions) throws IOException, JetStreamApiException;
}
4 changes: 2 additions & 2 deletions src/main/java/io/nats/client/impl/NatsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -2148,15 +2148,15 @@ public StreamContext streamContext(String streamName, JetStreamOptions options)
*/
@Override
public ConsumerContext consumerContext(String streamName, String consumerName) throws IOException, JetStreamApiException {
return streamContext(streamName).consumerContext(consumerName);
return streamContext(streamName).createConsumerContext(consumerName);
}

/**
* {@inheritDoc}
*/
@Override
public ConsumerContext consumerContext(String streamName, String consumerName, JetStreamOptions options) throws IOException, JetStreamApiException {
return streamContext(streamName, options).consumerContext(consumerName);
return streamContext(streamName, options).createConsumerContext(consumerName);
}

/**
Expand Down
Loading

0 comments on commit ded756a

Please sign in to comment.