From 42f0d4fe8de793e47a4796499d82d3c963b4e7d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Objet=20Trouv=C3=A9?= Date: Sun, 8 Aug 2021 13:01:59 +0200 Subject: [PATCH 1/4] #386: Implemented passing of `CryptoKeyReader`. As requested [here](https://github.com/streamnative/pulsar-flink/issues/386), it's now possible to pass a `CryptoKeyReader` (and encryption keys) to `FlinkPulsarSource` and `FlinkPulsarSink`. Used builder pattern for easy extensibility without breaking or excessively overloading public c'tors. Added integration test. (Maybe it can be moved to one of the other tests to avoid overhead.) --- .../connectors/pulsar/FlinkPulsarSink.java | 122 +++++++++- .../pulsar/FlinkPulsarSinkBase.java | 117 ++++++++-- .../connectors/pulsar/FlinkPulsarSource.java | 140 ++++++++--- .../pulsar/internal/PulsarFetcher.java | 199 +++++++++++++--- .../pulsar/internal/ReaderThread.java | 97 ++++++++ .../pulsar/SinkSrcRoundtripTest.java | 218 ++++++++++++++++++ 6 files changed, 816 insertions(+), 77 deletions(-) create mode 100644 pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/SinkSrcRoundtripTest.java diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSink.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSink.java index 34b481c7..0833da23 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSink.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSink.java @@ -19,6 +19,7 @@ import org.apache.flink.streaming.util.serialization.PulsarSerializationSchema; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRouter; import org.apache.pulsar.client.api.TypedMessageBuilder; @@ -26,9 +27,12 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Properties; +import java.util.Set; import java.util.concurrent.CompletableFuture; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -41,8 +45,115 @@ @Slf4j public class FlinkPulsarSink extends FlinkPulsarSinkBase { + public static class Builder { + private String adminUrl; + private String defaultTopicName; + private ClientConfigurationData clientConf; + private Properties properties; + private PulsarSerializationSchema serializationSchema; + private MessageRouter messageRouter = null; + private PulsarSinkSemantic semantic = PulsarSinkSemantic.AT_LEAST_ONCE; + private String serviceUrl; + private CryptoKeyReader cryptoKeyReader; + private final Set encryptionKeys = new HashSet<>(); + + public Builder withAdminUrl(final String adminUrl) { + this.adminUrl = adminUrl; + return this; + } + + public Builder withDefaultTopicName(final String defaultTopicName) { + this.defaultTopicName = defaultTopicName; + return this; + } + + public Builder withClientConf(final ClientConfigurationData clientConf) { + this.clientConf = clientConf; + return this; + } + + public Builder withProperties(final Properties properties) { + this.properties = properties; + return this; + } + + public Builder withPulsarSerializationSchema(final PulsarSerializationSchema serializationSchema) { + this.serializationSchema = serializationSchema; + return this; + } + + public Builder withMessageRouter(final MessageRouter messageRouter) { + this.messageRouter = messageRouter; + return this; + } + + public Builder withSemantic(final PulsarSinkSemantic semantic) { + this.semantic = semantic; + return this; + } + + public Builder withServiceUrl(final String serviceUrl) { + this.serviceUrl = serviceUrl; + return this; + } + + public Builder withCryptoKeyReader(CryptoKeyReader cryptoKeyReader) { + this.cryptoKeyReader = cryptoKeyReader; + return this; + } + + public Builder withEncryptionKeys(String... encryptionKeys) { + this.encryptionKeys.addAll(Arrays.asList(encryptionKeys)); + return this; + } + + public FlinkPulsarSink build(){ + if (adminUrl == null) { + throw new IllegalStateException("Admin URL must be set."); + } + if (serializationSchema == null) { + throw new IllegalStateException("Serialization schema must be set."); + } + if (semantic == null) { + throw new IllegalStateException("Semantic must be set."); + } + if (properties == null) { + throw new IllegalStateException("Properties must be set."); + } + if (serviceUrl != null && clientConf != null) { + throw new IllegalStateException("Set either client conf or service URL but not both."); + } + if (serviceUrl != null){ + clientConf = PulsarClientUtils.newClientConf(checkNotNull(serviceUrl), properties); + } + if (clientConf == null){ + throw new IllegalStateException("Client conf must be set."); + } + if ((cryptoKeyReader == null) != (encryptionKeys.isEmpty())){ + throw new IllegalStateException("Set crypto key reader and encryption keys in conjunction."); + } + return new FlinkPulsarSink<>(this); + } + + } + private final PulsarSerializationSchema serializationSchema; + public FlinkPulsarSink(final Builder builder) { + super( + new FlinkPulsarSinkBase.Config() + .withAdminUrl(builder.adminUrl) + .withDefaultTopicName(builder.defaultTopicName) + .withClientConf(builder.clientConf) + .withProperties(builder.properties) + .withSerializationSchema(builder.serializationSchema) + .withMessageRouter(builder.messageRouter) + .withSemantic(builder.semantic) + .withCryptoKeyReader(builder.cryptoKeyReader) + .withEncryptionKeys(builder.encryptionKeys)); + this.serializationSchema = builder.serializationSchema; + } + public FlinkPulsarSink( String adminUrl, Optional defaultTopicName, @@ -51,9 +162,14 @@ public FlinkPulsarSink( PulsarSerializationSchema serializationSchema, MessageRouter messageRouter, PulsarSinkSemantic semantic) { - - super(adminUrl, defaultTopicName, clientConf, properties, serializationSchema, messageRouter, semantic); - this.serializationSchema = serializationSchema; + this(new Builder() + .withAdminUrl(adminUrl) + .withDefaultTopicName(defaultTopicName.orElse(null)) + .withClientConf(clientConf) + .withProperties(properties) + .withPulsarSerializationSchema(serializationSchema) + .withMessageRouter(messageRouter) + .withSemantic(semantic)); } public FlinkPulsarSink( diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java index 5d1887c2..78932108 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java @@ -42,6 +42,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRouter; import org.apache.pulsar.client.api.MessageRoutingMode; @@ -62,10 +63,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; @@ -86,6 +89,64 @@ @Slf4j abstract class FlinkPulsarSinkBase extends TwoPhaseCommitSinkFunction, Void> implements CheckpointedFunction { + public static class Config { + private String adminUrl; + private Optional defaultTopicName; + private ClientConfigurationData clientConf; + private Properties properties; + private PulsarSerializationSchema serializationSchema; + private MessageRouter messageRouter; + private PulsarSinkSemantic semantic = PulsarSinkSemantic.AT_LEAST_ONCE; + private CryptoKeyReader cryptoKeyReader; + private Set encryptionKeys = new HashSet<>(); + + public Config withAdminUrl(final String adminUrl) { + this.adminUrl = adminUrl; + return this; + } + + public Config withDefaultTopicName(final String defaultTopicName) { + this.defaultTopicName = Optional.ofNullable(defaultTopicName); + return this; + } + + public Config withClientConf(ClientConfigurationData clientConf) { + this.clientConf = clientConf; + return this; + } + + public Config withProperties(final Properties properties) { + this.properties = properties; + return this; + } + + public Config withSerializationSchema(final PulsarSerializationSchema serializationSchema) { + this.serializationSchema = serializationSchema; + return this; + } + + public Config withMessageRouter(final MessageRouter messageRouter) { + this.messageRouter = messageRouter; + return this; + } + + public Config withSemantic(final PulsarSinkSemantic semantic) { + this.semantic = semantic; + return this; + } + + public Config withCryptoKeyReader(final CryptoKeyReader cryptoKeyReader) { + this.cryptoKeyReader = cryptoKeyReader; + return this; + } + + public Config withEncryptionKeys(final Set encryptionKeys) { + this.encryptionKeys = encryptionKeys; + return this; + } + + } + protected String adminUrl; protected ClientConfigurationData clientConfigurationData; @@ -143,6 +204,10 @@ abstract class FlinkPulsarSinkBase extends TwoPhaseCommitSinkFunction> topic2Producer; + private final CryptoKeyReader cryptoKeyReader; + + private final Set encryptionKeys; + public FlinkPulsarSinkBase( String adminUrl, Optional defaultTopicName, @@ -153,34 +218,29 @@ public FlinkPulsarSinkBase( this(adminUrl, defaultTopicName, clientConf, properties, serializationSchema, messageRouter, PulsarSinkSemantic.AT_LEAST_ONCE); } - public FlinkPulsarSinkBase( - String adminUrl, - Optional defaultTopicName, - ClientConfigurationData clientConf, - Properties properties, - PulsarSerializationSchema serializationSchema, - MessageRouter messageRouter, - PulsarSinkSemantic semantic) { + public FlinkPulsarSinkBase(final Config config) { super(new TransactionStateSerializer(), VoidSerializer.INSTANCE); - this.adminUrl = checkNotNull(adminUrl); - this.semantic = semantic; + this.adminUrl = checkNotNull(config.adminUrl); + this.semantic = config.semantic; + this.cryptoKeyReader = config.cryptoKeyReader; + this.encryptionKeys = config.encryptionKeys; - if (defaultTopicName.isPresent()) { + if (config.defaultTopicName.isPresent()) { this.forcedTopic = true; - this.defaultTopic = defaultTopicName.get(); + this.defaultTopic = config.defaultTopicName.get(); } else { this.forcedTopic = false; this.defaultTopic = null; } - this.serializationSchema = serializationSchema; + this.serializationSchema = config.serializationSchema; - this.messageRouter = messageRouter; + this.messageRouter = config.messageRouter; - this.clientConfigurationData = clientConf; + this.clientConfigurationData = config.clientConf; - this.properties = checkNotNull(properties); + this.properties = checkNotNull(config.properties); this.caseInsensitiveParams = SourceSinkUtils.toCaceInsensitiveParams(Maps.fromProperties(properties)); @@ -216,6 +276,25 @@ public FlinkPulsarSinkBase( } } + public FlinkPulsarSinkBase( + String adminUrl, + Optional defaultTopicName, + ClientConfigurationData clientConf, + Properties properties, + PulsarSerializationSchema serializationSchema, + MessageRouter messageRouter, + PulsarSinkSemantic semantic) { + this(new Config() + .withAdminUrl(adminUrl) + .withDefaultTopicName(defaultTopicName.orElse(null)) + .withClientConf(clientConf) + .withProperties(properties) + .withSerializationSchema(serializationSchema) + .withMessageRouter(messageRouter) + .withSemantic(semantic) + ); + } + public FlinkPulsarSinkBase( String serviceUrl, String adminUrl, @@ -340,6 +419,12 @@ protected Producer createProducer( // maximizing the throughput .batchingMaxBytes(5 * 1024 * 1024) .loadConf(producerConf); + if (cryptoKeyReader != null){ + builder.cryptoKeyReader(cryptoKeyReader); + for (final String encryptionKey : this.encryptionKeys) { + builder.addEncryptionKey(encryptionKey); + } + } if (messageRouter == null) { return builder.create(); } else { diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java index 3edd5ce4..30cebfbf 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java @@ -63,6 +63,7 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; @@ -102,6 +103,76 @@ public class FlinkPulsarSource CheckpointListener, CheckpointedFunction { + public static class Builder { + private String adminUrl; + private Properties properties; + private String serviceUrl; + ClientConfigurationData clientConf; + PulsarDeserializationSchema deserializer; + private CryptoKeyReader cryptoKeyReader; + + public Builder withAdminUrl(final String adminUrl) { + this.adminUrl = adminUrl; + return this; + } + + public Builder withProperties(final Properties properties) { + this.properties = properties; + return this; + } + + public Builder withServiceUrl(final String serviceUrl) { + this.serviceUrl = serviceUrl; + return this; + } + + public Builder withCryptoKeyReader(final CryptoKeyReader cryptoKeyReader) { + this.cryptoKeyReader = cryptoKeyReader; + return this; + } + + public Builder withClientConfigurationData(final ClientConfigurationData clientConf){ + this.clientConf = clientConf; + return this; + } + + public Builder withPulsarDeserializionSchema(final PulsarDeserializationSchema deserializer){ + if (this.deserializer != null){ + throw new IllegalStateException("Deserializer was already set."); + } + this.deserializer = deserializer; + return this; + } + + public Builder withDeserializionSchema(final DeserializationSchema deserializer){ + if (this.deserializer != null){ + throw new IllegalStateException("Deserializer was already set."); + } + this.deserializer = PulsarDeserializationSchema.valueOnly(deserializer); + return this; + } + + public FlinkPulsarSource build(){ + if (adminUrl == null){ + throw new IllegalStateException("Admin URL must be set."); + } + if (properties == null){ + throw new IllegalStateException("Properties must be set."); + } + if ((serviceUrl != null && clientConf != null)){ + throw new IllegalStateException("Please specify either service URL plus properties or client conf but not both."); + } + if (serviceUrl != null){ + clientConf = PulsarClientUtils.newClientConf(serviceUrl, properties); + } + if (clientConf == null){ + throw new IllegalStateException("Client conf mustn't be null. Either provide a client conf or a service URL plus properties."); + } + return new FlinkPulsarSource<>(this); + } + + } + /** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks. */ public static final int MAX_NUM_PENDING_CHECKPOINTS = 100; @@ -229,15 +300,14 @@ public class FlinkPulsarSource private transient int numParallelTasks; - public FlinkPulsarSource( - String adminUrl, - ClientConfigurationData clientConf, - PulsarDeserializationSchema deserializer, - Properties properties) { - this.adminUrl = checkNotNull(adminUrl); - this.clientConfigurationData = checkNotNull(clientConf); - this.deserializer = deserializer; - this.properties = properties; + private final CryptoKeyReader cryptoKeyReader; + + public FlinkPulsarSource(final Builder builder) { + this.adminUrl = checkNotNull(builder.adminUrl); + this.clientConfigurationData = checkNotNull(builder.clientConf); + this.deserializer = builder.deserializer; + this.properties = builder.properties; + this.cryptoKeyReader = builder.cryptoKeyReader; this.caseInsensitiveParams = SourceSinkUtils.validateStreamSourceOptions(Maps.fromProperties(properties)); this.readerConf = @@ -259,6 +329,18 @@ public FlinkPulsarSource( this.oldStateVersion = SourceSinkUtils.getOldStateVersion(caseInsensitiveParams, oldStateVersion); } + public FlinkPulsarSource( + String adminUrl, + ClientConfigurationData clientConf, + PulsarDeserializationSchema deserializer, + Properties properties) { + this(new Builder() + .withAdminUrl(adminUrl) + .withClientConfigurationData(clientConf) + .withPulsarDeserializionSchema(deserializer) + .withProperties(properties)); + } + public FlinkPulsarSource( String serviceUrl, String adminUrl, @@ -595,27 +677,25 @@ protected PulsarFetcher createFetcher( StreamingRuntimeContext streamingRuntime, boolean useMetrics, Set excludeStartMessageIds) throws Exception { - - //readerConf.putIfAbsent(PulsarOptions.SUBSCRIPTION_ROLE_OPTION_KEY, getSubscriptionName()); - - return new PulsarFetcher<>( - sourceContext, - seedTopicsWithInitialOffsets, - excludeStartMessageIds, - watermarkStrategy, - processingTimeProvider, - autoWatermarkInterval, - userCodeClassLoader, - streamingRuntime, - clientConfigurationData, - readerConf, - pollTimeoutMs, - commitMaxRetries, - deserializer, - metadataReader, - streamingRuntime.getMetricGroup().addGroup(PULSAR_SOURCE_METRICS_GROUP), - useMetrics - ); + return new PulsarFetcher.Builder() + .withSourceContext(sourceContext) + .withSeedTopicsWithInitialOffsets(seedTopicsWithInitialOffsets) + .withExcludeStartMessageIds(excludeStartMessageIds) + .withWatermarkStrategy(watermarkStrategy) + .withProcessingTimeProvider(processingTimeProvider) + .withAutoWatermarkInterval(autoWatermarkInterval) + .withUserCodeClassLoader(userCodeClassLoader) + .withRuntimeContext(streamingRuntime) + .withClientConf(clientConfigurationData) + .withReaderConf(readerConf) + .withPollTimeoutMs(pollTimeoutMs) + .withCommitMaxRetries(commitMaxRetries) + .withDeserializer(deserializer) + .withMetadataReader(metadataReader) + .withConsumerMetricGroup(streamingRuntime.getMetricGroup().addGroup(PULSAR_SOURCE_METRICS_GROUP)) + .withUseMetrics(useMetrics) + .withCryptoKeyReader(cryptoKeyReader) + .build(); } public void joinDiscoveryLoopThread() throws InterruptedException { diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarFetcher.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarFetcher.java index 7c8a5ba9..0e69575d 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarFetcher.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarFetcher.java @@ -27,6 +27,7 @@ import org.apache.flink.util.SerializedValue; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.shade.com.google.common.collect.ImmutableList; @@ -55,6 +56,119 @@ @Slf4j public class PulsarFetcher { + public static class Builder { + private SourceContext sourceContext; + private Map seedTopicsWithInitialOffsets; + private SerializedValue> watermarkStrategy; + private ProcessingTimeService processingTimeProvider; + private long autoWatermarkInterval; + private ClassLoader userCodeClassLoader; + private StreamingRuntimeContext runtimeContext; + private ClientConfigurationData clientConf; + private Map readerConf; + private int pollTimeoutMs; + private PulsarDeserializationSchema deserializer; + private PulsarMetadataReader metadataReader; + private MetricGroup consumerMetricGroup; + private boolean useMetrics; + private Set excludeStartMessageIds = Collections.emptySet(); + private int commitMaxRetries = 3; + private CryptoKeyReader cryptoKeyReader; + + public Builder withSourceContext(final SourceContext sourceContext) { + this.sourceContext = sourceContext; + return this; + } + + public Builder withSeedTopicsWithInitialOffsets(final Map seedTopicsWithInitialOffsets) { + this.seedTopicsWithInitialOffsets = seedTopicsWithInitialOffsets; + return this; + } + + public Builder withWatermarkStrategy(final SerializedValue> watermarkStrategy) { + this.watermarkStrategy = watermarkStrategy; + return this; + } + + public Builder withProcessingTimeProvider(final ProcessingTimeService processingTimeProvider) { + this.processingTimeProvider = processingTimeProvider; + return this; + } + + public Builder withAutoWatermarkInterval(final long autoWatermarkInterval) { + this.autoWatermarkInterval = autoWatermarkInterval; + return this; + } + + public Builder withUserCodeClassLoader(final ClassLoader userCodeClassLoader) { + this.userCodeClassLoader = userCodeClassLoader; + return this; + } + + public Builder withRuntimeContext(final StreamingRuntimeContext runtimeContext) { + this.runtimeContext = runtimeContext; + return this; + } + + public Builder withClientConf(final ClientConfigurationData clientConf) { + this.clientConf = clientConf; + return this; + } + + public Builder withReaderConf(final Map readerConf) { + this.readerConf = readerConf; + return this; + } + + public Builder withPollTimeoutMs(final int pollTimeoutMs) { + this.pollTimeoutMs = pollTimeoutMs; + return this; + } + + public Builder withDeserializer(final PulsarDeserializationSchema deserializer) { + this.deserializer = deserializer; + return this; + } + + public Builder withMetadataReader(final PulsarMetadataReader metadataReader) { + this.metadataReader = metadataReader; + return this; + } + + public Builder withConsumerMetricGroup(final MetricGroup consumerMetricGroup) { + this.consumerMetricGroup = consumerMetricGroup; + return this; + } + + public Builder withUseMetrics(final boolean useMetrics) { + this.useMetrics = useMetrics; + return this; + } + + public Builder withExcludeStartMessageIds(final Set excludeStartMessageIds) { + this.excludeStartMessageIds = excludeStartMessageIds; + return this; + } + + public Builder withCommitMaxRetries(final int commitMaxRetries) { + this.commitMaxRetries = commitMaxRetries; + return this; + } + + public Builder withCryptoKeyReader(final CryptoKeyReader cryptoKeyReader) { + this.cryptoKeyReader = cryptoKeyReader; + return this; + } + + public PulsarFetcher build() throws Exception { + return new PulsarFetcher(this); + } + } + + public static Builder builder(){ + return new Builder<>(); + } + private static final int NO_TIMESTAMPS_WATERMARKS = 0; private static final int WITH_WATERMARK_GENERATOR = 1; @@ -146,6 +260,8 @@ public class PulsarFetcher { */ private final MetricGroup consumerMetricGroup; + private final CryptoKeyReader cryptoKeyReader; + public PulsarFetcher( SourceContext sourceContext, Map seedTopicsWithInitialOffsets, @@ -199,27 +315,51 @@ public PulsarFetcher( MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception { - this.sourceContext = sourceContext; + this(new Builder() + .withSourceContext(sourceContext) + .withUseMetrics(useMetrics) + .withConsumerMetricGroup(consumerMetricGroup) + .withSeedTopicsWithInitialOffsets(seedTopicsWithInitialOffsets) + .withExcludeStartMessageIds(excludeStartMessageIds) + .withUserCodeClassLoader(userCodeClassLoader) + .withRuntimeContext(runtimeContext) + .withClientConf(clientConf) + .withReaderConf(readerConf) + .withPollTimeoutMs(pollTimeoutMs) + .withCommitMaxRetries(commitMaxRetries) + .withDeserializer(deserializer) + .withMetadataReader(metadataReader) + .withWatermarkStrategy(watermarkStrategy) + .withProcessingTimeProvider(processingTimeProvider) + .withAutoWatermarkInterval(autoWatermarkInterval) + ); + + } + + private PulsarFetcher(final Builder builder) throws Exception { + + this.sourceContext = builder.sourceContext; this.watermarkOutput = new SourceContextWatermarkOutputAdapter<>(sourceContext); this.watermarkOutputMultiplexer = new WatermarkOutputMultiplexer(watermarkOutput); - this.useMetrics = useMetrics; - this.consumerMetricGroup = checkNotNull(consumerMetricGroup); - this.seedTopicsWithInitialOffsets = seedTopicsWithInitialOffsets; - this.excludeStartMessageIds = excludeStartMessageIds; + this.useMetrics = builder.useMetrics; + this.consumerMetricGroup = checkNotNull(builder.consumerMetricGroup); + this.seedTopicsWithInitialOffsets = builder.seedTopicsWithInitialOffsets; + this.excludeStartMessageIds = builder.excludeStartMessageIds; this.checkpointLock = sourceContext.getCheckpointLock(); - this.userCodeClassLoader = userCodeClassLoader; - this.runtimeContext = runtimeContext; - this.clientConf = clientConf; - this.readerConf = readerConf == null ? new HashMap<>() : readerConf; + this.userCodeClassLoader = builder.userCodeClassLoader; + this.runtimeContext = builder.runtimeContext; + this.clientConf = builder.clientConf; + this.readerConf = builder.readerConf == null ? new HashMap<>() : builder.readerConf; this.failOnDataLoss = SourceSinkUtils.getFailOnDataLossAndRemoveKey(this.readerConf); this.useEarliestWhenDataLoss = SourceSinkUtils.getUseEarliestWhenDataLossAndRemoveKey(this.readerConf); - this.pollTimeoutMs = pollTimeoutMs; - this.commitMaxRetries = commitMaxRetries; - this.deserializer = deserializer; - this.metadataReader = metadataReader; + this.pollTimeoutMs = builder.pollTimeoutMs; + this.commitMaxRetries = builder.commitMaxRetries; + this.deserializer = builder.deserializer; + this.metadataReader = builder.metadataReader; + this.cryptoKeyReader = builder.cryptoKeyReader; // figure out what we watermark mode we will be using - this.watermarkStrategy = watermarkStrategy; + this.watermarkStrategy = builder.watermarkStrategy; if (watermarkStrategy == null) { timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS; @@ -254,13 +394,13 @@ public PulsarFetcher( } // if we have periodic watermarks, kick off the interval scheduler - if (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && autoWatermarkInterval > 0) { + if (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && builder.autoWatermarkInterval > 0) { PeriodicWatermarkEmitter periodicEmitter = new PeriodicWatermarkEmitter<>( checkpointLock, subscribedPartitionStates, watermarkOutputMultiplexer, - processingTimeProvider, - autoWatermarkInterval); + builder.processingTimeProvider, + builder.autoWatermarkInterval); periodicEmitter.start(); } @@ -533,17 +673,20 @@ protected List> getSubscribedTopicStates() { } protected ReaderThread createReaderThread(ExceptionProxy exceptionProxy, PulsarTopicState state) { - return new ReaderThread<>( - this, - state, - clientConf, - readerConf, - deserializer, - pollTimeoutMs, - exceptionProxy, - failOnDataLoss, - useEarliestWhenDataLoss, - excludeStartMessageIds.contains(state.getTopicRange())); + + return new ReaderThread.Builder() + .withOwner(this) + .withState(state) + .withClientConf(clientConf) + .withReaderConf(readerConf) + .withDeserializer(deserializer) + .withPollTimeoutMs(pollTimeoutMs) + .withExceptionProxy(exceptionProxy) + .withFailOnDataLoss(failOnDataLoss) + .withUseEarliestWhenDataLoss(useEarliestWhenDataLoss) + .withExcludeMessageId(excludeStartMessageIds.contains(state.getTopicRange())) + .withCryptoKeyReader(cryptoKeyReader) + .build(); } /** diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.java index fefac9c0..e027bf8a 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.java @@ -18,6 +18,7 @@ import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; @@ -39,6 +40,80 @@ @Slf4j public class ReaderThread extends Thread { + public static class Builder { + private PulsarFetcher owner; + private PulsarTopicState state; + private ClientConfigurationData clientConf; + private Map readerConf; + private PulsarDeserializationSchema deserializer; + private int pollTimeoutMs; + private ExceptionProxy exceptionProxy; + private boolean failOnDataLoss; + private boolean useEarliestWhenDataLoss; + private boolean excludeMessageId; + private CryptoKeyReader cryptoKeyReader; + + public Builder withOwner(final PulsarFetcher owner) { + this.owner = owner; + return this; + } + + public Builder withState(final PulsarTopicState state) { + this.state = state; + return this; + } + + public Builder withClientConf(final ClientConfigurationData clientConf) { + this.clientConf = clientConf; + return this; + } + + public Builder withReaderConf(final Map readerConf) { + this.readerConf = readerConf; + return this; + } + + public Builder withDeserializer(final PulsarDeserializationSchema deserializer) { + this.deserializer = deserializer; + return this; + } + + public Builder withPollTimeoutMs(final int pollTimeoutMs) { + this.pollTimeoutMs = pollTimeoutMs; + return this; + } + + public Builder withExceptionProxy(final ExceptionProxy exceptionProxy) { + this.exceptionProxy = exceptionProxy; + return this; + } + + public Builder withFailOnDataLoss(final boolean failOnDataLoss) { + this.failOnDataLoss = failOnDataLoss; + return this; + } + + public Builder withUseEarliestWhenDataLoss(final boolean useEarliestWhenDataLoss) { + this.useEarliestWhenDataLoss = useEarliestWhenDataLoss; + return this; + } + + public Builder withExcludeMessageId(final boolean excludeMessageId) { + this.excludeMessageId = excludeMessageId; + return this; + } + + public Builder withCryptoKeyReader(final CryptoKeyReader cryptoKeyReader) { + this.cryptoKeyReader = cryptoKeyReader; + return this; + } + + public ReaderThread build(){ + return new ReaderThread<>(this); + } + + } + protected final PulsarFetcher owner; protected final PulsarTopicState state; protected final ClientConfigurationData clientConf; @@ -58,6 +133,8 @@ public class ReaderThread extends Thread { protected volatile Reader reader = null; + private final CryptoKeyReader cryptoKeyReader; + public ReaderThread( PulsarFetcher owner, PulsarTopicState state, @@ -76,6 +153,7 @@ public ReaderThread( this.topicRange = state.getTopicRange(); this.startMessageId = state.getOffset(); + this.cryptoKeyReader = null; } public ReaderThread( @@ -95,6 +173,22 @@ public ReaderThread( this.excludeMessageId = excludeMessageId; } + private ReaderThread(final Builder builder){ + this.owner = builder.owner; + this.state = builder.state; + this.clientConf = builder.clientConf; + this.readerConf = builder.readerConf; + this.deserializer = builder.deserializer; + this.pollTimeoutMs = builder.pollTimeoutMs; + this.exceptionProxy = builder.exceptionProxy; + this.topicRange = state.getTopicRange(); + this.startMessageId = state.getOffset(); + this.failOnDataLoss = builder.failOnDataLoss; + this.useEarliestWhenDataLoss = builder.useEarliestWhenDataLoss; + this.excludeMessageId = builder.excludeMessageId; + this.cryptoKeyReader = builder.cryptoKeyReader; + } + @Override public void run() { log.info("Starting to fetch from {} at {}, failOnDataLoss {}", topicRange, startMessageId, failOnDataLoss); @@ -129,6 +223,9 @@ protected void createActualReader() throws PulsarClientException { .topic(topicRange.getTopic()) .startMessageId(startMessageId) .loadConf(readerConf); + if (cryptoKeyReader != null){ + readerBuilder.cryptoKeyReader(cryptoKeyReader); + } log.info("Create a reader at topic {} starting from message {} (inclusive) : config = {}", topicRange, startMessageId, readerConf); if (!excludeMessageId){ diff --git a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/SinkSrcRoundtripTest.java b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/SinkSrcRoundtripTest.java new file mode 100644 index 00000000..3bb867d8 --- /dev/null +++ b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/SinkSrcRoundtripTest.java @@ -0,0 +1,218 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; +import org.apache.flink.streaming.util.serialization.PulsarSerializationSchemaWrapper; + +import org.apache.pulsar.client.api.CryptoKeyReader; +import org.apache.pulsar.client.api.EncryptionKeyInfo; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.naming.TopicName; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions.TOPIC_SINGLE_OPTION_KEY; +import static org.junit.Assert.assertTrue; + +public class SinkSrcRoundtripTest extends PulsarTestBase { + + private static final String TOPIC_UNENCRYPTED = TopicName.get("persistent", "public", "default", "unencrypted-topic").toString(); + private static final String TOPIC_ENCRYPTED = TopicName.get("persistent", "public", "default", "encrypted-topic").toString(); + + @Test(timeout = 60 * 1000) + public void testUnencryptedRoundtrip() throws Exception { + final Properties properties = new Properties(); + final String topic = TOPIC_UNENCRYPTED; + properties.put(TOPIC_SINGLE_OPTION_KEY, topic); + final SimpleStringSchema schema = new SimpleStringSchema(); + final PulsarSerializationSchemaWrapper serializationSchema = new PulsarSerializationSchemaWrapper.Builder<>(schema) + .useSpecialMode(Schema.STRING) + .build(); + + final FlinkPulsarSink sink = new FlinkPulsarSink.Builder() + .withAdminUrl(getAdminUrl()) + .withServiceUrl(getServiceUrl()) + .withDefaultTopicName(topic) + .withProperties(properties) + .withPulsarSerializationSchema(serializationSchema) + .build(); + + final FlinkPulsarSource src = new FlinkPulsarSource.Builder() + .withAdminUrl(getAdminUrl()) + .withServiceUrl(getServiceUrl()) + .withProperties(properties) + .withDeserializionSchema(schema) + .build() + .setStartFromEarliest(); + + roundtrip(sink, src); + } + + private static class MyCryptoKeyReader implements CryptoKeyReader { + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map metadata) { + EncryptionKeyInfo pub = new EncryptionKeyInfo(); + pub.setKey(("" + + "-----BEGIN RSA PUBLIC KEY-----\n" + + "MIICCgKCAgEAyuz24j5TMVi9PthJpGYQ0sGJ+5uKPoVeoQQPvagOlIhKMCpjitmO\n" + + "gZo5LvMFT0lACVzdxVkUpqEbRc9upopLDAhhHeHTauOxi7kH1iDO9kMKE5uy5QK5\n" + + "JveG56drwIXrvQyz9NmEIkmSG/Ruisl9zDr+t3cdyLdwE4T0YU8JCj1Ex3Pnlr8c\n" + + "iJoYV7VZoqJPJF3Le3PbXORxn5lXDnZcVkdDCDSLITAHegN5DU6/UJqIgwMQo0fQ\n" + + "2dMiZ/VzqiPF1RHrBoM7/P2u507k2ZOS4jrIbKj9YdV4AibTDEYIhtSyIPkEJGDJ\n" + + "GZINSxcfW9+exnq28B+NsJwwKNVoU60Phui6PcCOF4BXRYWMajJyLIrTNo8cFL5Z\n" + + "Kid1+5BrYxCBdWDHJ4KiaT/y8y9q8ea+kUrUgp3VbsDstN1gU3vxzPcvaiTZc5xK\n" + + "puEg4bbY5waN1y8JyfjwRUBk29CTXh3wQbd81DBjykfVL6OcbcVH8V0qLN7uhAPm\n" + + "EGFyyqlwH93HsSCTHjJpkxBj2gO8n7/5YQJe9181tz+0xofc9kpDT1YTxdHxJevA\n" + + "UsRfkrCGbwAdE2QhGmzwJSJCPdIanTEGRK8fr/6T0EM7TwrmHgLsCybpqdMil15u\n" + + "8crgr8N7wTfm/iikdVs1sOtkjfG5WoNwm8XqS7g4CrVBFIVu/v8o++ECAwEAAQ==\n" + + "-----END RSA PUBLIC KEY-----") + .getBytes()); + return pub; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map metadata) { + EncryptionKeyInfo priv = new EncryptionKeyInfo(); + priv.setKey(("" + + "-----BEGIN RSA PRIVATE KEY-----\n" + + "MIIJKAIBAAKCAgEAyuz24j5TMVi9PthJpGYQ0sGJ+5uKPoVeoQQPvagOlIhKMCpj\n" + + "itmOgZo5LvMFT0lACVzdxVkUpqEbRc9upopLDAhhHeHTauOxi7kH1iDO9kMKE5uy\n" + + "5QK5JveG56drwIXrvQyz9NmEIkmSG/Ruisl9zDr+t3cdyLdwE4T0YU8JCj1Ex3Pn\n" + + "lr8ciJoYV7VZoqJPJF3Le3PbXORxn5lXDnZcVkdDCDSLITAHegN5DU6/UJqIgwMQ\n" + + "o0fQ2dMiZ/VzqiPF1RHrBoM7/P2u507k2ZOS4jrIbKj9YdV4AibTDEYIhtSyIPkE\n" + + "JGDJGZINSxcfW9+exnq28B+NsJwwKNVoU60Phui6PcCOF4BXRYWMajJyLIrTNo8c\n" + + "FL5ZKid1+5BrYxCBdWDHJ4KiaT/y8y9q8ea+kUrUgp3VbsDstN1gU3vxzPcvaiTZ\n" + + "c5xKpuEg4bbY5waN1y8JyfjwRUBk29CTXh3wQbd81DBjykfVL6OcbcVH8V0qLN7u\n" + + "hAPmEGFyyqlwH93HsSCTHjJpkxBj2gO8n7/5YQJe9181tz+0xofc9kpDT1YTxdHx\n" + + "JevAUsRfkrCGbwAdE2QhGmzwJSJCPdIanTEGRK8fr/6T0EM7TwrmHgLsCybpqdMi\n" + + "l15u8crgr8N7wTfm/iikdVs1sOtkjfG5WoNwm8XqS7g4CrVBFIVu/v8o++ECAwEA\n" + + "AQKCAgBB5lCKypizxtC2bwEDXY4LE4Ue67Uqdp9zhOEjw0bw343QNIPdHKfV2OLH\n" + + "J27K/8vG/pyasUIultVHh4S0muaiQrpfPO4uoUEQUgeEd2Uevkiwc3jWPFsql2n9\n" + + "IvawMA2NeGmck2MAy4migG/BrIuo3mPH6uwGOeQwwpWmYEdcRudmKnLEFs5KYliT\n" + + "azZvxWwUME2bitVrRljL7r1B2hhEgKH5MS8ZmQJkkmomczNYFsdMXJtzmyftBU8A\n" + + "Gcr1LubZOhdsJwQ9NZkuTwWszur9gv+BoiOfOPbfJAKX0sqEFuC+KoA43CGSp0af\n" + + "4yNw758db86nDmgyOZa+PAfEXMhUf/3VFbnkca8w9vAP5w8b0W8GtCflcnC+vb1y\n" + + "qcToLbHaDnmhHLUdxHVj2XLTNxYFxmCmPhS+R8XRX4yPftni1skxjNOa6QbmHgVq\n" + + "7DgXcJJxJBk5+EIJrujk/eXzfU2zwUsYMt1dFLzwAvdc2fyoe8CCiBN+VUrhGoTF\n" + + "3IvYMzHYAdpxTdhKtQfShdI+tS3PLiEbzB3DxHH6H9X6Q0HZdLyGRufpqDW6DW1M\n" + + "zzCJIXRIcj8oZk36JsoH00RGOsCjdaeDOL13a3LCn0yBjuRPRVD0UlJA3EZT4/Ic\n" + + "QNvbgQe6zLbeIWWaYcstcAyjBCcW0mzEdlyUFIjU9LIVuJRhkQKCAQEA+D6p8KSb\n" + + "LhkwDGrt8X60QxattrRGVir5o/7lGCHkcnNBQUNXXy/8hrS41qsRhDHPSYHZFLwV\n" + + "66sIolkWhA9JEdZKfT/CsamHkfho8aitHh8YShdOog1OZQsunhFkaLzXO1mvDSUF\n" + + "W5ZLogUWy4oAduVwb4ANzNuW0pMGTYSzQHNJoHb91c/iu755av3VxeuxmomaKqBB\n" + + "h5TrNkHA+fUFvJqn/GZI0ZVyAqvIbyn7qN8y9rZcTa05L8oR2oyL7WVWF61ym03O\n" + + "V3f1gjSOU12acAGx5T6B6Bofb0v1xGgb4NMjacZcJ/lur9VmDZMlDt6XG6zWNalS\n" + + "/cB36JKQRgijfQKCAQEA0UPca7dG8cRryYpKYfHcCX3RIjp89XB83SWBpvNJr2+m\n" + + "BBHlsHPyU/NAVrh7rCL4S+TMsp8Dl/6sRjcon5R0a1gJfHPjmLTVx9tAD5Jbmn14\n" + + "zGemQDaGsV1DHxtL6B1a7qlKqq2QM4N6vzz18DvAkISs74iO9b+RwSFRLvkdmYQk\n" + + "e2CXDedNIPQlABV4nJlIQD6dAFBSdibf2xC8goXfxEtcRK6wf9/2SR0SVeOxnKDi\n" + + "KkeA9tqChW4Skk+WCHmDL8k375aYgViJcTmFD1cJHlj0lQGfk+gnxJnJwQJL7Cqg\n" + + "UYJxrX667sNnga8dcwFjqLF/hzAb75d+8cTXAE8fNQKCAQBOwdK4fgCdh3AvAF2t\n" + + "GD2oazGBnYATJl89IEkeduI7TUWOpwa5NEgxlHRv5qYQAp14/LEaWvG5avG6T/lM\n" + + "vGy6M/o98lSaeOaB8QWaZaFGxSa3mt1fnEka1YlcrLfmYsMGGVXoHa6td+lW5bZt\n" + + "rMKo9fHN7hpyu9gFxo9hWJBmCi15s0ak5udQGQX8Y7vGpxgZpz45983SbfSRqhrH\n" + + "Mm03gPl6ohjIJVmeb1GPswoccXOBwilWm3ZhKwKvC5f5IQVHTcfmbbDhHzXMsU/W\n" + + "MwQkNOVzjXk5YdBHRxoZzc3KbjH2BPCH3iK3tkRCWkSPix71sMflDms+BioEpzsO\n" + + "fP8hAoIBAQDRIYdr4pq0zP6HSHvzjDjBB4r0MQ1mX8d5Xp1GkkYmXGbGFHi+MfGQ\n" + + "Mj4vLGjz63LGrd5f+AgoYywZc9BWQo9iI3Y/eLWQi9BFzfgkV7jSGOibJk6AR72u\n" + + "DS0iLi5axtN0RZ1IGvJMeO43ph2GusBD7UPCkm+EarGoF7rBPdZ18BhhcHMlQu3S\n" + + "rAs6HTsPDSSmh6xxftQaHdmDXSN3MYEh88o/HXFoKhNAmBwV19pNVH8Rj6nziQX9\n" + + "gLZwn7apu33+SJJtDsxUH34juD8gyHNlb7LmItwufUkY8jQtfjUPzL2xF7Kxl0AL\n" + + "kx6i/LVqlI3bLZ/sI4kXlQgZaAUR2wCtAoIBAG/EIzsdMmM5ym7FX1KuQX1FsEFN\n" + + "avzwjnpzR4dp3m8/IS2yTKxyb83zHauox+1m6PnEU8yihvJkAt4JJSk9aC4PeZt+\n" + + "b3MV6lHGgN4Q5EjGGOwEDh4yfyCW6eJACyIc7eyySXA0W3dvnZChgp6zav/3rZr7\n" + + "mETaPxWH9AYy2U9KLzSYNPrYVnPz97vjwFLRAsWQCRhYm/4mU/53GCqsxW7lW1f1\n" + + "e6NqFGHNw6ubOl4p3lNX53mYp4Fn7YJNumAftEBd8r9LS7HYL7XUzYrrgt57WVMt\n" + + "8svUxDU98U1YCSNWAupd4se08tdprO5czULIEjCqS1we4J07hICeWEaxfss=\n" + + "-----END RSA PRIVATE KEY-----").getBytes()); + return priv; + } + } + + @Test(timeout = 60 * 1000) + public void testEncryptedRoundtrip() throws Exception { + + final Properties properties = new Properties(); + final String topic = TOPIC_ENCRYPTED; + properties.put(TOPIC_SINGLE_OPTION_KEY, topic); + final SimpleStringSchema schema = new SimpleStringSchema(); + final PulsarSerializationSchemaWrapper serializationSchema = new PulsarSerializationSchemaWrapper.Builder<>(schema) + .useSpecialMode(Schema.STRING) + .build(); + + CryptoKeyReader cryptoKeyReader = new MyCryptoKeyReader(); + + final FlinkPulsarSink sink = new FlinkPulsarSink.Builder() + .withAdminUrl(getAdminUrl()) + .withServiceUrl(getServiceUrl()) + .withDefaultTopicName(topic) + .withProperties(properties) + .withPulsarSerializationSchema(serializationSchema) + .withCryptoKeyReader(cryptoKeyReader) + .withEncryptionKeys("key1", "key2") + .build(); + + final FlinkPulsarSource src = new FlinkPulsarSource.Builder() + .withAdminUrl(getAdminUrl()) + .withServiceUrl(getServiceUrl()) + .withProperties(properties) + .withDeserializionSchema(schema) + .withCryptoKeyReader(cryptoKeyReader) + .build() + .setStartFromEarliest(); + + roundtrip(sink, src); + } + + private void roundtrip(FlinkPulsarSink sink, FlinkPulsarSource src) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + ArrayList input = new ArrayList<>(); + input.add("Hello"); + input.add("world"); + input.add("!"); + + final DataStream out = env + .fromCollection(input); + out.addSink(sink); + + final DataStream in = env.addSource(src); + in.addSink(new PrintSinkFunction<>()); + final Iterator iterator = in.executeAndCollect("Roundtrip Job"); + + final List results = new LinkedList<>(); + while (iterator.hasNext()){ + final String item = iterator.next(); + results.add(item); + assertTrue(input.containsAll(results)); + if (results.size() >= input.size()){ + // Happy! + break; + } + } + } + +} From 05e3b78da7682c4c6ea90b26b30074833de4536f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Objet=20Trouv=C3=A9?= Date: Wed, 15 Sep 2021 15:31:55 +0200 Subject: [PATCH 2/4] #386: Made c'tors private. --- .../flink/streaming/connectors/pulsar/FlinkPulsarSink.java | 2 +- .../flink/streaming/connectors/pulsar/FlinkPulsarSource.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSink.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSink.java index 0833da23..5d37aa81 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSink.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSink.java @@ -139,7 +139,7 @@ public FlinkPulsarSink build(){ private final PulsarSerializationSchema serializationSchema; - public FlinkPulsarSink(final Builder builder) { + private FlinkPulsarSink(final Builder builder) { super( new FlinkPulsarSinkBase.Config() .withAdminUrl(builder.adminUrl) diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java index 30cebfbf..1933d953 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java @@ -302,7 +302,7 @@ public FlinkPulsarSource build(){ private final CryptoKeyReader cryptoKeyReader; - public FlinkPulsarSource(final Builder builder) { + private FlinkPulsarSource(final Builder builder) { this.adminUrl = checkNotNull(builder.adminUrl); this.clientConfigurationData = checkNotNull(builder.clientConf); this.deserializer = builder.deserializer; From 50b9e9b0a8291bbbe34872bd677f0223ad2b545e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Objet=20Trouv=C3=A9?= Date: Wed, 15 Sep 2021 16:26:37 +0200 Subject: [PATCH 3/4] #386: Restored private API c'tors. Turned out builder pattern wasn't necessary here. There's one path now for the CryptoKeyReader (plus the encryption keys) and possibly others where it's null (and the keys are empty). --- .../connectors/pulsar/FlinkPulsarSink.java | 16 +- .../pulsar/FlinkPulsarSinkBase.java | 124 ++--------- .../connectors/pulsar/FlinkPulsarSource.java | 41 ++-- .../pulsar/internal/PulsarFetcher.java | 204 +++--------------- .../pulsar/internal/ReaderThread.java | 100 +-------- 5 files changed, 88 insertions(+), 397 deletions(-) diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSink.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSink.java index 5d37aa81..dede0aed 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSink.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSink.java @@ -107,6 +107,10 @@ public Builder withEncryptionKeys(String... encryptionKeys) { return this; } + private Optional getDefaultTopicName() { + return Optional.ofNullable(defaultTopicName); + } + public FlinkPulsarSink build(){ if (adminUrl == null) { throw new IllegalStateException("Admin URL must be set."); @@ -140,17 +144,7 @@ public FlinkPulsarSink build(){ private final PulsarSerializationSchema serializationSchema; private FlinkPulsarSink(final Builder builder) { - super( - new FlinkPulsarSinkBase.Config() - .withAdminUrl(builder.adminUrl) - .withDefaultTopicName(builder.defaultTopicName) - .withClientConf(builder.clientConf) - .withProperties(builder.properties) - .withSerializationSchema(builder.serializationSchema) - .withMessageRouter(builder.messageRouter) - .withSemantic(builder.semantic) - .withCryptoKeyReader(builder.cryptoKeyReader) - .withEncryptionKeys(builder.encryptionKeys)); + super(builder.adminUrl, builder.getDefaultTopicName(), builder.clientConf, builder.properties, builder.serializationSchema, builder.messageRouter, builder.semantic, builder.cryptoKeyReader, builder.encryptionKeys); this.serializationSchema = builder.serializationSchema; } diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java index 78932108..5f03f2d4 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java @@ -89,64 +89,6 @@ @Slf4j abstract class FlinkPulsarSinkBase extends TwoPhaseCommitSinkFunction, Void> implements CheckpointedFunction { - public static class Config { - private String adminUrl; - private Optional defaultTopicName; - private ClientConfigurationData clientConf; - private Properties properties; - private PulsarSerializationSchema serializationSchema; - private MessageRouter messageRouter; - private PulsarSinkSemantic semantic = PulsarSinkSemantic.AT_LEAST_ONCE; - private CryptoKeyReader cryptoKeyReader; - private Set encryptionKeys = new HashSet<>(); - - public Config withAdminUrl(final String adminUrl) { - this.adminUrl = adminUrl; - return this; - } - - public Config withDefaultTopicName(final String defaultTopicName) { - this.defaultTopicName = Optional.ofNullable(defaultTopicName); - return this; - } - - public Config withClientConf(ClientConfigurationData clientConf) { - this.clientConf = clientConf; - return this; - } - - public Config withProperties(final Properties properties) { - this.properties = properties; - return this; - } - - public Config withSerializationSchema(final PulsarSerializationSchema serializationSchema) { - this.serializationSchema = serializationSchema; - return this; - } - - public Config withMessageRouter(final MessageRouter messageRouter) { - this.messageRouter = messageRouter; - return this; - } - - public Config withSemantic(final PulsarSinkSemantic semantic) { - this.semantic = semantic; - return this; - } - - public Config withCryptoKeyReader(final CryptoKeyReader cryptoKeyReader) { - this.cryptoKeyReader = cryptoKeyReader; - return this; - } - - public Config withEncryptionKeys(final Set encryptionKeys) { - this.encryptionKeys = encryptionKeys; - return this; - } - - } - protected String adminUrl; protected ClientConfigurationData clientConfigurationData; @@ -215,32 +157,39 @@ public FlinkPulsarSinkBase( Properties properties, PulsarSerializationSchema serializationSchema, MessageRouter messageRouter) { - this(adminUrl, defaultTopicName, clientConf, properties, serializationSchema, messageRouter, PulsarSinkSemantic.AT_LEAST_ONCE); + this(adminUrl, defaultTopicName, clientConf, properties, serializationSchema, messageRouter, PulsarSinkSemantic.AT_LEAST_ONCE, null, new HashSet<>()); } - public FlinkPulsarSinkBase(final Config config) { + public FlinkPulsarSinkBase( + String adminUrl, + Optional defaultTopicName, + ClientConfigurationData clientConf, + Properties properties, + PulsarSerializationSchema serializationSchema, + MessageRouter messageRouter, + PulsarSinkSemantic semantic, + final CryptoKeyReader cryptoKeyReader, + final Set encryptionKeys) { super(new TransactionStateSerializer(), VoidSerializer.INSTANCE); - this.adminUrl = checkNotNull(config.adminUrl); - this.semantic = config.semantic; - this.cryptoKeyReader = config.cryptoKeyReader; - this.encryptionKeys = config.encryptionKeys; + this.adminUrl = checkNotNull(adminUrl); + this.semantic = semantic; - if (config.defaultTopicName.isPresent()) { + if (defaultTopicName.isPresent()) { this.forcedTopic = true; - this.defaultTopic = config.defaultTopicName.get(); + this.defaultTopic = defaultTopicName.get(); } else { this.forcedTopic = false; this.defaultTopic = null; } - this.serializationSchema = config.serializationSchema; + this.serializationSchema = serializationSchema; - this.messageRouter = config.messageRouter; + this.messageRouter = messageRouter; - this.clientConfigurationData = config.clientConf; + this.clientConfigurationData = clientConf; - this.properties = checkNotNull(config.properties); + this.properties = checkNotNull(properties); this.caseInsensitiveParams = SourceSinkUtils.toCaceInsensitiveParams(Maps.fromProperties(properties)); @@ -274,40 +223,9 @@ public FlinkPulsarSinkBase(final Config config) { if (this.clientConfigurationData.getServiceUrl() == null) { throw new IllegalArgumentException("ServiceUrl must be supplied in the client configuration"); } - } - public FlinkPulsarSinkBase( - String adminUrl, - Optional defaultTopicName, - ClientConfigurationData clientConf, - Properties properties, - PulsarSerializationSchema serializationSchema, - MessageRouter messageRouter, - PulsarSinkSemantic semantic) { - this(new Config() - .withAdminUrl(adminUrl) - .withDefaultTopicName(defaultTopicName.orElse(null)) - .withClientConf(clientConf) - .withProperties(properties) - .withSerializationSchema(serializationSchema) - .withMessageRouter(messageRouter) - .withSemantic(semantic) - ); - } - - public FlinkPulsarSinkBase( - String serviceUrl, - String adminUrl, - Optional defaultTopicName, - Properties properties, - PulsarSerializationSchema serializationSchema, - MessageRouter messageRouter) { - this(adminUrl, - defaultTopicName, - PulsarClientUtils.newClientConf(checkNotNull(serviceUrl), properties), - properties, - serializationSchema, - messageRouter); + this.cryptoKeyReader = cryptoKeyReader; + this.encryptionKeys = encryptionKeys; } @Override diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java index 1933d953..a1709d32 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java @@ -677,25 +677,28 @@ protected PulsarFetcher createFetcher( StreamingRuntimeContext streamingRuntime, boolean useMetrics, Set excludeStartMessageIds) throws Exception { - return new PulsarFetcher.Builder() - .withSourceContext(sourceContext) - .withSeedTopicsWithInitialOffsets(seedTopicsWithInitialOffsets) - .withExcludeStartMessageIds(excludeStartMessageIds) - .withWatermarkStrategy(watermarkStrategy) - .withProcessingTimeProvider(processingTimeProvider) - .withAutoWatermarkInterval(autoWatermarkInterval) - .withUserCodeClassLoader(userCodeClassLoader) - .withRuntimeContext(streamingRuntime) - .withClientConf(clientConfigurationData) - .withReaderConf(readerConf) - .withPollTimeoutMs(pollTimeoutMs) - .withCommitMaxRetries(commitMaxRetries) - .withDeserializer(deserializer) - .withMetadataReader(metadataReader) - .withConsumerMetricGroup(streamingRuntime.getMetricGroup().addGroup(PULSAR_SOURCE_METRICS_GROUP)) - .withUseMetrics(useMetrics) - .withCryptoKeyReader(cryptoKeyReader) - .build(); + + //readerConf.putIfAbsent(PulsarOptions.SUBSCRIPTION_ROLE_OPTION_KEY, getSubscriptionName()); + + return new PulsarFetcher<>( + sourceContext, + seedTopicsWithInitialOffsets, + excludeStartMessageIds, + watermarkStrategy, + processingTimeProvider, + autoWatermarkInterval, + userCodeClassLoader, + streamingRuntime, + clientConfigurationData, + readerConf, + pollTimeoutMs, + commitMaxRetries, + deserializer, + metadataReader, + streamingRuntime.getMetricGroup().addGroup(PULSAR_SOURCE_METRICS_GROUP), + useMetrics, + cryptoKeyReader + ); } public void joinDiscoveryLoopThread() throws InterruptedException { diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarFetcher.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarFetcher.java index 0e69575d..721eb880 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarFetcher.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarFetcher.java @@ -56,119 +56,6 @@ @Slf4j public class PulsarFetcher { - public static class Builder { - private SourceContext sourceContext; - private Map seedTopicsWithInitialOffsets; - private SerializedValue> watermarkStrategy; - private ProcessingTimeService processingTimeProvider; - private long autoWatermarkInterval; - private ClassLoader userCodeClassLoader; - private StreamingRuntimeContext runtimeContext; - private ClientConfigurationData clientConf; - private Map readerConf; - private int pollTimeoutMs; - private PulsarDeserializationSchema deserializer; - private PulsarMetadataReader metadataReader; - private MetricGroup consumerMetricGroup; - private boolean useMetrics; - private Set excludeStartMessageIds = Collections.emptySet(); - private int commitMaxRetries = 3; - private CryptoKeyReader cryptoKeyReader; - - public Builder withSourceContext(final SourceContext sourceContext) { - this.sourceContext = sourceContext; - return this; - } - - public Builder withSeedTopicsWithInitialOffsets(final Map seedTopicsWithInitialOffsets) { - this.seedTopicsWithInitialOffsets = seedTopicsWithInitialOffsets; - return this; - } - - public Builder withWatermarkStrategy(final SerializedValue> watermarkStrategy) { - this.watermarkStrategy = watermarkStrategy; - return this; - } - - public Builder withProcessingTimeProvider(final ProcessingTimeService processingTimeProvider) { - this.processingTimeProvider = processingTimeProvider; - return this; - } - - public Builder withAutoWatermarkInterval(final long autoWatermarkInterval) { - this.autoWatermarkInterval = autoWatermarkInterval; - return this; - } - - public Builder withUserCodeClassLoader(final ClassLoader userCodeClassLoader) { - this.userCodeClassLoader = userCodeClassLoader; - return this; - } - - public Builder withRuntimeContext(final StreamingRuntimeContext runtimeContext) { - this.runtimeContext = runtimeContext; - return this; - } - - public Builder withClientConf(final ClientConfigurationData clientConf) { - this.clientConf = clientConf; - return this; - } - - public Builder withReaderConf(final Map readerConf) { - this.readerConf = readerConf; - return this; - } - - public Builder withPollTimeoutMs(final int pollTimeoutMs) { - this.pollTimeoutMs = pollTimeoutMs; - return this; - } - - public Builder withDeserializer(final PulsarDeserializationSchema deserializer) { - this.deserializer = deserializer; - return this; - } - - public Builder withMetadataReader(final PulsarMetadataReader metadataReader) { - this.metadataReader = metadataReader; - return this; - } - - public Builder withConsumerMetricGroup(final MetricGroup consumerMetricGroup) { - this.consumerMetricGroup = consumerMetricGroup; - return this; - } - - public Builder withUseMetrics(final boolean useMetrics) { - this.useMetrics = useMetrics; - return this; - } - - public Builder withExcludeStartMessageIds(final Set excludeStartMessageIds) { - this.excludeStartMessageIds = excludeStartMessageIds; - return this; - } - - public Builder withCommitMaxRetries(final int commitMaxRetries) { - this.commitMaxRetries = commitMaxRetries; - return this; - } - - public Builder withCryptoKeyReader(final CryptoKeyReader cryptoKeyReader) { - this.cryptoKeyReader = cryptoKeyReader; - return this; - } - - public PulsarFetcher build() throws Exception { - return new PulsarFetcher(this); - } - } - - public static Builder builder(){ - return new Builder<>(); - } - private static final int NO_TIMESTAMPS_WATERMARKS = 0; private static final int WITH_WATERMARK_GENERATOR = 1; @@ -293,7 +180,8 @@ public PulsarFetcher( deserializer, metadataReader, consumerMetricGroup, - useMetrics + useMetrics, + null ); } @@ -313,53 +201,31 @@ public PulsarFetcher( PulsarDeserializationSchema deserializer, PulsarMetadataReader metadataReader, MetricGroup consumerMetricGroup, - boolean useMetrics) throws Exception { - - this(new Builder() - .withSourceContext(sourceContext) - .withUseMetrics(useMetrics) - .withConsumerMetricGroup(consumerMetricGroup) - .withSeedTopicsWithInitialOffsets(seedTopicsWithInitialOffsets) - .withExcludeStartMessageIds(excludeStartMessageIds) - .withUserCodeClassLoader(userCodeClassLoader) - .withRuntimeContext(runtimeContext) - .withClientConf(clientConf) - .withReaderConf(readerConf) - .withPollTimeoutMs(pollTimeoutMs) - .withCommitMaxRetries(commitMaxRetries) - .withDeserializer(deserializer) - .withMetadataReader(metadataReader) - .withWatermarkStrategy(watermarkStrategy) - .withProcessingTimeProvider(processingTimeProvider) - .withAutoWatermarkInterval(autoWatermarkInterval) - ); - - } + boolean useMetrics, + final CryptoKeyReader cryptoKeyReader) throws Exception { - private PulsarFetcher(final Builder builder) throws Exception { - - this.sourceContext = builder.sourceContext; + this.sourceContext = sourceContext; this.watermarkOutput = new SourceContextWatermarkOutputAdapter<>(sourceContext); this.watermarkOutputMultiplexer = new WatermarkOutputMultiplexer(watermarkOutput); - this.useMetrics = builder.useMetrics; - this.consumerMetricGroup = checkNotNull(builder.consumerMetricGroup); - this.seedTopicsWithInitialOffsets = builder.seedTopicsWithInitialOffsets; - this.excludeStartMessageIds = builder.excludeStartMessageIds; + this.useMetrics = useMetrics; + this.consumerMetricGroup = checkNotNull(consumerMetricGroup); + this.seedTopicsWithInitialOffsets = seedTopicsWithInitialOffsets; + this.excludeStartMessageIds = excludeStartMessageIds; this.checkpointLock = sourceContext.getCheckpointLock(); - this.userCodeClassLoader = builder.userCodeClassLoader; - this.runtimeContext = builder.runtimeContext; - this.clientConf = builder.clientConf; - this.readerConf = builder.readerConf == null ? new HashMap<>() : builder.readerConf; + this.userCodeClassLoader = userCodeClassLoader; + this.runtimeContext = runtimeContext; + this.clientConf = clientConf; + this.readerConf = readerConf == null ? new HashMap<>() : readerConf; this.failOnDataLoss = SourceSinkUtils.getFailOnDataLossAndRemoveKey(this.readerConf); this.useEarliestWhenDataLoss = SourceSinkUtils.getUseEarliestWhenDataLossAndRemoveKey(this.readerConf); - this.pollTimeoutMs = builder.pollTimeoutMs; - this.commitMaxRetries = builder.commitMaxRetries; - this.deserializer = builder.deserializer; - this.metadataReader = builder.metadataReader; - this.cryptoKeyReader = builder.cryptoKeyReader; + this.pollTimeoutMs = pollTimeoutMs; + this.commitMaxRetries = commitMaxRetries; + this.deserializer = deserializer; + this.metadataReader = metadataReader; + this.cryptoKeyReader = cryptoKeyReader; // figure out what we watermark mode we will be using - this.watermarkStrategy = builder.watermarkStrategy; + this.watermarkStrategy = watermarkStrategy; if (watermarkStrategy == null) { timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS; @@ -394,13 +260,13 @@ private PulsarFetcher(final Builder builder) throws Exception { } // if we have periodic watermarks, kick off the interval scheduler - if (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && builder.autoWatermarkInterval > 0) { + if (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && autoWatermarkInterval > 0) { PeriodicWatermarkEmitter periodicEmitter = new PeriodicWatermarkEmitter<>( checkpointLock, subscribedPartitionStates, watermarkOutputMultiplexer, - builder.processingTimeProvider, - builder.autoWatermarkInterval); + processingTimeProvider, + autoWatermarkInterval); periodicEmitter.start(); } @@ -673,20 +539,18 @@ protected List> getSubscribedTopicStates() { } protected ReaderThread createReaderThread(ExceptionProxy exceptionProxy, PulsarTopicState state) { - - return new ReaderThread.Builder() - .withOwner(this) - .withState(state) - .withClientConf(clientConf) - .withReaderConf(readerConf) - .withDeserializer(deserializer) - .withPollTimeoutMs(pollTimeoutMs) - .withExceptionProxy(exceptionProxy) - .withFailOnDataLoss(failOnDataLoss) - .withUseEarliestWhenDataLoss(useEarliestWhenDataLoss) - .withExcludeMessageId(excludeStartMessageIds.contains(state.getTopicRange())) - .withCryptoKeyReader(cryptoKeyReader) - .build(); + return new ReaderThread<>( + this, + state, + clientConf, + readerConf, + deserializer, + pollTimeoutMs, + exceptionProxy, + failOnDataLoss, + useEarliestWhenDataLoss, + excludeStartMessageIds.contains(state.getTopicRange()), + cryptoKeyReader); } /** diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.java index e027bf8a..7ae4f569 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.java @@ -40,80 +40,6 @@ @Slf4j public class ReaderThread extends Thread { - public static class Builder { - private PulsarFetcher owner; - private PulsarTopicState state; - private ClientConfigurationData clientConf; - private Map readerConf; - private PulsarDeserializationSchema deserializer; - private int pollTimeoutMs; - private ExceptionProxy exceptionProxy; - private boolean failOnDataLoss; - private boolean useEarliestWhenDataLoss; - private boolean excludeMessageId; - private CryptoKeyReader cryptoKeyReader; - - public Builder withOwner(final PulsarFetcher owner) { - this.owner = owner; - return this; - } - - public Builder withState(final PulsarTopicState state) { - this.state = state; - return this; - } - - public Builder withClientConf(final ClientConfigurationData clientConf) { - this.clientConf = clientConf; - return this; - } - - public Builder withReaderConf(final Map readerConf) { - this.readerConf = readerConf; - return this; - } - - public Builder withDeserializer(final PulsarDeserializationSchema deserializer) { - this.deserializer = deserializer; - return this; - } - - public Builder withPollTimeoutMs(final int pollTimeoutMs) { - this.pollTimeoutMs = pollTimeoutMs; - return this; - } - - public Builder withExceptionProxy(final ExceptionProxy exceptionProxy) { - this.exceptionProxy = exceptionProxy; - return this; - } - - public Builder withFailOnDataLoss(final boolean failOnDataLoss) { - this.failOnDataLoss = failOnDataLoss; - return this; - } - - public Builder withUseEarliestWhenDataLoss(final boolean useEarliestWhenDataLoss) { - this.useEarliestWhenDataLoss = useEarliestWhenDataLoss; - return this; - } - - public Builder withExcludeMessageId(final boolean excludeMessageId) { - this.excludeMessageId = excludeMessageId; - return this; - } - - public Builder withCryptoKeyReader(final CryptoKeyReader cryptoKeyReader) { - this.cryptoKeyReader = cryptoKeyReader; - return this; - } - - public ReaderThread build(){ - return new ReaderThread<>(this); - } - - } - protected final PulsarFetcher owner; protected final PulsarTopicState state; protected final ClientConfigurationData clientConf; @@ -142,7 +68,8 @@ public ReaderThread( Map readerConf, PulsarDeserializationSchema deserializer, int pollTimeoutMs, - ExceptionProxy exceptionProxy) { + ExceptionProxy exceptionProxy, + final CryptoKeyReader cryptoKeyReader) { this.owner = owner; this.state = state; this.clientConf = clientConf; @@ -153,7 +80,7 @@ public ReaderThread( this.topicRange = state.getTopicRange(); this.startMessageId = state.getOffset(); - this.cryptoKeyReader = null; + this.cryptoKeyReader = cryptoKeyReader; } public ReaderThread( @@ -166,29 +93,14 @@ public ReaderThread( ExceptionProxy exceptionProxy, boolean failOnDataLoss, boolean useEarliestWhenDataLoss, - boolean excludeMessageId) { - this(owner, state, clientConf, readerConf, deserializer, pollTimeoutMs, exceptionProxy); + boolean excludeMessageId, + final CryptoKeyReader cryptoKeyReader) { + this(owner, state, clientConf, readerConf, deserializer, pollTimeoutMs, exceptionProxy, cryptoKeyReader); this.failOnDataLoss = failOnDataLoss; this.useEarliestWhenDataLoss = useEarliestWhenDataLoss; this.excludeMessageId = excludeMessageId; } - private ReaderThread(final Builder builder){ - this.owner = builder.owner; - this.state = builder.state; - this.clientConf = builder.clientConf; - this.readerConf = builder.readerConf; - this.deserializer = builder.deserializer; - this.pollTimeoutMs = builder.pollTimeoutMs; - this.exceptionProxy = builder.exceptionProxy; - this.topicRange = state.getTopicRange(); - this.startMessageId = state.getOffset(); - this.failOnDataLoss = builder.failOnDataLoss; - this.useEarliestWhenDataLoss = builder.useEarliestWhenDataLoss; - this.excludeMessageId = builder.excludeMessageId; - this.cryptoKeyReader = builder.cryptoKeyReader; - } - @Override public void run() { log.info("Starting to fetch from {} at {}, failOnDataLoss {}", topicRange, startMessageId, failOnDataLoss); From a0b27cade489518a1992664d2b848a07b60f872f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Objet=20Trouv=C3=A9?= Date: Wed, 15 Sep 2021 16:41:34 +0200 Subject: [PATCH 4/4] #386: Check serializability of crypto key reader and encryption keys. --- .../flink/streaming/connectors/pulsar/FlinkPulsarSink.java | 4 ++++ .../flink/streaming/connectors/pulsar/FlinkPulsarSource.java | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSink.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSink.java index dede0aed..2a2c0523 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSink.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSink.java @@ -35,7 +35,9 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; +import static org.apache.flink.util.InstantiationUtil.isSerializable; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** * Write data to Flink. @@ -136,6 +138,8 @@ public FlinkPulsarSink build(){ if ((cryptoKeyReader == null) != (encryptionKeys.isEmpty())){ throw new IllegalStateException("Set crypto key reader and encryption keys in conjunction."); } + checkState(isSerializable(cryptoKeyReader)); + checkState(isSerializable(encryptionKeys)); return new FlinkPulsarSink<>(this); } diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java index a1709d32..e02305eb 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java @@ -88,8 +88,11 @@ import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.COMMITS_FAILED_METRICS_COUNTER; import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.COMMITS_SUCCEEDED_METRICS_COUNTER; import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.PULSAR_SOURCE_METRICS_GROUP; +import static org.apache.flink.util.InstantiationUtil.isSerializable; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + /** * Pulsar data source. @@ -168,6 +171,7 @@ public FlinkPulsarSource build(){ if (clientConf == null){ throw new IllegalStateException("Client conf mustn't be null. Either provide a client conf or a service URL plus properties."); } + checkState(isSerializable(cryptoKeyReader)); return new FlinkPulsarSource<>(this); }