Skip to content

Commit

Permalink
Simplification More Review (#948)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Aug 11, 2023
1 parent 19aa491 commit 4882a62
Show file tree
Hide file tree
Showing 45 changed files with 1,166 additions and 686 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ You can however set the deliver policy which will be used to start the subscript
| JsSoOrderedNotAllowedWithDurable | SO | 90106 | Durable is not allowed with an ordered consumer. |
| JsSoOrderedNotAllowedWithDeliverSubject | SO | 90107 | Deliver subject is not allowed with an ordered consumer. |
| JsSoOrderedRequiresAckPolicyNone | SO | 90108 | Ordered consumer requires Ack Policy None. |
| JsSoOrderedRequiresMaxDeliver | SO | 90109 | Max deliver is limited to 1 with an ordered consumer. |
| JsSoOrderedRequiresMaxDeliverOfOne | SO | 90109 | Max deliver is limited to 1 with an ordered consumer. |
| JsSoNameMismatch | SO | 90110 | Builder name must match the consumer configuration name if both are provided. |
| JsSoOrderedMemStorageNotSuppliedOrTrue | SO | 90111 | Mem Storage must be true if supplied. |
| JsSoOrderedReplicasNotSuppliedOrOne | SO | 90112 | Replicas must be 1 if supplied. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,31 +38,31 @@ public static void main(String[] args) {
createOrReplaceStream(nc.jetStreamManagement(), STREAM, SUBJECT);

// get a stream context from the connection
StreamContext streamContext = nc.streamContext(STREAM);
StreamContext streamContext = nc.getStreamContext(STREAM);
System.out.println("S1. " + streamContext.getStreamInfo());

// get a stream context from the connection, supplying custom JetStreamOptions
streamContext = nc.streamContext(STREAM, JetStreamOptions.builder().build());
streamContext = nc.getStreamContext(STREAM, JetStreamOptions.builder().build());
System.out.println("S2. " + streamContext.getStreamInfo());

// get a stream context from the JetStream context
streamContext = js.streamContext(STREAM);
streamContext = js.getStreamContext(STREAM);
System.out.println("S3. " + streamContext.getStreamInfo());

// when you create a consumer from the stream context you get a ConsumerContext in return
ConsumerContext consumerContext = streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build());
System.out.println("C1. " + consumerContext.getCachedConsumerInfo());

// get a ConsumerContext from the connection for a pre-existing consumer
consumerContext = nc.consumerContext(STREAM, CONSUMER_NAME);
consumerContext = nc.getConsumerContext(STREAM, CONSUMER_NAME);
System.out.println("C2. " + consumerContext.getCachedConsumerInfo());

// get a ConsumerContext from the connection for a pre-existing consumer, supplying custom JetStreamOptions
consumerContext = nc.consumerContext(STREAM, CONSUMER_NAME, JetStreamOptions.builder().build());
consumerContext = nc.getConsumerContext(STREAM, CONSUMER_NAME, JetStreamOptions.builder().build());
System.out.println("C3. " + consumerContext.getCachedConsumerInfo());

// get a ConsumerContext from the stream context for a pre-existing consumer
consumerContext = streamContext.consumerContext(CONSUMER_NAME);
consumerContext = streamContext.getConsumerContext(CONSUMER_NAME);
System.out.println("C4. " + consumerContext.getCachedConsumerInfo());
}
catch (JetStreamApiException | IOException | InterruptedException ioe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private static void simpleFetch(Connection nc, JetStream js, String label, int m
StreamContext streamContext;
ConsumerContext consumerContext;
try {
streamContext = nc.streamContext(STREAM);
streamContext = nc.getStreamContext(STREAM);
consumerContext = streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(consumerName).build());
}
catch (JetStreamApiException | IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private static void simpleFetch(Connection nc, JetStream js, String label, int m
StreamContext streamContext;
ConsumerContext consumerContext;
try {
streamContext = nc.streamContext(STREAM);
streamContext = nc.getStreamContext(STREAM);
consumerContext = streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(consumerName).build());
}
catch (JetStreamApiException | IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@
* SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE
*/
public class IterableConsumerExample {
private static final String STREAM = "manually-stream";
private static final String SUBJECT = "manually-subject";
private static final String CONSUMER_NAME = "manually-consumer";
private static final String MESSAGE_TEXT = "manually";
private static final String STREAM = "iterable-stream";
private static final String SUBJECT = "iterable-subject";
private static final String CONSUMER_NAME = "iterable-consumer";
private static final String MESSAGE_TEXT = "iterable";
private static final int STOP_COUNT = 500;
private static final int REPORT_EVERY = 50;
private static final int JITTER = 20;

private static final String SERVER = "nats://localhost:4222";

Expand All @@ -46,7 +45,7 @@ public static void main(String[] args) {
StreamContext streamContext;
ConsumerContext consumerContext;
try {
streamContext = nc.streamContext(STREAM);
streamContext = nc.getStreamContext(STREAM);
consumerContext = streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build());
}
catch (JetStreamApiException | IOException e) {
Expand All @@ -58,33 +57,44 @@ public static void main(String[] args) {
return;
}

System.out.println("Starting publish...");
Publisher publisher = new Publisher(js, SUBJECT, MESSAGE_TEXT, 10);
Thread pubThread = new Thread(publisher);
pubThread.start();

// set up the iterable consumer
Thread consumeThread = new Thread(() -> {
int count = 0;
long start = System.currentTimeMillis();
try (IterableConsumer consumer = consumerContext.consume()) {
try (IterableConsumer consumer = consumerContext.iterate()) {
System.out.println("Starting main loop.");
while (count < STOP_COUNT) {
Message msg = consumer.nextMessage(1000);
if (msg != null) {
msg.ack();
if (++count % REPORT_EVERY == 0) {
report("Main Loop Running", System.currentTimeMillis() - start, count);
report("Main loop running", System.currentTimeMillis() - start, count);
}
}
}
report("Main Loop Stopped", System.currentTimeMillis() - start, count);
report("Main loop stopped", System.currentTimeMillis() - start, count);

System.out.println("Pausing for effect...allow more messages come across.");
Thread.sleep(JITTER * 2); // allows more messages to come across
consumer.stop(1000);
// The consumer has at least 1 pull request active. When stop is called,
// no more pull requests will be made, but messages already requested
// will still come across the wire to the client.
consumer.stop();

System.out.println("Starting post-stop loop.");
Message msg = consumer.nextMessage(1000);
while (msg != null) {
msg.ack();
report("Post-stop loop running", System.currentTimeMillis() - start, ++count);
msg = consumer.nextMessage(1000);
while (!consumer.isFinished()) {
Message msg = consumer.nextMessage(1000);
if (msg != null) {
msg.ack();
if (++count % REPORT_EVERY == 0) {
report("Post-stop loop running", System.currentTimeMillis() - start, ++count);
}
}
}
report("Post-stop loop stopped", System.currentTimeMillis() - start, count);
}
catch (JetStreamStatusCheckedException | InterruptedException | IOException | JetStreamApiException e) {
// JetStreamStatusCheckedException:
Expand All @@ -105,13 +115,9 @@ public static void main(String[] args) {
report("Done", System.currentTimeMillis() - start, count);
});
consumeThread.start();

Publisher publisher = new Publisher(js, SUBJECT, MESSAGE_TEXT, JITTER);
Thread pubThread = new Thread(publisher);
pubThread.start();

consumeThread.join();
publisher.stopPublishing();

publisher.stopPublishing(); // otherwise it will complain when the connection goes away
pubThread.join();
}
catch (IOException | InterruptedException ioe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import static io.nats.examples.jetstream.simple.Utils.Publisher;
import static io.nats.examples.jetstream.simple.Utils.createOrReplaceStream;
import static io.nats.examples.jetstream.simple.Utils.publish;

/**
* This example will demonstrate simplified consume with a handler
* SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE
*/
public class MessageConsumerExample {
private static final String STREAM = "consume-handler-stream";
private static final String SUBJECT = "consume-handler-subject";
private static final String CONSUMER_NAME = "consume-handler-consumer";
private static final String MESSAGE_TEXT = "consume-handler";
private static final String STREAM = "consume-stream";
private static final String SUBJECT = "consume-subject";
private static final String CONSUMER_NAME = "consume-consumer";
private static final String MESSAGE_TEXT = "consume";
private static final int STOP_COUNT = 500;
private static final int REPORT_EVERY = 100;

Expand All @@ -43,17 +43,18 @@ public static void main(String[] args) {
JetStream js = nc.jetStream();
createOrReplaceStream(nc.jetStreamManagement(), STREAM, SUBJECT);

// publishing so there are lots of messages
System.out.println("Publishing...");
publish(js, SUBJECT, MESSAGE_TEXT, 2500);
System.out.println("Starting publish...");
Publisher publisher = new Publisher(js, SUBJECT, MESSAGE_TEXT, 10);
Thread pubThread = new Thread(publisher);
pubThread.start();

// get stream context, create consumer and get the consumer context
StreamContext streamContext;
ConsumerContext consumerContext;
try {
streamContext = nc.streamContext(STREAM);
streamContext = nc.getStreamContext(STREAM);
streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build());
consumerContext = streamContext.consumerContext(CONSUMER_NAME);
consumerContext = streamContext.getConsumerContext(CONSUMER_NAME);
}
catch (JetStreamApiException | IOException e) {
// JetStreamApiException:
Expand All @@ -78,13 +79,19 @@ public static void main(String[] args) {
};

// create the consumer then use it
try {
MessageConsumer consumer = consumerContext.consume(handler);
try (MessageConsumer consumer = consumerContext.consume(handler)) {
latch.await();
// once the consumer is stopped, the client will drain messages

// The consumer has at least 1 pull request active. When stop is called,
// no more pull requests will be made, but messages already requested
// will still come across the wire to the client.
System.out.println("Stop the consumer...");
consumer.stop(1000);
Thread.sleep(1000); // enough for messages to drain after stop
consumer.stop();

// wait until the consumer is finished
while (!consumer.isFinished()) {
Thread.sleep(10);
}
}
catch (JetStreamApiException | IOException e) {
// JetStreamApiException:
Expand All @@ -103,6 +110,9 @@ public static void main(String[] args) {
}

report("Final", start, atomicCount.get());

publisher.stopPublishing(); // otherwise it will complain when the connection goes away
pubThread.join();
}
catch (IOException | InterruptedException ioe) {
// IOException:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public static void main(String[] args) {
StreamContext streamContext;
ConsumerContext consumerContext;
try {
streamContext = nc.streamContext(STREAM);
streamContext = nc.getStreamContext(STREAM);
consumerContext = streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build());
}
catch (JetStreamApiException | IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static void main(String[] args) {
// to allow ConsumerContexts to be created from the name.
StreamContext streamContext;
try {
streamContext = nc.streamContext(STREAM);
streamContext = nc.getStreamContext(STREAM);
streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build());
}
catch (JetStreamApiException | IOException e) {
Expand Down Expand Up @@ -139,7 +139,7 @@ public HandlerConsumerHolder(int id, StreamContext sc, CountDownLatch latch) thr

@Override
public void stop() throws InterruptedException {
messageConsumer.stop(1000);
messageConsumer.stop();
}
}

Expand All @@ -150,7 +150,7 @@ static class IterableConsumerHolder extends ConsumerHolder {

public IterableConsumerHolder(int id, StreamContext sc, CountDownLatch latch) throws JetStreamApiException, IOException {
super(id, sc, latch);
iterableConsumer = consumerContext.consume();
iterableConsumer = consumerContext.iterate();
t = new Thread(() -> {
while (latch.getCount() > 0) {
try {
Expand Down Expand Up @@ -187,7 +187,7 @@ public ConsumerHolder(int id, StreamContext sc, CountDownLatch latch) throws Jet
this.id = id;
thisReceived = new AtomicInteger();
this.latch = latch;
consumerContext = sc.consumerContext(CONSUMER_NAME);
consumerContext = sc.getConsumerContext(CONSUMER_NAME);
}

public abstract void stop() throws InterruptedException;
Expand Down
13 changes: 10 additions & 3 deletions src/examples/java/io/nats/examples/jetstream/simple/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class Utils {

Expand Down Expand Up @@ -58,28 +59,34 @@ public static class Publisher implements Runnable {
private final String messageText;
private final int jitter;
private final AtomicBoolean keepGoing = new AtomicBoolean(true);
private int pubNo;
private final AtomicInteger pubCount;

public Publisher(JetStream js, String subject, String messageText, int jitter) {
this.js = js;
this.subject = subject;
this.messageText = messageText;
this.jitter = jitter;
pubCount = new AtomicInteger();
}

public void stopPublishing() {
keepGoing.set(false);
}

public int getPubCount() {
return pubCount.get();
}

@Override
public void run() {
try {
while (keepGoing.get()) {
//noinspection BusyWait
Thread.sleep(ThreadLocalRandom.current().nextLong(jitter));
js.publish(subject, (messageText + "-" + (++pubNo)).getBytes());
js.publish(subject, (messageText + "-" + (pubCount.incrementAndGet())).getBytes());
}
} catch (Exception e) {
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/BaseConsumeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ protected BaseConsumeOptions(Builder b) {
idleHeartbeat = Math.min(MAX_HEARTBEAT_MILLIS, expiresIn * MAX_IDLE_HEARTBEAT_PERCENT / 100);
}

public long getExpiresIn() {
public long getExpiresInMillis() {
return expiresIn;
}

Expand Down
Loading

0 comments on commit 4882a62

Please sign in to comment.