From 380729fefb3b1478b281e90e09d8c43108d4888b Mon Sep 17 00:00:00 2001 From: Nisarg Thakkar Date: Wed, 9 Aug 2023 11:40:35 -0700 Subject: [PATCH] [producer] Allow online producer users to specify underlying VeniceWriter configs (#576) This commit allows the users of online producer to specify arbitrary configs for the underlying VeniceWriter --- .../linkedin/venice/producer/AbstractVeniceProducer.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/clients/venice-producer/src/main/java/com/linkedin/venice/producer/AbstractVeniceProducer.java b/clients/venice-producer/src/main/java/com/linkedin/venice/producer/AbstractVeniceProducer.java index c451a53375..0c4f0f6f69 100644 --- a/clients/venice-producer/src/main/java/com/linkedin/venice/producer/AbstractVeniceProducer.java +++ b/clients/venice-producer/src/main/java/com/linkedin/venice/producer/AbstractVeniceProducer.java @@ -3,11 +3,9 @@ import static com.linkedin.venice.ConfigKeys.CLIENT_PRODUCER_THREAD_NUM; import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS; import static com.linkedin.venice.ConfigKeys.KAFKA_OVER_SSL; -import static com.linkedin.venice.ConfigKeys.KAFKA_SECURITY_PROTOCOL; import static com.linkedin.venice.ConfigKeys.SSL_KAFKA_BOOTSTRAP_SERVERS; import static com.linkedin.venice.writer.VeniceWriter.APP_DEFAULT_LOGICAL_TS; -import com.linkedin.venice.SSLConfig; import com.linkedin.venice.controllerapi.VersionCreationResponse; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.partitioner.VenicePartitioner; @@ -111,13 +109,11 @@ protected void configure( } private VeniceWriter getVeniceWriter(VersionCreationResponse versionCreationResponse) { - Properties writerProps = new Properties(); + Properties writerProps = producerConfigs.getPropertiesCopy(); if (versionCreationResponse.isEnableSSL()) { writerProps.put(KAFKA_OVER_SSL, "true"); writerProps.put(SSL_KAFKA_BOOTSTRAP_SERVERS, versionCreationResponse.getKafkaBootstrapServers()); - writerProps.put(KAFKA_SECURITY_PROTOCOL, producerConfigs.getString(KAFKA_SECURITY_PROTOCOL)); - writerProps.putAll(new SSLConfig(producerConfigs).getKafkaSSLConfig()); } else { writerProps.put(KAFKA_BOOTSTRAP_SERVERS, versionCreationResponse.getKafkaBootstrapServers()); }