Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ECO-4650] fix: remove message grouping for Ably Batch calls #180

Merged
merged 1 commit into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 0 additions & 87 deletions src/main/java/com/ably/kafka/connect/batch/MessageGroup.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,36 @@
import com.ably.kafka.connect.mapping.MessageConverter;
import com.ably.kafka.connect.mapping.RecordMapping;
import com.ably.kafka.connect.mapping.RecordMappingException;
import com.google.common.collect.Lists;
import io.ably.lib.types.Message;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

public class MessageGrouper {
private static final Logger logger = LoggerFactory.getLogger(MessageGrouper.class);
public class MessageTransformer {
private static final Logger logger = LoggerFactory.getLogger(MessageTransformer.class);

private final RecordMapping channelMapping;
private final RecordMapping messageNameMapping;
private final ChannelSinkConnectorConfig.FailedRecordMappingAction actionOnFailure;
@Nullable private final ErrantRecordReporter dlqReporter;
@Nullable
private final ErrantRecordReporter dlqReporter;

/**
* Construct a new message grouper, for generating Ably BatchSpecs and converting
* Construct a new message transformer, for generating Ably BatchSpecs and converting
* records to messages as needed.
*
* @param channelMapping The RecordMapping to use to generate Ably channel names
* @param channelMapping The RecordMapping to use to generate Ably channel names
* @param messageNameMapping The RecordMapping to use to generate Ably Message names
* @param actionOnFailure Action to perform when a message mapping attempt fails
* @param dlqReporter dead letter queue for reporting bad records, or null if not in use
* @param actionOnFailure Action to perform when a message mapping attempt fails
* @param dlqReporter dead letter queue for reporting bad records, or null if not in use
*/
public MessageGrouper(
public MessageTransformer(
RecordMapping channelMapping,
RecordMapping messageNameMapping,
ChannelSinkConnectorConfig.FailedRecordMappingAction actionOnFailure,
Expand All @@ -45,44 +45,32 @@ public MessageGrouper(
}

/**
* Construct a message group for an incoming batch of Kafka records
* Construct Ably messages for an incoming batch of Kafka records
*
* @param records Kafka sink records to group by channel and transform to Ably messages
* @return MessageGroup for outgoing message batch
* @param records Kafka sink records to transform to Ably messages
* @return List of Kafka sink records with transformed Ably messages
* @throws FatalBatchProcessingException if a fatal error occurred processing records
*/
public MessageGroup group(List<SinkRecord> records) throws FatalBatchProcessingException {
final Map<String, List<MessageGroup.RecordMessagePair>> groupedRecords = new HashMap<>();
for (SinkRecord record : records) {
public List<RecordMessagePair> transform(List<SinkRecord> records) throws FatalBatchProcessingException {
return records.stream().map(record -> {
try {
final String channel = channelMapping.map(record);
final String messageName = messageNameMapping.map(record);
final Message message = MessageConverter.toAblyMessage(messageName, record);

groupedRecords.compute(channel, (ch, recs) -> {
final MessageGroup.RecordMessagePair pair = new MessageGroup.RecordMessagePair(record, message);
if (recs != null) {
recs.add(pair);
return recs;
} else {
return Lists.newArrayList(pair);
}
});

String channel = channelMapping.map(record);
String messageName = messageNameMapping.map(record);
Message message = MessageConverter.toAblyMessage(messageName, record);
return new RecordMessagePair(record, message, channel);
} catch (RecordMappingException mappingError) {
handleMappingFailure(record, mappingError);
return null;
}
}

return new MessageGroup(groupedRecords);
}).filter(Objects::nonNull).collect(Collectors.toList());
}


/**
* Process a record that we're unable to forward to Ably due to a failed channel or
* message name mapping according to the configured handling behaviour.
*
* @param record The SinkRecord we weren't able to map
* @param record The SinkRecord we weren't able to map
* @param mappingError The error raised by the RecordMapping
*/
private void handleMappingFailure(
Expand Down
50 changes: 50 additions & 0 deletions src/main/java/com/ably/kafka/connect/batch/RecordMessagePair.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.ably.kafka.connect.batch;

import com.ably.kafka.connect.client.BatchSpec;
import io.ably.lib.types.Message;
import org.apache.kafka.connect.sink.SinkRecord;

/**
* Kafka Records with outgoing Ably Batch Spec.
*/
public class RecordMessagePair {
private final SinkRecord kafkaRecord;
private final Message message;
private final String channelName;
/**
* Construct a new record-message pairing.
*/
RecordMessagePair(SinkRecord kafkaRecord, Message message, String channelName) {
this.kafkaRecord = kafkaRecord;
this.message = message;
this.channelName = channelName;
}

/**
* Returns the incoming Kafka SinkRecord
*/
public SinkRecord getKafkaRecord() {
return kafkaRecord;
}

/**
* Returns the outgoing Ably Message
*/
public BatchSpec getBatchSpec() {
return new BatchSpec(channelName, message);
}

/**
* Returns Ably message associated with Kafka Record
*/
public Message getMessage() {
return message;
}

/**
* Returns channel name
*/
public String getChannelName() {
return channelName;
}
}
5 changes: 4 additions & 1 deletion src/main/java/com/ably/kafka/connect/client/BatchSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import io.ably.lib.types.Message;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;

/**
Expand All @@ -18,6 +18,9 @@ public BatchSpec(Set<String> channels, List<Message> messages) {
this.channels = channels;
this.messages = messages;
}
public BatchSpec(String channelName, Message message) {
this(Collections.singleton(channelName), Collections.singletonList(message));
}
public Set<String> getChannels() {
return channels;
}
Expand Down
Loading
Loading