From 04ed86fce1afb9f1b52b14805e9044b4fc81e416 Mon Sep 17 00:00:00 2001 From: Christopher Smith Date: Thu, 18 Jan 2018 02:48:39 -0800 Subject: [PATCH 1/2] Added STS sdk module. Bumped version to v1.0 of connect API --- pom.xml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 34c0b0b..63ded66 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ org.apache.kafka connect-api - 0.11.0.2 + 1.0.0 com.amazonaws @@ -37,6 +37,11 @@ 6.9.10 test + + com.amazonaws + aws-java-sdk-sts + 1.11.265 + From 40e909c2e07849a133b15d1d38b9ec101f84daea Mon Sep 17 00:00:00 2001 From: Christopher Smith Date: Thu, 18 Jan 2018 02:52:46 -0800 Subject: [PATCH 2/2] Minimalist tweaks to add assume role support. This is ugly and has duplicate code, but a cleaner code base would be a more intrusive patch. --- .../kafka/AmazonKinesisSinkConnector.java | 19 +++++++++++++++- .../kinesis/kafka/AmazonKinesisSinkTask.java | 22 +++++++++++++++++-- .../kinesis/kafka/FirehoseSinkConnector.java | 17 ++++++++++++++ .../kinesis/kafka/FirehoseSinkTask.java | 20 ++++++++++++++++- 4 files changed, 74 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkConnector.java b/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkConnector.java index 7adcdd5..beeb6b2 100644 --- a/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkConnector.java +++ b/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkConnector.java @@ -11,6 +11,7 @@ import org.apache.kafka.connect.sink.SinkConnector; public class AmazonKinesisSinkConnector extends SinkConnector { + public static final String STS_SESSION_NAME_DEFAULT = "AmazonKinesisSink"; public static final String REGION = "region"; @@ -46,6 +47,10 @@ public class AmazonKinesisSinkConnector extends SinkConnector { public static final String SLEEP_CYCLES = "sleepCycles"; + public static final String PRODUCER_ROLE = "producerRole"; + + public static final String STS_SESSION_NAME = "stsSessionName"; + private String region; private String streamName; @@ -80,6 +85,10 @@ public class AmazonKinesisSinkConnector extends SinkConnector { private String sleepCycles; + private String producerRole; + + private String stsSessionName; + @Override public void start(Map props) { region = props.get(REGION); @@ -99,6 +108,8 @@ public void start(Map props) { outstandingRecordsThreshold = props.get(OUTSTANDING_RECORDS_THRESHOLD); sleepPeriod = props.get(SLEEP_PERIOD); sleepCycles = props.get(SLEEP_CYCLES); + producerRole = props.get(PRODUCER_ROLE); + stsSessionName = props.getOrDefault(STS_SESSION_NAME, STS_SESSION_NAME_DEFAULT); } @Override @@ -198,6 +209,12 @@ public List> taskConfigs(int maxTasks) { config.put(SLEEP_CYCLES, sleepCycles); else config.put(SLEEP_CYCLES, "10"); + + if (producerRole != null) + config.put(PRODUCER_ROLE, producerRole); + + if (stsSessionName != null) + config.put(STS_SESSION_NAME, stsSessionName); configs.add(config); @@ -218,4 +235,4 @@ public ConfigDef config() { return new ConfigDef(); } -} \ No newline at end of file +} diff --git a/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkTask.java b/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkTask.java index c49d13e..4b8c745 100644 --- a/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkTask.java +++ b/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkTask.java @@ -11,7 +11,9 @@ import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.sink.SinkTaskContext; +import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; import com.amazonaws.services.kinesis.producer.Attempt; import com.amazonaws.services.kinesis.producer.KinesisProducer; import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; @@ -23,7 +25,6 @@ import com.google.common.util.concurrent.ListenableFuture; public class AmazonKinesisSinkTask extends SinkTask { - private String streamName; private String regionName; @@ -58,6 +59,10 @@ public class AmazonKinesisSinkTask extends SinkTask { private int sleepCycles; + private String producerRole; + + private String stsSessionName; + private SinkTaskContext sinkTaskContext; private Map producerMap = new HashMap(); @@ -264,6 +269,10 @@ public void start(Map props) { sleepCycles = Integer.parseInt(props.get(AmazonKinesisSinkConnector.SLEEP_CYCLES)); + producerRole = props.get(AmazonKinesisSinkConnector.PRODUCER_ROLE); + + stsSessionName = props.get(AmazonKinesisSinkConnector.STS_SESSION_NAME); + if (!singleKinesisProducerPerPartition) kinesisProducer = getKinesisProducer(); @@ -300,10 +309,19 @@ public void stop() { } + + private AWSCredentialsProvider getCredentialsProvider() { + if (producerRole == null) { + return new DefaultAWSCredentialsProviderChain(); + } + + return new STSAssumeRoleSessionCredentialsProvider.Builder(producerRole, stsSessionName).build(); + } + private KinesisProducer getKinesisProducer() { KinesisProducerConfiguration config = new KinesisProducerConfiguration(); config.setRegion(regionName); - config.setCredentialsProvider(new DefaultAWSCredentialsProviderChain()); + config.setCredentialsProvider(getCredentialsProvider()); config.setMaxConnections(maxConnections); config.setAggregationEnabled(aggregration); diff --git a/src/main/java/com/amazon/kinesis/kafka/FirehoseSinkConnector.java b/src/main/java/com/amazon/kinesis/kafka/FirehoseSinkConnector.java index 6e6b68d..420a509 100644 --- a/src/main/java/com/amazon/kinesis/kafka/FirehoseSinkConnector.java +++ b/src/main/java/com/amazon/kinesis/kafka/FirehoseSinkConnector.java @@ -11,6 +11,7 @@ import org.apache.kafka.connect.sink.SinkConnector; public class FirehoseSinkConnector extends SinkConnector { + public static final String STS_SESSION_NAME_DEFAULT = "AmazonKinesisSink"; public static final String DELIVERY_STREAM = "deliveryStream"; @@ -22,6 +23,10 @@ public class FirehoseSinkConnector extends SinkConnector { public static final String BATCH_SIZE_IN_BYTES = "batchSizeInBytes"; + public static final String PRODUCER_ROLE = "producerRole"; + + public static final String STS_SESSION_NAME = "stsSessionName"; + private String deliveryStream; private String region; @@ -32,6 +37,10 @@ public class FirehoseSinkConnector extends SinkConnector { private String batchSizeInBytes; + private String producerRole; + + private String stsSessionName; + private final String MAX_BATCH_SIZE = "500"; private final String MAX_BATCH_SIZE_IN_BYTES = "3670016"; @@ -44,6 +53,8 @@ public void start(Map props) { batch = props.get(BATCH); batchSize = props.get(BATCH_SIZE); batchSizeInBytes = props.get(BATCH_SIZE_IN_BYTES); + producerRole = props.get(PRODUCER_ROLE); + stsSessionName = props.getOrDefault(STS_SESSION_NAME, STS_SESSION_NAME_DEFAULT); } @Override @@ -81,6 +92,12 @@ public List> taskConfigs(int maxTasks) { else config.put(BATCH_SIZE_IN_BYTES, MAX_BATCH_SIZE_IN_BYTES); + if (producerRole != null) + config.put(PRODUCER_ROLE, producerRole); + + if (stsSessionName != null) + config.put(STS_SESSION_NAME, stsSessionName); + configs.add(config); } return configs; diff --git a/src/main/java/com/amazon/kinesis/kafka/FirehoseSinkTask.java b/src/main/java/com/amazon/kinesis/kafka/FirehoseSinkTask.java index 2c4a7dd..d1af42c 100644 --- a/src/main/java/com/amazon/kinesis/kafka/FirehoseSinkTask.java +++ b/src/main/java/com/amazon/kinesis/kafka/FirehoseSinkTask.java @@ -11,7 +11,9 @@ import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; +import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; import com.amazonaws.regions.RegionUtils; import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient; import com.amazonaws.services.kinesisfirehose.model.AmazonKinesisFirehoseException; @@ -39,6 +41,10 @@ public class FirehoseSinkTask extends SinkTask { private int batchSizeInBytes; + private String producerRole; + + private String stsSessionName; + @Override public String version() { return new FirehoseSinkConnector().version(); @@ -58,6 +64,14 @@ public void put(Collection sinkRecords) { } + private AWSCredentialsProvider getCredentialsProvider() { + if (producerRole == null) { + return new DefaultAWSCredentialsProviderChain(); + } + + return new STSAssumeRoleSessionCredentialsProvider.Builder(producerRole, stsSessionName).build(); + } + @Override public void start(Map props) { @@ -69,7 +83,11 @@ public void start(Map props) { deliveryStreamName = props.get(FirehoseSinkConnector.DELIVERY_STREAM); - firehoseClient = new AmazonKinesisFirehoseClient(new DefaultAWSCredentialsProviderChain()); + producerRole = props.get(AmazonKinesisSinkConnector.PRODUCER_ROLE); + + stsSessionName = props.get(AmazonKinesisSinkConnector.STS_SESSION_NAME); + + firehoseClient = new AmazonKinesisFirehoseClient(getCredentialsProvider()); firehoseClient.setRegion(RegionUtils.getRegion(props.get(FirehoseSinkConnector.REGION)));