From 69d2b6f4b588964f50b54dc2ce67e5c0d8e6fc63 Mon Sep 17 00:00:00 2001 From: Esta Nagy Date: Thu, 16 Jan 2025 23:11:09 +0100 Subject: [PATCH] Add Azure Event Hubs Emulator container to Azure module - Fix code review findings Signed-off-by: Esta Nagy --- .../AzureEventHubsEmulatorContainer.java | 35 ++++- .../AzureEventHubsEmulatorContainerTest.java | 134 +++++++++--------- 2 files changed, 93 insertions(+), 76 deletions(-) diff --git a/modules/azure/src/main/java/org/testcontainers/azure/AzureEventHubsEmulatorContainer.java b/modules/azure/src/main/java/org/testcontainers/azure/AzureEventHubsEmulatorContainer.java index 7149b73281a..02bf61bff05 100644 --- a/modules/azure/src/main/java/org/testcontainers/azure/AzureEventHubsEmulatorContainer.java +++ b/modules/azure/src/main/java/org/testcontainers/azure/AzureEventHubsEmulatorContainer.java @@ -32,25 +32,39 @@ public class AzureEventHubsEmulatorContainer extends GenericContainer { - IterableStream events = consumer.receiveFromPartition( - "0", - 1, - EventPosition.earliest(), - Duration.ofSeconds(2) - ); - Optional event = events.stream().findFirst(); - assertThat(event).isPresent(); - assertThat(event.get().getData().getBodyAsString()).isEqualTo("test"); - }); + .pollDelay(Duration.ofSeconds(5)) + .untilAsserted(() -> { + IterableStream events = consumer.receiveFromPartition( + "0", + 1, + EventPosition.earliest(), + Duration.ofSeconds(2) + ); + Optional event = events.stream().findFirst(); + assertThat(event).isPresent(); + assertThat(event.get().getData().getBodyAsString()).isEqualTo("test"); + }); } } @@ -101,18 +97,18 @@ public void testWithEventHubsClient() { public void testWithKafkaClient() throws Exception { // kafkaProperties { ImmutableMap commonProperties = ImmutableMap - .builder() - .put("bootstrap.servers", emulator.getBootstrapServers()) - .put("sasl.mechanism", "PLAIN") - .put("security.protocol", "SASL_PLAINTEXT") - .put( - "sasl.jaas.config", - String.format( - "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";", - emulator.getConnectionString() + .builder() + .put("bootstrap.servers", emulator.getBootstrapServers()) + .put("sasl.mechanism", "PLAIN") + .put("security.protocol", "SASL_PLAINTEXT") + .put( + "sasl.jaas.config", + String.format( + "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";", + emulator.getConnectionString() + ) ) - ) - .build(); + .build(); // } Properties producerProperties = new Properties(); @@ -123,34 +119,34 @@ public void testWithKafkaClient() throws Exception { consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "tc-" + UUID.randomUUID()); consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProperties.putAll(commonProperties); + try ( - KafkaProducer producer = new KafkaProducer<>( - producerProperties, - new StringSerializer(), - new StringSerializer() - ); - KafkaConsumer consumer = new KafkaConsumer<>( - consumerProperties, - new StringDeserializer(), - new StringDeserializer() - ); + KafkaProducer producer = new KafkaProducer<>( + producerProperties, + new StringSerializer(), + new StringSerializer() + ); + KafkaConsumer consumer = new KafkaConsumer<>( + consumerProperties, + new StringDeserializer(), + new StringDeserializer() + ); ) { String topicName = "eh1"; consumer.subscribe(Collections.singletonList(topicName)); producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get(); - Awaitility - .await() - .atMost(Duration.ofSeconds(10)) - .untilAsserted(() -> { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - - assertThat(records) - .hasSize(1) - .extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value) - .containsExactly(tuple(topicName, "testcontainers", "rulezzz")); - }); + await() + .atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + + assertThat(records) + .hasSize(1) + .extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value) + .containsExactly(tuple(topicName, "testcontainers", "rulezzz")); + }); consumer.unsubscribe(); }