Skip to content

Commit

Permalink
Reduce S3 memory usage by clearing records already sent to S3
Browse files Browse the repository at this point in the history
Signed-off-by: Aindriu Lavelle <[email protected]>
  • Loading branch information
aindriu-aiven committed Nov 13, 2024
1 parent f719afd commit 77893fc
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 71 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.grouper;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.kafka.connect.sink.SinkRecord;

public class GroupedSinkRecord {

private int numberOfRecords;
final private List<SinkRecord> sinkRecords;
final private String filename;
final private long recordCreationDate = System.currentTimeMillis();

public GroupedSinkRecord(final String filename) {
this.filename = filename;
sinkRecords = new ArrayList<>();
numberOfRecords = 0;
}

public GroupedSinkRecord(final String filename, final List<SinkRecord> sinkRecords) {
this.filename = filename;
this.sinkRecords = new ArrayList<>(sinkRecords);
numberOfRecords = sinkRecords.size();
}
public GroupedSinkRecord(final String filename, final SinkRecord sinkRecord) {
this.filename = filename;
this.sinkRecords = new ArrayList<>();
this.sinkRecords.add(sinkRecord);
numberOfRecords = 1;
}

public void addSinkRecord(final SinkRecord sinkRecord) {
this.sinkRecords.add(sinkRecord);
this.numberOfRecords++;
}

public List<SinkRecord> getSinkRecords() {
// Ensure access to the Sink Records can only be changed through the apis and not accidentally by another
// process.
return Collections.unmodifiableList(sinkRecords);
}

public void removeSinkRecords(final List<SinkRecord> sinkRecords) {
this.sinkRecords.removeAll(sinkRecords);
}

public int getNumberOfRecords() {
return numberOfRecords;
}

public String getFilename() {
return filename;
}

public long getRecordCreationDate() {
return recordCreationDate;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ public void clear() {
fileBuffers.clear();
}

@Override
public void clearProcessedRecords(final String identifier, final List<SinkRecord> records) {
// One entry per file, so the entire file can be removed to reduce memory overhead.
fileBuffers.remove(identifier);
}

@Override
public Map<String, List<SinkRecord>> records() {
return Collections.unmodifiableMap(fileBuffers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ public void clear() {
fileBuffers.clear();
}

@Override
public void clearProcessedRecords(final String identifier, final List<SinkRecord> records) {
// One record per file, so remove the entry to reduce memory
fileBuffers.remove(identifier);
}

@Override
public Map<String, List<SinkRecord>> records() {
return Collections.unmodifiableMap(fileBuffers);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.grouper;

public class PartitionOffset {

private Long offset;
private int partition;

public PartitionOffset(final int partition, final Long offset) {
this.offset = offset;
this.partition = partition;
}

public int getPartition() {
return partition;
}

public void setPartition(final int partition) {
this.partition = partition;
}

public Long getOffset() {
return offset;
}

public void setOffset(final Long offset) {
this.offset = offset;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,18 @@ public interface RecordGrouper {
*/
void clear();

/**
* Clear processed records from memory
*
* @param records
* all records already processed to Sink
*/
void clearProcessedRecords(String identifier, List<SinkRecord> records);

/**
* Get all records associated with files, grouped by the file name.
*
* @return map of records assotiated with files
* @return map of records associated with files
*/
Map<String, List<SinkRecord>> records();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
package io.aiven.kafka.connect.common.grouper;

import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
Expand All @@ -42,13 +42,13 @@ public class TopicPartitionKeyRecordGrouper implements RecordGrouper {

private final Template filenameTemplate;

private final Map<TopicPartitionKey, SinkRecord> currentHeadRecords = new HashMap<>();
private final Map<TopicPartitionKey, PartitionOffset> currentHeadRecords = new HashMap<>();

private final Map<String, List<SinkRecord>> fileBuffers = new HashMap<>();
private final Map<String, GroupedSinkRecord> fileBuffers = new HashMap<>();

private final Function<SinkRecord, Function<Parameter, String>> setTimestampBasedOnRecord;

private final Rotator<List<SinkRecord>> rotator;
private final Rotator<GroupedSinkRecord> rotator;

TopicPartitionKeyRecordGrouper(final Template filenameTemplate, final Integer maxRecordsPerFile,
final TimestampSource tsSource) {
Expand All @@ -64,7 +64,7 @@ public class TopicPartitionKeyRecordGrouper implements RecordGrouper {
if (unlimited) {
return false;
} else {
return buffer == null || buffer.size() >= maxRecordsPerFile;
return buffer == null || buffer.getNumberOfRecords() >= maxRecordsPerFile;
}
};
}
Expand All @@ -73,15 +73,16 @@ public class TopicPartitionKeyRecordGrouper implements RecordGrouper {
public void put(final SinkRecord record) {
Objects.requireNonNull(record, "record cannot be null");
final String recordKey = resolveRecordKeyFor(record);
fileBuffers.computeIfAbsent(recordKey, ignored -> new ArrayList<>()).add(record);
fileBuffers.computeIfAbsent(recordKey, ignored -> new GroupedSinkRecord(recordKey)).addSinkRecord(record);
}

protected String resolveRecordKeyFor(final SinkRecord record) {
final var key = recordKey(record);

final TopicPartitionKey tpk = new TopicPartitionKey(new TopicPartition(record.topic(), record.kafkaPartition()),
key);
final SinkRecord currentHeadRecord = currentHeadRecords.computeIfAbsent(tpk, ignored -> record);
final PartitionOffset currentHeadRecord = currentHeadRecords.computeIfAbsent(tpk,
ignored -> new PartitionOffset(record.kafkaPartition(), record.kafkaOffset()));
String objectKey = generateObjectKey(tpk, currentHeadRecord, record);
if (rotator.rotate(fileBuffers.get(objectKey))) {
// Create new file using this record as the head record.
Expand All @@ -102,14 +103,14 @@ private String recordKey(final SinkRecord record) {
return key;
}

public String generateObjectKey(final TopicPartitionKey tpk, final SinkRecord headRecord,
public String generateObjectKey(final TopicPartitionKey tpk, final PartitionOffset headRecord,
final SinkRecord currentRecord) {
final Function<Parameter, String> setKafkaOffset = usePaddingParameter -> usePaddingParameter.asBoolean()
? String.format("%020d", headRecord.kafkaOffset())
: Long.toString(headRecord.kafkaOffset());
? String.format("%020d", headRecord.getOffset())
: Long.toString(headRecord.getOffset());
final Function<Parameter, String> setKafkaPartition = usePaddingParameter -> usePaddingParameter.asBoolean()
? String.format("%010d", headRecord.kafkaPartition())
: Long.toString(headRecord.kafkaPartition());
? String.format("%010d", headRecord.getPartition())
: Long.toString(headRecord.getPartition());

return filenameTemplate.instance()
.bindVariable(FilenameTemplateVariable.TOPIC.name, tpk.topicPartition::topic)
Expand All @@ -123,8 +124,8 @@ public String generateObjectKey(final TopicPartitionKey tpk, final SinkRecord he
protected String generateNewRecordKey(final SinkRecord record) {
final var key = recordKey(record);
final var tpk = new TopicPartitionKey(new TopicPartition(record.topic(), record.kafkaPartition()), key);
currentHeadRecords.put(tpk, record);
return generateObjectKey(tpk, record, record);
currentHeadRecords.put(tpk, new PartitionOffset(record.kafkaPartition(), record.kafkaOffset()));
return generateObjectKey(tpk, new PartitionOffset(record.kafkaPartition(), record.kafkaOffset()), record);
}

@Override
Expand All @@ -133,9 +134,20 @@ public void clear() {
fileBuffers.clear();
}

@Override
public void clearProcessedRecords(final String identifier, final List<SinkRecord> records) {
final GroupedSinkRecord grouperRecord = fileBuffers.getOrDefault(identifier, null);
if (Objects.isNull(grouperRecord)) {
return;
}
grouperRecord.removeSinkRecords(records);
}

@Override
public Map<String, List<SinkRecord>> records() {
return Collections.unmodifiableMap(fileBuffers);
return Collections.unmodifiableMap(fileBuffers.values()
.stream()
.collect(Collectors.toMap(GroupedSinkRecord::getFilename, GroupedSinkRecord::getSinkRecords)));
}

public static class TopicPartitionKey {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
package io.aiven.kafka.connect.common.grouper;

import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
Expand All @@ -50,14 +50,14 @@ class TopicPartitionRecordGrouper implements RecordGrouper {
DateTimeFormatter.ofPattern("dd"), "HH", DateTimeFormatter.ofPattern("HH"));

private final Template filenameTemplate;
// Offsets are a Long and Partitions are an Integer
private final Map<TopicPartition, PartitionOffset> currentHeadRecords = new HashMap<>();

private final Map<TopicPartition, SinkRecord> currentHeadRecords = new HashMap<>();

private final Map<String, List<SinkRecord>> fileBuffers = new HashMap<>();
private final Map<String, GroupedSinkRecord> fileBuffers = new HashMap<>();

private final Function<SinkRecord, Function<Parameter, String>> setTimestampBasedOnRecord;

private final Rotator<List<SinkRecord>> rotator;
private final Rotator<GroupedSinkRecord> rotator;

/**
* A constructor.
Expand All @@ -83,7 +83,7 @@ class TopicPartitionRecordGrouper implements RecordGrouper {
if (unlimited) {
return false;
} else {
return buffer == null || buffer.size() >= maxRecordsPerFile;
return buffer == null || buffer.getNumberOfRecords() >= maxRecordsPerFile;
}
};
}
Expand All @@ -92,28 +92,30 @@ class TopicPartitionRecordGrouper implements RecordGrouper {
public void put(final SinkRecord record) {
Objects.requireNonNull(record, "record cannot be null");
final String recordKey = resolveRecordKeyFor(record);
fileBuffers.computeIfAbsent(recordKey, ignored -> new ArrayList<>()).add(record);
fileBuffers.computeIfAbsent(recordKey, ignored -> new GroupedSinkRecord(recordKey)).addSinkRecord(record);
}

protected String resolveRecordKeyFor(final SinkRecord record) {
final TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition());
final SinkRecord currentHeadRecord = currentHeadRecords.computeIfAbsent(topicPartition, ignored -> record);
final PartitionOffset currentHeadRecord = currentHeadRecords.computeIfAbsent(topicPartition,
ignored -> new PartitionOffset(record.kafkaPartition(), record.kafkaOffset()));
String recordKey = generateRecordKey(topicPartition, currentHeadRecord, record);
if (rotator.rotate(fileBuffers.get(recordKey))) {
// Create new file using this record as the head record.
recordKey = generateNewRecordKey(record);
}

return recordKey;
}

private String generateRecordKey(final TopicPartition topicPartition, final SinkRecord headRecord,
private String generateRecordKey(final TopicPartition topicPartition, final PartitionOffset headRecord,
final SinkRecord currentRecord) {
final Function<Parameter, String> setKafkaOffset = usePaddingParameter -> usePaddingParameter.asBoolean()
? String.format("%020d", headRecord.kafkaOffset())
: Long.toString(headRecord.kafkaOffset());
? String.format("%020d", headRecord.getOffset())
: Long.toString(headRecord.getOffset());
final Function<Parameter, String> setKafkaPartition = usePaddingParameter -> usePaddingParameter.asBoolean()
? String.format("%010d", headRecord.kafkaPartition())
: Long.toString(headRecord.kafkaPartition());
? String.format("%010d", headRecord.getPartition())
: Long.toString(headRecord.getPartition());

return filenameTemplate.instance()
.bindVariable(FilenameTemplateVariable.TOPIC.name, topicPartition::topic)
Expand All @@ -125,8 +127,9 @@ private String generateRecordKey(final TopicPartition topicPartition, final Sink

protected String generateNewRecordKey(final SinkRecord record) {
final TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition());
currentHeadRecords.put(topicPartition, record);
return generateRecordKey(topicPartition, record, record);
currentHeadRecords.put(topicPartition, new PartitionOffset(record.kafkaPartition(), record.kafkaOffset()));
return generateRecordKey(topicPartition, new PartitionOffset(record.kafkaPartition(), record.kafkaOffset()),
record);
}

@Override
Expand All @@ -135,9 +138,20 @@ public void clear() {
fileBuffers.clear();
}

@Override
public void clearProcessedRecords(final String identifier, final List<SinkRecord> records) {
final GroupedSinkRecord grouperRecord = fileBuffers.getOrDefault(identifier, null);
if (Objects.isNull(grouperRecord)) {
return;
}
grouperRecord.removeSinkRecords(records);
}

@Override
public Map<String, List<SinkRecord>> records() {
return Collections.unmodifiableMap(fileBuffers);
return Collections.unmodifiableMap(fileBuffers.values()
.stream()
.collect(Collectors.toMap(GroupedSinkRecord::getFilename, GroupedSinkRecord::getSinkRecords)));
}

}
Loading

0 comments on commit 77893fc

Please sign in to comment.