Skip to content

Commit

Permalink
chore : add tests (#637)
Browse files Browse the repository at this point in the history
* chore : add tests

* polish

* implement code review comments
  • Loading branch information
rajadilipkolli authored Jan 25, 2025
1 parent 4c8f499 commit 836420e
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void publishPersonWithGender(CapturedOutput output) {
}

@Test
void concurrentPublishing(CapturedOutput output) throws Exception {
void concurrentPublishing(CapturedOutput output) {
int numberOfRequests = 10;
for (int i = 0; i < numberOfRequests; i++) {
this.mockMvcTester
Expand Down Expand Up @@ -138,7 +138,7 @@ void publishPersonWithEmptyName() {
.hasStatus(HttpStatus.BAD_REQUEST)
.bodyJson()
.convertTo(ProblemDetail.class)
.satisfies(problemDetail -> assertBadRequestProblem(problemDetail));
.satisfies(this::assertBadRequestProblem);
}

@Test
Expand All @@ -153,7 +153,7 @@ void publishPersonWithNegativeAge() {
.hasStatus(HttpStatus.BAD_REQUEST)
.bodyJson()
.convertTo(ProblemDetail.class)
.satisfies(problemDetail -> assertBadRequestProblem(problemDetail));
.satisfies(this::assertBadRequestProblem);
}

private void assertBadRequestProblem(ProblemDetail problemDetail, String expectedDetail) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import org.springframework.messaging.handler.annotation.Header;

@MessagingGateway
interface KafkaGateway {
public interface KafkaGateway {

@Gateway(requestChannel = "toKafka.input")
void sendToKafka(String payload, @Header(KafkaHeaders.TOPIC) String topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ KafkaContainer kafkaContainer() {

@Bean
DynamicPropertyRegistrar kafkaProperties(KafkaContainer kafkaContainer) {
return (properties) -> {
properties.add("spring.kafka.bootstrapServers", kafkaContainer::getBootstrapServers);
};
return properties -> properties.add("spring.kafka.bootstrapServers", kafkaContainer::getBootstrapServers);
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,80 @@
package com.example.integration.kafkadsl;

import static org.assertj.core.api.Assertions.assertThat;

import com.example.integration.kafkadsl.config.KafkaAppProperties;
import com.example.integration.kafkadsl.config.KafkaGateway;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.Message;

@SpringBootTest(classes = ContainerConfiguration.class)
@SpringBootTest(classes = {ContainerConfiguration.class})
class KafkaDslApplicationTests {

@Autowired
KafkaGateway kafkaGateway;

@Autowired
KafkaAppProperties kafkaAppProperties;

@Autowired
KafkaTemplate<String, String> kafkaTemplate;

@Test
void sendMessageToKafka() {
String message = "test-message";
kafkaGateway.sendToKafka(message, kafkaAppProperties.topic());
Message<?> received = kafkaGateway.receiveFromKafka();
assertThat(received.getPayload()).isEqualTo(message);
}

@Test
void receiveMessageFromKafka() {
String message = "test-message";
kafkaTemplate.send(kafkaAppProperties.topic(), message);
Message<?> received = kafkaGateway.receiveFromKafka();
assertThat(received.getPayload()).isEqualTo(message);
}

@Test
void sendAndReceiveMultipleMessages() {
for (int i = 0; i < 10; i++) {
String message = "message" + i;
kafkaGateway.sendToKafka(message, kafkaAppProperties.topic());
}
for (int i = 0; i < 10; i++) {
Message<?> received = kafkaGateway.receiveFromKafka();
assertThat(received.getPayload()).isEqualTo("message" + i);
}
}

@Test
void sendMessageToNewTopic() {
String message = "new-topic-message";
kafkaGateway.sendToKafka(message, kafkaAppProperties.newTopic());
Message<?> received = kafkaGateway.receiveFromKafka();
assertThat(received.getPayload()).isEqualTo(message);
}

@Test
void receiveMessageFromNewTopic() {
String message = "new-topic-message";
kafkaTemplate.send(kafkaAppProperties.newTopic(), message);
Message<?> received = kafkaGateway.receiveFromKafka();
assertThat(received.getPayload()).isEqualTo(message);
}

@Test
void contextLoads() {}
void sendAndReceiveMultipleMessagesFromNewTopic() {
for (int i = 0; i < 10; i++) {
String message = "new-topic-message" + i;
kafkaGateway.sendToKafka(message, kafkaAppProperties.newTopic());
}
for (int i = 0; i < 10; i++) {
Message<?> received = kafkaGateway.receiveFromKafka();
assertThat(received.getPayload()).isEqualTo("new-topic-message" + i);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,34 @@ class KafkaSampleIntegrationTest {
private Receiver2 receiver2;

@Test
@Order(1)
@Order(101)
void sendAndReceiveMessage() throws Exception {
long initialCount = receiver2.getLatch().getCount();
this.mockMvcTester
.post()
.uri("/messages")
.content(this.objectMapper.writeValueAsString(new MessageDTO("test_1", "junitTest")))
.contentType(MediaType.APPLICATION_JSON)
.exchange()
.assertThat()
.hasStatusOk();

// 4 from topic1 and 3 from topic2 on startUp, plus 1 from test
await().pollInterval(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(30))
.untilAsserted(() -> assertThat(receiver2.getLatch().getCount()).isEqualTo(initialCount - 1));
.untilAsserted(() -> {
long currentCount = receiver2.getLatch().getCount();
assertThat(currentCount)
.as(
"Expected message count to decrease by 1, initial: %d, current: %d",
initialCount, currentCount)
.isEqualTo(initialCount - 1);
});
assertThat(receiver2.getDeadLetterLatch().getCount()).isEqualTo(1);
}

@Test
@Order(2)
@Order(102)
void sendAndReceiveMessageInDeadLetter() throws Exception {
this.mockMvcTester
.post()
Expand All @@ -74,6 +82,7 @@ void sendAndReceiveMessageInDeadLetter() throws Exception {
}

@Test
@Order(51)
void topicsWithPartitionsCount() {
String expectedJson =
"""
Expand Down Expand Up @@ -122,6 +131,7 @@ void topicsWithPartitionsCount() {
}

@Test
@Order(1)
void getListOfContainers() {
String expectedJson =
"""
Expand All @@ -143,6 +153,7 @@ void getListOfContainers() {
}

@Test
@Order(2)
void stopAndStartContainers() throws Exception {
String expectedJson =
"""
Expand Down Expand Up @@ -178,6 +189,7 @@ void stopAndStartContainers() throws Exception {
}

@Test
@Order(3)
void invalidContainerOperation() throws Exception {
this.mockMvcTester
.post()
Expand Down Expand Up @@ -209,8 +221,15 @@ void invalidContainerOperation() throws Exception {
}

@Test
@Order(4)
void whenInvalidOperation_thenReturnsBadRequest() {
String invalidRequest = "{ \"containerId\": \"myListener\", \"operation\": \"INVALID\" }";
String invalidRequest =
"""
{
"containerId": "topic_2_Listener-dlt",
"operation": "INVALID"
}
""";

this.mockMvcTester
.post()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
import org.springframework.kafka.annotation.DltHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.retrytopic.TopicSuffixingStrategy;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.retry.annotation.Backoff;

@TestConfiguration(proxyBeanMethods = false)
public class OrderListener {
Expand All @@ -18,7 +22,10 @@ public class OrderListener {
private final CountDownLatch latch = new CountDownLatch(1);
private final CountDownLatch dlqLatch = new CountDownLatch(1);

@RetryableTopic
@RetryableTopic(
attempts = "2",
backoff = @Backoff(delay = 1000, multiplier = 2.0),
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(topics = "order-created", groupId = "notification")
public void notify(OrderRecord event) {
log.info(
Expand All @@ -32,12 +39,13 @@ public void notify(OrderRecord event) {
}

@DltHandler
public void notifyDLT(OrderRecord event) {
public void notifyDLT(OrderRecord event, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.error(
"Order processing failed, received in DLT - OrderId: {}, Status: {}, Items: {}",
+event.id(),
"Order processing failed, received in DLT - OrderId: {}, Status: {}, Items: {} from topic: {}",
event.id(),
event.status(),
event.orderItems());
event.orderItems(),
topic);
dlqLatch.countDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,31 @@ void shouldTriggerOrderCreatedEvent(Scenario scenario) {
.toArriveAndVerify(event ->
assertThat(event.orderItems().getFirst().productCode()).isEqualTo("Coffee"));
}

@Test
void shouldCreateOrderWithMultipleItems(Scenario scenario) {
when(kafkaOperations.send(any(), any(), any())).then(invocation -> {
log.info(
"Sending message key {}, value {} to {}.",
invocation.getArguments()[1],
invocation.getArguments()[2],
invocation.getArguments()[0]);
return CompletableFuture.completedFuture(new SendResult<>(null, null));
});

scenario.stimulate(() -> orders.saveOrder(new OrderRequest(
null,
List.of(
new OrderItemRequest("Coffee", BigDecimal.TEN, 100),
new OrderItemRequest("Tea", BigDecimal.valueOf(5), 50)))))
.andWaitForEventOfType(OrderRecord.class)
.toArriveAndVerify(event -> {
assertThat(event.orderItems().get(0).productCode()).isEqualTo("Coffee");
assertThat(event.orderItems().get(1).productCode()).isEqualTo("Tea");
assertThat(event.orderItems().get(0).quantity()).isEqualTo(100);
assertThat(event.orderItems().get(1).quantity()).isEqualTo(50);
assertThat(event.orderItems().get(0).productPrice()).isEqualTo(BigDecimal.TEN);
assertThat(event.orderItems().get(1).productPrice()).isEqualTo(BigDecimal.valueOf(5));
});
}
}

0 comments on commit 836420e

Please sign in to comment.