diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/AivenCommonConfig.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/AivenCommonConfig.java index 70df3d96..1dd297ce 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/AivenCommonConfig.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/AivenCommonConfig.java @@ -16,217 +16,32 @@ package io.aiven.kafka.connect.common.config; -import java.time.ZoneId; -import java.util.ArrayList; -import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigException; - -import io.aiven.kafka.connect.common.config.validators.FileCompressionTypeValidator; -import io.aiven.kafka.connect.common.config.validators.OutputFieldsEncodingValidator; -import io.aiven.kafka.connect.common.config.validators.OutputFieldsValidator; -import io.aiven.kafka.connect.common.config.validators.OutputTypeValidator; -import io.aiven.kafka.connect.common.grouper.RecordGrouperFactory; -import io.aiven.kafka.connect.common.templating.Template; - -public class AivenCommonConfig extends AbstractConfig { - public static final String FORMAT_OUTPUT_FIELDS_CONFIG = "format.output.fields"; - public static final String FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG = "format.output.fields.value.encoding"; - public static final String FORMAT_OUTPUT_TYPE_CONFIG = "format.output.type"; - public static final String FORMAT_OUTPUT_ENVELOPE_CONFIG = "format.output.envelope"; - public static final String FILE_COMPRESSION_TYPE_CONFIG = "file.compression.type"; - public static final String FILE_MAX_RECORDS = "file.max.records"; - public static final String FILE_NAME_TIMESTAMP_TIMEZONE = "file.name.timestamp.timezone"; - public static final String FILE_NAME_TIMESTAMP_SOURCE = "file.name.timestamp.source"; - public static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template"; - - private static final String GROUP_COMPRESSION = "File Compression"; - private static final String GROUP_FORMAT = "Format"; - private static final String DEFAULT_FILENAME_TEMPLATE = "{{topic}}-{{partition}}-{{start_offset}}"; - - private static final String GROUP_RETRY_BACKOFF_POLICY = "Retry backoff policy"; - public static final String KAFKA_RETRY_BACKOFF_MS_CONFIG = "kafka.retry.backoff.ms"; +@Deprecated +public class AivenCommonConfig extends SinkCommonConfig { protected AivenCommonConfig(final ConfigDef definition, final Map originals) { super(definition, originals); - // TODO: calls getOutputFields, can be overridden in subclasses. - validate(); // NOPMD ConstructorCallsOverridableMethod - } - - private void validate() { - // Special checks for output json envelope config. - final List outputFields = getOutputFields(); - final Boolean outputEnvelopConfig = envelopeEnabled(); - if (!outputEnvelopConfig && outputFields.toArray().length != 1) { - final String msg = String.format("When %s is %s, %s must contain only one field", - FORMAT_OUTPUT_ENVELOPE_CONFIG, false, FORMAT_OUTPUT_FIELDS_CONFIG); - throw new ConfigException(msg); - } - validateKeyFilenameTemplate(); } protected static void addKafkaBackoffPolicy(final ConfigDef configDef) { - configDef.define(KAFKA_RETRY_BACKOFF_MS_CONFIG, ConfigDef.Type.LONG, null, new ConfigDef.Validator() { - - final long maximumBackoffPolicy = TimeUnit.HOURS.toMillis(24); - - @Override - public void ensureValid(final String name, final Object value) { - if (Objects.isNull(value)) { - return; - } - assert value instanceof Long; - final var longValue = (Long) value; - if (longValue < 0) { - throw new ConfigException(name, value, "Value must be at least 0"); - } else if (longValue > maximumBackoffPolicy) { - throw new ConfigException(name, value, - "Value must be no more than " + maximumBackoffPolicy + " (24 hours)"); - } - } - }, ConfigDef.Importance.MEDIUM, - "The retry backoff in milliseconds. " - + "This config is used to notify Kafka Connect to retry delivering a message batch or " - + "performing recovery in case of transient exceptions. Maximum value is " - + TimeUnit.HOURS.toMillis(24) + " (24 hours).", - GROUP_RETRY_BACKOFF_POLICY, 1, ConfigDef.Width.NONE, KAFKA_RETRY_BACKOFF_MS_CONFIG); - } - - public Long getKafkaRetryBackoffMs() { - return getLong(KAFKA_RETRY_BACKOFF_MS_CONFIG); + SinkCommonConfig.addKafkaBackoffPolicy(configDef); } protected static void addOutputFieldsFormatConfigGroup(final ConfigDef configDef, final OutputFieldType defaultFieldType) { - int formatGroupCounter = 0; - - addFormatTypeConfig(configDef, formatGroupCounter); - - configDef.define(FORMAT_OUTPUT_FIELDS_CONFIG, ConfigDef.Type.LIST, - Objects.isNull(defaultFieldType) ? null : defaultFieldType.name, // NOPMD NullAssignment - new OutputFieldsValidator(), ConfigDef.Importance.MEDIUM, - "Fields to put into output files. " + "The supported values are: " + OutputField.SUPPORTED_OUTPUT_FIELDS - + ".", - GROUP_FORMAT, formatGroupCounter++, ConfigDef.Width.NONE, FORMAT_OUTPUT_FIELDS_CONFIG, - FixedSetRecommender.ofSupportedValues(OutputFieldType.names())); - - configDef.define(FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, ConfigDef.Type.STRING, - OutputFieldEncodingType.BASE64.name, new OutputFieldsEncodingValidator(), ConfigDef.Importance.MEDIUM, - "The type of encoding for the value field. " + "The supported values are: " - + OutputFieldEncodingType.SUPPORTED_FIELD_ENCODING_TYPES + ".", - GROUP_FORMAT, formatGroupCounter++, ConfigDef.Width.NONE, FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, - FixedSetRecommender.ofSupportedValues(OutputFieldEncodingType.names())); - - configDef.define(FORMAT_OUTPUT_ENVELOPE_CONFIG, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, - "Whether to enable envelope for entries with single field.", GROUP_FORMAT, formatGroupCounter, - ConfigDef.Width.SHORT, FORMAT_OUTPUT_ENVELOPE_CONFIG); + SinkCommonConfig.addOutputFieldsFormatConfigGroup(configDef, defaultFieldType); } protected static void addFormatTypeConfig(final ConfigDef configDef, final int formatGroupCounter) { - final String supportedFormatTypes = FormatType.names() - .stream() - .map(f -> "'" + f + "'") - .collect(Collectors.joining(", ")); - configDef.define(FORMAT_OUTPUT_TYPE_CONFIG, ConfigDef.Type.STRING, FormatType.CSV.name, - new OutputTypeValidator(), ConfigDef.Importance.MEDIUM, - "The format type of output content" + "The supported values are: " + supportedFormatTypes + ".", - GROUP_FORMAT, formatGroupCounter, ConfigDef.Width.NONE, FORMAT_OUTPUT_TYPE_CONFIG, - FixedSetRecommender.ofSupportedValues(FormatType.names())); - } - - public FormatType getFormatType() { - return FormatType.forName(getString(FORMAT_OUTPUT_TYPE_CONFIG)); + SinkCommonConfig.addFormatTypeConfig(configDef, formatGroupCounter); } protected static void addCompressionTypeConfig(final ConfigDef configDef, final CompressionType defaultCompressionType) { - configDef.define(FILE_COMPRESSION_TYPE_CONFIG, ConfigDef.Type.STRING, - Objects.isNull(defaultCompressionType) ? null : defaultCompressionType.name, // NOPMD NullAssignment - new FileCompressionTypeValidator(), ConfigDef.Importance.MEDIUM, - "The compression type used for files put on GCS. " + "The supported values are: " - + CompressionType.SUPPORTED_COMPRESSION_TYPES + ".", - GROUP_COMPRESSION, 1, ConfigDef.Width.NONE, FILE_COMPRESSION_TYPE_CONFIG, - FixedSetRecommender.ofSupportedValues(CompressionType.names())); - - } - - public CompressionType getCompressionType() { - return CompressionType.forName(getString(FILE_COMPRESSION_TYPE_CONFIG)); - } - - public Boolean envelopeEnabled() { - return getBoolean(FORMAT_OUTPUT_ENVELOPE_CONFIG); - } - - public OutputFieldEncodingType getOutputFieldEncodingType() { - return OutputFieldEncodingType.forName(getString(FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG)); - } - - public final Template getFilenameTemplate() { - return Template.of(getFilename()); - } - - protected final void validateKeyFilenameTemplate() { - // Special checks for {{key}} filename template. - final Template filenameTemplate = getFilenameTemplate(); - final String groupType = RecordGrouperFactory.resolveRecordGrouperType(filenameTemplate); - if (isKeyBased(groupType) && getMaxRecordsPerFile() > 1) { - final String msg = String.format("When %s is %s, %s must be either 1 or not set", FILE_NAME_TEMPLATE_CONFIG, - filenameTemplate, FILE_MAX_RECORDS); - throw new ConfigException(msg); - } - } - - public final String getFilename() { - return resolveFilenameTemplate(); - } - - private String resolveFilenameTemplate() { - String fileNameTemplate = getString(FILE_NAME_TEMPLATE_CONFIG); - if (fileNameTemplate == null) { - fileNameTemplate = FormatType.AVRO.equals(getFormatType()) - ? DEFAULT_FILENAME_TEMPLATE + ".avro" + getCompressionType().extension() - : DEFAULT_FILENAME_TEMPLATE + getCompressionType().extension(); - } - return fileNameTemplate; - } - - public final ZoneId getFilenameTimezone() { - return ZoneId.of(getString(FILE_NAME_TIMESTAMP_TIMEZONE)); - } - - public final TimestampSource getFilenameTimestampSource() { - return TimestampSource.of(getFilenameTimezone(), - TimestampSource.Type.of(getString(FILE_NAME_TIMESTAMP_SOURCE))); + SinkCommonConfig.addCompressionTypeConfig(configDef, defaultCompressionType); } - public final int getMaxRecordsPerFile() { - return getInt(FILE_MAX_RECORDS); - } - - public List getOutputFields() { - final List result = new ArrayList<>(); - for (final String outputFieldTypeStr : getList(FORMAT_OUTPUT_FIELDS_CONFIG)) { - final OutputFieldType fieldType = OutputFieldType.forName(outputFieldTypeStr); - final OutputFieldEncodingType encodingType; - if (fieldType == OutputFieldType.VALUE || fieldType == OutputFieldType.KEY) { - encodingType = getOutputFieldEncodingType(); - } else { - encodingType = OutputFieldEncodingType.NONE; - } - result.add(new OutputField(fieldType, encodingType)); - } - return result; - } - - private Boolean isKeyBased(final String groupType) { - return RecordGrouperFactory.KEY_RECORD.equals(groupType) - || RecordGrouperFactory.KEY_TOPIC_PARTITION_RECORD.equals(groupType); - } } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/CommonConfig.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/CommonConfig.java new file mode 100644 index 00000000..eae5bc38 --- /dev/null +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/CommonConfig.java @@ -0,0 +1,68 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.config; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +public abstract class CommonConfig extends AbstractConfig { + protected static final String GROUP_COMPRESSION = "File Compression"; + protected static final String GROUP_FORMAT = "Format"; + + private static final String GROUP_RETRY_BACKOFF_POLICY = "Retry backoff policy"; + public static final String KAFKA_RETRY_BACKOFF_MS_CONFIG = "kafka.retry.backoff.ms"; + + public CommonConfig(ConfigDef definition, Map originals) { // NOPMD + super(definition, originals); + } + + protected static void addKafkaBackoffPolicy(final ConfigDef configDef) { + configDef.define(KAFKA_RETRY_BACKOFF_MS_CONFIG, ConfigDef.Type.LONG, null, new ConfigDef.Validator() { + + final long maximumBackoffPolicy = TimeUnit.HOURS.toMillis(24); + + @Override + public void ensureValid(final String name, final Object value) { + if (Objects.isNull(value)) { + return; + } + assert value instanceof Long; + final var longValue = (Long) value; + if (longValue < 0) { + throw new ConfigException(name, value, "Value must be at least 0"); + } else if (longValue > maximumBackoffPolicy) { + throw new ConfigException(name, value, + "Value must be no more than " + maximumBackoffPolicy + " (24 hours)"); + } + } + }, ConfigDef.Importance.MEDIUM, + "The retry backoff in milliseconds. " + + "This config is used to notify Kafka Connect to retry delivering a message batch or " + + "performing recovery in case of transient exceptions. Maximum value is " + + TimeUnit.HOURS.toMillis(24) + " (24 hours).", + GROUP_RETRY_BACKOFF_POLICY, 1, ConfigDef.Width.NONE, KAFKA_RETRY_BACKOFF_MS_CONFIG); + } + + public Long getKafkaRetryBackoffMs() { + return getLong(KAFKA_RETRY_BACKOFF_MS_CONFIG); + } + +} diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/SinkCommonConfig.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/SinkCommonConfig.java new file mode 100644 index 00000000..390d2ceb --- /dev/null +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/SinkCommonConfig.java @@ -0,0 +1,197 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.config; + +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; + +import io.aiven.kafka.connect.common.config.validators.FileCompressionTypeValidator; +import io.aiven.kafka.connect.common.config.validators.OutputFieldsEncodingValidator; +import io.aiven.kafka.connect.common.config.validators.OutputFieldsValidator; +import io.aiven.kafka.connect.common.config.validators.OutputTypeValidator; +import io.aiven.kafka.connect.common.grouper.RecordGrouperFactory; +import io.aiven.kafka.connect.common.templating.Template; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +public abstract class SinkCommonConfig extends CommonConfig { + public static final String FORMAT_OUTPUT_FIELDS_CONFIG = "format.output.fields"; + public static final String FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG = "format.output.fields.value.encoding"; + public static final String FORMAT_OUTPUT_TYPE_CONFIG = "format.output.type"; + public static final String FORMAT_OUTPUT_ENVELOPE_CONFIG = "format.output.envelope"; + public static final String FILE_COMPRESSION_TYPE_CONFIG = "file.compression.type"; + public static final String FILE_MAX_RECORDS = "file.max.records"; + public static final String FILE_NAME_TIMESTAMP_TIMEZONE = "file.name.timestamp.timezone"; + public static final String FILE_NAME_TIMESTAMP_SOURCE = "file.name.timestamp.source"; + public static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template"; + private static final String DEFAULT_FILENAME_TEMPLATE = "{{topic}}-{{partition}}-{{start_offset}}"; + + @SuppressFBWarnings("CT_CONSTRUCTOR_THROW") + public SinkCommonConfig(ConfigDef definition, Map originals) { // NOPMD + super(definition, originals); + // TODO: calls getOutputFields, can be overridden in subclasses. + validate(); // NOPMD ConstructorCallsOverridableMethod + } + + private void validate() { + // Special checks for output json envelope config. + final List outputFields = getOutputFields(); + final Boolean outputEnvelopConfig = envelopeEnabled(); + if (!outputEnvelopConfig && outputFields.toArray().length != 1) { + final String msg = String.format("When %s is %s, %s must contain only one field", + FORMAT_OUTPUT_ENVELOPE_CONFIG, false, FORMAT_OUTPUT_FIELDS_CONFIG); + throw new ConfigException(msg); + } + validateKeyFilenameTemplate(); + } + + protected static void addOutputFieldsFormatConfigGroup(final ConfigDef configDef, + final OutputFieldType defaultFieldType) { + int formatGroupCounter = 0; + + addFormatTypeConfig(configDef, formatGroupCounter); + + configDef.define(FORMAT_OUTPUT_FIELDS_CONFIG, ConfigDef.Type.LIST, + Objects.isNull(defaultFieldType) ? null : defaultFieldType.name, // NOPMD NullAssignment + new OutputFieldsValidator(), ConfigDef.Importance.MEDIUM, + "Fields to put into output files. " + "The supported values are: " + OutputField.SUPPORTED_OUTPUT_FIELDS + + ".", + GROUP_FORMAT, formatGroupCounter++, ConfigDef.Width.NONE, FORMAT_OUTPUT_FIELDS_CONFIG, + FixedSetRecommender.ofSupportedValues(OutputFieldType.names())); + + configDef.define(FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, ConfigDef.Type.STRING, + OutputFieldEncodingType.BASE64.name, new OutputFieldsEncodingValidator(), ConfigDef.Importance.MEDIUM, + "The type of encoding for the value field. " + "The supported values are: " + + OutputFieldEncodingType.SUPPORTED_FIELD_ENCODING_TYPES + ".", + GROUP_FORMAT, formatGroupCounter++, ConfigDef.Width.NONE, FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG, + FixedSetRecommender.ofSupportedValues(OutputFieldEncodingType.names())); + + configDef.define(FORMAT_OUTPUT_ENVELOPE_CONFIG, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, + "Whether to enable envelope for entries with single field.", GROUP_FORMAT, formatGroupCounter, + ConfigDef.Width.SHORT, FORMAT_OUTPUT_ENVELOPE_CONFIG); + } + + protected static void addFormatTypeConfig(final ConfigDef configDef, final int formatGroupCounter) { + final String supportedFormatTypes = FormatType.names() + .stream() + .map(f -> "'" + f + "'") + .collect(Collectors.joining(", ")); + configDef.define(FORMAT_OUTPUT_TYPE_CONFIG, ConfigDef.Type.STRING, FormatType.CSV.name, + new OutputTypeValidator(), ConfigDef.Importance.MEDIUM, + "The format type of output content" + "The supported values are: " + supportedFormatTypes + ".", + GROUP_FORMAT, formatGroupCounter, ConfigDef.Width.NONE, FORMAT_OUTPUT_TYPE_CONFIG, + FixedSetRecommender.ofSupportedValues(FormatType.names())); + } + + public FormatType getFormatType() { + return FormatType.forName(getString(FORMAT_OUTPUT_TYPE_CONFIG)); + } + + protected static void addCompressionTypeConfig(final ConfigDef configDef, + final CompressionType defaultCompressionType) { + configDef.define(FILE_COMPRESSION_TYPE_CONFIG, ConfigDef.Type.STRING, + Objects.isNull(defaultCompressionType) ? null : defaultCompressionType.name, // NOPMD NullAssignment + new FileCompressionTypeValidator(), ConfigDef.Importance.MEDIUM, + "The compression type used for files put on GCS. " + "The supported values are: " + + CompressionType.SUPPORTED_COMPRESSION_TYPES + ".", + GROUP_COMPRESSION, 1, ConfigDef.Width.NONE, FILE_COMPRESSION_TYPE_CONFIG, + FixedSetRecommender.ofSupportedValues(CompressionType.names())); + + } + + public CompressionType getCompressionType() { + return CompressionType.forName(getString(FILE_COMPRESSION_TYPE_CONFIG)); + } + + public Boolean envelopeEnabled() { + return getBoolean(FORMAT_OUTPUT_ENVELOPE_CONFIG); + } + + public OutputFieldEncodingType getOutputFieldEncodingType() { + return OutputFieldEncodingType.forName(getString(FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG)); + } + + public final Template getFilenameTemplate() { + return Template.of(getFilename()); + } + + protected final void validateKeyFilenameTemplate() { + // Special checks for {{key}} filename template. + final Template filenameTemplate = getFilenameTemplate(); + final String groupType = RecordGrouperFactory.resolveRecordGrouperType(filenameTemplate); + if (isKeyBased(groupType) && getMaxRecordsPerFile() > 1) { + final String msg = String.format("When %s is %s, %s must be either 1 or not set", FILE_NAME_TEMPLATE_CONFIG, + filenameTemplate, FILE_MAX_RECORDS); + throw new ConfigException(msg); + } + } + + public final String getFilename() { + return resolveFilenameTemplate(); + } + + private String resolveFilenameTemplate() { + String fileNameTemplate = getString(FILE_NAME_TEMPLATE_CONFIG); + if (fileNameTemplate == null) { + fileNameTemplate = FormatType.AVRO.equals(getFormatType()) + ? DEFAULT_FILENAME_TEMPLATE + ".avro" + getCompressionType().extension() + : DEFAULT_FILENAME_TEMPLATE + getCompressionType().extension(); + } + return fileNameTemplate; + } + + public final ZoneId getFilenameTimezone() { + return ZoneId.of(getString(FILE_NAME_TIMESTAMP_TIMEZONE)); + } + + public final TimestampSource getFilenameTimestampSource() { + return TimestampSource.of(getFilenameTimezone(), + TimestampSource.Type.of(getString(FILE_NAME_TIMESTAMP_SOURCE))); + } + + public final int getMaxRecordsPerFile() { + return getInt(FILE_MAX_RECORDS); + } + + private Boolean isKeyBased(final String groupType) { + return RecordGrouperFactory.KEY_RECORD.equals(groupType) + || RecordGrouperFactory.KEY_TOPIC_PARTITION_RECORD.equals(groupType); + } + + public List getOutputFields() { + final List result = new ArrayList<>(); + for (final String outputFieldTypeStr : getList(FORMAT_OUTPUT_FIELDS_CONFIG)) { + final OutputFieldType fieldType = OutputFieldType.forName(outputFieldTypeStr); + final OutputFieldEncodingType encodingType; + if (fieldType == OutputFieldType.VALUE || fieldType == OutputFieldType.KEY) { + encodingType = getOutputFieldEncodingType(); + } else { + encodingType = OutputFieldEncodingType.NONE; + } + result.add(new OutputField(fieldType, encodingType)); + } + return result; + } + +} diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java new file mode 100644 index 00000000..e363d7c9 --- /dev/null +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java @@ -0,0 +1,27 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.config; + +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; + +public class SourceCommonConfig extends CommonConfig { + public SourceCommonConfig(ConfigDef definition, Map originals) {// NOPMD + super(definition, originals); + } +} diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouperFactory.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouperFactory.java index a43ea0d2..a81ef88c 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouperFactory.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouperFactory.java @@ -25,9 +25,9 @@ import java.util.function.Supplier; import java.util.stream.Collectors; -import io.aiven.kafka.connect.common.config.AivenCommonConfig; import io.aiven.kafka.connect.common.config.FilenameTemplateVariable; import io.aiven.kafka.connect.common.config.FormatType; +import io.aiven.kafka.connect.common.config.SinkCommonConfig; import io.aiven.kafka.connect.common.templating.Template; import com.google.common.collect.Sets; @@ -141,7 +141,7 @@ public static String resolveRecordGrouperType(final Template template) { } @SuppressWarnings("PMD.CognitiveComplexity") - public static RecordGrouper newRecordGrouper(final AivenCommonConfig config) { + public static RecordGrouper newRecordGrouper(final SinkCommonConfig config) { final Template fileNameTemplate = config.getFilenameTemplate(); final String grType = resolveRecordGrouperType(fileNameTemplate); if (KEY_RECORD.equals(grType)) { diff --git a/s3-commons/src/main/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactory.java b/s3-commons/src/main/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactory.java index 49d3a745..6be1633b 100644 --- a/s3-commons/src/main/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactory.java +++ b/s3-commons/src/main/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactory.java @@ -18,7 +18,7 @@ import java.util.Objects; -import io.aiven.kafka.connect.s3.S3BaseConfig; +import io.aiven.kafka.connect.s3.S3SinkBaseConfig; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSStaticCredentialsProvider; @@ -29,7 +29,7 @@ public class AwsCredentialProviderFactory { - public AWSCredentialsProvider getProvider(final S3BaseConfig config) { + public AWSCredentialsProvider getProvider(final S3SinkBaseConfig config) { if (config.hasAwsStsRole()) { return getStsProvider(config); } @@ -40,7 +40,7 @@ public AWSCredentialsProvider getProvider(final S3BaseConfig config) { return new AWSStaticCredentialsProvider(awsCredentials); } - private AWSCredentialsProvider getStsProvider(final S3BaseConfig config) { + private AWSCredentialsProvider getStsProvider(final S3SinkBaseConfig config) { final AwsStsRole awsstsRole = config.getStsRole(); final AWSSecurityTokenService sts = securityTokenService(config); return new STSAssumeRoleSessionCredentialsProvider.Builder(awsstsRole.getArn(), awsstsRole.getSessionName()) @@ -50,7 +50,7 @@ private AWSCredentialsProvider getStsProvider(final S3BaseConfig config) { .build(); } - private AWSSecurityTokenService securityTokenService(final S3BaseConfig config) { + private AWSSecurityTokenService securityTokenService(final S3SinkBaseConfig config) { if (config.hasStsEndpointConfig()) { final AWSSecurityTokenServiceClientBuilder stsBuilder = AWSSecurityTokenServiceClientBuilder.standard(); stsBuilder.setEndpointConfiguration(config.getAwsEndpointConfiguration()); diff --git a/s3-commons/src/main/java/io/aiven/kafka/connect/s3/S3BaseConfig.java b/s3-commons/src/main/java/io/aiven/kafka/connect/s3/S3SinkBaseConfig.java similarity index 98% rename from s3-commons/src/main/java/io/aiven/kafka/connect/s3/S3BaseConfig.java rename to s3-commons/src/main/java/io/aiven/kafka/connect/s3/S3SinkBaseConfig.java index 8f91d78a..8e1cdc58 100644 --- a/s3-commons/src/main/java/io/aiven/kafka/connect/s3/S3BaseConfig.java +++ b/s3-commons/src/main/java/io/aiven/kafka/connect/s3/S3SinkBaseConfig.java @@ -29,7 +29,7 @@ import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigException; -import io.aiven.kafka.connect.common.config.AivenCommonConfig; +import io.aiven.kafka.connect.common.config.SinkCommonConfig; import io.aiven.kafka.connect.common.config.validators.FileCompressionTypeValidator; import io.aiven.kafka.connect.common.config.validators.NonEmptyPassword; import io.aiven.kafka.connect.common.config.validators.OutputFieldsValidator; @@ -47,8 +47,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class S3BaseConfig extends AivenCommonConfig { - public static final Logger LOGGER = LoggerFactory.getLogger(S3BaseConfig.class); +public class S3SinkBaseConfig extends SinkCommonConfig { + public static final Logger LOGGER = LoggerFactory.getLogger(S3SinkBaseConfig.class); @Deprecated public static final String OUTPUT_COMPRESSION = "output_compression"; @@ -130,7 +130,7 @@ public class S3BaseConfig extends AivenCommonConfig { // in other words we can't use values greater than 30 public static final int S3_RETRY_BACKOFF_MAX_RETRIES_DEFAULT = 3; - protected S3BaseConfig(ConfigDef definition, Map originals) { // NOPMD UnusedAssignment + protected S3SinkBaseConfig(ConfigDef definition, Map originals) { // NOPMD UnusedAssignment super(definition, handleDeprecatedYyyyUppercase(originals)); } diff --git a/s3-commons/src/main/java/io/aiven/kafka/connect/s3/S3SourceBaseConfig.java b/s3-commons/src/main/java/io/aiven/kafka/connect/s3/S3SourceBaseConfig.java new file mode 100644 index 00000000..f88fe4e9 --- /dev/null +++ b/s3-commons/src/main/java/io/aiven/kafka/connect/s3/S3SourceBaseConfig.java @@ -0,0 +1,449 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.s3; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigException; + +import io.aiven.kafka.connect.common.config.SourceCommonConfig; +import io.aiven.kafka.connect.common.config.validators.FileCompressionTypeValidator; +import io.aiven.kafka.connect.common.config.validators.NonEmptyPassword; +import io.aiven.kafka.connect.common.config.validators.OutputFieldsValidator; +import io.aiven.kafka.connect.common.config.validators.UrlValidator; +import io.aiven.kafka.connect.iam.AwsStsEndpointConfig; +import io.aiven.kafka.connect.iam.AwsStsRole; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.RegionUtils; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.s3.internal.BucketNameUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class S3SourceBaseConfig extends SourceCommonConfig { + public static final Logger LOGGER = LoggerFactory.getLogger(S3SourceBaseConfig.class); + + @Deprecated + public static final String OUTPUT_COMPRESSION = "output_compression"; + @Deprecated + public static final String OUTPUT_COMPRESSION_TYPE_GZIP = "gzip"; + @Deprecated + public static final String OUTPUT_COMPRESSION_TYPE_NONE = "none"; + + @Deprecated + public static final String OUTPUT_FIELDS = "output_fields"; + @Deprecated + public static final String TIMESTAMP_TIMEZONE = "timestamp.timezone"; + @Deprecated + public static final String TIMESTAMP_SOURCE = "timestamp.source"; + @Deprecated + public static final String OUTPUT_FIELD_NAME_KEY = "key"; + @Deprecated + public static final String OUTPUT_FIELD_NAME_OFFSET = "offset"; + @Deprecated + public static final String OUTPUT_FIELD_NAME_TIMESTAMP = "timestamp"; + @Deprecated + public static final String OUTPUT_FIELD_NAME_VALUE = "value"; + @Deprecated + public static final String OUTPUT_FIELD_NAME_HEADERS = "headers"; + + @Deprecated + public static final String AWS_ACCESS_KEY_ID = "aws_access_key_id"; + @Deprecated + public static final String AWS_SECRET_ACCESS_KEY = "aws_secret_access_key"; + @Deprecated + public static final String AWS_S3_BUCKET = "aws_s3_bucket"; + @Deprecated + public static final String AWS_S3_ENDPOINT = "aws_s3_endpoint"; + @Deprecated + public static final String AWS_S3_REGION = "aws_s3_region"; + @Deprecated + public static final String AWS_S3_PREFIX = "aws_s3_prefix"; + + // FIXME since we support so far both old style and new style of property names + // Importance was set to medium, + // as soon we will migrate to new values it must be set to HIGH + // same for default value + public static final String AWS_ACCESS_KEY_ID_CONFIG = "aws.access.key.id"; + public static final String AWS_SECRET_ACCESS_KEY_CONFIG = "aws.secret.access.key"; + public static final String AWS_CREDENTIALS_PROVIDER_CONFIG = "aws.credentials.provider"; + public static final String AWS_CREDENTIAL_PROVIDER_DEFAULT = "com.amazonaws.auth.DefaultAWSCredentialsProviderChain"; + public static final String AWS_S3_BUCKET_NAME_CONFIG = "aws.s3.bucket.name"; + public static final String AWS_S3_SSE_ALGORITHM_CONFIG = "aws.s3.sse.algorithm"; + public static final String AWS_S3_ENDPOINT_CONFIG = "aws.s3.endpoint"; + public static final String AWS_S3_REGION_CONFIG = "aws.s3.region"; + public static final String AWS_S3_PART_SIZE = "aws.s3.part.size.bytes"; + + public static final String AWS_S3_PREFIX_CONFIG = "aws.s3.prefix"; + public static final String AWS_STS_ROLE_ARN = "aws.sts.role.arn"; + public static final String AWS_STS_ROLE_EXTERNAL_ID = "aws.sts.role.external.id"; + public static final String AWS_STS_ROLE_SESSION_NAME = "aws.sts.role.session.name"; + public static final String AWS_STS_ROLE_SESSION_DURATION = "aws.sts.role.session.duration"; + public static final String AWS_STS_CONFIG_ENDPOINT = "aws.sts.config.endpoint"; + + public static final String AWS_S3_RETRY_BACKOFF_DELAY_MS_CONFIG = "aws.s3.backoff.delay.ms"; + public static final String AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_CONFIG = "aws.s3.backoff.max.delay.ms"; + public static final String AWS_S3_RETRY_BACKOFF_MAX_RETRIES_CONFIG = "aws.s3.backoff.max.retries"; + + private static final String GROUP_AWS = "AWS"; + private static final String GROUP_AWS_STS = "AWS STS"; + + private static final String GROUP_S3_RETRY_BACKOFF_POLICY = "S3 retry backoff policy"; + + public static final int DEFAULT_PART_SIZE = 5 * 1024 * 1024; + + // Default values from AWS SDK, since they are hidden + public static final int AWS_S3_RETRY_BACKOFF_DELAY_MS_DEFAULT = 100; + public static final int AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_DEFAULT = 20_000; + // Comment in AWS SDK for max retries: + // Maximum retry limit. Avoids integer overflow issues. + // + // NOTE: If the value is greater than 30, there can be integer overflow + // issues during delay calculation. + // in other words we can't use values greater than 30 + public static final int S3_RETRY_BACKOFF_MAX_RETRIES_DEFAULT = 3; + + protected S3SourceBaseConfig(ConfigDef definition, Map originals) { // NOPMD UnusedAssignment + super(definition, handleDeprecatedYyyyUppercase(originals)); + } + + private static Map handleDeprecatedYyyyUppercase(final Map properties) { + if (!properties.containsKey(AWS_S3_PREFIX_CONFIG)) { + return properties; + } + + final var result = new HashMap<>(properties); + for (final var prop : List.of(AWS_S3_PREFIX_CONFIG)) { + if (properties.containsKey(prop)) { + String template = properties.get(prop); + final String originalTemplate = template; + + final var unitYyyyPattern = Pattern.compile("\\{\\{\\s*timestamp\\s*:\\s*unit\\s*=\\s*YYYY\\s*}}"); + template = unitYyyyPattern.matcher(template) + .replaceAll(matchResult -> matchResult.group().replace("YYYY", "yyyy")); + + if (!template.equals(originalTemplate)) { + LOGGER.warn("{{timestamp:unit=YYYY}} is no longer supported, " + + "please use {{timestamp:unit=yyyy}} instead. " + "It was automatically replaced: {}", + template); + } + + result.put(prop, template); + } + } + return result; + } + + protected static void addS3RetryPolicies(final ConfigDef configDef) { + var retryPolicyGroupCounter = 0; + configDef.define(AWS_S3_RETRY_BACKOFF_DELAY_MS_CONFIG, Type.LONG, AWS_S3_RETRY_BACKOFF_DELAY_MS_DEFAULT, + ConfigDef.Range.atLeast(1L), Importance.MEDIUM, + "S3 default base sleep time for non-throttled exceptions in milliseconds. " + "Default is " + + AWS_S3_RETRY_BACKOFF_DELAY_MS_DEFAULT + ".", + GROUP_S3_RETRY_BACKOFF_POLICY, retryPolicyGroupCounter++, // NOPMD UnusedAssignment + ConfigDef.Width.NONE, AWS_S3_RETRY_BACKOFF_DELAY_MS_CONFIG); + configDef.define(AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_CONFIG, Type.LONG, AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_DEFAULT, + ConfigDef.Range.atLeast(1L), Importance.MEDIUM, + "S3 maximum back-off time before retrying a request in milliseconds. " + "Default is " + + AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_DEFAULT + ".", + GROUP_S3_RETRY_BACKOFF_POLICY, retryPolicyGroupCounter++, // NOPMD UnusedAssignment + ConfigDef.Width.NONE, AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_CONFIG); + configDef.define(AWS_S3_RETRY_BACKOFF_MAX_RETRIES_CONFIG, Type.INT, S3_RETRY_BACKOFF_MAX_RETRIES_DEFAULT, + ConfigDef.Range.between(1L, 30), Importance.MEDIUM, + "Maximum retry limit " + "(if the value is greater than 30, " + + "there can be integer overflow issues during delay calculation). " + "Default is " + + S3_RETRY_BACKOFF_MAX_RETRIES_DEFAULT + ".", + GROUP_S3_RETRY_BACKOFF_POLICY, retryPolicyGroupCounter++, // NOPMD UnusedAssignment + ConfigDef.Width.NONE, AWS_S3_RETRY_BACKOFF_MAX_RETRIES_CONFIG); + } + + protected static void addAwsConfigGroup(final ConfigDef configDef) { + int awsGroupCounter = 0; + + configDef.define(AWS_ACCESS_KEY_ID_CONFIG, Type.PASSWORD, null, new NonEmptyPassword(), Importance.MEDIUM, + "AWS Access Key ID", GROUP_AWS, awsGroupCounter++, ConfigDef.Width.NONE, AWS_ACCESS_KEY_ID_CONFIG); + + configDef.define(AWS_SECRET_ACCESS_KEY_CONFIG, Type.PASSWORD, null, new NonEmptyPassword(), Importance.MEDIUM, + "AWS Secret Access Key", GROUP_AWS, awsGroupCounter++, ConfigDef.Width.NONE, + AWS_SECRET_ACCESS_KEY_CONFIG); + + configDef.define(AWS_CREDENTIALS_PROVIDER_CONFIG, Type.CLASS, AWS_CREDENTIAL_PROVIDER_DEFAULT, + Importance.MEDIUM, + "When you initialize a new " + "service client without supplying any arguments, " + + "the AWS SDK for Java attempts to find temporary " + + "credentials by using the default credential " + "provider chain implemented by the " + + "DefaultAWSCredentialsProviderChain class.", + + GROUP_AWS, awsGroupCounter++, ConfigDef.Width.NONE, AWS_CREDENTIALS_PROVIDER_CONFIG); + + configDef.define(AWS_S3_BUCKET_NAME_CONFIG, Type.STRING, null, new BucketNameValidator(), Importance.MEDIUM, + "AWS S3 Bucket name", GROUP_AWS, awsGroupCounter++, ConfigDef.Width.NONE, AWS_S3_BUCKET_NAME_CONFIG); + + // AWS S3 Server Side Encryption Algorithm configuration + // Example values: 'AES256' for S3-managed keys, 'aws:kms' for AWS KMS-managed keys + configDef.define(AWS_S3_SSE_ALGORITHM_CONFIG, Type.STRING, null, new ConfigDef.NonEmptyString(), + Importance.MEDIUM, "AWS S3 Server Side Encryption Algorithm. Example values: 'AES256', 'aws:kms'.", + GROUP_AWS, awsGroupCounter++, ConfigDef.Width.NONE, AWS_S3_SSE_ALGORITHM_CONFIG); + + configDef.define(AWS_S3_ENDPOINT_CONFIG, Type.STRING, null, new UrlValidator(), Importance.LOW, + "Explicit AWS S3 Endpoint Address, mainly for testing", GROUP_AWS, awsGroupCounter++, + ConfigDef.Width.NONE, AWS_S3_ENDPOINT_CONFIG); + + configDef.define(AWS_S3_REGION_CONFIG, Type.STRING, null, new AwsRegionValidator(), Importance.MEDIUM, + "AWS S3 Region, e.g. us-east-1", GROUP_AWS, awsGroupCounter++, // NOPMD UnusedAssignment + ConfigDef.Width.NONE, AWS_S3_REGION_CONFIG); + } + + protected static void addAwsStsConfigGroup(final ConfigDef configDef) { + int awsStsGroupCounter = 0; + configDef.define(AWS_STS_ROLE_ARN, Type.STRING, null, new ConfigDef.NonEmptyString(), Importance.MEDIUM, + "AWS STS Role", GROUP_AWS_STS, awsStsGroupCounter++, // NOPMD + // UnusedAssignment + ConfigDef.Width.NONE, AWS_STS_ROLE_ARN); + + configDef.define(AWS_STS_ROLE_SESSION_NAME, Type.STRING, null, new ConfigDef.NonEmptyString(), + Importance.MEDIUM, "AWS STS Session name", GROUP_AWS_STS, awsStsGroupCounter++, // NOPMD + // UnusedAssignment + ConfigDef.Width.NONE, AWS_STS_ROLE_SESSION_NAME); + + configDef.define(AWS_STS_ROLE_SESSION_DURATION, Type.INT, 3600, + ConfigDef.Range.between(AwsStsRole.MIN_SESSION_DURATION, AwsStsRole.MAX_SESSION_DURATION), + Importance.MEDIUM, "AWS STS Session duration", GROUP_AWS_STS, awsStsGroupCounter++, // NOPMD + // UnusedAssignment + ConfigDef.Width.NONE, AWS_STS_ROLE_SESSION_DURATION); + + configDef.define(AWS_STS_ROLE_EXTERNAL_ID, Type.STRING, null, new ConfigDef.NonEmptyString(), Importance.MEDIUM, + "AWS STS External Id", GROUP_AWS_STS, awsStsGroupCounter++, // NOPMD UnusedAssignment + ConfigDef.Width.NONE, AWS_STS_ROLE_EXTERNAL_ID); + + configDef.define(AWS_STS_CONFIG_ENDPOINT, Type.STRING, AwsStsEndpointConfig.AWS_STS_GLOBAL_ENDPOINT, + new ConfigDef.NonEmptyString(), Importance.MEDIUM, "AWS STS Config Endpoint", GROUP_AWS_STS, + awsStsGroupCounter++, // NOPMD UnusedAssignment + ConfigDef.Width.NONE, AWS_STS_CONFIG_ENDPOINT); + } + + protected static void addDeprecatedConfiguration(final ConfigDef configDef) { + configDef.define(AWS_S3_PREFIX_CONFIG, Type.STRING, null, new ConfigDef.NonEmptyString(), Importance.MEDIUM, + "[Deprecated] Use `file.name.template` instead. Prefix for stored objects, e.g. cluster-1/", GROUP_AWS, + 0, ConfigDef.Width.NONE, AWS_S3_PREFIX_CONFIG); + + configDef.define(AWS_ACCESS_KEY_ID, Type.PASSWORD, null, new NonEmptyPassword() { + @Override + public void ensureValid(final String name, final Object value) { + LOGGER.info(AWS_ACCESS_KEY_ID + " property is deprecated please read documentation for the new name"); + super.ensureValid(name, value); + } + }, Importance.MEDIUM, "AWS Access Key ID"); + + configDef.define(AWS_SECRET_ACCESS_KEY, Type.PASSWORD, null, new NonEmptyPassword() { + @Override + public void ensureValid(final String name, final Object value) { + LOGGER.info( + AWS_SECRET_ACCESS_KEY + " property is deprecated please read documentation for the new name"); + super.ensureValid(name, value); + } + }, Importance.MEDIUM, "AWS Secret Access Key"); + + configDef.define(AWS_S3_BUCKET, Type.STRING, null, new BucketNameValidator() { + @Override + public void ensureValid(final String name, final Object object) { + LOGGER.info(AWS_S3_BUCKET + " property is deprecated please read documentation for the new name"); + super.ensureValid(name, object); + } + }, Importance.MEDIUM, "AWS S3 Bucket name"); + + configDef.define(AWS_S3_ENDPOINT, Type.STRING, null, new UrlValidator() { + @Override + public void ensureValid(final String name, final Object object) { + LOGGER.info(AWS_S3_ENDPOINT + " property is deprecated please read documentation for the new name"); + super.ensureValid(name, object); + } + }, Importance.LOW, "Explicit AWS S3 Endpoint Address, mainly for testing"); + + configDef.define(AWS_S3_REGION, Type.STRING, null, new AwsRegionValidator() { + @Override + public void ensureValid(final String name, final Object object) { + LOGGER.info(AWS_S3_REGION + " property is deprecated please read documentation for the new name"); + super.ensureValid(name, object); + } + }, Importance.MEDIUM, "AWS S3 Region, e.g. us-east-1"); + + configDef.define(AWS_S3_PREFIX, Type.STRING, null, new ConfigDef.NonEmptyString() { + @Override + public void ensureValid(final String name, final Object object) { + LOGGER.info(AWS_S3_PREFIX + " property is deprecated please read documentation for the new name"); + super.ensureValid(name, object); + } + }, Importance.MEDIUM, "Prefix for stored objects, e.g. cluster-1/"); + + configDef.define(OUTPUT_FIELDS, Type.LIST, null, new OutputFieldsValidator() { + @Override + public void ensureValid(final String name, final Object value) { + LOGGER.info(OUTPUT_FIELDS + " property is deprecated please read documentation for the new name"); + super.ensureValid(name, value); + } + }, Importance.MEDIUM, + "Output fields. A comma separated list of one or more: " + OUTPUT_FIELD_NAME_KEY + ", " + + OUTPUT_FIELD_NAME_OFFSET + ", " + OUTPUT_FIELD_NAME_TIMESTAMP + ", " + OUTPUT_FIELD_NAME_VALUE + + ", " + OUTPUT_FIELD_NAME_HEADERS); + + configDef.define(OUTPUT_COMPRESSION, Type.STRING, null, new FileCompressionTypeValidator() { + @Override + public void ensureValid(final String name, final Object value) { + LOGGER.info(OUTPUT_COMPRESSION + " property is deprecated please read documentation for the new name"); + super.ensureValid(name, value); + } + }, Importance.MEDIUM, "Output compression. Valid values are: " + OUTPUT_COMPRESSION_TYPE_GZIP + " and " + + OUTPUT_COMPRESSION_TYPE_NONE); + } + + // Custom Validators + protected static class AwsRegionValidator implements ConfigDef.Validator { + private static final String SUPPORTED_AWS_REGIONS = Arrays.stream(Regions.values()) + .map(Regions::getName) + .collect(Collectors.joining(", ")); + + @Override + public void ensureValid(final String name, final Object value) { + if (Objects.nonNull(value)) { + final String valueStr = (String) value; + final Region region = RegionUtils.getRegion(valueStr); + if (!RegionUtils.getRegions().contains(region)) { + throw new ConfigException(name, valueStr, "supported values are: " + SUPPORTED_AWS_REGIONS); + } + } + } + } + + private static class BucketNameValidator implements ConfigDef.Validator { + @Override + public void ensureValid(final String name, final Object value) { + try { + if (value != null) { + BucketNameUtils.validateBucketName((String) value); + } + } catch (final IllegalArgumentException e) { + throw new ConfigException("Illegal bucket name: " + e.getMessage()); + } + } + } + + public AwsStsRole getStsRole() { + return new AwsStsRole(getString(AWS_STS_ROLE_ARN), getString(AWS_STS_ROLE_EXTERNAL_ID), + getString(AWS_STS_ROLE_SESSION_NAME), getInt(AWS_STS_ROLE_SESSION_DURATION)); + } + + public boolean hasAwsStsRole() { + return getStsRole().isValid(); + } + + public boolean hasStsEndpointConfig() { + return getStsEndpointConfig().isValid(); + } + + public AwsStsEndpointConfig getStsEndpointConfig() { + return new AwsStsEndpointConfig(getString(AWS_STS_CONFIG_ENDPOINT), getString(AWS_S3_REGION_CONFIG)); + } + + public AwsClientBuilder.EndpointConfiguration getAwsEndpointConfiguration() { + final AwsStsEndpointConfig config = getStsEndpointConfig(); + return new AwsClientBuilder.EndpointConfiguration(config.getServiceEndpoint(), config.getSigningRegion()); + } + + public BasicAWSCredentials getAwsCredentials() { + if (Objects.nonNull(getPassword(AWS_ACCESS_KEY_ID_CONFIG)) + && Objects.nonNull(getPassword(AWS_SECRET_ACCESS_KEY_CONFIG))) { + + return new BasicAWSCredentials(getPassword(AWS_ACCESS_KEY_ID_CONFIG).value(), + getPassword(AWS_SECRET_ACCESS_KEY_CONFIG).value()); + } else if (Objects.nonNull(getPassword(AWS_ACCESS_KEY_ID)) + && Objects.nonNull(getPassword(AWS_SECRET_ACCESS_KEY))) { + LOGGER.warn("Config options {} and {} are deprecated", AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY); + return new BasicAWSCredentials(getPassword(AWS_ACCESS_KEY_ID).value(), + getPassword(AWS_SECRET_ACCESS_KEY).value()); + } + return null; + } + + public String getAwsS3EndPoint() { + return Objects.nonNull(getString(AWS_S3_ENDPOINT_CONFIG)) + ? getString(AWS_S3_ENDPOINT_CONFIG) + : getString(AWS_S3_ENDPOINT); + } + + public Region getAwsS3Region() { + // we have priority of properties if old one not set or both old and new one set + // the new property value will be selected + if (Objects.nonNull(getString(AWS_S3_REGION_CONFIG))) { + return RegionUtils.getRegion(getString(AWS_S3_REGION_CONFIG)); + } else if (Objects.nonNull(getString(AWS_S3_REGION))) { + return RegionUtils.getRegion(getString(AWS_S3_REGION)); + } else { + return RegionUtils.getRegion(Regions.US_EAST_1.getName()); + } + } + + public String getAwsS3BucketName() { + return Objects.nonNull(getString(AWS_S3_BUCKET_NAME_CONFIG)) + ? getString(AWS_S3_BUCKET_NAME_CONFIG) + : getString(AWS_S3_BUCKET); + } + + public String getServerSideEncryptionAlgorithmName() { + return getString(AWS_S3_SSE_ALGORITHM_CONFIG); + } + + public String getAwsS3Prefix() { + return Objects.nonNull(getString(AWS_S3_PREFIX_CONFIG)) + ? getString(AWS_S3_PREFIX_CONFIG) + : getString(AWS_S3_PREFIX); + } + + public int getAwsS3PartSize() { + return getInt(AWS_S3_PART_SIZE); + } + + public long getS3RetryBackoffDelayMs() { + return getLong(AWS_S3_RETRY_BACKOFF_DELAY_MS_CONFIG); + } + + public long getS3RetryBackoffMaxDelayMs() { + return getLong(AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_CONFIG); + } + + public int getS3RetryBackoffMaxRetries() { + return getInt(AWS_S3_RETRY_BACKOFF_MAX_RETRIES_CONFIG); + } + + public AWSCredentialsProvider getCustomCredentialsProvider() { + return getConfiguredInstance(AWS_CREDENTIALS_PROVIDER_CONFIG, AWSCredentialsProvider.class); + } + +} diff --git a/s3-commons/src/main/java/io/aiven/kafka/connect/s3/S3Utility.java b/s3-commons/src/main/java/io/aiven/kafka/connect/s3/S3Utility.java index 4ba49025..657664a9 100644 --- a/s3-commons/src/main/java/io/aiven/kafka/connect/s3/S3Utility.java +++ b/s3-commons/src/main/java/io/aiven/kafka/connect/s3/S3Utility.java @@ -32,7 +32,7 @@ public class S3Utility { private final AwsCredentialProviderFactory credentialFactory = new AwsCredentialProviderFactory(); - public AmazonS3 createAmazonS3Client(final S3BaseConfig config) { + public AmazonS3 createAmazonS3Client(final S3SinkBaseConfig config) { final var awsEndpointConfig = newEndpointConfiguration(config); final var clientConfig = PredefinedClientConfigurations.defaultConfig() .withRetryPolicy(new RetryPolicy(PredefinedRetryPolicies.DEFAULT_RETRY_CONDITION, @@ -51,7 +51,7 @@ public AmazonS3 createAmazonS3Client(final S3BaseConfig config) { return s3ClientBuilder.build(); } - private AwsClientBuilder.EndpointConfiguration newEndpointConfiguration(final S3BaseConfig config) { + private AwsClientBuilder.EndpointConfiguration newEndpointConfiguration(final S3SinkBaseConfig config) { if (Objects.isNull(config.getAwsS3EndPoint())) { return null; } diff --git a/s3-commons/src/test/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactoryTest.java b/s3-commons/src/test/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactoryTest.java index 17b4e94d..cafb2b6f 100644 --- a/s3-commons/src/test/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactoryTest.java +++ b/s3-commons/src/test/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactoryTest.java @@ -21,8 +21,8 @@ import java.util.HashMap; import java.util.Map; -import io.aiven.kafka.connect.s3.S3BaseConfig; -import io.aiven.kafka.connect.tools.AwsCredentialConfig; +import io.aiven.kafka.connect.s3.S3SinkBaseConfig; +import io.aiven.kafka.connect.tools.AwsCredentialBaseConfig; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; @@ -39,21 +39,21 @@ final class AwsCredentialProviderFactoryTest { public void setUp() { factory = new AwsCredentialProviderFactory(); props = new HashMap<>(); - props.put(S3BaseConfig.AWS_S3_BUCKET_NAME_CONFIG, "any-bucket"); - props.put(S3BaseConfig.FILE_NAME_TEMPLATE_CONFIG, "{{topic}}-{{partition}}-{{start_offset}}"); + props.put(S3SinkBaseConfig.AWS_S3_BUCKET_NAME_CONFIG, "any-bucket"); + props.put(S3SinkBaseConfig.FILE_NAME_TEMPLATE_CONFIG, "{{topic}}-{{partition}}-{{start_offset}}"); } @Test void createsStsCredentialProviderIfSpecified() { - props.put(S3BaseConfig.AWS_ACCESS_KEY_ID_CONFIG, "blah-blah-blah"); - props.put(S3BaseConfig.AWS_SECRET_ACCESS_KEY_CONFIG, "blah-blah-blah"); - props.put(S3BaseConfig.AWS_STS_ROLE_ARN, "arn:aws:iam::12345678910:role/S3SinkTask"); - props.put(S3BaseConfig.AWS_STS_ROLE_SESSION_NAME, "SESSION_NAME"); - props.put(S3BaseConfig.AWS_S3_REGION_CONFIG, Regions.US_EAST_1.getName()); - props.put(S3BaseConfig.AWS_STS_CONFIG_ENDPOINT, "https://sts.us-east-1.amazonaws.com"); + props.put(S3SinkBaseConfig.AWS_ACCESS_KEY_ID_CONFIG, "blah-blah-blah"); + props.put(S3SinkBaseConfig.AWS_SECRET_ACCESS_KEY_CONFIG, "blah-blah-blah"); + props.put(S3SinkBaseConfig.AWS_STS_ROLE_ARN, "arn:aws:iam::12345678910:role/S3SinkTask"); + props.put(S3SinkBaseConfig.AWS_STS_ROLE_SESSION_NAME, "SESSION_NAME"); + props.put(S3SinkBaseConfig.AWS_S3_REGION_CONFIG, Regions.US_EAST_1.getName()); + props.put(S3SinkBaseConfig.AWS_STS_CONFIG_ENDPOINT, "https://sts.us-east-1.amazonaws.com"); - final var config = new AwsCredentialConfig(props); + final var config = new AwsCredentialBaseConfig(props); final var credentialProvider = factory.getProvider(config); assertThat(credentialProvider).isInstanceOf(STSAssumeRoleSessionCredentialsProvider.class); @@ -61,10 +61,10 @@ void createsStsCredentialProviderIfSpecified() { @Test void createStaticCredentialProviderByDefault() { - props.put(S3BaseConfig.AWS_ACCESS_KEY_ID_CONFIG, "blah-blah-blah"); - props.put(S3BaseConfig.AWS_SECRET_ACCESS_KEY_CONFIG, "blah-blah-blah"); + props.put(S3SinkBaseConfig.AWS_ACCESS_KEY_ID_CONFIG, "blah-blah-blah"); + props.put(S3SinkBaseConfig.AWS_SECRET_ACCESS_KEY_CONFIG, "blah-blah-blah"); - final var config = new AwsCredentialConfig(props); + final var config = new AwsCredentialBaseConfig(props); final var credentialProvider = factory.getProvider(config); assertThat(credentialProvider).isInstanceOf(AWSStaticCredentialsProvider.class); @@ -72,7 +72,7 @@ void createStaticCredentialProviderByDefault() { @Test void createDefaultCredentialsWhenNoCredentialsSpecified() { - final var config = new AwsCredentialConfig(props); + final var config = new AwsCredentialBaseConfig(props); final var credentialProvider = factory.getProvider(config); assertThat(credentialProvider).isInstanceOf(DefaultAWSCredentialsProviderChain.class); diff --git a/s3-commons/src/test/java/io/aiven/kafka/connect/tools/AwsCredentialConfig.java b/s3-commons/src/test/java/io/aiven/kafka/connect/tools/AwsCredentialBaseConfig.java similarity index 91% rename from s3-commons/src/test/java/io/aiven/kafka/connect/tools/AwsCredentialConfig.java rename to s3-commons/src/test/java/io/aiven/kafka/connect/tools/AwsCredentialBaseConfig.java index 48e0f0a5..67829c93 100644 --- a/s3-commons/src/test/java/io/aiven/kafka/connect/tools/AwsCredentialConfig.java +++ b/s3-commons/src/test/java/io/aiven/kafka/connect/tools/AwsCredentialBaseConfig.java @@ -23,10 +23,10 @@ import io.aiven.kafka.connect.common.config.AivenCommonConfig; import io.aiven.kafka.connect.common.config.CompressionType; import io.aiven.kafka.connect.common.config.OutputFieldType; -import io.aiven.kafka.connect.s3.S3BaseConfig; +import io.aiven.kafka.connect.s3.S3SinkBaseConfig; -public class AwsCredentialConfig extends S3BaseConfig { - public AwsCredentialConfig(final Map properties) { +public class AwsCredentialBaseConfig extends S3SinkBaseConfig { + public AwsCredentialBaseConfig(final Map properties) { super(configDef(), properties); } diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java index 38d5c511..1f410c75 100644 --- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java +++ b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java @@ -44,8 +44,8 @@ import io.aiven.kafka.connect.common.templating.Template; import io.aiven.kafka.connect.iam.AwsStsEndpointConfig; import io.aiven.kafka.connect.iam.AwsStsRole; -import io.aiven.kafka.connect.s3.S3BaseConfig; import io.aiven.kafka.connect.s3.S3OutputStream; +import io.aiven.kafka.connect.s3.S3SinkBaseConfig; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.regions.Region; @@ -56,7 +56,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({ "PMD.TooManyMethods", "PMD.GodClass", "PMD.ExcessiveImports" }) -final public class S3SinkConfig extends S3BaseConfig { +final public class S3SinkConfig extends S3SinkBaseConfig { public static final Logger LOGGER = LoggerFactory.getLogger(S3SinkConfig.class);