Skip to content

Commit

Permalink
Added tests for publishing and subscribing for receiving many messages
Browse files Browse the repository at this point in the history
  • Loading branch information
corcoja committed Jan 25, 2022
1 parent 746476c commit ba8ffd8
Showing 1 changed file with 116 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,16 @@
import java.nio.file.Paths;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -64,12 +69,13 @@ void cleanUpServer() {
}

@Test
void testPubSubPushConsumer() throws Exception {
void testPubSubPushConsumerOneMessage() throws Exception {
List<String> testSubjects = List.of("test1", "test2", "test3", "test4", "test4");
String testMessage = "Test message!";

natsServer.start();

// Create connection and the transceiver object
final ConnectionPrimer connectionPrimer = new ConnectionPrimer(serverUrl);
PubSubTransceiver pubSubTransceiver = new PubSubTransceiver(connectionPrimer,
new StreamConfiguration.Builder().name("test").subjects(testSubjects).build(),
Expand All @@ -80,10 +86,12 @@ void testPubSubPushConsumer() throws Exception {

AtomicBoolean testMatch = new AtomicBoolean(false);

// Publish one message
assertDoesNotThrow(() -> pubSubTransceiver.publish(
testSubjects.get(ThreadLocalRandom.current().nextInt(0, testSubjects.size())),
testMessage));

// Subscribe to a push-based consumer
assertDoesNotThrow(() -> pubSubTransceiver.subscribe(new SubHandler() {
@Override
public void newMessageReceived(PubSubTunnel tunnel, String subject, String message) {
Expand All @@ -92,7 +100,8 @@ public void newMessageReceived(PubSubTunnel tunnel, String subject, String messa
}
}));

Awaitility.await().atMost(Duration.ofSeconds(2)).with().pollInterval(Duration.ofMillis(500))
// Wait until the subscription has received the published message
Awaitility.await().atMost(Duration.ofSeconds(4)).with().pollInterval(Duration.ofMillis(500))
.until(() -> testMatch.get());

assertTrue(testMatch.get());
Expand All @@ -102,12 +111,13 @@ public void newMessageReceived(PubSubTunnel tunnel, String subject, String messa
}

@Test
void testPubSubPullConsumer() throws Exception {
void testPubSubPullConsumerOneMessage() throws Exception {
List<String> testSubjects = List.of("test1", "test2", "test3", "test4", "test4");
String testMessage = "Test message!";

natsServer.start();

// Create connection and the transceiver object
final ConnectionPrimer connectionPrimer = new ConnectionPrimer(serverUrl);
PubSubTransceiver pubSubTransceiver = new PubSubTransceiver(connectionPrimer,
new StreamConfiguration.Builder().name("test").subjects(testSubjects).build(),
Expand All @@ -117,10 +127,12 @@ void testPubSubPullConsumer() throws Exception {

AtomicBoolean testMatch = new AtomicBoolean(false);

// Publish one message
assertDoesNotThrow(() -> pubSubTransceiver.publish(
testSubjects.get(ThreadLocalRandom.current().nextInt(0, testSubjects.size())),
testMessage));

// Subscribe to a pull-based consumer
assertDoesNotThrow(() -> pubSubTransceiver.subscribe(new SubHandler() {
@Override
public void newMessageReceived(PubSubTunnel tunnel, String subject, String message) {
Expand All @@ -129,12 +141,112 @@ public void newMessageReceived(PubSubTunnel tunnel, String subject, String messa
}
}));

Awaitility.await().atMost(Duration.ofSeconds(2)).with().pollInterval(Duration.ofMillis(500))
// Wait until the subscription has received the published message
Awaitility.await().atMost(Duration.ofSeconds(4)).with().pollInterval(Duration.ofMillis(500))
.until(() -> testMatch.get());

assertTrue(testMatch.get());

assertDoesNotThrow(() -> connectionPrimer.close());
natsServer.stop();
}

@Test
void testPubSubPushConsumerManyMessages() throws Exception {
List<String> testSubjects = List.of("test1", "test2", "test3", "test4", "test4");
List<String> testMessages =
List.of("Test message!", "Hey", "there", "how", "are", "you? :)", "fu covid");

// Generate random messages that will be published
List<SimpleEntry<String, String>> messages = testMessages.stream()
.map(message -> new SimpleEntry<>(
testSubjects.get(ThreadLocalRandom.current().nextInt(0, testSubjects.size())), message))
.collect(Collectors.toList());
ArrayList<Boolean> matches = new ArrayList<>(Collections.nCopies(messages.size(), false));

natsServer.start();

// Create connection and the transceiver object
final ConnectionPrimer connectionPrimer = new ConnectionPrimer(serverUrl);
PubSubTransceiver pubSubTransceiver = new PubSubTransceiver(connectionPrimer,
new StreamConfiguration.Builder().name("test").subjects(testSubjects).build(),
new ConsumerConfiguration.Builder().durable("test-consumer-push")
.deliverSubject("delivery.test").build(),
new PubSubConfiguration.Builder().automaticallyCreateStream(true)
.automaticallyCreateConsumer(true).build());

// Publish many message
for (SimpleEntry<String, String> message : messages) {
assertDoesNotThrow(() -> pubSubTransceiver.publish(message.getKey(), message.getValue()));
}

// Subscribe to a push-based consumer
assertDoesNotThrow(() -> pubSubTransceiver.subscribe(new SubHandler() {
@Override
public void newMessageReceived(PubSubTunnel tunnel, String subject, String message) {
IntStream.range(0, matches.size())
.filter(idx -> messages.get(idx).getKey().equals(subject)
&& messages.get(idx).getValue().equals(message))
.findFirst().ifPresent(idx -> matches.set(idx, true));
}
}));

// Wait until the subscription has received all the published message
Awaitility.await().atMost(Duration.ofSeconds(8)).with().pollInterval(Duration.ofMillis(500))
.until(() -> matches.stream().allMatch(Boolean::valueOf));

assertTrue(matches.stream().allMatch(Boolean::valueOf));

assertDoesNotThrow(() -> connectionPrimer.close());
natsServer.stop();
}

@Test
void testPubSubPullConsumerManyMessages() throws Exception {
List<String> testSubjects = List.of("test1", "test2", "test3", "test4", "test4");
List<String> testMessages =
List.of("Test message!", "Hey", "there", "how", "are", "you? :)", "fu covid");

// Generate random messages that will be published
List<SimpleEntry<String, String>> messages = testMessages.stream()
.map(message -> new SimpleEntry<>(
testSubjects.get(ThreadLocalRandom.current().nextInt(0, testSubjects.size())), message))
.collect(Collectors.toList());
ArrayList<Boolean> matches = new ArrayList<>(Collections.nCopies(messages.size(), false));

natsServer.start();

// Create connection and the transceiver object
final ConnectionPrimer connectionPrimer = new ConnectionPrimer(serverUrl);
PubSubTransceiver pubSubTransceiver = new PubSubTransceiver(connectionPrimer,
new StreamConfiguration.Builder().name("test").subjects(testSubjects).build(),
new ConsumerConfiguration.Builder().durable("test-consumer-pull").build(),
new PubSubConfiguration.Builder().automaticallyCreateStream(true)
.automaticallyCreateConsumer(true).build());

// Publish many message
for (SimpleEntry<String, String> message : messages) {
assertDoesNotThrow(() -> pubSubTransceiver.publish(message.getKey(), message.getValue()));
}

// Subscribe to a pull-based consumer
assertDoesNotThrow(() -> pubSubTransceiver.subscribe(new SubHandler() {
@Override
public void newMessageReceived(PubSubTunnel tunnel, String subject, String message) {
IntStream.range(0, matches.size())
.filter(idx -> messages.get(idx).getKey().equals(subject)
&& messages.get(idx).getValue().equals(message))
.findFirst().ifPresent(idx -> matches.set(idx, true));
}
}));

// Wait until the subscription has received all the published message
Awaitility.await().atMost(Duration.ofSeconds(8)).with().pollInterval(Duration.ofMillis(500))
.until(() -> matches.stream().allMatch(Boolean::valueOf));

assertTrue(matches.stream().allMatch(Boolean::valueOf));

assertDoesNotThrow(() -> connectionPrimer.close());
natsServer.stop();
}
}

0 comments on commit ba8ffd8

Please sign in to comment.