Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initiate S3 Multi-part upload on receiving first event #318

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ private KafkaProducer<String, GenericRecord> newProducer() {
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroSerializer");
producerProps.put("schema.registry.url", SCHEMA_REGISTRY.getSchemaRegistryUrl());
producerProps.put("linger.ms", 1000);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

linger.ms was added to send all the test events in one batch, so that the flush method is not called in between small batches of kafka events being sent causing the integration tests to fail.

return new KafkaProducer<>(producerProps);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ private KafkaProducer<String, GenericRecord> newProducer() {
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroSerializer");
producerProps.put("schema.registry.url", SCHEMA_REGISTRY.getSchemaRegistryUrl());
producerProps.put("linger.ms", 1000);
return new KafkaProducer<>(producerProps);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ private KafkaProducer<byte[], byte[]> newProducer() {
"org.apache.kafka.common.serialization.ByteArraySerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
producerProps.put("linger.ms", 1000);
return new KafkaProducer<>(producerProps);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ void tearDown() {
private KafkaProducer<byte[], byte[]> newProducer() {
final Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
producerProps.put("linger.ms", 1000);
return new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -40,6 +42,7 @@
import io.aiven.kafka.connect.common.grouper.RecordGrouper;
import io.aiven.kafka.connect.common.grouper.RecordGrouperFactory;
import io.aiven.kafka.connect.common.output.OutputWriter;
import io.aiven.kafka.connect.common.templating.Template;
import io.aiven.kafka.connect.common.templating.VariableTemplatePart;
import io.aiven.kafka.connect.s3.config.AwsCredentialProviderFactory;
import io.aiven.kafka.connect.s3.config.S3SinkConfig;
Expand All @@ -53,17 +56,21 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings("PMD.ExcessiveImports")
@SuppressWarnings({ "PMD.ExcessiveImports", "PMD.TooManyMethods" })
public final class S3SinkTask extends SinkTask {

private static final Logger LOGGER = LoggerFactory.getLogger(AivenKafkaConnectS3SinkConnector.class);
private static final Logger LOGGER = LoggerFactory.getLogger(S3SinkTask.class);

private RecordGrouper recordGrouper;

private S3SinkConfig config;

private AmazonS3 s3Client;

private Map<String, OutputWriter> writers;

private boolean isKeyRecordGrouper;

AwsCredentialProviderFactory credentialFactory = new AwsCredentialProviderFactory();

@SuppressWarnings("PMD.UnnecessaryConstructor") // required by Connect
Expand All @@ -76,6 +83,8 @@ public void start(final Map<String, String> props) {
Objects.requireNonNull(props, "props hasn't been set");
config = new S3SinkConfig(props);
s3Client = createAmazonS3Client(config);
writers = new HashMap<>();
isKeyRecordGrouper = isOfTypeKeyRecordGrouper(config.getFilenameTemplate());
try {
recordGrouper = RecordGrouperFactory.newRecordGrouper(config);
} catch (final Exception e) { // NOPMD AvoidCatchingGenericException
Expand All @@ -86,6 +95,20 @@ public void start(final Map<String, String> props) {
}
}

/**
* This determines if the file is key based, and possible to change a single file multiple times per flush or if
* it's a roll over file which at each flush is reset.
Comment on lines +98 to +99
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain more about this? What is key based grouping, and why does it mutate the file?

Copy link
Contributor Author

@aindriu-aiven aindriu-aiven Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @gharris1727 first of all thanks for taking a look!

In terms of the roll over and key grouping.

We have documentation on the key grouping here. (but I will explain below what I am doing.)
Docs

The S3 Sink (along with all the Sinks provided by Aiven) use a "Record Grouper" this record grouper uses the file.name.template to determine if records should be grouped in a 'changelog' or if they should be grouped by 'Key'.

e.g. {{topic}}-{{partition}}-{{start_offset}} is the default and would cause the Record Grouper to group by changelog.
Changelog means records are appended to the same file and on flush, this causes the record files to be rolled over, to use a new end start_offset

So the original file might be 'logs-0-28.ext' and after flush it will be 'logs-0-45.ext' and each event between offset 28 and 44 will be written to the file for partition 0.

As we don't enforce a max number of events per file, or a max file size the flush (this would be new and is something I am looking at in a separate memory improvements PR) works as a delimiter of sorts to roll the files over.

in compact mode the key looks something like
'{{key}}' or '{{key}}-{{topic}}-{{partition}}' and when matching keys appear it will create a new record or if there is one already existing update the existing record.
To handle this, currently the record grouper removes any existing record and adds the latest record to the file. This then gets written on flush.

The multi part upload does not handle a change to the file so the options are to upload every time and immediately close it to complete the upload or wait until the flush and update the record once.

The latter option is I think, better in terms of API costs if it is possible this could update multiple times over a 30-60s period.

edit: The downside to this implementation for the compact/key based records, is that the upcoming PR to reduce memory useage will only have an impact for those users using 'changelog' as we can delete the records of those uploaded in a part by S3 multi upload.

Any questions please let me know.

*
* @param fileNameTemplate
* the format type to output files in supplied in the configuration
* @return true if is of type RecordGrouperFactory.KEY_RECORD or RecordGrouperFactory.KEY_TOPIC_PARTITION_RECORD
*/
private boolean isOfTypeKeyRecordGrouper(final Template fileNameTemplate) {
return RecordGrouperFactory.KEY_RECORD.equals(RecordGrouperFactory.resolveRecordGrouperType(fileNameTemplate))
|| RecordGrouperFactory.KEY_TOPIC_PARTITION_RECORD
.equals(RecordGrouperFactory.resolveRecordGrouperType(fileNameTemplate));
}

private AmazonS3 createAmazonS3Client(final S3SinkConfig config) {
final var awsEndpointConfig = newEndpointConfiguration(this.config);
final var clientConfig = PredefinedClientConfigurations.defaultConfig()
Expand All @@ -110,39 +133,137 @@ public void put(final Collection<SinkRecord> records) {
Objects.requireNonNull(records, "records cannot be null");
LOGGER.info("Processing {} records", records.size());
records.forEach(recordGrouper::put);
if (!isKeyRecordGrouper) {
recordGrouper.records().forEach((filename, groupedRecords) -> writeToS3(filename, groupedRecords, records));
}
}

/**
* Flush is used alongside the KeyRecordGroupers to initate and complete file writes to S3. When not using a key
* record grouper, the S3 upload will be initiated by the put command and flush will be used to write the files and
* roll over the files/
*
* @param offsets
* the latest offset sent to put and that is now ready to be flushed.
*/
@Override
public void flush(final Map<TopicPartition, OffsetAndMetadata> offsets) {
try {
recordGrouper.records().forEach(this::flushFile);
} finally {
if (isKeyRecordGrouper) {
try {
recordGrouper.records().forEach(this::flushToS3);
} finally {
recordGrouper.clear();
}
} else {
// On Flush Get Active writers
final Collection<OutputWriter> activeWriters = writers.values();
// Clear recordGrouper so it restarts OFFSET HEADS etc and on next put new writers will be created.
recordGrouper.clear();
// Close
activeWriters.forEach(writer -> {
try {
// Close active writers && remove from writers Map
// Calling close will write anything in the buffer before closing and complete the S3 multi part
// upload
writer.close();
// Remove once closed
writers.remove(writer);
} catch (IOException e) {
throw new ConnectException(e);
}
});
}

}

private void flushFile(final String filename, final List<SinkRecord> records) {
Objects.requireNonNull(records, "records cannot be null");
if (records.isEmpty()) {
return;
/**
* getOutputWriter is used to check if an existing compatible OutputWriter exists and if not create one and return
* it to the caller.
*
* @param filename
* used to write to S3
* @param sinkRecord
* a sinkRecord used to create a new S3OutputStream
* @return correct OutputWriter for writing a particular record to S3
*/
private OutputWriter getOutputWriter(final String filename, final SinkRecord sinkRecord) {

if (writers.get(filename) == null) {
final var out = newStreamFor(filename, sinkRecord);
try {
writers.put(filename,
OutputWriter.builder()
.withCompressionType(config.getCompressionType())
.withExternalProperties(config.originalsStrings())
.withOutputFields(config.getOutputFields())
.withEnvelopeEnabled(config.envelopeEnabled())
.build(out, config.getFormatType()));
} catch (IOException e) {
throw new ConnectException(e);
}
}
return writers.get(filename);
}

/**
*
* @param filename
* the name of the file in S3 to be written to
* @param records
* all records in this record grouping, including those already written to S3
*/
private void writeToS3(final String filename, final List<SinkRecord> records,
final Collection<SinkRecord> recordToBeWritten) {
final SinkRecord sinkRecord = records.get(0);
try (var out = newStreamFor(filename, sinkRecord);
var outputWriter = OutputWriter.builder()
.withCompressionType(config.getCompressionType())
.withExternalProperties(config.originalsStrings())
.withOutputFields(config.getOutputFields())
.withEnvelopeEnabled(config.envelopeEnabled())
.build(out, config.getFormatType())) {
outputWriter.writeRecords(records);
} catch (final IOException e) {
// This writer is being left open until a flush occurs.
final OutputWriter writer; // NOPMD CloseResource
try {
writer = getOutputWriter(filename, sinkRecord);
// Record Grouper returns all records for that filename, all we want is the new batch of records to be added
// to the multi part upload.
writer.writeRecords(records.stream().filter(recordToBeWritten::contains).collect(Collectors.toList()));

} catch (IOException e) {
throw new ConnectException(e);
}

}

/**
* For Key record grouper the file is written just once to reduce the number of calls to S3 to a minimum. Each file
* contains one record and is written once with the latest record when flush is called
*
* @param filename
* the name of the file in S3 to be written to
* @param records
* all records in this record grouping, including those already written to S3
*/
private void flushToS3(final String filename, final List<SinkRecord> records) {
final SinkRecord sinkRecord = records.get(0);
try (var writer = getOutputWriter(filename, sinkRecord)) {
// For Key based files Record Grouper returns only one record for that filename
// to the multi part upload.
writer.writeRecords(records);
writers.remove(filename, writer);
} catch (IOException e) {
throw new ConnectException(e);
}

}

@Override
public void stop() {
if (!isKeyRecordGrouper) {
writers.forEach((k, v) -> {
try {
v.close();
} catch (IOException e) {
throw new ConnectException(e);
}
});
}
Claudenw marked this conversation as resolved.
Show resolved Hide resolved
s3Client.shutdown();

LOGGER.info("Stop S3 Sink Task");
}

Expand All @@ -152,11 +273,15 @@ public String version() {
}

private OutputStream newStreamFor(final String filename, final SinkRecord record) {
final var fullKey = config.usesFileNameTemplate() ? filename : oldFullKey(record);
final var fullKey = getFileNameTemplate(filename, record);
return new S3OutputStream(config.getAwsS3BucketName(), fullKey, config.getAwsS3PartSize(), s3Client,
config.getServerSideEncryptionAlgorithmName());
}

private String getFileNameTemplate(final String filename, final SinkRecord record) {
return config.usesFileNameTemplate() ? filename : oldFullKey(record);
}

private EndpointConfiguration newEndpointConfiguration(final S3SinkConfig config) {
if (Objects.isNull(config.getAwsS3EndPoint())) {
return null;
Expand Down
Loading
Loading