Skip to content

Commit

Permalink
Add compatibility for compact records to update only on flush. While …
Browse files Browse the repository at this point in the history
…allowing changelog records to initiate multipart upload.

Signed-off-by: Aindriu Lavelle <[email protected]>
  • Loading branch information
aindriu-aiven committed Oct 30, 2024
1 parent 4776d6d commit f719afd
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ private KafkaProducer<String, GenericRecord> newProducer() {
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroSerializer");
producerProps.put("schema.registry.url", SCHEMA_REGISTRY.getSchemaRegistryUrl());
producerProps.put("linger.ms", 1000);
return new KafkaProducer<>(producerProps);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ private KafkaProducer<String, GenericRecord> newProducer() {
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroSerializer");
producerProps.put("schema.registry.url", SCHEMA_REGISTRY.getSchemaRegistryUrl());
producerProps.put("linger.ms", 1000);
return new KafkaProducer<>(producerProps);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ private KafkaProducer<byte[], byte[]> newProducer() {
"org.apache.kafka.common.serialization.ByteArraySerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
producerProps.put("linger.ms", 1000);
return new KafkaProducer<>(producerProps);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ void tearDown() {
private KafkaProducer<byte[], byte[]> newProducer() {
final Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
producerProps.put("linger.ms", 1000);
return new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.aiven.kafka.connect.common.grouper.RecordGrouper;
import io.aiven.kafka.connect.common.grouper.RecordGrouperFactory;
import io.aiven.kafka.connect.common.output.OutputWriter;
import io.aiven.kafka.connect.common.templating.Template;
import io.aiven.kafka.connect.common.templating.VariableTemplatePart;
import io.aiven.kafka.connect.s3.config.AwsCredentialProviderFactory;
import io.aiven.kafka.connect.s3.config.S3SinkConfig;
Expand All @@ -68,6 +69,8 @@ public final class S3SinkTask extends SinkTask {

private Map<String, OutputWriter> writers;

private boolean isKeyRecordGrouper;

AwsCredentialProviderFactory credentialFactory = new AwsCredentialProviderFactory();

@SuppressWarnings("PMD.UnnecessaryConstructor") // required by Connect
Expand All @@ -81,6 +84,7 @@ public void start(final Map<String, String> props) {
config = new S3SinkConfig(props);
s3Client = createAmazonS3Client(config);
writers = new HashMap<>();
isKeyRecordGrouper = isOfTypeKeyRecordGrouper(config.getFilenameTemplate());
try {
recordGrouper = RecordGrouperFactory.newRecordGrouper(config);
} catch (final Exception e) { // NOPMD AvoidCatchingGenericException
Expand All @@ -91,6 +95,20 @@ public void start(final Map<String, String> props) {
}
}

/**
* This determines if the file is key based, and possible to change a single file multiple times per flush or if
* it's a roll over file which at each flush is reset.
*
* @param fileNameTemplate
* the format type to output files in supplied in the configuration
* @return true if is of type RecordGrouperFactory.KEY_RECORD or RecordGrouperFactory.KEY_TOPIC_PARTITION_RECORD
*/
private boolean isOfTypeKeyRecordGrouper(final Template fileNameTemplate) {
return RecordGrouperFactory.KEY_RECORD.equals(RecordGrouperFactory.resolveRecordGrouperType(fileNameTemplate))
|| RecordGrouperFactory.KEY_TOPIC_PARTITION_RECORD
.equals(RecordGrouperFactory.resolveRecordGrouperType(fileNameTemplate));
}

private AmazonS3 createAmazonS3Client(final S3SinkConfig config) {
final var awsEndpointConfig = newEndpointConfiguration(this.config);
final var clientConfig = PredefinedClientConfigurations.defaultConfig()
Expand All @@ -115,34 +133,46 @@ public void put(final Collection<SinkRecord> records) {
Objects.requireNonNull(records, "records cannot be null");
LOGGER.info("Processing {} records", records.size());
records.forEach(recordGrouper::put);

recordGrouper.records().forEach((filename, groupedRecords) -> writeToS3(filename, groupedRecords, records));

if (!isKeyRecordGrouper) {
recordGrouper.records().forEach((filename, groupedRecords) -> writeToS3(filename, groupedRecords, records));
}
}

/**
* Flush is used to roll over file and complete the S3 Mutli part upload.
* Flush is used alongside the KeyRecordGroupers to initate and complete file writes to S3. When not using a key
* record grouper, the S3 upload will be initiated by the put command and flush will be used to write the files and
* roll over the files/
*
* @param offsets
* the latest offset sent to put and that is now ready to be flushed.
*/
@Override
public void flush(final Map<TopicPartition, OffsetAndMetadata> offsets) {
// On Flush Get Active writers
final Collection<OutputWriter> activeWriters = writers.values();
// Clear recordGrouper so it restarts OFFSET HEADS etc and on next put new writers will be created.
recordGrouper.clear();
// Close
activeWriters.forEach(writer -> {
if (isKeyRecordGrouper) {
try {
// Close active writers && remove from writers Map
// Calling close will write anything in the buffer before closing and complete the S3 multi part upload
writer.close();
// Remove once closed
writers.remove(writer);
} catch (IOException e) {
throw new ConnectException(e);
recordGrouper.records().forEach(this::flushToS3);
} finally {
recordGrouper.clear();
}
});
} else {
// On Flush Get Active writers
final Collection<OutputWriter> activeWriters = writers.values();
// Clear recordGrouper so it restarts OFFSET HEADS etc and on next put new writers will be created.
recordGrouper.clear();
// Close
activeWriters.forEach(writer -> {
try {
// Close active writers && remove from writers Map
// Calling close will write anything in the buffer before closing and complete the S3 multi part
// upload
writer.close();
// Remove once closed
writers.remove(writer);
} catch (IOException e) {
throw new ConnectException(e);
}
});
}

}

Expand All @@ -157,12 +187,11 @@ public void flush(final Map<TopicPartition, OffsetAndMetadata> offsets) {
* @return correct OutputWriter for writing a particular record to S3
*/
private OutputWriter getOutputWriter(final String filename, final SinkRecord sinkRecord) {
final String fileNameTemplate = getFileNameTemplate(filename, sinkRecord);

if (writers.get(fileNameTemplate) == null) {
if (writers.get(filename) == null) {
final var out = newStreamFor(filename, sinkRecord);
try {
writers.put(fileNameTemplate,
writers.put(filename,
OutputWriter.builder()
.withCompressionType(config.getCompressionType())
.withExternalProperties(config.originalsStrings())
Expand All @@ -173,7 +202,7 @@ private OutputWriter getOutputWriter(final String filename, final SinkRecord sin
throw new ConnectException(e);
}
}
return writers.get(fileNameTemplate);
return writers.get(filename);
}

/**
Expand All @@ -182,12 +211,9 @@ private OutputWriter getOutputWriter(final String filename, final SinkRecord sin
* the name of the file in S3 to be written to
* @param records
* all records in this record grouping, including those already written to S3
* @param recordToBeWritten
* new records from put() which are to be written to S3
*/
private void writeToS3(final String filename, final List<SinkRecord> records,
final Collection<SinkRecord> recordToBeWritten) {

final SinkRecord sinkRecord = records.get(0);
// This writer is being left open until a flush occurs.
final OutputWriter writer; // NOPMD CloseResource
Expand All @@ -196,6 +222,29 @@ private void writeToS3(final String filename, final List<SinkRecord> records,
// Record Grouper returns all records for that filename, all we want is the new batch of records to be added
// to the multi part upload.
writer.writeRecords(records.stream().filter(recordToBeWritten::contains).collect(Collectors.toList()));

} catch (IOException e) {
throw new ConnectException(e);
}

}

/**
* For Key record grouper the file is written just once to reduce the number of calls to S3 to a minimum. Each file
* contains one record and is written once with the latest record when flush is called
*
* @param filename
* the name of the file in S3 to be written to
* @param records
* all records in this record grouping, including those already written to S3
*/
private void flushToS3(final String filename, final List<SinkRecord> records) {
final SinkRecord sinkRecord = records.get(0);
try (var writer = getOutputWriter(filename, sinkRecord)) {
// For Key based files Record Grouper returns only one record for that filename
// to the multi part upload.
writer.writeRecords(records);
writers.remove(filename, writer);
} catch (IOException e) {
throw new ConnectException(e);
}
Expand All @@ -204,13 +253,15 @@ private void writeToS3(final String filename, final List<SinkRecord> records,

@Override
public void stop() {
writers.forEach((k, v) -> {
try {
v.close();
} catch (IOException e) {
throw new ConnectException(e);
}
});
if (!isKeyRecordGrouper) {
writers.forEach((k, v) -> {
try {
v.close();
} catch (IOException e) {
throw new ConnectException(e);
}
});
}
s3Client.shutdown();

LOGGER.info("Stop S3 Sink Task");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,7 @@ void mutliPartUploadWriteOnlyExpectedRecordsAndFilesToS3() throws IOException {
properties.put(S3SinkConfig.FORMAT_OUTPUT_ENVELOPE_CONFIG, "false");
properties.put(S3SinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, "json");
properties.put(S3SinkConfig.AWS_S3_PREFIX_CONFIG, "prefix-");
properties.put(S3SinkConfig.FILE_NAME_TEMPLATE_CONFIG, "{{topic}}-{{partition}}-{{start_offset}}");

final S3SinkTask task = new S3SinkTask();
task.start(properties);
Expand Down Expand Up @@ -773,6 +774,88 @@ void mutliPartUploadWriteOnlyExpectedRecordsAndFilesToS3() throws IOException {

}

@Test
void mutliPartUploadUsingKeyPartitioning() throws IOException {
final String compression = "none";
properties.put(S3SinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression);
properties.put(S3SinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "value");
properties.put(S3SinkConfig.FORMAT_OUTPUT_ENVELOPE_CONFIG, "false");
properties.put(S3SinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, "json");
properties.put(S3SinkConfig.AWS_S3_PREFIX_CONFIG, "prefix-");
// Compact/key 'mode' value only updated
properties.put(S3SinkConfig.FILE_NAME_TEMPLATE_CONFIG, "{{key}}-{{topic}}");

final S3SinkTask task = new S3SinkTask();
task.start(properties);
int timestamp = 1000;
int offset1 = 10;
int offset2 = 20;
int offset3 = 30;
final List<List<SinkRecord>> allRecords = new ArrayList<>();
for (int i = 0; i < 3; i++) {
allRecords.add(List.of(
createRecordWithStructValueSchema("topic0", 0, "key0", "name0" + i, offset1++, timestamp++),
createRecordWithStructValueSchema("topic0", 1, "key1", "name1" + i, offset2++, timestamp++),
createRecordWithStructValueSchema("topic1", 0, "key2", "name2" + i, offset3++, timestamp++)));
}
final TopicPartition tp00 = new TopicPartition("topic0", 0);
final TopicPartition tp01 = new TopicPartition("topic0", 1);
final TopicPartition tp10 = new TopicPartition("topic1", 0);
final Collection<TopicPartition> tps = List.of(tp00, tp01, tp10);
task.open(tps);

allRecords.forEach(task::put);

final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(tp00, new OffsetAndMetadata(offset1));
offsets.put(tp01, new OffsetAndMetadata(offset2));
offsets.put(tp10, new OffsetAndMetadata(offset3));
task.flush(offsets);

final CompressionType compressionType = CompressionType.forName(compression);

List<String> expectedBlobs = Lists.newArrayList(
"prefix-topic0-0-00000000000000000012" + compressionType.extension(),
"prefix-topic0-1-00000000000000000022" + compressionType.extension(),
"prefix-topic1-0-00000000000000000032" + compressionType.extension());

assertThat(expectedBlobs).allMatch(blobName -> testBucketAccessor.doesObjectExist(blobName));

assertThat(testBucketAccessor.readLines("prefix-topic0-0-00000000000000000012", compression))
.containsExactly("[", "{\"name\":\"name02\"}", "]");
assertThat(testBucketAccessor.readLines("prefix-topic0-1-00000000000000000022", compression))
.containsExactly("[", "{\"name\":\"name12\"}", "]");
assertThat(testBucketAccessor.readLines("prefix-topic1-0-00000000000000000032", compression))
.containsExactly("[", "{\"name\":\"name22\"}", "]");
// Reset and send another batch of records to S3
allRecords.clear();
for (int i = 0; i < 3; i++) {
allRecords.add(List.of(
createRecordWithStructValueSchema("topic0", 0, "key0", "name01" + i, offset1++, timestamp++),
createRecordWithStructValueSchema("topic0", 1, "key1", "name11" + i, offset2++, timestamp++),
createRecordWithStructValueSchema("topic1", 0, "key2", "name21" + i, offset3++, timestamp++)));
}
allRecords.forEach(task::put);
offsets.clear();
offsets.put(tp00, new OffsetAndMetadata(offset1));
offsets.put(tp01, new OffsetAndMetadata(offset2));
offsets.put(tp10, new OffsetAndMetadata(offset3));
task.flush(offsets);
expectedBlobs.clear();

expectedBlobs = Lists.newArrayList("prefix-topic0-0-00000000000000000015" + compressionType.extension(),
"prefix-topic0-1-00000000000000000025" + compressionType.extension(),
"prefix-topic1-0-00000000000000000035" + compressionType.extension());
assertThat(expectedBlobs).allMatch(blobName -> testBucketAccessor.doesObjectExist(blobName));
assertThat(testBucketAccessor.readLines("prefix-topic0-0-00000000000000000015", compression))
.containsExactly("[", "{\"name\":\"name012\"}", "]");
assertThat(testBucketAccessor.readLines("prefix-topic0-1-00000000000000000025", compression))
.containsExactly("[", "{\"name\":\"name112\"}", "]");
assertThat(testBucketAccessor.readLines("prefix-topic1-0-00000000000000000035", compression))
.containsExactly("[", "{\"name\":\"name212\"}", "]");

}

private SinkRecord createRecordWithStringValueSchema(final String topic, final int partition, final String key,
final String value, final int offset, final long timestamp) {
return new SinkRecord(topic, partition, Schema.STRING_SCHEMA, key, Schema.STRING_SCHEMA, value, offset,
Expand Down

0 comments on commit f719afd

Please sign in to comment.