Skip to content

Commit

Permalink
fix: remove message grouping for Ably Batch calls
Browse files Browse the repository at this point in the history
The kafka connector put every message sent to a single channel in one `BatchSpec`, meaning we treated them atomically and send them as a single `ProtocolMessage`.

This causes a problem when the total `ProtocolMessage` size gets way too big.

That's why now we put every message in its own `BatchSpec`
  • Loading branch information
ttypic committed Mar 12, 2024
1 parent fcdcd2d commit b59bb2b
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 232 deletions.
87 changes: 0 additions & 87 deletions src/main/java/com/ably/kafka/connect/batch/MessageGroup.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,39 +1,41 @@
package com.ably.kafka.connect.batch;

import com.ably.kafka.connect.client.BatchSpec;
import com.ably.kafka.connect.config.ChannelSinkConnectorConfig;
import com.ably.kafka.connect.mapping.MessageConverter;
import com.ably.kafka.connect.mapping.RecordMapping;
import com.ably.kafka.connect.mapping.RecordMappingException;
import com.google.common.collect.Lists;
import io.ably.lib.types.Message;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

public class MessageGrouper {
private static final Logger logger = LoggerFactory.getLogger(MessageGrouper.class);
public class MessageTransformer {
private static final Logger logger = LoggerFactory.getLogger(MessageTransformer.class);

private final RecordMapping channelMapping;
private final RecordMapping messageNameMapping;
private final ChannelSinkConnectorConfig.FailedRecordMappingAction actionOnFailure;
@Nullable private final ErrantRecordReporter dlqReporter;
@Nullable
private final ErrantRecordReporter dlqReporter;

/**
* Construct a new message grouper, for generating Ably BatchSpecs and converting
* Construct a new message transformer, for generating Ably BatchSpecs and converting
* records to messages as needed.
*
* @param channelMapping The RecordMapping to use to generate Ably channel names
* @param channelMapping The RecordMapping to use to generate Ably channel names
* @param messageNameMapping The RecordMapping to use to generate Ably Message names
* @param actionOnFailure Action to perform when a message mapping attempt fails
* @param dlqReporter dead letter queue for reporting bad records, or null if not in use
* @param actionOnFailure Action to perform when a message mapping attempt fails
* @param dlqReporter dead letter queue for reporting bad records, or null if not in use
*/
public MessageGrouper(
public MessageTransformer(
RecordMapping channelMapping,
RecordMapping messageNameMapping,
ChannelSinkConnectorConfig.FailedRecordMappingAction actionOnFailure,
Expand All @@ -45,44 +47,32 @@ public MessageGrouper(
}

/**
* Construct a message group for an incoming batch of Kafka records
* Construct Ably messages for an incoming batch of Kafka records
*
* @param records Kafka sink records to group by channel and transform to Ably messages
* @return MessageGroup for outgoing message batch
* @param records Kafka sink records to transform to Ably messages
* @return List of Kafka sink records with transformed Ably messages
* @throws FatalBatchProcessingException if a fatal error occurred processing records
*/
public MessageGroup group(List<SinkRecord> records) throws FatalBatchProcessingException {
final Map<String, List<MessageGroup.RecordMessagePair>> groupedRecords = new HashMap<>();
for (SinkRecord record : records) {
public List<RecordMessagePair> transform(List<SinkRecord> records) throws FatalBatchProcessingException {
return records.stream().map(record -> {
try {
final String channel = channelMapping.map(record);
final String messageName = messageNameMapping.map(record);
final Message message = MessageConverter.toAblyMessage(messageName, record);

groupedRecords.compute(channel, (ch, recs) -> {
final MessageGroup.RecordMessagePair pair = new MessageGroup.RecordMessagePair(record, message);
if (recs != null) {
recs.add(pair);
return recs;
} else {
return Lists.newArrayList(pair);
}
});

String channel = channelMapping.map(record);
String messageName = messageNameMapping.map(record);
Message message = MessageConverter.toAblyMessage(messageName, record);
return new RecordMessagePair(record, new BatchSpec(Collections.singleton(channel), Collections.singletonList(message)));
} catch (RecordMappingException mappingError) {
handleMappingFailure(record, mappingError);
return null;
}
}

return new MessageGroup(groupedRecords);
}).filter(Objects::nonNull).collect(Collectors.toList());
}


/**
* Process a record that we're unable to forward to Ably due to a failed channel or
* message name mapping according to the configured handling behaviour.
*
* @param record The SinkRecord we weren't able to map
* @param record The SinkRecord we weren't able to map
* @param mappingError The error raised by the RecordMapping
*/
private void handleMappingFailure(
Expand Down
35 changes: 35 additions & 0 deletions src/main/java/com/ably/kafka/connect/batch/RecordMessagePair.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.ably.kafka.connect.batch;

import com.ably.kafka.connect.client.BatchSpec;
import io.ably.lib.types.Message;
import org.apache.kafka.connect.sink.SinkRecord;

/**
* Kafka Records with outgoing Ably Batch Spec.
*/
public class RecordMessagePair {
private final SinkRecord kafkaRecord;
private final BatchSpec batchSpec;
/**
* Construct a new record-message pairing.
*/
RecordMessagePair(SinkRecord kafkaRecord, BatchSpec batchSpec) {
this.kafkaRecord = kafkaRecord;
this.batchSpec = batchSpec;
}

/**
* Returns the incoming Kafka SinkRecord
*/
public SinkRecord getKafkaRecord() {
return kafkaRecord;
}

/**
* Returns the outgoing Ably Message
*/
public BatchSpec getBatchSpec() {
return batchSpec;
}

}
4 changes: 4 additions & 0 deletions src/main/java/com/ably/kafka/connect/client/BatchSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.ably.lib.types.Message;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
Expand All @@ -18,6 +19,9 @@ public BatchSpec(Set<String> channels, List<Message> messages) {
this.channels = channels;
this.messages = messages;
}
public BatchSpec(String channel, Message message) {
this(Collections.singleton(channel), Collections.singletonList(message));
}
public Set<String> getChannels() {
return channels;
}
Expand Down
Loading

0 comments on commit b59bb2b

Please sign in to comment.