From 7c456903138a9d93c0c675c252e065e758b83e70 Mon Sep 17 00:00:00 2001 From: evgeny Date: Tue, 12 Mar 2024 13:21:42 +0000 Subject: [PATCH] fix: remove message grouping for Ably Batch calls The kafka connector put every message sent to a single channel in one `BatchSpec`, meaning we treated them atomically and send them as a single `ProtocolMessage`. This causes a problem when the total `ProtocolMessage` size gets way too big. That's why now we put every message in its own `BatchSpec` --- .../kafka/connect/batch/MessageGroup.java | 87 -------------- ...geGrouper.java => MessageTransformer.java} | 58 ++++------ .../connect/batch/RecordMessagePair.java | 50 +++++++++ .../ably/kafka/connect/client/BatchSpec.java | 5 +- .../client/DefaultAblyBatchClient.java | 106 +++++++++--------- .../kafka/connect/AblyBatchClientTest.java | 49 ++++++-- .../kafka/connect/MessageGroupingTest.java | 94 ++++++---------- 7 files changed, 202 insertions(+), 247 deletions(-) delete mode 100644 src/main/java/com/ably/kafka/connect/batch/MessageGroup.java rename src/main/java/com/ably/kafka/connect/batch/{MessageGrouper.java => MessageTransformer.java} (61%) create mode 100644 src/main/java/com/ably/kafka/connect/batch/RecordMessagePair.java diff --git a/src/main/java/com/ably/kafka/connect/batch/MessageGroup.java b/src/main/java/com/ably/kafka/connect/batch/MessageGroup.java deleted file mode 100644 index 27d5599f..00000000 --- a/src/main/java/com/ably/kafka/connect/batch/MessageGroup.java +++ /dev/null @@ -1,87 +0,0 @@ -package com.ably.kafka.connect.batch; - -import com.ably.kafka.connect.client.BatchSpec; -import io.ably.lib.types.Message; -import org.apache.kafka.connect.sink.SinkRecord; - -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - -/** - * A grouping of Kafka Records by their outgoing Ably channel. - */ -public class MessageGroup { - - /** - * Pairing of SinkRecord to corresponding Ably Message. - * This is needed because we need to recover SinkRecord instances if Ably rejects a message. - */ - static class RecordMessagePair { - public final SinkRecord kafkaRecord; - public final Message ablyMessage; - /** - * Construct a new record-message pairing. - */ - RecordMessagePair(SinkRecord kafkaRecord, Message ablyMessage) { - this.kafkaRecord = kafkaRecord; - this.ablyMessage = ablyMessage; - } - - /** - * Returns the incoming Kafka SinkRecord - */ - public SinkRecord getKafkaRecord() { - return kafkaRecord; - } - - /** - * Returns the outgoing Ably Message - */ - public Message getAblyMessage() { - return ablyMessage; - } - - } - - private final Map> groupedRecords; - - /** - * Grouping of records ready for submission using Ably batch client. - * Construct using MessageGrouper.group() - */ - MessageGroup(Map> groupedRecords) { - this.groupedRecords = groupedRecords; - } - - /** - * Generate a list of Ably BatchSpecs ready for publishing - * @return List of BatchSpec instances that can be passed to AblyRest client. - */ - public List specs() { - return groupedRecords.entrySet().stream() - .map(entry -> new BatchSpec(Set.of(entry.getKey()), - entry.getValue().stream() - .map(RecordMessagePair::getAblyMessage) - .collect(Collectors.toList()))) - .collect(Collectors.toList()); - } - - /** - * Return all SinkRecords that correspond to a particular channel - */ - public List recordsForChannel(final String channelName) { - return groupedRecords.getOrDefault(channelName, Collections.emptyList()).stream() - .map(RecordMessagePair::getKafkaRecord) - .collect(Collectors.toList()); - } - - /** - * Return a set of all the channels in this grouping. - */ - public Set allChannels() { - return groupedRecords.keySet(); - } -} diff --git a/src/main/java/com/ably/kafka/connect/batch/MessageGrouper.java b/src/main/java/com/ably/kafka/connect/batch/MessageTransformer.java similarity index 61% rename from src/main/java/com/ably/kafka/connect/batch/MessageGrouper.java rename to src/main/java/com/ably/kafka/connect/batch/MessageTransformer.java index 362ac51b..9e4a3a38 100644 --- a/src/main/java/com/ably/kafka/connect/batch/MessageGrouper.java +++ b/src/main/java/com/ably/kafka/connect/batch/MessageTransformer.java @@ -4,7 +4,6 @@ import com.ably.kafka.connect.mapping.MessageConverter; import com.ably.kafka.connect.mapping.RecordMapping; import com.ably.kafka.connect.mapping.RecordMappingException; -import com.google.common.collect.Lists; import io.ably.lib.types.Message; import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; @@ -12,28 +11,29 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; -import java.util.HashMap; import java.util.List; -import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; -public class MessageGrouper { - private static final Logger logger = LoggerFactory.getLogger(MessageGrouper.class); +public class MessageTransformer { + private static final Logger logger = LoggerFactory.getLogger(MessageTransformer.class); private final RecordMapping channelMapping; private final RecordMapping messageNameMapping; private final ChannelSinkConnectorConfig.FailedRecordMappingAction actionOnFailure; - @Nullable private final ErrantRecordReporter dlqReporter; + @Nullable + private final ErrantRecordReporter dlqReporter; /** - * Construct a new message grouper, for generating Ably BatchSpecs and converting + * Construct a new message transformer, for generating Ably BatchSpecs and converting * records to messages as needed. * - * @param channelMapping The RecordMapping to use to generate Ably channel names + * @param channelMapping The RecordMapping to use to generate Ably channel names * @param messageNameMapping The RecordMapping to use to generate Ably Message names - * @param actionOnFailure Action to perform when a message mapping attempt fails - * @param dlqReporter dead letter queue for reporting bad records, or null if not in use + * @param actionOnFailure Action to perform when a message mapping attempt fails + * @param dlqReporter dead letter queue for reporting bad records, or null if not in use */ - public MessageGrouper( + public MessageTransformer( RecordMapping channelMapping, RecordMapping messageNameMapping, ChannelSinkConnectorConfig.FailedRecordMappingAction actionOnFailure, @@ -45,36 +45,24 @@ public MessageGrouper( } /** - * Construct a message group for an incoming batch of Kafka records + * Construct Ably messages for an incoming batch of Kafka records * - * @param records Kafka sink records to group by channel and transform to Ably messages - * @return MessageGroup for outgoing message batch + * @param records Kafka sink records to transform to Ably messages + * @return List of Kafka sink records with transformed Ably messages * @throws FatalBatchProcessingException if a fatal error occurred processing records */ - public MessageGroup group(List records) throws FatalBatchProcessingException { - final Map> groupedRecords = new HashMap<>(); - for (SinkRecord record : records) { + public List transform(List records) throws FatalBatchProcessingException { + return records.stream().map(record -> { try { - final String channel = channelMapping.map(record); - final String messageName = messageNameMapping.map(record); - final Message message = MessageConverter.toAblyMessage(messageName, record); - - groupedRecords.compute(channel, (ch, recs) -> { - final MessageGroup.RecordMessagePair pair = new MessageGroup.RecordMessagePair(record, message); - if (recs != null) { - recs.add(pair); - return recs; - } else { - return Lists.newArrayList(pair); - } - }); - + String channel = channelMapping.map(record); + String messageName = messageNameMapping.map(record); + Message message = MessageConverter.toAblyMessage(messageName, record); + return new RecordMessagePair(record, message, channel); } catch (RecordMappingException mappingError) { handleMappingFailure(record, mappingError); + return null; } - } - - return new MessageGroup(groupedRecords); + }).filter(Objects::nonNull).collect(Collectors.toList()); } @@ -82,7 +70,7 @@ public MessageGroup group(List records) throws FatalBatchProcessingE * Process a record that we're unable to forward to Ably due to a failed channel or * message name mapping according to the configured handling behaviour. * - * @param record The SinkRecord we weren't able to map + * @param record The SinkRecord we weren't able to map * @param mappingError The error raised by the RecordMapping */ private void handleMappingFailure( diff --git a/src/main/java/com/ably/kafka/connect/batch/RecordMessagePair.java b/src/main/java/com/ably/kafka/connect/batch/RecordMessagePair.java new file mode 100644 index 00000000..92928eae --- /dev/null +++ b/src/main/java/com/ably/kafka/connect/batch/RecordMessagePair.java @@ -0,0 +1,50 @@ +package com.ably.kafka.connect.batch; + +import com.ably.kafka.connect.client.BatchSpec; +import io.ably.lib.types.Message; +import org.apache.kafka.connect.sink.SinkRecord; + +/** + * Kafka Records with outgoing Ably Batch Spec. + */ +public class RecordMessagePair { + private final SinkRecord kafkaRecord; + private final Message message; + private final String channelName; + /** + * Construct a new record-message pairing. + */ + RecordMessagePair(SinkRecord kafkaRecord, Message message, String channelName) { + this.kafkaRecord = kafkaRecord; + this.message = message; + this.channelName = channelName; + } + + /** + * Returns the incoming Kafka SinkRecord + */ + public SinkRecord getKafkaRecord() { + return kafkaRecord; + } + + /** + * Returns the outgoing Ably Message + */ + public BatchSpec getBatchSpec() { + return new BatchSpec(channelName, message); + } + + /** + * Returns Ably message associated with Kafka Record + */ + public Message getMessage() { + return message; + } + + /** + * Returns channel name + */ + public String getChannelName() { + return channelName; + } +} diff --git a/src/main/java/com/ably/kafka/connect/client/BatchSpec.java b/src/main/java/com/ably/kafka/connect/client/BatchSpec.java index ac8093c0..052013d4 100644 --- a/src/main/java/com/ably/kafka/connect/client/BatchSpec.java +++ b/src/main/java/com/ably/kafka/connect/client/BatchSpec.java @@ -2,8 +2,8 @@ import io.ably.lib.types.Message; +import java.util.Collections; import java.util.List; -import java.util.Objects; import java.util.Set; /** @@ -18,6 +18,9 @@ public BatchSpec(Set channels, List messages) { this.channels = channels; this.messages = messages; } + public BatchSpec(String channelName, Message message) { + this(Collections.singleton(channelName), Collections.singletonList(message)); + } public Set getChannels() { return channels; } diff --git a/src/main/java/com/ably/kafka/connect/client/DefaultAblyBatchClient.java b/src/main/java/com/ably/kafka/connect/client/DefaultAblyBatchClient.java index 95eeb3dd..c2b8ed1f 100644 --- a/src/main/java/com/ably/kafka/connect/client/DefaultAblyBatchClient.java +++ b/src/main/java/com/ably/kafka/connect/client/DefaultAblyBatchClient.java @@ -1,17 +1,17 @@ package com.ably.kafka.connect.client; -import com.ably.kafka.connect.batch.MessageGroup; -import com.ably.kafka.connect.batch.MessageGrouper; +import com.ably.kafka.connect.batch.MessageTransformer; +import com.ably.kafka.connect.batch.RecordMessagePair; import com.ably.kafka.connect.config.ChannelSinkConnectorConfig; import com.ably.kafka.connect.mapping.RecordMappingFactory; import com.ably.kafka.connect.offset.OffsetRegistry; -import com.google.common.collect.Lists; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import io.ably.lib.http.HttpCore; import io.ably.lib.http.HttpUtils; import io.ably.lib.types.AblyException; +import io.ably.lib.types.ErrorInfo; import io.ably.lib.types.HttpPaginatedResponse; import io.ably.lib.types.Param; import org.apache.kafka.connect.sink.ErrantRecordReporter; @@ -20,7 +20,11 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.stream.Collectors; /** * Ably Batch client based on REST API @@ -32,7 +36,7 @@ public class DefaultAblyBatchClient implements AblyBatchClient { private final AblyRestProxy restClient; @Nullable private final ErrantRecordReporter dlqReporter; private final OffsetRegistry offsetRegistryService; - private final MessageGrouper messageGrouper; + private final MessageTransformer messageTransformer; public DefaultAblyBatchClient( final ChannelSinkConnectorConfig connectorConfig, @@ -59,7 +63,7 @@ public DefaultAblyBatchClient( this.offsetRegistryService = offsetRegistryService; final RecordMappingFactory mappingFactory = new RecordMappingFactory(this.connectorConfig); - this.messageGrouper = new MessageGrouper( + this.messageTransformer = new MessageTransformer( mappingFactory.channelNameMapping(), mappingFactory.messageNameMapping(), this.connectorConfig.getFailedMappingAction(), @@ -75,23 +79,29 @@ public DefaultAblyBatchClient( @Override public void publishBatch(List records) { if (!records.isEmpty()) { - final MessageGroup groupedMessages = this.messageGrouper.group(records); + final List recordMessagePairs = this.messageTransformer.transform(records); + final List batchSpecs = recordMessagePairs + .stream() + .map(RecordMessagePair::getBatchSpec) + .collect(Collectors.toList()); + try { logger.debug("Ably Batch API call - Thread({})", Thread.currentThread().getName()); - final HttpPaginatedResponse response = this.sendBatches(groupedMessages.specs()); + final HttpPaginatedResponse response = this.sendBatches(batchSpecs); if (isError(response)) { - for (final String channelName : groupedMessages.allChannels()) { + for (final RecordMessagePair recordMessagePair : recordMessagePairs) { + final String channelName = recordMessagePair.getChannelName(); final AblyChannelPublishException error = new AblyChannelPublishException( channelName, response.errorCode, response.statusCode, response.errorMessage ); - sendMessagesToDlq(groupedMessages.recordsForChannel(channelName), error); + sendMessagesToDlq(Collections.singletonList(recordMessagePair.getKafkaRecord()), error); } } else { - handlePartialFailure(response, groupedMessages); + handlePartialFailure(response, recordMessagePairs); offsetRegistryService.updateTopicPartitionToOffsetMap(records); } } catch (AblyException e) { @@ -113,20 +123,38 @@ private boolean isError(final HttpPaginatedResponse response) { */ private void handlePartialFailure( HttpPaginatedResponse response, - MessageGroup messageGroup) throws AblyException { - - do { - JsonElement[] batchResponses = response.items(); - for (JsonElement batchResponse : batchResponses) { - for (AblyChannelPublishException error : getFailedChannels(batchResponse)) { - logger.debug("Submission to channel {} failed", error.channelName, error); - final List failedRecords = messageGroup.recordsForChannel(error.channelName); - sendMessagesToDlq(failedRecords, error); - } - } + List recordMessagePairs) throws AblyException { + Iterator recordMessagePairItr = recordMessagePairs.iterator(); + Iterator itemsItr = Arrays.stream(response.items()).iterator(); + + while (recordMessagePairItr.hasNext() && itemsItr.hasNext()) { + RecordMessagePair recordMessagePair = recordMessagePairItr.next(); + JsonObject batchSpecResponse = itemsItr.next().getAsJsonObject(); + + int failureCount = batchSpecResponse. + getAsJsonPrimitive("failureCount").getAsInt(); + + if (failureCount == 0) continue; - response = response.next(); - } while (response != null); + JsonArray results = batchSpecResponse.getAsJsonObject().getAsJsonArray("results"); + + if (results == null || results.size() != 1) + throw AblyException.fromErrorInfo(new ErrorInfo("Inconsistent batch response: result field should contain single element", 500)); + + JsonObject result = results.iterator().next().getAsJsonObject(); + String channelName = result.getAsJsonPrimitive("channel").getAsString(); + + if (result.has("error")) { + JsonObject error = result.getAsJsonObject("error"); + int errorCode = error.getAsJsonPrimitive("code").getAsInt(); + int statusCode = error.getAsJsonPrimitive("statusCode").getAsInt(); + String errorMessage = error.getAsJsonPrimitive("message").getAsString(); + AblyChannelPublishException publishError = + new AblyChannelPublishException(channelName, errorCode, statusCode, errorMessage); + logger.debug("Submission to channel {} failed", channelName, publishError); + sendMessagesToDlq(Collections.singletonList(recordMessagePair.getKafkaRecord()), publishError); + } + } } /** @@ -163,36 +191,4 @@ private HttpPaginatedResponse sendBatches(final List batches) throws return response; } - - /** - * Function to parse the Ably response message and retrieve the list of failed channel ids. - * - * @param batchResponse BatchResponse response object - */ - private List getFailedChannels(JsonElement batchResponse) { - - final List failures = Lists.newArrayList(); - - final int failureCount = batchResponse.getAsJsonObject(). - getAsJsonPrimitive("failureCount").getAsInt(); - - if(failureCount > 0) { - JsonArray results = batchResponse.getAsJsonObject().getAsJsonArray("results"); - if (results != null) { - for (JsonElement resultElement : results) { - final JsonObject result = resultElement.getAsJsonObject(); - final String channelName = result.getAsJsonPrimitive("channel").getAsString(); - if (result.has("error")) { - final JsonObject error = result.getAsJsonObject("error"); - final int errorCode = error.getAsJsonPrimitive("code").getAsInt(); - final int statusCode = error.getAsJsonPrimitive("statusCode").getAsInt(); - final String errorMessage = error.getAsJsonPrimitive("message").getAsString(); - failures.add(new AblyChannelPublishException(channelName, errorCode, statusCode, errorMessage)); - } - } - } - } - - return failures; - } } diff --git a/src/test/java/com/ably/kafka/connect/AblyBatchClientTest.java b/src/test/java/com/ably/kafka/connect/AblyBatchClientTest.java index 78aea1af..a7d19cf8 100644 --- a/src/test/java/com/ably/kafka/connect/AblyBatchClientTest.java +++ b/src/test/java/com/ably/kafka/connect/AblyBatchClientTest.java @@ -67,37 +67,59 @@ public void testSendsAblyBatchApiRequests() throws ChannelSinkConnectorConfig.Co " {\n" + " channels: ['channel_1'],\n" + " messages: [\n" + - " {data: 'msg1', name: 'static_name'},\n" + - " {data: 'msg4', name:'static_name'}\n" + + " {data: 'msg1', name: 'static_name'}\n" + + " ]\n" + + " },\n" + + " {\n" + + " channels: ['channel_2'],\n" + + " messages: [\n" + + " {data: 'msg2', name: 'static_name'}\n" + " ]\n" + " },\n" + " {\n" + " channels: ['channel_2'],\n" + " messages: [\n" + - " {data: 'msg2', name: 'static_name'},\n" + " {data: 'msg3', name:'static_name'}\n" + " ]\n" + + " },\n" + + " {\n" + + " channels: ['channel_1'],\n" + + " messages: [\n" + + " {data: 'msg4', name:'static_name'}\n" + + " ]\n" + " }\n" + "]\n", // response: 201, 0, "", // no error message - "[\n" + + "[\n" + + " {\n" + + " successCount: 1, \n" + + " failureCount: 0,\n" + + " results: [\n" + + " {channel: \"channe_1\", messageId: \"1\"}\n" + + " ]\n" + + " },\n" + + " {\n" + + " successCount: 1, \n" + + " failureCount: 0,\n" + + " results: [\n" + + " {channel: \"channe_2\", messageId: \"2\"}\n" + + " ]\n" + + " },\n" + " {\n" + - " successCount: 2, \n" + + " successCount: 1, \n" + " failureCount: 0,\n" + " results: [\n" + - " {channel: \"channe_1\", messageId: \"1\"},\n" + - " {channel: \"channe_1\", messageId: \"2\"}\n" + + " {channel: \"channe_2\", messageId: \"3\"}\n" + " ]\n" + " },\n" + " {\n" + - " successCount: 2, \n" + + " successCount: 1, \n" + " failureCount: 0,\n" + " results: [\n" + - " {channel: \"channe_2\", messageId: \"3\"},\n" + - " {channel: \"channe_2\", messageId: \"4\"}\n" + + " {channel: \"channe_1\", messageId: \"4\"}\n" + " ]\n" + " }\n" + "]\n" @@ -149,7 +171,12 @@ public void testSendsEntireBatchToDlqOnFullFailure() throws ChannelSinkConnector " {\n" + " channels: ['static_channel'],\n" + " messages: [\n" + - " {data: 'msg1', name: 'static_name_2'},\n" + + " {data: 'msg1', name: 'static_name_2'}\n" + + " ]\n" + + " },\n" + + " {\n" + + " channels: ['static_channel'],\n" + + " messages: [\n" + " {data: 'msg2', name:'static_name_2'}\n" + " ]\n" + " }\n" + diff --git a/src/test/java/com/ably/kafka/connect/MessageGroupingTest.java b/src/test/java/com/ably/kafka/connect/MessageGroupingTest.java index e5c96286..1ed01467 100644 --- a/src/test/java/com/ably/kafka/connect/MessageGroupingTest.java +++ b/src/test/java/com/ably/kafka/connect/MessageGroupingTest.java @@ -2,8 +2,8 @@ import com.ably.kafka.connect.batch.FatalBatchProcessingException; -import com.ably.kafka.connect.batch.MessageGroup; -import com.ably.kafka.connect.batch.MessageGrouper; +import com.ably.kafka.connect.batch.MessageTransformer; +import com.ably.kafka.connect.batch.RecordMessagePair; import com.ably.kafka.connect.client.BatchSpec; import com.ably.kafka.connect.config.ChannelSinkConnectorConfig; import com.ably.kafka.connect.mapping.RecordMappingException; @@ -11,6 +11,7 @@ import com.ably.kafka.connect.utils.CapturedDlqError; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; +import io.ably.lib.types.Message; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; @@ -18,28 +19,26 @@ import javax.annotation.Nullable; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; /** - * Unit tests for {@link MessageGroup} and {@link MessageGrouper} + * Unit tests for {@link RecordMessagePair} and {@link MessageTransformer} */ public class MessageGroupingTest { /** - * Ensure that records are grouped by channel and the returned MessageGroup returns the appropriate + * Ensure that records are grouped by channel and the returned MessageTransformer returns the appropriate * BatchSpec configuration, and the corresponding SinkRecords where needed. */ @Test public void testGroupsByChannelName() throws ChannelSinkConnectorConfig.ConfigException { final List errors = Lists.newArrayList(); - final MessageGrouper sut = grouper("channel_#{key}", "msgName", "skip", errors); + final MessageTransformer sut = transformer("channel_#{key}", "msgName", "skip", errors); final List records = List.of( new SinkRecord("topic1", 0, Schema.INT32_SCHEMA, 1, null, "msg1", 0), new SinkRecord("topic1", 0, Schema.INT32_SCHEMA, 2, null, "msg2", 0), @@ -49,35 +48,20 @@ public void testGroupsByChannelName() throws ChannelSinkConnectorConfig.ConfigEx new SinkRecord("topic1", 0, Schema.INT32_SCHEMA, 1, null, "msg6", 0) ); - final MessageGroup group = sut.group(records); + final List recordMessagePairs = sut.transform(records); // Not expecting any errors assertEquals(Collections.emptyList(), errors); // Check that records have been grouped correctly, and order has been preserved - final List specs = group.specs(); - assertEquals(Map.of( - "channel_1", List.of("msgName/msg1", "msgName/msg6"), - "channel_2", List.of("msgName/msg2", "msgName/msg4"), - "channel_3", List.of("msgName/msg3", "msgName/msg5") - ), flatten(specs)); - - // allChannels() should return all channels we have records for - assertEquals(Set.of("channel_1", "channel_2", "channel_3"), group.allChannels()); - - // Check that recordsForChannel can select correct subsets of records - assertEquals(List.of( - new SinkRecord("topic1", 0, Schema.INT32_SCHEMA, 1, null, "msg1", 0), - new SinkRecord("topic1", 0, Schema.INT32_SCHEMA, 1, null, "msg6", 0) - ), group.recordsForChannel("channel_1")); assertEquals(List.of( - new SinkRecord("topic1", 0, Schema.INT32_SCHEMA, 2, null, "msg2", 0), - new SinkRecord("topic1", 0, Schema.INT32_SCHEMA, 2, null, "msg4", 0) - ), group.recordsForChannel("channel_2")); - assertEquals(List.of( - new SinkRecord("topic1", 0, Schema.INT32_SCHEMA, 3, null, "msg3", 0), - new SinkRecord("topic1", 0, Schema.INT32_SCHEMA, 3, null, "msg5", 0) - ), group.recordsForChannel("channel_3")); + Map.of("channel_1", "msgName/msg1"), + Map.of("channel_2", "msgName/msg2"), + Map.of("channel_3", "msgName/msg3"), + Map.of("channel_2", "msgName/msg4"), + Map.of("channel_3", "msgName/msg5"), + Map.of("channel_1", "msgName/msg6") + ), flatten(recordMessagePairs)); } @@ -93,29 +77,29 @@ public void testRespectsErrorHandlingConfiguration() throws ChannelSinkConnector // `skip` configuration should not submit errors to DLQ and only return valid record mappings final List skipErrors = Lists.newArrayList(); - final MessageGrouper skipGrouper = grouper("test_channel", "name_#{key}", "skip", skipErrors); - final MessageGroup skipGroup = skipGrouper.group(records); + final MessageTransformer skipTransformer = transformer("test_channel", "name_#{key}", "skip", skipErrors); + final List recordMessagePairs = skipTransformer.transform(records); assertEquals(Collections.emptyList(), skipErrors); - assertEquals(Map.of( - "test_channel", List.of("name_10/msg1") - ), flatten(skipGroup.specs())); + assertEquals(List.of( + Map.of("test_channel", "name_10/msg1") + ), flatten(recordMessagePairs)); // `stop` should kill the sink task during processing final List stopErrors = Lists.newArrayList(); - final MessageGrouper stopGrouper = grouper("test_channel", "name_#{key}", "stop", stopErrors); + final MessageTransformer stopTransformer = transformer("test_channel", "name_#{key}", "stop", stopErrors); assertEquals(Collections.emptyList(), stopErrors); - assertThrows(FatalBatchProcessingException.class, () -> stopGrouper.group(records)); + assertThrows(FatalBatchProcessingException.class, () -> stopTransformer.transform(records)); // `dlq` should send failed records to the DLQ final List dlqErrors = Lists.newArrayList(); - final MessageGrouper dlqGrouper = grouper("test_channel", "name_#{key}", "dlq", dlqErrors); - final MessageGroup dlqGroup = dlqGrouper.group(records); + final MessageTransformer dlqTransformer = transformer("test_channel", "name_#{key}", "dlq", dlqErrors); + final List dlqMessagePairs = dlqTransformer.transform(records); assertEquals(1, dlqErrors.size()); assertEquals(records.get(1), dlqErrors.get(0).record); assertEquals(RecordMappingException.class, dlqErrors.get(0).error.getClass()); - assertEquals(Map.of( - "test_channel", List.of("name_10/msg1") - ), flatten(dlqGroup.specs())); + assertEquals(List.of( + Map.of("test_channel", "name_10/msg1") + ), flatten(dlqMessagePairs)); } /** @@ -128,31 +112,25 @@ public void testDefaultsToStopIfDlqUnavailable() throws ChannelSinkConnectorConf new SinkRecord("topic3", 0, null, null, null, "msg1", 0) ); - final MessageGrouper grouper = grouper("test_channel", "name_#{key}", "dlq", null); - assertThrows(FatalBatchProcessingException.class, () -> grouper.group(records)); + final MessageTransformer transformer = transformer("test_channel", "name_#{key}", "dlq", null); + assertThrows(FatalBatchProcessingException.class, () -> transformer.transform(records)); } /** * Helper for testing to extract the key information from BatchSpecs. This is because we can't * do direct value comparisons on BatchSpecs as they contain Messages, which are not values. */ - private Map> flatten(final List specs) { - final Map> flatSpecs = new HashMap<>(); - for (BatchSpec spec : specs) { - assertEquals(1, spec.getChannels().size()); - final List messages = spec.getMessages().stream() - .map(msg -> String.format("%s/%s", msg.name, msg.data.toString())) - .collect(Collectors.toList()); - flatSpecs.put(spec.getChannels().iterator().next(), messages); - } - - return flatSpecs; + private List> flatten(final List recordMessagePairs) { + return recordMessagePairs.stream().map(record -> { + Message msg = record.getMessage(); + return Map.of(record.getChannelName(), String.format("%s/%s", msg.name, msg.data.toString())); + }).collect(Collectors.toList()); } /** - * Set up a MessageGrouper for unit testing + * Set up a MessageTransformer for unit testing */ - private MessageGrouper grouper( + private MessageTransformer transformer( String channelPattern, String messagePattern, String failAction, @@ -172,7 +150,7 @@ private MessageGrouper grouper( errorSink.add(new CapturedDlqError(sinkRecord, throwable)); return Futures.immediateVoidFuture(); }; - return new MessageGrouper( + return new MessageTransformer( factory.channelNameMapping(), factory.messageNameMapping(), config.getFailedMappingAction(),