Skip to content

Commit

Permalink
Add Azure Event Hubs Emulator container to Azure module
Browse files Browse the repository at this point in the history
- Fix code review findings

Signed-off-by: Esta Nagy <[email protected]>
  • Loading branch information
nagyesta committed Jan 22, 2025
1 parent 7071595 commit 69d2b6f
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,39 @@ public class AzureEventHubsEmulatorContainer extends GenericContainer<AzureEvent
"mcr.microsoft.com/azure-messaging/eventhubs-emulator"
);

private final AzuriteContainer azuriteContainer;
private AzuriteContainer azuriteContainer;

private boolean useKafka;

/**
* @param dockerImageName specified docker image name to run
*/
public AzureEventHubsEmulatorContainer(
final DockerImageName dockerImageName,
final AzuriteContainer azuriteContainer
) {
public AzureEventHubsEmulatorContainer(final String dockerImageName) {
this(DockerImageName.parse(dockerImageName));
}

/**
* @param dockerImageName specified docker image name to run
*/
public AzureEventHubsEmulatorContainer(final DockerImageName dockerImageName) {
super(dockerImageName);
dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);
this.azuriteContainer = azuriteContainer;
dependsOn(this.azuriteContainer);
waitingFor(Wait.forLogMessage(".*Emulator Service is Successfully Up!.*", 1));
withExposedPorts(DEFAULT_AMQP_PORT);
}

/**
* * Sets the Azurite dependency needed by the Event Hubs Container,
*
* @param azuriteContainer The Azurite container used by Event HUbs as a dependency
* @return this
*/
public AzureEventHubsEmulatorContainer withAzuriteContainer(final AzuriteContainer azuriteContainer) {
this.azuriteContainer = azuriteContainer;
dependsOn(this.azuriteContainer);
return this;
}

