Skip to content
This repository has been archived by the owner on Dec 14, 2022. It is now read-only.

#386: Implemented passing of CryptoKeyReader. #387

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@
import org.apache.flink.streaming.util.serialization.PulsarSerializationSchema;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -41,8 +45,115 @@
@Slf4j
public class FlinkPulsarSink<T> extends FlinkPulsarSinkBase<T> {

public static class Builder<T> {
private String adminUrl;
private String defaultTopicName;
private ClientConfigurationData clientConf;
private Properties properties;
private PulsarSerializationSchema<T> serializationSchema;
private MessageRouter messageRouter = null;
private PulsarSinkSemantic semantic = PulsarSinkSemantic.AT_LEAST_ONCE;
private String serviceUrl;
private CryptoKeyReader cryptoKeyReader;
private final Set<String> encryptionKeys = new HashSet<>();

public Builder<T> withAdminUrl(final String adminUrl) {
this.adminUrl = adminUrl;
return this;
}

public Builder<T> withDefaultTopicName(final String defaultTopicName) {
this.defaultTopicName = defaultTopicName;
return this;
}

public Builder<T> withClientConf(final ClientConfigurationData clientConf) {
this.clientConf = clientConf;
return this;
}

public Builder<T> withProperties(final Properties properties) {
this.properties = properties;
return this;
}

public Builder<T> withPulsarSerializationSchema(final PulsarSerializationSchema<T> serializationSchema) {
this.serializationSchema = serializationSchema;
return this;
}

public Builder<T> withMessageRouter(final MessageRouter messageRouter) {
this.messageRouter = messageRouter;
return this;
}

public Builder<T> withSemantic(final PulsarSinkSemantic semantic) {
this.semantic = semantic;
return this;
}

public Builder<T> withServiceUrl(final String serviceUrl) {
this.serviceUrl = serviceUrl;
return this;
}

public Builder<T> withCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
this.cryptoKeyReader = cryptoKeyReader;
return this;
}

public Builder<T> withEncryptionKeys(String... encryptionKeys) {
this.encryptionKeys.addAll(Arrays.asList(encryptionKeys));
return this;
}

public FlinkPulsarSink<T> build(){
if (adminUrl == null) {
throw new IllegalStateException("Admin URL must be set.");
}
if (serializationSchema == null) {
throw new IllegalStateException("Serialization schema must be set.");
}
if (semantic == null) {
throw new IllegalStateException("Semantic must be set.");
}
if (properties == null) {
throw new IllegalStateException("Properties must be set.");
}
if (serviceUrl != null && clientConf != null) {
throw new IllegalStateException("Set either client conf or service URL but not both.");
}
if (serviceUrl != null){
clientConf = PulsarClientUtils.newClientConf(checkNotNull(serviceUrl), properties);
}
if (clientConf == null){
throw new IllegalStateException("Client conf must be set.");
}
if ((cryptoKeyReader == null) != (encryptionKeys.isEmpty())){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cryptoKeyReader requires extra serialization check. Use Preconditions.checkState(InstantiationUtil.isSerializable(cryptoKeyReader))

Copy link
Author

@objecttrouve objecttrouve Aug 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, will do.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the check.

throw new IllegalStateException("Set crypto key reader and encryption keys in conjunction.");
}
return new FlinkPulsarSink<>(this);
}

}

private final PulsarSerializationSchema<T> serializationSchema;

public FlinkPulsarSink(final Builder<T> builder) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should be public for a Builder pattern.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely. Must have missed it. Will fix.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made c'tors private.

