diff --git a/src/test/java/io/boomerang/eventing/nats/jetstream/PubSubTransceiverTest.java b/src/test/java/io/boomerang/eventing/nats/jetstream/PubSubTransceiverTest.java index b9fb984..100f135 100644 --- a/src/test/java/io/boomerang/eventing/nats/jetstream/PubSubTransceiverTest.java +++ b/src/test/java/io/boomerang/eventing/nats/jetstream/PubSubTransceiverTest.java @@ -9,11 +9,16 @@ import java.nio.file.Paths; import java.text.MessageFormat; import java.time.Duration; +import java.util.AbstractMap.SimpleEntry; +import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; @@ -64,12 +69,13 @@ void cleanUpServer() { } @Test - void testPubSubPushConsumer() throws Exception { + void testPubSubPushConsumerOneMessage() throws Exception { List testSubjects = List.of("test1", "test2", "test3", "test4", "test4"); String testMessage = "Test message!"; natsServer.start(); + // Create connection and the transceiver object final ConnectionPrimer connectionPrimer = new ConnectionPrimer(serverUrl); PubSubTransceiver pubSubTransceiver = new PubSubTransceiver(connectionPrimer, new StreamConfiguration.Builder().name("test").subjects(testSubjects).build(), @@ -80,10 +86,12 @@ void testPubSubPushConsumer() throws Exception { AtomicBoolean testMatch = new AtomicBoolean(false); + // Publish one message assertDoesNotThrow(() -> pubSubTransceiver.publish( testSubjects.get(ThreadLocalRandom.current().nextInt(0, testSubjects.size())), testMessage)); + // Subscribe to a push-based consumer assertDoesNotThrow(() -> pubSubTransceiver.subscribe(new SubHandler() { @Override public void newMessageReceived(PubSubTunnel tunnel, String subject, String message) { @@ -92,7 +100,8 @@ public void newMessageReceived(PubSubTunnel tunnel, String subject, String messa } })); - Awaitility.await().atMost(Duration.ofSeconds(2)).with().pollInterval(Duration.ofMillis(500)) + // Wait until the subscription has received the published message + Awaitility.await().atMost(Duration.ofSeconds(4)).with().pollInterval(Duration.ofMillis(500)) .until(() -> testMatch.get()); assertTrue(testMatch.get()); @@ -102,12 +111,13 @@ public void newMessageReceived(PubSubTunnel tunnel, String subject, String messa } @Test - void testPubSubPullConsumer() throws Exception { + void testPubSubPullConsumerOneMessage() throws Exception { List testSubjects = List.of("test1", "test2", "test3", "test4", "test4"); String testMessage = "Test message!"; natsServer.start(); + // Create connection and the transceiver object final ConnectionPrimer connectionPrimer = new ConnectionPrimer(serverUrl); PubSubTransceiver pubSubTransceiver = new PubSubTransceiver(connectionPrimer, new StreamConfiguration.Builder().name("test").subjects(testSubjects).build(), @@ -117,10 +127,12 @@ void testPubSubPullConsumer() throws Exception { AtomicBoolean testMatch = new AtomicBoolean(false); + // Publish one message assertDoesNotThrow(() -> pubSubTransceiver.publish( testSubjects.get(ThreadLocalRandom.current().nextInt(0, testSubjects.size())), testMessage)); + // Subscribe to a pull-based consumer assertDoesNotThrow(() -> pubSubTransceiver.subscribe(new SubHandler() { @Override public void newMessageReceived(PubSubTunnel tunnel, String subject, String message) { @@ -129,7 +141,8 @@ public void newMessageReceived(PubSubTunnel tunnel, String subject, String messa } })); - Awaitility.await().atMost(Duration.ofSeconds(2)).with().pollInterval(Duration.ofMillis(500)) + // Wait until the subscription has received the published message + Awaitility.await().atMost(Duration.ofSeconds(4)).with().pollInterval(Duration.ofMillis(500)) .until(() -> testMatch.get()); assertTrue(testMatch.get()); @@ -137,4 +150,103 @@ public void newMessageReceived(PubSubTunnel tunnel, String subject, String messa assertDoesNotThrow(() -> connectionPrimer.close()); natsServer.stop(); } + + @Test + void testPubSubPushConsumerManyMessages() throws Exception { + List testSubjects = List.of("test1", "test2", "test3", "test4", "test4"); + List testMessages = + List.of("Test message!", "Hey", "there", "how", "are", "you? :)", "fu covid"); + + // Generate random messages that will be published + List> messages = testMessages.stream() + .map(message -> new SimpleEntry<>( + testSubjects.get(ThreadLocalRandom.current().nextInt(0, testSubjects.size())), message)) + .collect(Collectors.toList()); + ArrayList matches = new ArrayList<>(Collections.nCopies(messages.size(), false)); + + natsServer.start(); + + // Create connection and the transceiver object + final ConnectionPrimer connectionPrimer = new ConnectionPrimer(serverUrl); + PubSubTransceiver pubSubTransceiver = new PubSubTransceiver(connectionPrimer, + new StreamConfiguration.Builder().name("test").subjects(testSubjects).build(), + new ConsumerConfiguration.Builder().durable("test-consumer-push") + .deliverSubject("delivery.test").build(), + new PubSubConfiguration.Builder().automaticallyCreateStream(true) + .automaticallyCreateConsumer(true).build()); + + // Publish many message + for (SimpleEntry message : messages) { + assertDoesNotThrow(() -> pubSubTransceiver.publish(message.getKey(), message.getValue())); + } + + // Subscribe to a push-based consumer + assertDoesNotThrow(() -> pubSubTransceiver.subscribe(new SubHandler() { + @Override + public void newMessageReceived(PubSubTunnel tunnel, String subject, String message) { + IntStream.range(0, matches.size()) + .filter(idx -> messages.get(idx).getKey().equals(subject) + && messages.get(idx).getValue().equals(message)) + .findFirst().ifPresent(idx -> matches.set(idx, true)); + } + })); + + // Wait until the subscription has received all the published message + Awaitility.await().atMost(Duration.ofSeconds(8)).with().pollInterval(Duration.ofMillis(500)) + .until(() -> matches.stream().allMatch(Boolean::valueOf)); + + assertTrue(matches.stream().allMatch(Boolean::valueOf)); + + assertDoesNotThrow(() -> connectionPrimer.close()); + natsServer.stop(); + } + + @Test + void testPubSubPullConsumerManyMessages() throws Exception { + List testSubjects = List.of("test1", "test2", "test3", "test4", "test4"); + List testMessages = + List.of("Test message!", "Hey", "there", "how", "are", "you? :)", "fu covid"); + + // Generate random messages that will be published + List> messages = testMessages.stream() + .map(message -> new SimpleEntry<>( + testSubjects.get(ThreadLocalRandom.current().nextInt(0, testSubjects.size())), message)) + .collect(Collectors.toList()); + ArrayList matches = new ArrayList<>(Collections.nCopies(messages.size(), false)); + + natsServer.start(); + + // Create connection and the transceiver object + final ConnectionPrimer connectionPrimer = new ConnectionPrimer(serverUrl); + PubSubTransceiver pubSubTransceiver = new PubSubTransceiver(connectionPrimer, + new StreamConfiguration.Builder().name("test").subjects(testSubjects).build(), + new ConsumerConfiguration.Builder().durable("test-consumer-pull").build(), + new PubSubConfiguration.Builder().automaticallyCreateStream(true) + .automaticallyCreateConsumer(true).build()); + + // Publish many message + for (SimpleEntry message : messages) { + assertDoesNotThrow(() -> pubSubTransceiver.publish(message.getKey(), message.getValue())); + } + + // Subscribe to a pull-based consumer + assertDoesNotThrow(() -> pubSubTransceiver.subscribe(new SubHandler() { + @Override + public void newMessageReceived(PubSubTunnel tunnel, String subject, String message) { + IntStream.range(0, matches.size()) + .filter(idx -> messages.get(idx).getKey().equals(subject) + && messages.get(idx).getValue().equals(message)) + .findFirst().ifPresent(idx -> matches.set(idx, true)); + } + })); + + // Wait until the subscription has received all the published message + Awaitility.await().atMost(Duration.ofSeconds(8)).with().pollInterval(Duration.ofMillis(500)) + .until(() -> matches.stream().allMatch(Boolean::valueOf)); + + assertTrue(matches.stream().allMatch(Boolean::valueOf)); + + assertDoesNotThrow(() -> connectionPrimer.close()); + natsServer.stop(); + } }