Skip to content

Commit

Permalink
[producer] Allow online producer users to specify underlying VeniceWr…
Browse files Browse the repository at this point in the history
…iter configs (#576)

This commit allows the users of online producer to specify arbitrary configs for the underlying VeniceWriter
  • Loading branch information
nisargthakkar authored Aug 9, 2023
1 parent a73b77c commit 380729f
Showing 1 changed file with 1 addition and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
import static com.linkedin.venice.ConfigKeys.CLIENT_PRODUCER_THREAD_NUM;
import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.ConfigKeys.KAFKA_OVER_SSL;
import static com.linkedin.venice.ConfigKeys.KAFKA_SECURITY_PROTOCOL;
import static com.linkedin.venice.ConfigKeys.SSL_KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.writer.VeniceWriter.APP_DEFAULT_LOGICAL_TS;

import com.linkedin.venice.SSLConfig;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.partitioner.VenicePartitioner;
Expand Down Expand Up @@ -111,13 +109,11 @@ protected void configure(
}

private VeniceWriter<byte[], byte[], byte[]> getVeniceWriter(VersionCreationResponse versionCreationResponse) {
Properties writerProps = new Properties();
Properties writerProps = producerConfigs.getPropertiesCopy();

if (versionCreationResponse.isEnableSSL()) {
writerProps.put(KAFKA_OVER_SSL, "true");
writerProps.put(SSL_KAFKA_BOOTSTRAP_SERVERS, versionCreationResponse.getKafkaBootstrapServers());
writerProps.put(KAFKA_SECURITY_PROTOCOL, producerConfigs.getString(KAFKA_SECURITY_PROTOCOL));
writerProps.putAll(new SSLConfig(producerConfigs).getKafkaSSLConfig());
} else {
writerProps.put(KAFKA_BOOTSTRAP_SERVERS, versionCreationResponse.getKafkaBootstrapServers());
}
Expand Down

0 comments on commit 380729f

Please sign in to comment.