super(
new FlinkPulsarSinkBase.Config<T>()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be boilerplate. Why not simply use ctor? Since it's a builder pattern. The end user would never touch this ctor.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The end user would never touch this ctor.

I wasn't sure about which classes an end user could touch. Therefore, I tried hard not to change the public c'tors plus not to add more than absolutely necessary. But if the class is just for internal use, right, it's unnecessary boilerplate. Will change it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Back to the old c'tor.

.withAdminUrl(builder.adminUrl)
.withDefaultTopicName(builder.defaultTopicName)
.withClientConf(builder.clientConf)
.withProperties(builder.properties)
.withSerializationSchema(builder.serializationSchema)
.withMessageRouter(builder.messageRouter)
.withSemantic(builder.semantic)
.withCryptoKeyReader(builder.cryptoKeyReader)
.withEncryptionKeys(builder.encryptionKeys));
this.serializationSchema = builder.serializationSchema;
}

public FlinkPulsarSink(
String adminUrl,
Optional<String> defaultTopicName,
Expand All @@ -51,9 +162,14 @@ public FlinkPulsarSink(
PulsarSerializationSchema serializationSchema,
MessageRouter messageRouter,
PulsarSinkSemantic semantic) {

super(adminUrl, defaultTopicName, clientConf, properties, serializationSchema, messageRouter, semantic);
this.serializationSchema = serializationSchema;
this(new Builder<T>()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor old ctor doen't look like a good choice.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal was basically to delegate as much as possible to a single c'tor that contains the interesting logic, instead of repeating variants of the logic inside the c'tor all over the place.
But to be honest, I don't mind very much. So I'll just revert to the old c'tor.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Back to the old c'tor.

.withAdminUrl(adminUrl)
.withDefaultTopicName(defaultTopicName.orElse(null))
.withClientConf(clientConf)
.withProperties(properties)
.withPulsarSerializationSchema(serializationSchema)
.withMessageRouter(messageRouter)
.withSemantic(semantic));
}

public FlinkPulsarSink(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
Expand All @@ -62,10 +63,12 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
Expand All @@ -86,6 +89,64 @@
@Slf4j
abstract class FlinkPulsarSinkBase<T> extends TwoPhaseCommitSinkFunction<T, FlinkPulsarSinkBase.PulsarTransactionState<T>, Void> implements CheckpointedFunction {

public static class Config<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need builder for this base class?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much the same reasoning as implemented throughout. A builder or config object makes it easier to add parameters in the future, without breaking an API or resulting in more and more overloaded c'tors. I have to admit I'm not sure which classes exactly were intended strictly for private purposes, so I just applied the pattern all over the place.
(Knowing that builders and config objects imply a lot of boilerplate.)

If overloading c'tors or changing the API of the class is OK, I'm completely fine with it though. So I'll change this piece, too.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Back to the old c'tor.

private String adminUrl;
private Optional<String> defaultTopicName;
private ClientConfigurationData clientConf;
private Properties properties;
private PulsarSerializationSchema<T> serializationSchema;
private MessageRouter messageRouter;
private PulsarSinkSemantic semantic = PulsarSinkSemantic.AT_LEAST_ONCE;
private CryptoKeyReader cryptoKeyReader;
private Set<String> encryptionKeys = new HashSet<>();

public Config<T> withAdminUrl(final String adminUrl) {
this.adminUrl = adminUrl;
return this;
}

public Config<T> withDefaultTopicName(final String defaultTopicName) {
this.defaultTopicName = Optional.ofNullable(defaultTopicName);
return this;
}

public Config<T> withClientConf(ClientConfigurationData clientConf) {
this.clientConf = clientConf;
return this;
}

public Config<T> withProperties(final Properties properties) {
this.properties = properties;
return this;
}

public Config<T> withSerializationSchema(final PulsarSerializationSchema<T> serializationSchema) {
this.serializationSchema = serializationSchema;
return this;
}

public Config<T> withMessageRouter(final MessageRouter messageRouter) {
this.messageRouter = messageRouter;
return this;
}

public Config<T> withSemantic(final PulsarSinkSemantic semantic) {
this.semantic = semantic;
return this;
}

public Config<T> withCryptoKeyReader(final CryptoKeyReader cryptoKeyReader) {
this.cryptoKeyReader = cryptoKeyReader;
return this;
}

public Config<T> withEncryptionKeys(final Set<String> encryptionKeys) {
this.encryptionKeys = encryptionKeys;
return this;
}

}

protected String adminUrl;

protected ClientConfigurationData clientConfigurationData;
Expand Down Expand Up @@ -143,6 +204,10 @@ abstract class FlinkPulsarSinkBase<T> extends TwoPhaseCommitSinkFunction<T, Flin

protected transient Map<String, Producer<T>> topic2Producer;

private final CryptoKeyReader cryptoKeyReader;

private final Set<String> encryptionKeys;

public FlinkPulsarSinkBase(
String adminUrl,
Optional<String> defaultTopicName,
Expand All @@ -153,34 +218,29 @@ public FlinkPulsarSinkBase(
this(adminUrl, defaultTopicName, clientConf, properties, serializationSchema, messageRouter, PulsarSinkSemantic.AT_LEAST_ONCE);
}

public FlinkPulsarSinkBase(
String adminUrl,
Optional<String> defaultTopicName,
ClientConfigurationData clientConf,
Properties properties,
PulsarSerializationSchema<T> serializationSchema,
MessageRouter messageRouter,
PulsarSinkSemantic semantic) {
public FlinkPulsarSinkBase(final Config<T> config) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may just keep this ctor.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much the same as above, I'll change it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Back to the old c'tor.

super(new TransactionStateSerializer(), VoidSerializer.INSTANCE);

this.adminUrl = checkNotNull(adminUrl);
this.semantic = semantic;
this.adminUrl = checkNotNull(config.adminUrl);
this.semantic = config.semantic;
this.cryptoKeyReader = config.cryptoKeyReader;
this.encryptionKeys = config.encryptionKeys;

if (defaultTopicName.isPresent()) {
if (config.defaultTopicName.isPresent()) {
this.forcedTopic = true;
this.defaultTopic = defaultTopicName.get();
this.defaultTopic = config.defaultTopicName.get();
} else {
this.forcedTopic = false;
this.defaultTopic = null;
}

this.serializationSchema = serializationSchema;
this.serializationSchema = config.serializationSchema;

this.messageRouter = messageRouter;
this.messageRouter = config.messageRouter;

this.clientConfigurationData = clientConf;
this.clientConfigurationData = config.clientConf;

this.properties = checkNotNull(properties);
this.properties = checkNotNull(config.properties);

this.caseInsensitiveParams =
SourceSinkUtils.toCaceInsensitiveParams(Maps.fromProperties(properties));
Expand Down Expand Up @@ -216,6 +276,25 @@ public FlinkPulsarSinkBase(
}
}

public FlinkPulsarSinkBase(
String adminUrl,
Optional<String> defaultTopicName,
ClientConfigurationData clientConf,
Properties properties,
PulsarSerializationSchema<T> serializationSchema,
MessageRouter messageRouter,
PulsarSinkSemantic semantic) {
this(new Config<T>()
.withAdminUrl(adminUrl)
.withDefaultTopicName(defaultTopicName.orElse(null))
.withClientConf(clientConf)
.withProperties(properties)
.withSerializationSchema(serializationSchema)
.withMessageRouter(messageRouter)
.withSemantic(semantic)
);
}

public FlinkPulsarSinkBase(
String serviceUrl,
String adminUrl,
Expand Down Expand Up @@ -340,6 +419,12 @@ protected Producer<T> createProducer(
// maximizing the throughput
.batchingMaxBytes(5 * 1024 * 1024)
.loadConf(producerConf);
if (cryptoKeyReader != null){
builder.cryptoKeyReader(cryptoKeyReader);
for (final String encryptionKey : this.encryptionKeys) {
builder.addEncryptionKey(encryptionKey);
}
}
if (messageRouter == null) {
return builder.create();
} else {
Expand Down
Loading