diff --git a/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/AvroIntegrationTest.java b/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/AvroIntegrationTest.java index 2c14c47c..086acb67 100644 --- a/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/AvroIntegrationTest.java +++ b/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/AvroIntegrationTest.java @@ -281,6 +281,7 @@ private KafkaProducer newProducer() { producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer"); producerProps.put("schema.registry.url", SCHEMA_REGISTRY.getSchemaRegistryUrl()); + producerProps.put("linger.ms", 1000); return new KafkaProducer<>(producerProps); } diff --git a/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/AvroParquetIntegrationTest.java b/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/AvroParquetIntegrationTest.java index 380af3e5..dede24c7 100644 --- a/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/AvroParquetIntegrationTest.java +++ b/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/AvroParquetIntegrationTest.java @@ -357,6 +357,7 @@ private KafkaProducer newProducer() { producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer"); producerProps.put("schema.registry.url", SCHEMA_REGISTRY.getSchemaRegistryUrl()); + producerProps.put("linger.ms", 1000); return new KafkaProducer<>(producerProps); } diff --git a/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/IntegrationTest.java b/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/IntegrationTest.java index 55ace34e..580c008c 100644 --- a/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/IntegrationTest.java +++ b/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/IntegrationTest.java @@ -530,6 +530,7 @@ private KafkaProducer newProducer() { "org.apache.kafka.common.serialization.ByteArraySerializer"); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + producerProps.put("linger.ms", 1000); return new KafkaProducer<>(producerProps); } diff --git a/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/ParquetIntegrationTest.java b/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/ParquetIntegrationTest.java index 4c6a34b0..277239cd 100644 --- a/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/ParquetIntegrationTest.java +++ b/s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/ParquetIntegrationTest.java @@ -125,6 +125,7 @@ void tearDown() { private KafkaProducer newProducer() { final Map producerProps = new HashMap<>(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers()); + producerProps.put("linger.ms", 1000); return new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer()); } diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java index 7b09745a..6dc7560e 100644 --- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java +++ b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java @@ -42,6 +42,7 @@ import io.aiven.kafka.connect.common.grouper.RecordGrouper; import io.aiven.kafka.connect.common.grouper.RecordGrouperFactory; import io.aiven.kafka.connect.common.output.OutputWriter; +import io.aiven.kafka.connect.common.templating.Template; import io.aiven.kafka.connect.common.templating.VariableTemplatePart; import io.aiven.kafka.connect.s3.config.AwsCredentialProviderFactory; import io.aiven.kafka.connect.s3.config.S3SinkConfig; @@ -68,6 +69,8 @@ public final class S3SinkTask extends SinkTask { private Map writers; + private boolean isKeyRecordGrouper; + AwsCredentialProviderFactory credentialFactory = new AwsCredentialProviderFactory(); @SuppressWarnings("PMD.UnnecessaryConstructor") // required by Connect @@ -81,6 +84,7 @@ public void start(final Map props) { config = new S3SinkConfig(props); s3Client = createAmazonS3Client(config); writers = new HashMap<>(); + isKeyRecordGrouper = isOfTypeKeyRecordGrouper(config.getFilenameTemplate()); try { recordGrouper = RecordGrouperFactory.newRecordGrouper(config); } catch (final Exception e) { // NOPMD AvoidCatchingGenericException @@ -91,6 +95,20 @@ public void start(final Map props) { } } + /** + * This determines if the file is key based, and possible to change a single file multiple times per flush or if + * it's a roll over file which at each flush is reset. + * + * @param fileNameTemplate + * the format type to output files in supplied in the configuration + * @return true if is of type RecordGrouperFactory.KEY_RECORD or RecordGrouperFactory.KEY_TOPIC_PARTITION_RECORD + */ + private boolean isOfTypeKeyRecordGrouper(final Template fileNameTemplate) { + return RecordGrouperFactory.KEY_RECORD.equals(RecordGrouperFactory.resolveRecordGrouperType(fileNameTemplate)) + || RecordGrouperFactory.KEY_TOPIC_PARTITION_RECORD + .equals(RecordGrouperFactory.resolveRecordGrouperType(fileNameTemplate)); + } + private AmazonS3 createAmazonS3Client(final S3SinkConfig config) { final var awsEndpointConfig = newEndpointConfiguration(this.config); final var clientConfig = PredefinedClientConfigurations.defaultConfig() @@ -115,34 +133,46 @@ public void put(final Collection records) { Objects.requireNonNull(records, "records cannot be null"); LOGGER.info("Processing {} records", records.size()); records.forEach(recordGrouper::put); - - recordGrouper.records().forEach((filename, groupedRecords) -> writeToS3(filename, groupedRecords, records)); - + if (!isKeyRecordGrouper) { + recordGrouper.records().forEach((filename, groupedRecords) -> writeToS3(filename, groupedRecords, records)); + } } /** - * Flush is used to roll over file and complete the S3 Mutli part upload. + * Flush is used alongside the KeyRecordGroupers to initate and complete file writes to S3. When not using a key + * record grouper, the S3 upload will be initiated by the put command and flush will be used to write the files and + * roll over the files/ * * @param offsets + * the latest offset sent to put and that is now ready to be flushed. */ @Override public void flush(final Map offsets) { - // On Flush Get Active writers - final Collection activeWriters = writers.values(); - // Clear recordGrouper so it restarts OFFSET HEADS etc and on next put new writers will be created. - recordGrouper.clear(); - // Close - activeWriters.forEach(writer -> { + if (isKeyRecordGrouper) { try { - // Close active writers && remove from writers Map - // Calling close will write anything in the buffer before closing and complete the S3 multi part upload - writer.close(); - // Remove once closed - writers.remove(writer); - } catch (IOException e) { - throw new ConnectException(e); + recordGrouper.records().forEach(this::flushToS3); + } finally { + recordGrouper.clear(); } - }); + } else { + // On Flush Get Active writers + final Collection activeWriters = writers.values(); + // Clear recordGrouper so it restarts OFFSET HEADS etc and on next put new writers will be created. + recordGrouper.clear(); + // Close + activeWriters.forEach(writer -> { + try { + // Close active writers && remove from writers Map + // Calling close will write anything in the buffer before closing and complete the S3 multi part + // upload + writer.close(); + // Remove once closed + writers.remove(writer); + } catch (IOException e) { + throw new ConnectException(e); + } + }); + } } @@ -157,12 +187,11 @@ public void flush(final Map offsets) { * @return correct OutputWriter for writing a particular record to S3 */ private OutputWriter getOutputWriter(final String filename, final SinkRecord sinkRecord) { - final String fileNameTemplate = getFileNameTemplate(filename, sinkRecord); - if (writers.get(fileNameTemplate) == null) { + if (writers.get(filename) == null) { final var out = newStreamFor(filename, sinkRecord); try { - writers.put(fileNameTemplate, + writers.put(filename, OutputWriter.builder() .withCompressionType(config.getCompressionType()) .withExternalProperties(config.originalsStrings()) @@ -173,7 +202,7 @@ private OutputWriter getOutputWriter(final String filename, final SinkRecord sin throw new ConnectException(e); } } - return writers.get(fileNameTemplate); + return writers.get(filename); } /** @@ -182,12 +211,9 @@ private OutputWriter getOutputWriter(final String filename, final SinkRecord sin * the name of the file in S3 to be written to * @param records * all records in this record grouping, including those already written to S3 - * @param recordToBeWritten - * new records from put() which are to be written to S3 */ private void writeToS3(final String filename, final List records, final Collection recordToBeWritten) { - final SinkRecord sinkRecord = records.get(0); // This writer is being left open until a flush occurs. final OutputWriter writer; // NOPMD CloseResource @@ -196,6 +222,29 @@ private void writeToS3(final String filename, final List records, // Record Grouper returns all records for that filename, all we want is the new batch of records to be added // to the multi part upload. writer.writeRecords(records.stream().filter(recordToBeWritten::contains).collect(Collectors.toList())); + + } catch (IOException e) { + throw new ConnectException(e); + } + + } + + /** + * For Key record grouper the file is written just once to reduce the number of calls to S3 to a minimum. Each file + * contains one record and is written once with the latest record when flush is called + * + * @param filename + * the name of the file in S3 to be written to + * @param records + * all records in this record grouping, including those already written to S3 + */ + private void flushToS3(final String filename, final List records) { + final SinkRecord sinkRecord = records.get(0); + try (var writer = getOutputWriter(filename, sinkRecord)) { + // For Key based files Record Grouper returns only one record for that filename + // to the multi part upload. + writer.writeRecords(records); + writers.remove(filename, writer); } catch (IOException e) { throw new ConnectException(e); } @@ -204,13 +253,15 @@ private void writeToS3(final String filename, final List records, @Override public void stop() { - writers.forEach((k, v) -> { - try { - v.close(); - } catch (IOException e) { - throw new ConnectException(e); - } - }); + if (!isKeyRecordGrouper) { + writers.forEach((k, v) -> { + try { + v.close(); + } catch (IOException e) { + throw new ConnectException(e); + } + }); + } s3Client.shutdown(); LOGGER.info("Stop S3 Sink Task"); diff --git a/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/S3SinkTaskTest.java b/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/S3SinkTaskTest.java index 83858666..795c7c8b 100644 --- a/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/S3SinkTaskTest.java +++ b/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/S3SinkTaskTest.java @@ -706,6 +706,7 @@ void mutliPartUploadWriteOnlyExpectedRecordsAndFilesToS3() throws IOException { properties.put(S3SinkConfig.FORMAT_OUTPUT_ENVELOPE_CONFIG, "false"); properties.put(S3SinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, "json"); properties.put(S3SinkConfig.AWS_S3_PREFIX_CONFIG, "prefix-"); + properties.put(S3SinkConfig.FILE_NAME_TEMPLATE_CONFIG, "{{topic}}-{{partition}}-{{start_offset}}"); final S3SinkTask task = new S3SinkTask(); task.start(properties); @@ -773,6 +774,88 @@ void mutliPartUploadWriteOnlyExpectedRecordsAndFilesToS3() throws IOException { } + @Test + void mutliPartUploadUsingKeyPartitioning() throws IOException { + final String compression = "none"; + properties.put(S3SinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression); + properties.put(S3SinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "value"); + properties.put(S3SinkConfig.FORMAT_OUTPUT_ENVELOPE_CONFIG, "false"); + properties.put(S3SinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, "json"); + properties.put(S3SinkConfig.AWS_S3_PREFIX_CONFIG, "prefix-"); + // Compact/key 'mode' value only updated + properties.put(S3SinkConfig.FILE_NAME_TEMPLATE_CONFIG, "{{key}}-{{topic}}"); + + final S3SinkTask task = new S3SinkTask(); + task.start(properties); + int timestamp = 1000; + int offset1 = 10; + int offset2 = 20; + int offset3 = 30; + final List> allRecords = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + allRecords.add(List.of( + createRecordWithStructValueSchema("topic0", 0, "key0", "name0" + i, offset1++, timestamp++), + createRecordWithStructValueSchema("topic0", 1, "key1", "name1" + i, offset2++, timestamp++), + createRecordWithStructValueSchema("topic1", 0, "key2", "name2" + i, offset3++, timestamp++))); + } + final TopicPartition tp00 = new TopicPartition("topic0", 0); + final TopicPartition tp01 = new TopicPartition("topic0", 1); + final TopicPartition tp10 = new TopicPartition("topic1", 0); + final Collection tps = List.of(tp00, tp01, tp10); + task.open(tps); + + allRecords.forEach(task::put); + + final Map offsets = new HashMap<>(); + offsets.put(tp00, new OffsetAndMetadata(offset1)); + offsets.put(tp01, new OffsetAndMetadata(offset2)); + offsets.put(tp10, new OffsetAndMetadata(offset3)); + task.flush(offsets); + + final CompressionType compressionType = CompressionType.forName(compression); + + List expectedBlobs = Lists.newArrayList( + "prefix-topic0-0-00000000000000000012" + compressionType.extension(), + "prefix-topic0-1-00000000000000000022" + compressionType.extension(), + "prefix-topic1-0-00000000000000000032" + compressionType.extension()); + + assertThat(expectedBlobs).allMatch(blobName -> testBucketAccessor.doesObjectExist(blobName)); + + assertThat(testBucketAccessor.readLines("prefix-topic0-0-00000000000000000012", compression)) + .containsExactly("[", "{\"name\":\"name02\"}", "]"); + assertThat(testBucketAccessor.readLines("prefix-topic0-1-00000000000000000022", compression)) + .containsExactly("[", "{\"name\":\"name12\"}", "]"); + assertThat(testBucketAccessor.readLines("prefix-topic1-0-00000000000000000032", compression)) + .containsExactly("[", "{\"name\":\"name22\"}", "]"); + // Reset and send another batch of records to S3 + allRecords.clear(); + for (int i = 0; i < 3; i++) { + allRecords.add(List.of( + createRecordWithStructValueSchema("topic0", 0, "key0", "name01" + i, offset1++, timestamp++), + createRecordWithStructValueSchema("topic0", 1, "key1", "name11" + i, offset2++, timestamp++), + createRecordWithStructValueSchema("topic1", 0, "key2", "name21" + i, offset3++, timestamp++))); + } + allRecords.forEach(task::put); + offsets.clear(); + offsets.put(tp00, new OffsetAndMetadata(offset1)); + offsets.put(tp01, new OffsetAndMetadata(offset2)); + offsets.put(tp10, new OffsetAndMetadata(offset3)); + task.flush(offsets); + expectedBlobs.clear(); + + expectedBlobs = Lists.newArrayList("prefix-topic0-0-00000000000000000015" + compressionType.extension(), + "prefix-topic0-1-00000000000000000025" + compressionType.extension(), + "prefix-topic1-0-00000000000000000035" + compressionType.extension()); + assertThat(expectedBlobs).allMatch(blobName -> testBucketAccessor.doesObjectExist(blobName)); + assertThat(testBucketAccessor.readLines("prefix-topic0-0-00000000000000000015", compression)) + .containsExactly("[", "{\"name\":\"name012\"}", "]"); + assertThat(testBucketAccessor.readLines("prefix-topic0-1-00000000000000000025", compression)) + .containsExactly("[", "{\"name\":\"name112\"}", "]"); + assertThat(testBucketAccessor.readLines("prefix-topic1-0-00000000000000000035", compression)) + .containsExactly("[", "{\"name\":\"name212\"}", "]"); + + } + private SinkRecord createRecordWithStringValueSchema(final String topic, final int partition, final String key, final String value, final int offset, final long timestamp) { return new SinkRecord(topic, partition, Schema.STRING_SCHEMA, key, Schema.STRING_SCHEMA, value, offset,