From b4e06fd9e0a7857fb5aa0134be2442d335ebe6fc Mon Sep 17 00:00:00 2001 From: Aindriu Lavelle Date: Tue, 29 Oct 2024 11:40:01 +0000 Subject: [PATCH] Reduce S3 memory usage by clearing records already sent to S3 Signed-off-by: Aindriu Lavelle --- .../KeyAndTopicPartitionRecordGrouper.java | 6 ++ .../common/grouper/KeyRecordGrouper.java | 6 ++ .../common/grouper/PartitionOffset.java | 44 +++++++++++ .../connect/common/grouper/RecordGrouper.java | 10 ++- .../common/grouper/SinkRecordsBatch.java | 77 +++++++++++++++++++ .../TopicPartitionKeyRecordGrouper.java | 42 ++++++---- .../grouper/TopicPartitionRecordGrouper.java | 46 +++++++---- .../io/aiven/kafka/connect/s3/S3SinkTask.java | 58 +++++--------- 8 files changed, 218 insertions(+), 71 deletions(-) create mode 100644 commons/src/main/java/io/aiven/kafka/connect/common/grouper/PartitionOffset.java create mode 100644 commons/src/main/java/io/aiven/kafka/connect/common/grouper/SinkRecordsBatch.java diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyAndTopicPartitionRecordGrouper.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyAndTopicPartitionRecordGrouper.java index 43ce6c2f..b2402385 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyAndTopicPartitionRecordGrouper.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyAndTopicPartitionRecordGrouper.java @@ -103,6 +103,12 @@ public void clear() { fileBuffers.clear(); } + @Override + public void clearProcessedRecords(final String identifier, final List records) { + // One entry per file, so the entire file can be removed to reduce memory overhead. + fileBuffers.remove(identifier); + } + @Override public Map> records() { return Collections.unmodifiableMap(fileBuffers); diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyRecordGrouper.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyRecordGrouper.java index b9af899e..5ba409b8 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyRecordGrouper.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyRecordGrouper.java @@ -90,6 +90,12 @@ public void clear() { fileBuffers.clear(); } + @Override + public void clearProcessedRecords(final String identifier, final List records) { + // One record per file, so remove the entry to reduce memory + fileBuffers.remove(identifier); + } + @Override public Map> records() { return Collections.unmodifiableMap(fileBuffers); diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/PartitionOffset.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/PartitionOffset.java new file mode 100644 index 00000000..568407c8 --- /dev/null +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/PartitionOffset.java @@ -0,0 +1,44 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.grouper; + +public class PartitionOffset { + + private Long offset; + private int partition; + + public PartitionOffset(final int partition, final Long offset) { + this.offset = offset; + this.partition = partition; + } + + public int getPartition() { + return partition; + } + + public void setPartition(final int partition) { + this.partition = partition; + } + + public Long getOffset() { + return offset; + } + + public void setOffset(final Long offset) { + this.offset = offset; + } +} diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouper.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouper.java index 1e0e6c18..2126e0a6 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouper.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouper.java @@ -38,10 +38,18 @@ public interface RecordGrouper { */ void clear(); + /** + * Clear processed records from memory + * + * @param records + * all records already processed to Sink + */ + void clearProcessedRecords(String identifier, List records); + /** * Get all records associated with files, grouped by the file name. * - * @return map of records assotiated with files + * @return map of records associated with files */ Map> records(); diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/SinkRecordsBatch.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/SinkRecordsBatch.java new file mode 100644 index 00000000..3afae801 --- /dev/null +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/SinkRecordsBatch.java @@ -0,0 +1,77 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.grouper; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.kafka.connect.sink.SinkRecord; + +public class SinkRecordsBatch { + + private int numberOfRecords; + final private List sinkRecords; + final private String filename; + final private long recordCreationDate = System.currentTimeMillis(); + + public SinkRecordsBatch(final String filename) { + this.filename = filename; + sinkRecords = new ArrayList<>(); + numberOfRecords = 0; + } + + public SinkRecordsBatch(final String filename, final List sinkRecords) { + this.filename = filename; + this.sinkRecords = new ArrayList<>(sinkRecords); + numberOfRecords = sinkRecords.size(); + } + public SinkRecordsBatch(final String filename, final SinkRecord sinkRecord) { + this.filename = filename; + this.sinkRecords = new ArrayList<>(); + this.sinkRecords.add(sinkRecord); + numberOfRecords = 1; + } + + public void addSinkRecord(final SinkRecord sinkRecord) { + this.sinkRecords.add(sinkRecord); + this.numberOfRecords++; + } + + public List getSinkRecords() { + // Ensure access to the Sink Records can only be changed through the apis and not accidentally by another + // process. + return Collections.unmodifiableList(sinkRecords); + } + + public void removeSinkRecords(final List sinkRecords) { + this.sinkRecords.removeAll(sinkRecords); + } + + public int getNumberOfRecords() { + return numberOfRecords; + } + + public String getFilename() { + return filename; + } + + public long getRecordCreationDate() { + return recordCreationDate; + } + +} diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouper.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouper.java index 55e09763..e36ba1df 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouper.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouper.java @@ -17,13 +17,13 @@ package io.aiven.kafka.connect.common.grouper; import java.time.format.DateTimeFormatter; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.data.Schema; @@ -42,13 +42,13 @@ public class TopicPartitionKeyRecordGrouper implements RecordGrouper { private final Template filenameTemplate; - private final Map currentHeadRecords = new HashMap<>(); + private final Map currentHeadRecords = new HashMap<>(); - private final Map> fileBuffers = new HashMap<>(); + private final Map fileBuffers = new HashMap<>(); private final Function> setTimestampBasedOnRecord; - private final Rotator> rotator; + private final Rotator rotator; TopicPartitionKeyRecordGrouper(final Template filenameTemplate, final Integer maxRecordsPerFile, final TimestampSource tsSource) { @@ -64,7 +64,7 @@ public class TopicPartitionKeyRecordGrouper implements RecordGrouper { if (unlimited) { return false; } else { - return buffer == null || buffer.size() >= maxRecordsPerFile; + return buffer == null || buffer.getNumberOfRecords() >= maxRecordsPerFile; } }; } @@ -73,7 +73,7 @@ public class TopicPartitionKeyRecordGrouper implements RecordGrouper { public void put(final SinkRecord record) { Objects.requireNonNull(record, "record cannot be null"); final String recordKey = resolveRecordKeyFor(record); - fileBuffers.computeIfAbsent(recordKey, ignored -> new ArrayList<>()).add(record); + fileBuffers.computeIfAbsent(recordKey, ignored -> new SinkRecordsBatch(recordKey)).addSinkRecord(record); } protected String resolveRecordKeyFor(final SinkRecord record) { @@ -81,7 +81,8 @@ protected String resolveRecordKeyFor(final SinkRecord record) { final TopicPartitionKey tpk = new TopicPartitionKey(new TopicPartition(record.topic(), record.kafkaPartition()), key); - final SinkRecord currentHeadRecord = currentHeadRecords.computeIfAbsent(tpk, ignored -> record); + final PartitionOffset currentHeadRecord = currentHeadRecords.computeIfAbsent(tpk, + ignored -> new PartitionOffset(record.kafkaPartition(), record.kafkaOffset())); String objectKey = generateObjectKey(tpk, currentHeadRecord, record); if (rotator.rotate(fileBuffers.get(objectKey))) { // Create new file using this record as the head record. @@ -102,14 +103,14 @@ private String recordKey(final SinkRecord record) { return key; } - public String generateObjectKey(final TopicPartitionKey tpk, final SinkRecord headRecord, + public String generateObjectKey(final TopicPartitionKey tpk, final PartitionOffset headRecord, final SinkRecord currentRecord) { final Function setKafkaOffset = usePaddingParameter -> usePaddingParameter.asBoolean() - ? String.format("%020d", headRecord.kafkaOffset()) - : Long.toString(headRecord.kafkaOffset()); + ? String.format("%020d", headRecord.getOffset()) + : Long.toString(headRecord.getOffset()); final Function setKafkaPartition = usePaddingParameter -> usePaddingParameter.asBoolean() - ? String.format("%010d", headRecord.kafkaPartition()) - : Long.toString(headRecord.kafkaPartition()); + ? String.format("%010d", headRecord.getPartition()) + : Long.toString(headRecord.getPartition()); return filenameTemplate.instance() .bindVariable(FilenameTemplateVariable.TOPIC.name, tpk.topicPartition::topic) @@ -123,8 +124,8 @@ public String generateObjectKey(final TopicPartitionKey tpk, final SinkRecord he protected String generateNewRecordKey(final SinkRecord record) { final var key = recordKey(record); final var tpk = new TopicPartitionKey(new TopicPartition(record.topic(), record.kafkaPartition()), key); - currentHeadRecords.put(tpk, record); - return generateObjectKey(tpk, record, record); + currentHeadRecords.put(tpk, new PartitionOffset(record.kafkaPartition(), record.kafkaOffset())); + return generateObjectKey(tpk, new PartitionOffset(record.kafkaPartition(), record.kafkaOffset()), record); } @Override @@ -133,9 +134,20 @@ public void clear() { fileBuffers.clear(); } + @Override + public void clearProcessedRecords(final String identifier, final List records) { + final SinkRecordsBatch grouperRecord = fileBuffers.getOrDefault(identifier, null); + if (Objects.isNull(grouperRecord)) { + return; + } + grouperRecord.removeSinkRecords(records); + } + @Override public Map> records() { - return Collections.unmodifiableMap(fileBuffers); + return Collections.unmodifiableMap(fileBuffers.values() + .stream() + .collect(Collectors.toMap(SinkRecordsBatch::getFilename, SinkRecordsBatch::getSinkRecords))); } public static class TopicPartitionKey { diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouper.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouper.java index 5a080e0a..e3996d77 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouper.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouper.java @@ -17,13 +17,13 @@ package io.aiven.kafka.connect.common.grouper; import java.time.format.DateTimeFormatter; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.sink.SinkRecord; @@ -50,14 +50,14 @@ class TopicPartitionRecordGrouper implements RecordGrouper { DateTimeFormatter.ofPattern("dd"), "HH", DateTimeFormatter.ofPattern("HH")); private final Template filenameTemplate; + // Offsets are a Long and Partitions are an Integer + private final Map currentHeadRecords = new HashMap<>(); - private final Map currentHeadRecords = new HashMap<>(); - - private final Map> fileBuffers = new HashMap<>(); + private final Map fileBuffers = new HashMap<>(); private final Function> setTimestampBasedOnRecord; - private final Rotator> rotator; + private final Rotator rotator; /** * A constructor. @@ -83,7 +83,7 @@ class TopicPartitionRecordGrouper implements RecordGrouper { if (unlimited) { return false; } else { - return buffer == null || buffer.size() >= maxRecordsPerFile; + return buffer == null || buffer.getNumberOfRecords() >= maxRecordsPerFile; } }; } @@ -92,28 +92,30 @@ class TopicPartitionRecordGrouper implements RecordGrouper { public void put(final SinkRecord record) { Objects.requireNonNull(record, "record cannot be null"); final String recordKey = resolveRecordKeyFor(record); - fileBuffers.computeIfAbsent(recordKey, ignored -> new ArrayList<>()).add(record); + fileBuffers.computeIfAbsent(recordKey, ignored -> new SinkRecordsBatch(recordKey)).addSinkRecord(record); } protected String resolveRecordKeyFor(final SinkRecord record) { final TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition()); - final SinkRecord currentHeadRecord = currentHeadRecords.computeIfAbsent(topicPartition, ignored -> record); + final PartitionOffset currentHeadRecord = currentHeadRecords.computeIfAbsent(topicPartition, + ignored -> new PartitionOffset(record.kafkaPartition(), record.kafkaOffset())); String recordKey = generateRecordKey(topicPartition, currentHeadRecord, record); if (rotator.rotate(fileBuffers.get(recordKey))) { // Create new file using this record as the head record. recordKey = generateNewRecordKey(record); } + return recordKey; } - private String generateRecordKey(final TopicPartition topicPartition, final SinkRecord headRecord, + private String generateRecordKey(final TopicPartition topicPartition, final PartitionOffset headRecord, final SinkRecord currentRecord) { final Function setKafkaOffset = usePaddingParameter -> usePaddingParameter.asBoolean() - ? String.format("%020d", headRecord.kafkaOffset()) - : Long.toString(headRecord.kafkaOffset()); + ? String.format("%020d", headRecord.getOffset()) + : Long.toString(headRecord.getOffset()); final Function setKafkaPartition = usePaddingParameter -> usePaddingParameter.asBoolean() - ? String.format("%010d", headRecord.kafkaPartition()) - : Long.toString(headRecord.kafkaPartition()); + ? String.format("%010d", headRecord.getPartition()) + : Long.toString(headRecord.getPartition()); return filenameTemplate.instance() .bindVariable(FilenameTemplateVariable.TOPIC.name, topicPartition::topic) @@ -125,8 +127,9 @@ private String generateRecordKey(final TopicPartition topicPartition, final Sink protected String generateNewRecordKey(final SinkRecord record) { final TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition()); - currentHeadRecords.put(topicPartition, record); - return generateRecordKey(topicPartition, record, record); + currentHeadRecords.put(topicPartition, new PartitionOffset(record.kafkaPartition(), record.kafkaOffset())); + return generateRecordKey(topicPartition, new PartitionOffset(record.kafkaPartition(), record.kafkaOffset()), + record); } @Override @@ -135,9 +138,20 @@ public void clear() { fileBuffers.clear(); } + @Override + public void clearProcessedRecords(final String identifier, final List records) { + final SinkRecordsBatch grouperRecord = fileBuffers.getOrDefault(identifier, null); + if (Objects.isNull(grouperRecord)) { + return; + } + grouperRecord.removeSinkRecords(records); + } + @Override public Map> records() { - return Collections.unmodifiableMap(fileBuffers); + return Collections.unmodifiableMap(fileBuffers.values() + .stream() + .collect(Collectors.toMap(SinkRecordsBatch::getFilename, SinkRecordsBatch::getSinkRecords))); } } diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java index 6dc7560e..8ba4f9d3 100644 --- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java +++ b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java @@ -29,7 +29,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; @@ -134,46 +133,25 @@ public void put(final Collection records) { LOGGER.info("Processing {} records", records.size()); records.forEach(recordGrouper::put); if (!isKeyRecordGrouper) { - recordGrouper.records().forEach((filename, groupedRecords) -> writeToS3(filename, groupedRecords, records)); + recordGrouper.records().forEach(this::writeToS3); } } /** - * Flush is used alongside the KeyRecordGroupers to initate and complete file writes to S3. When not using a key + * Flush is used alongside the KeyRecordGroupers to initiate and complete file writes to S3. When not using a key * record grouper, the S3 upload will be initiated by the put command and flush will be used to write the files and - * roll over the files/ + * roll over the files if any records remain in the record grouper for completion. * * @param offsets * the latest offset sent to put and that is now ready to be flushed. */ @Override public void flush(final Map offsets) { - if (isKeyRecordGrouper) { - try { - recordGrouper.records().forEach(this::flushToS3); - } finally { - recordGrouper.clear(); - } - } else { - // On Flush Get Active writers - final Collection activeWriters = writers.values(); - // Clear recordGrouper so it restarts OFFSET HEADS etc and on next put new writers will be created. + try { + recordGrouper.records().forEach(this::flushToS3); + } finally { recordGrouper.clear(); - // Close - activeWriters.forEach(writer -> { - try { - // Close active writers && remove from writers Map - // Calling close will write anything in the buffer before closing and complete the S3 multi part - // upload - writer.close(); - // Remove once closed - writers.remove(writer); - } catch (IOException e) { - throw new ConnectException(e); - } - }); } - } /** @@ -212,19 +190,20 @@ private OutputWriter getOutputWriter(final String filename, final SinkRecord sin * @param records * all records in this record grouping, including those already written to S3 */ - private void writeToS3(final String filename, final List records, - final Collection recordToBeWritten) { + private void writeToS3(final String filename, final List records) { + // If no new records are supplied in this put operation return immediately + if (records.isEmpty()) { + return; + } final SinkRecord sinkRecord = records.get(0); - // This writer is being left open until a flush occurs. - final OutputWriter writer; // NOPMD CloseResource + // Record Grouper returns all records for that filename, all we want is the new batch of records to be added + // to the multi part upload. try { - writer = getOutputWriter(filename, sinkRecord); - // Record Grouper returns all records for that filename, all we want is the new batch of records to be added - // to the multi part upload. - writer.writeRecords(records.stream().filter(recordToBeWritten::contains).collect(Collectors.toList())); - + // This writer is being left open until a flush occurs. + getOutputWriter(filename, sinkRecord).writeRecords(records); + recordGrouper.clearProcessedRecords(filename, records); } catch (IOException e) { - throw new ConnectException(e); + LOGGER.warn("Unable to write record, will retry on next put or flush operation.", e); } } @@ -239,7 +218,8 @@ private void writeToS3(final String filename, final List records, * all records in this record grouping, including those already written to S3 */ private void flushToS3(final String filename, final List records) { - final SinkRecord sinkRecord = records.get(0); + + final SinkRecord sinkRecord = records.isEmpty() ? null : records.get(0); try (var writer = getOutputWriter(filename, sinkRecord)) { // For Key based files Record Grouper returns only one record for that filename // to the multi part upload.