/**
* Provide the broker configuration to the container.
*
Expand Down Expand Up @@ -83,6 +97,13 @@ public AzureEventHubsEmulatorContainer enableKafka() {

@Override
protected void configure() {
if (azuriteContainer == null) {
throw new IllegalStateException(
"The image " +
getDockerImageName() +
" requires an Azurite container. Please provide one with the withAzuriteContainer method!"
);
}
final String azuriteHost = azuriteContainer.getNetworkAliases().get(0);
withEnv("BLOB_SERVER", azuriteHost);
withEnv("METADATA_SERVER", azuriteHost);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.awaitility.Awaitility;
import org.junit.Rule;
import org.junit.Test;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.MountableFile;

import java.time.Duration;
Expand All @@ -32,87 +30,85 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.tuple;
import static org.awaitility.Awaitility.await;
import static org.awaitility.Awaitility.waitAtMost;

public class AzureEventHubsEmulatorContainerTest {

@Rule
// network {
public Network network = Network.newNetwork();

// }

@Rule
// azuriteContainer {
public AzuriteContainer azuriteContainer = new AzuriteContainer("mcr.microsoft.com/azure-storage/azurite:3.33.0")
.withNetwork(network);

.withNetwork(network);
// }

@Rule
// emulatorContainer {
public AzureEventHubsEmulatorContainer emulator = new AzureEventHubsEmulatorContainer(
DockerImageName.parse("mcr.microsoft.com/azure-messaging/eventhubs-emulator:2.0.1"),
azuriteContainer
"mcr.microsoft.com/azure-messaging/eventhubs-emulator:2.0.1"
)
.acceptLicense()
.enableKafka() //optional
.withNetwork(network)
.withConfig(MountableFile.forClasspathResource("/eventhubs_config.json"));

.acceptLicense()
.enableKafka() //optional
.withNetwork(network)
.withConfig(MountableFile.forClasspathResource("/eventhubs_config.json"))
.withAzuriteContainer(azuriteContainer);
// }

@Test
public void testWithEventHubsClient() {
try (
// createProducerAndConsumer {
EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString(emulator.getConnectionString())
.fullyQualifiedNamespace("emulatorNs1")
.eventHubName("eh1")
.buildProducerClient();
EventHubConsumerClient consumer = new EventHubClientBuilder()
.connectionString(emulator.getConnectionString())
.fullyQualifiedNamespace("emulatorNs1")
.eventHubName("eh1")
.consumerGroup("cg1")
.buildConsumerClient()
// }
// createProducerAndConsumer {
EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString(emulator.getConnectionString())
.fullyQualifiedNamespace("emulatorNs1")
.eventHubName("eh1")
.buildProducerClient();
EventHubConsumerClient consumer = new EventHubClientBuilder()
.connectionString(emulator.getConnectionString())
.fullyQualifiedNamespace("emulatorNs1")
.eventHubName("eh1")
.consumerGroup("cg1")
.buildConsumerClient()
// }
) {
producer.send(Collections.singletonList(new EventData("test")));

waitAtMost(Duration.ofSeconds(30))
.pollDelay(Duration.ofSeconds(5))
.untilAsserted(() -> {
IterableStream<PartitionEvent> events = consumer.receiveFromPartition(
"0",
1,
EventPosition.earliest(),
Duration.ofSeconds(2)
);
Optional<PartitionEvent> event = events.stream().findFirst();
assertThat(event).isPresent();
assertThat(event.get().getData().getBodyAsString()).isEqualTo("test");
});
.pollDelay(Duration.ofSeconds(5))
.untilAsserted(() -> {
IterableStream<PartitionEvent> events = consumer.receiveFromPartition(
"0",
1,
EventPosition.earliest(),
Duration.ofSeconds(2)
);
Optional<PartitionEvent> event = events.stream().findFirst();
assertThat(event).isPresent();
assertThat(event.get().getData().getBodyAsString()).isEqualTo("test");
});
}
}

@Test
public void testWithKafkaClient() throws Exception {
// kafkaProperties {
ImmutableMap<String, String> commonProperties = ImmutableMap
.<String, String>builder()
.put("bootstrap.servers", emulator.getBootstrapServers())
.put("sasl.mechanism", "PLAIN")
.put("security.protocol", "SASL_PLAINTEXT")
.put(
"sasl.jaas.config",
String.format(
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";",
emulator.getConnectionString()
.<String, String>builder()
.put("bootstrap.servers", emulator.getBootstrapServers())
.put("sasl.mechanism", "PLAIN")
.put("security.protocol", "SASL_PLAINTEXT")
.put(
"sasl.jaas.config",
String.format(
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";",
emulator.getConnectionString()
)
)
)
.build();
.build();
// }

Properties producerProperties = new Properties();
Expand All @@ -123,34 +119,34 @@ public void testWithKafkaClient() throws Exception {
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "tc-" + UUID.randomUUID());
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.putAll(commonProperties);

try (
KafkaProducer<String, String> producer = new KafkaProducer<>(
producerProperties,
new StringSerializer(),
new StringSerializer()
);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
consumerProperties,
new StringDeserializer(),
new StringDeserializer()
);
KafkaProducer<String, String> producer = new KafkaProducer<>(
producerProperties,
new StringSerializer(),
new StringSerializer()
);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
consumerProperties,
new StringDeserializer(),
new StringDeserializer()
);
) {
String topicName = "eh1";
consumer.subscribe(Collections.singletonList(topicName));

producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get();

Awaitility
.await()
.atMost(Duration.ofSeconds(10))
.untilAsserted(() -> {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

assertThat(records)
.hasSize(1)
.extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value)
.containsExactly(tuple(topicName, "testcontainers", "rulezzz"));
});
await()
.atMost(Duration.ofSeconds(10))
.untilAsserted(() -> {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

assertThat(records)
.hasSize(1)
.extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value)
.containsExactly(tuple(topicName, "testcontainers", "rulezzz"));
});

consumer.unsubscribe();
}
Expand Down

0 comments on commit 69d2b6f

Please sign in to comment.