From a466e7e5c15726c6af6bd53398687c2379f586e5 Mon Sep 17 00:00:00 2001 From: Hao Xu Date: Thu, 24 Aug 2023 17:57:16 -0700 Subject: [PATCH] Adding properties to venice writer property passing. --- .../linkedin/venice/VeniceClusterInitializer.java | 5 +++-- .../AdminConsumptionTaskIntegrationTest.java | 2 +- .../linkedin/venice/endToEnd/DaVinciClientTest.java | 4 ++-- .../venice/endToEnd/DaVinciComputeTest.java | 13 +++++++------ .../endToEnd/DaVinciLiveUpdateSuppressionTest.java | 3 ++- .../venice/endToEnd/TestActiveActiveIngestion.java | 7 ++++--- .../venice/endToEnd/TestHelixCustomizedView.java | 5 +++-- .../venice/endToEnd/TestLeaderReplicaFailover.java | 9 ++++++--- .../utils/AbstractClientEndToEndSetup.java | 8 +++++--- .../hadoop/input/kafka/TestKafkaInputFormat.java | 3 +-- .../input/kafka/TestKafkaInputRecordReader.java | 5 ++--- ...erDeletingSstFilesWithActiveActiveIngestion.java | 8 +++++--- .../restart/TestRestartServerDuringIngestion.java | 11 +++++++---- .../java/com/linkedin/venice/router/TestRead.java | 5 +++-- .../com/linkedin/venice/router/TestRouterRetry.java | 5 +++-- .../com/linkedin/venice/router/TestStreaming.java | 5 +++-- .../storagenode/ReadComputeValidationTest.java | 13 +++++++------ .../venice/storagenode/StorageNodeComputeTest.java | 9 +++++---- .../venice/storagenode/StorageNodeReadTest.java | 5 +++-- .../throttle/TestRouterReadQuotaThrottler.java | 3 ++- .../venice/utils/IntegrationTestPushUtils.java | 12 ++++++++++++ .../java/com/linkedin/venice/utils/TestUtils.java | 8 -------- 22 files changed, 86 insertions(+), 62 deletions(-) diff --git a/internal/venice-avro-compatibility-test/src/test/java/com/linkedin/venice/VeniceClusterInitializer.java b/internal/venice-avro-compatibility-test/src/test/java/com/linkedin/venice/VeniceClusterInitializer.java index 84b9c9fbcb..189cdf0323 100644 --- a/internal/venice-avro-compatibility-test/src/test/java/com/linkedin/venice/VeniceClusterInitializer.java +++ b/internal/venice-avro-compatibility-test/src/test/java/com/linkedin/venice/VeniceClusterInitializer.java @@ -19,6 +19,7 @@ import com.linkedin.venice.pushmonitor.ExecutionStatus; import com.linkedin.venice.serialization.VeniceKafkaSerializer; import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; +import com.linkedin.venice.utils.IntegrationTestPushUtils; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.writer.VeniceWriter; @@ -186,8 +187,8 @@ private void pushSyntheticData() throws ExecutionException, InterruptedException PubSubProducerAdapterFactory pubSubProducerAdapterFactory = veniceCluster.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory(); - VeniceWriterFactory vwFactory = TestUtils - .getVeniceWriterFactory(veniceCluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory); + VeniceWriterFactory vwFactory = IntegrationTestPushUtils + .getVeniceWriterFactory(veniceCluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory); try (VeniceWriter veniceWriter = vwFactory.createVeniceWriter( new VeniceWriterOptions.Builder(pushVersionTopic).setKeySerializer(keySerializer).build())) { veniceWriter.broadcastStartOfPush(Collections.emptyMap()); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskIntegrationTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskIntegrationTest.java index 27c3e7fb40..63f10b9e84 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskIntegrationTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskIntegrationTest.java @@ -78,7 +78,7 @@ public void testSkipMessageEndToEnd() throws ExecutionException, InterruptedExce PubSubProducerAdapterFactory pubSubProducerAdapterFactory = pubSubBrokerWrapper.getPubSubClientsFactory().getProducerAdapterFactory(); VeniceWriter writer = - TestUtils.getVeniceWriterFactory(pubSubBrokerWrapper.getAddress(), pubSubProducerAdapterFactory) + IntegrationTestPushUtils.getVeniceWriterFactory(pubSubBrokerWrapper, pubSubProducerAdapterFactory) .createVeniceWriter(new VeniceWriterOptions.Builder(adminTopic.getName()).build())) { byte[] message = getStoreCreationMessage(clusterName, storeName, owner, "invalid_key_schema", valueSchema, 1); long badOffset = writer.put(new byte[0], message, AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java index d56e5bcd2d..2e229e40b2 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java @@ -363,7 +363,7 @@ public void testUnstableIngestionIsolation() throws Exception { final int pushVersion = newVersion.getVersion(); String topic = newVersion.getKafkaTopic(); VeniceWriterFactory vwFactory = - TestUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory); + IntegrationTestPushUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory); VeniceKafkaSerializer keySerializer = new VeniceAvroKafkaSerializer(DEFAULT_KEY_SCHEMA); VeniceKafkaSerializer valueSerializer = new VeniceAvroKafkaSerializer(DEFAULT_VALUE_SCHEMA); @@ -749,7 +749,7 @@ public void testBootstrap(DaVinciConfig daVinciConfig) throws Exception { VersionCreationResponse newVersion = cluster.getNewVersion(storeName); String topic = newVersion.getKafkaTopic(); VeniceWriterFactory vwFactory = - TestUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory); + IntegrationTestPushUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory); VeniceKafkaSerializer keySerializer = new VeniceAvroKafkaSerializer(DEFAULT_KEY_SCHEMA); VeniceKafkaSerializer valueSerializer = new VeniceAvroKafkaSerializer(DEFAULT_VALUE_SCHEMA); int valueSchemaId = HelixReadOnlySchemaRepository.VALUE_SCHEMA_STARTING_ID; diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciComputeTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciComputeTest.java index f194b02276..eef7a98a93 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciComputeTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciComputeTest.java @@ -35,6 +35,7 @@ import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; import com.linkedin.venice.serialization.VeniceKafkaSerializer; import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; +import com.linkedin.venice.utils.IntegrationTestPushUtils; import com.linkedin.venice.utils.PropertyBuilder; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Utils; @@ -185,7 +186,7 @@ public void testComputeOnStoreWithQTFDScompliantSchema() throws Exception { VersionCreationResponse newVersion = cluster.getNewVersion(storeName); String topic = newVersion.getKafkaTopic(); VeniceWriterFactory vwFactory = - TestUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory); + IntegrationTestPushUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory); VeniceKafkaSerializer keySerializer = new VeniceAvroKafkaSerializer(DEFAULT_KEY_SCHEMA); VeniceKafkaSerializer valueSerializer = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_FOR_COMPUTE_NULLABLE_LIST_FIELD); @@ -293,7 +294,7 @@ public void testReadComputeMissingField() throws Exception { VersionCreationResponse newVersion = cluster.getNewVersion(storeName); String topic = newVersion.getKafkaTopic(); VeniceWriterFactory vwFactory = - TestUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory); + IntegrationTestPushUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory); VeniceKafkaSerializer keySerializer = new VeniceAvroKafkaSerializer(DEFAULT_KEY_SCHEMA); VeniceKafkaSerializer valueSerializer = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_FOR_COMPUTE); @@ -427,7 +428,7 @@ public void testReadComputeSwappedFields() throws Exception { VersionCreationResponse newVersion = cluster.getNewVersion(storeName); String topic = newVersion.getKafkaTopic(); VeniceWriterFactory vwFactory = - TestUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory); + IntegrationTestPushUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory); VeniceKafkaSerializer keySerializer = new VeniceAvroKafkaSerializer(DEFAULT_KEY_SCHEMA); VeniceKafkaSerializer valueSerializer = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_FOR_COMPUTE); @@ -540,7 +541,7 @@ public void testComputeStreamingExecute() throws ExecutionException, Interrupted VersionCreationResponse newVersion = cluster.getNewVersion(storeName); String topic = newVersion.getKafkaTopic(); VeniceWriterFactory vwFactory = - TestUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory); + IntegrationTestPushUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory); VeniceKafkaSerializer keySerializer = new VeniceAvroKafkaSerializer(KEY_SCHEMA_STEAMING_COMPUTE); VeniceKafkaSerializer valueSerializer = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_STREAMING_COMPUTE); @@ -639,7 +640,7 @@ public void testPartialKeyLookupWithRocksDBBlockBasedTable() throws ExecutionExc VersionCreationResponse newVersion = cluster.getNewVersion(storeName); String topic = newVersion.getKafkaTopic(); VeniceWriterFactory vwFactory = - TestUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory); + IntegrationTestPushUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory); VeniceKafkaSerializer keySerializer = new VeniceAvroKafkaSerializer(KEY_SCHEMA_PARTIAL_KEY_LOOKUP); VeniceKafkaSerializer valueSerializer = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_FOR_COMPUTE); @@ -740,7 +741,7 @@ public void testPartialKeyLookupWithRocksDBPlainTable() throws ExecutionExceptio VersionCreationResponse newVersion = cluster.getNewVersion(storeName); String topic = newVersion.getKafkaTopic(); VeniceWriterFactory vwFactory = - TestUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory); + IntegrationTestPushUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory); VeniceKafkaSerializer keySerializer = new VeniceAvroKafkaSerializer(KEY_SCHEMA_PARTIAL_KEY_LOOKUP); VeniceKafkaSerializer valueSerializer = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_FOR_COMPUTE); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciLiveUpdateSuppressionTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciLiveUpdateSuppressionTest.java index 91d058a5f3..2203a709f2 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciLiveUpdateSuppressionTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciLiveUpdateSuppressionTest.java @@ -27,6 +27,7 @@ import com.linkedin.venice.serialization.VeniceKafkaSerializer; import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; import com.linkedin.venice.utils.DataProviderUtils; +import com.linkedin.venice.utils.IntegrationTestPushUtils; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.writer.VeniceWriter; @@ -106,7 +107,7 @@ public void testLiveUpdateSuppression(IngestionMode ingestionMode) throws Except PubSubProducerAdapterFactory pubSubProducerAdapterFactory = cluster.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory(); VeniceWriterFactory vwFactory = - TestUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory); + IntegrationTestPushUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory); VeniceKafkaSerializer keySerializer = new VeniceAvroKafkaSerializer(DEFAULT_KEY_SCHEMA); VeniceKafkaSerializer valueSerializer = new VeniceAvroKafkaSerializer(DEFAULT_VALUE_SCHEMA); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java index c888b91ffd..475c204f93 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java @@ -486,10 +486,11 @@ public void testActiveActiveStoreRestart() throws Exception { -1)); String topic = versionCreationResponse.getKafkaTopic(); - String kafkaUrl = versionCreationResponse.getKafkaBootstrapServers(); + PubSubBrokerWrapper pubSubBrokerWrapper = clusterWrapper.getPubSubBrokerWrapper(); PubSubProducerAdapterFactory pubSubProducerAdapterFactory = - clusterWrapper.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory(); - VeniceWriterFactory veniceWriterFactory = TestUtils.getVeniceWriterFactory(kafkaUrl, pubSubProducerAdapterFactory); + pubSubBrokerWrapper.getPubSubClientsFactory().getProducerAdapterFactory(); + VeniceWriterFactory veniceWriterFactory = + IntegrationTestPushUtils.getVeniceWriterFactory(pubSubBrokerWrapper, pubSubProducerAdapterFactory); try (VeniceWriter veniceWriter = veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(topic).build())) { veniceWriter.broadcastStartOfPush(true, Collections.emptyMap()); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestHelixCustomizedView.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestHelixCustomizedView.java index 80b718402d..e5a85896c0 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestHelixCustomizedView.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestHelixCustomizedView.java @@ -20,6 +20,7 @@ import com.linkedin.venice.serialization.VeniceKafkaSerializer; import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; import com.linkedin.venice.utils.HelixUtils; +import com.linkedin.venice.utils.IntegrationTestPushUtils; import com.linkedin.venice.utils.ObjectMapperFactory; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Time; @@ -101,8 +102,8 @@ public void setUp() throws VeniceClientException { valueSerializer = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_STR); PubSubProducerAdapterFactory pubSubProducerAdapterFactory = veniceCluster.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory(); - veniceWriter = TestUtils - .getVeniceWriterFactory(veniceCluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory) + veniceWriter = IntegrationTestPushUtils + .getVeniceWriterFactory(veniceCluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory) .createVeniceWriter( new VeniceWriterOptions.Builder(storeVersionName).setKeySerializer(keySerializer) .setValueSerializer(valueSerializer) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestLeaderReplicaFailover.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestLeaderReplicaFailover.java index cb4fb5aa04..3ef8ba0904 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestLeaderReplicaFailover.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestLeaderReplicaFailover.java @@ -17,6 +17,7 @@ import com.linkedin.venice.controllerapi.VersionCreationResponse; import com.linkedin.venice.hadoop.VenicePushJob; import com.linkedin.venice.helix.HelixBaseRoutingRepository; +import com.linkedin.venice.integration.utils.PubSubBrokerWrapper; import com.linkedin.venice.integration.utils.ServiceFactory; import com.linkedin.venice.integration.utils.VeniceClusterWrapper; import com.linkedin.venice.integration.utils.VeniceServerWrapper; @@ -26,6 +27,7 @@ import com.linkedin.venice.meta.Version; import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; import com.linkedin.venice.serializer.AvroSerializer; +import com.linkedin.venice.utils.IntegrationTestPushUtils; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; @@ -133,10 +135,11 @@ public void testLeaderReplicaFailover() throws Exception { -1)); String topic = versionCreationResponse.getKafkaTopic(); - String kafkaUrl = versionCreationResponse.getKafkaBootstrapServers(); + PubSubBrokerWrapper pubSubBrokerWrapper = clusterWrapper.getPubSubBrokerWrapper(); PubSubProducerAdapterFactory pubSubProducerAdapterFactory = - clusterWrapper.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory(); - VeniceWriterFactory veniceWriterFactory = TestUtils.getVeniceWriterFactory(kafkaUrl, pubSubProducerAdapterFactory); + pubSubBrokerWrapper.getPubSubClientsFactory().getProducerAdapterFactory(); + VeniceWriterFactory veniceWriterFactory = + IntegrationTestPushUtils.getVeniceWriterFactory(pubSubBrokerWrapper, pubSubProducerAdapterFactory); HelixBaseRoutingRepository routingDataRepo = clusterWrapper.getLeaderVeniceController() .getVeniceHelixAdmin() .getHelixVeniceClusterResources(clusterWrapper.getClusterName()) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/utils/AbstractClientEndToEndSetup.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/utils/AbstractClientEndToEndSetup.java index 989a87a9bb..e19182e3cc 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/utils/AbstractClientEndToEndSetup.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/utils/AbstractClientEndToEndSetup.java @@ -39,6 +39,7 @@ import com.linkedin.venice.fastclient.schema.TestValueSchema; import com.linkedin.venice.helix.HelixReadOnlySchemaRepository; import com.linkedin.venice.integration.utils.D2TestUtils; +import com.linkedin.venice.integration.utils.PubSubBrokerWrapper; import com.linkedin.venice.integration.utils.ServiceFactory; import com.linkedin.venice.integration.utils.VeniceClusterCreateOptions; import com.linkedin.venice.integration.utils.VeniceClusterWrapper; @@ -52,6 +53,7 @@ import com.linkedin.venice.systemstore.schemas.StoreMetaKey; import com.linkedin.venice.systemstore.schemas.StoreMetaValue; import com.linkedin.venice.utils.DataProviderUtils; +import com.linkedin.venice.utils.IntegrationTestPushUtils; import com.linkedin.venice.utils.PropertyBuilder; import com.linkedin.venice.utils.SslUtils; import com.linkedin.venice.utils.TestUtils; @@ -256,10 +258,10 @@ protected void prepareData() throws Exception { keySerializer = new VeniceAvroKafkaSerializer(KEY_SCHEMA_STR); valueSerializer = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_STR); + PubSubBrokerWrapper pubSubBrokerWrapper = veniceCluster.getPubSubBrokerWrapper(); PubSubProducerAdapterFactory pubSubProducerAdapterFactory = - veniceCluster.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory(); - veniceWriter = TestUtils - .getVeniceWriterFactory(veniceCluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory) + pubSubBrokerWrapper.getPubSubClientsFactory().getProducerAdapterFactory(); + veniceWriter = IntegrationTestPushUtils.getVeniceWriterFactory(pubSubBrokerWrapper, pubSubProducerAdapterFactory) .createVeniceWriter( new VeniceWriterOptions.Builder(storeVersionName).setKeySerializer(keySerializer) .setValueSerializer(valueSerializer) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/hadoop/input/kafka/TestKafkaInputFormat.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/hadoop/input/kafka/TestKafkaInputFormat.java index f25cfabfd4..c684668820 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/hadoop/input/kafka/TestKafkaInputFormat.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/hadoop/input/kafka/TestKafkaInputFormat.java @@ -11,7 +11,6 @@ import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; import com.linkedin.venice.pubsub.PubSubTopicRepository; import com.linkedin.venice.utils.IntegrationTestPushUtils; -import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.writer.VeniceWriter; @@ -65,7 +64,7 @@ public String getTopic(int numRecord, int numPartition) { PubSubProducerAdapterFactory pubSubProducerAdapterFactory = pubSubBrokerWrapper.getPubSubClientsFactory().getProducerAdapterFactory(); VeniceWriterFactory veniceWriterFactory = - TestUtils.getVeniceWriterFactory(pubSubBrokerWrapper.getAddress(), pubSubProducerAdapterFactory); + IntegrationTestPushUtils.getVeniceWriterFactory(pubSubBrokerWrapper, pubSubProducerAdapterFactory); try (VeniceWriter veniceWriter = veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(topicName).build())) { for (int i = 0; i < numRecord; ++i) { diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/hadoop/input/kafka/TestKafkaInputRecordReader.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/hadoop/input/kafka/TestKafkaInputRecordReader.java index f7d5b95adc..540f77fa33 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/hadoop/input/kafka/TestKafkaInputRecordReader.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/hadoop/input/kafka/TestKafkaInputRecordReader.java @@ -16,7 +16,6 @@ import com.linkedin.venice.utils.ByteUtils; import com.linkedin.venice.utils.IntegrationTestPushUtils; import com.linkedin.venice.utils.Pair; -import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.writer.VeniceWriter; @@ -61,8 +60,8 @@ public void cleanUp() throws IOException { public String getTopic(int numRecord, Pair updateRange, Pair deleteRange) { String topicName = Utils.getUniqueString("test_kafka_input_format") + "_v1"; manager.createTopic(pubSubTopicRepository.getTopic(topicName), 1, 1, true); - VeniceWriterFactory veniceWriterFactory = TestUtils.getVeniceWriterFactory( - pubSubBrokerWrapper.getAddress(), + VeniceWriterFactory veniceWriterFactory = IntegrationTestPushUtils.getVeniceWriterFactory( + pubSubBrokerWrapper, pubSubBrokerWrapper.getPubSubClientsFactory().getProducerAdapterFactory()); try (VeniceWriter veniceWriter = veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(topicName).build())) { diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/restart/TestRestartServerAfterDeletingSstFilesWithActiveActiveIngestion.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/restart/TestRestartServerAfterDeletingSstFilesWithActiveActiveIngestion.java index ba99cdf3c8..4337a7ad86 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/restart/TestRestartServerAfterDeletingSstFilesWithActiveActiveIngestion.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/restart/TestRestartServerAfterDeletingSstFilesWithActiveActiveIngestion.java @@ -29,6 +29,7 @@ import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; import com.linkedin.venice.controllerapi.VersionCreationResponse; import com.linkedin.venice.integration.utils.D2TestUtils; +import com.linkedin.venice.integration.utils.PubSubBrokerWrapper; import com.linkedin.venice.integration.utils.ServiceFactory; import com.linkedin.venice.integration.utils.TestVeniceServer; import com.linkedin.venice.integration.utils.VeniceClusterWrapper; @@ -265,10 +266,11 @@ public void testActiveActiveStoreWithRMDAndRestartServer(boolean deleteSSTFiles, newVersion = versionToBePushed; String topic = versionCreationResponse.getKafkaTopic(); - String kafkaUrl = versionCreationResponse.getKafkaBootstrapServers(); + PubSubBrokerWrapper pubSubBrokerWrapper = clusterWrapper.getPubSubBrokerWrapper(); PubSubProducerAdapterFactory pubSubProducerAdapterFactory = - clusterWrapper.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory(); - VeniceWriterFactory veniceWriterFactory = TestUtils.getVeniceWriterFactory(kafkaUrl, pubSubProducerAdapterFactory); + pubSubBrokerWrapper.getPubSubClientsFactory().getProducerAdapterFactory(); + VeniceWriterFactory veniceWriterFactory = + IntegrationTestPushUtils.getVeniceWriterFactory(pubSubBrokerWrapper, pubSubProducerAdapterFactory); startKey += numKeys; // to have different version having different set of keys int endKey = startKey + numKeys; diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/restart/TestRestartServerDuringIngestion.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/restart/TestRestartServerDuringIngestion.java index 996cc59877..7d3f072f11 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/restart/TestRestartServerDuringIngestion.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/restart/TestRestartServerDuringIngestion.java @@ -10,6 +10,7 @@ import com.linkedin.venice.controllerapi.ControllerClient; import com.linkedin.venice.controllerapi.VersionCreationResponse; import com.linkedin.venice.hadoop.VenicePushJob; +import com.linkedin.venice.integration.utils.PubSubBrokerWrapper; import com.linkedin.venice.integration.utils.ServiceFactory; import com.linkedin.venice.integration.utils.VeniceClusterWrapper; import com.linkedin.venice.integration.utils.VeniceRouterWrapper; @@ -119,8 +120,9 @@ public void testIngestionRecovery() throws ExecutionException, InterruptedExcept -1)); } String topic = versionCreationResponse.getKafkaTopic(); - String kafkaUrl = versionCreationResponse.getKafkaBootstrapServers(); - VeniceWriterFactory veniceWriterFactory = TestUtils.getVeniceWriterFactory(kafkaUrl, pubSubProducerAdapterFactory); + PubSubBrokerWrapper pubSubBrokerWrapper = cluster.getPubSubBrokerWrapper(); + VeniceWriterFactory veniceWriterFactory = + IntegrationTestPushUtils.getVeniceWriterFactory(pubSubBrokerWrapper, pubSubProducerAdapterFactory); try (VeniceWriter veniceWriter = veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(topic).build())) { veniceWriter.broadcastStartOfPush(true, Collections.emptyMap()); @@ -266,8 +268,9 @@ public void testIngestionDrainer() { -1)); } String topic = versionCreationResponse.getKafkaTopic(); - String kafkaUrl = versionCreationResponse.getKafkaBootstrapServers(); - VeniceWriterFactory veniceWriterFactory = TestUtils.getVeniceWriterFactory(kafkaUrl, pubSubProducerAdapterFactory); + PubSubBrokerWrapper pubSubBrokerWrapper = cluster.getPubSubBrokerWrapper(); + VeniceWriterFactory veniceWriterFactory = + IntegrationTestPushUtils.getVeniceWriterFactory(pubSubBrokerWrapper, pubSubProducerAdapterFactory); try (VeniceWriter veniceWriter = veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(topic).build())) { diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/router/TestRead.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/router/TestRead.java index 25792b41a6..3fc8211b5b 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/router/TestRead.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/router/TestRead.java @@ -38,6 +38,7 @@ import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; import com.linkedin.venice.stats.StatsErrorCode; import com.linkedin.venice.tehuti.MetricsUtils; +import com.linkedin.venice.utils.IntegrationTestPushUtils; import com.linkedin.venice.utils.ObjectMapperFactory; import com.linkedin.venice.utils.SslUtils; import com.linkedin.venice.utils.TestUtils; @@ -186,8 +187,8 @@ public void setUp() throws VeniceClientException, ExecutionException, Interrupte valueSerializer = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_STR); PubSubProducerAdapterFactory pubSubProducerAdapterFactory = veniceCluster.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory(); - veniceWriter = TestUtils - .getVeniceWriterFactory(veniceCluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory) + veniceWriter = IntegrationTestPushUtils + .getVeniceWriterFactory(veniceCluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory) .createVeniceWriter( new VeniceWriterOptions.Builder(storeVersionName).setKeySerializer(keySerializer) .setValueSerializer(valueSerializer) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/router/TestRouterRetry.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/router/TestRouterRetry.java index 562b2ce230..8ada96feb0 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/router/TestRouterRetry.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/router/TestRouterRetry.java @@ -18,6 +18,7 @@ import com.linkedin.venice.serialization.VeniceKafkaSerializer; import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; import com.linkedin.venice.tehuti.MetricsUtils; +import com.linkedin.venice.utils.IntegrationTestPushUtils; import com.linkedin.venice.utils.SslUtils; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Utils; @@ -91,8 +92,8 @@ public void setUp() throws VeniceClientException, ExecutionException, Interrupte VeniceKafkaSerializer valueSerializer = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_STR); PubSubProducerAdapterFactory pubSubProducerAdapterFactory = veniceCluster.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory(); - veniceWriter = TestUtils - .getVeniceWriterFactory(veniceCluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory) + veniceWriter = IntegrationTestPushUtils + .getVeniceWriterFactory(veniceCluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory) .createVeniceWriter( new VeniceWriterOptions.Builder(storeVersionName).setKeySerializer(keySerializer) .setValueSerializer(valueSerializer) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/router/TestStreaming.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/router/TestStreaming.java index 14fb1519a4..1da5fa2472 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/router/TestStreaming.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/router/TestStreaming.java @@ -38,6 +38,7 @@ import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; import com.linkedin.venice.tehuti.MetricsUtils; import com.linkedin.venice.utils.DataProviderUtils; +import com.linkedin.venice.utils.IntegrationTestPushUtils; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; @@ -136,8 +137,8 @@ public void setUp() throws InterruptedException, ExecutionException, VeniceClien VeniceCompressor compressor = compressorFactory.getCompressor(compressionStrategy); PubSubProducerAdapterFactory pubSubProducerAdapterFactory = veniceCluster.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory(); - veniceWriter = TestUtils - .getVeniceWriterFactory(veniceCluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory) + veniceWriter = IntegrationTestPushUtils + .getVeniceWriterFactory(veniceCluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory) .createVeniceWriter(new VeniceWriterOptions.Builder(storeVersionName).setKeySerializer(keySerializer).build()); final int pushVersion = Version.parseVersionFromKafkaTopicName(storeVersionName); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/storagenode/ReadComputeValidationTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/storagenode/ReadComputeValidationTest.java index 4c4c745d16..58cb7aa09f 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/storagenode/ReadComputeValidationTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/storagenode/ReadComputeValidationTest.java @@ -25,6 +25,7 @@ import com.linkedin.venice.serialization.DefaultSerializer; import com.linkedin.venice.serialization.VeniceKafkaSerializer; import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; +import com.linkedin.venice.utils.IntegrationTestPushUtils; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; @@ -145,8 +146,8 @@ public void testComputeMissingField() throws Exception { String topic = newVersion.getKafkaTopic(); PubSubProducerAdapterFactory pubSubProducerAdapterFactory = veniceCluster.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory(); - VeniceWriterFactory vwFactory = TestUtils - .getVeniceWriterFactory(veniceCluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory); + VeniceWriterFactory vwFactory = IntegrationTestPushUtils + .getVeniceWriterFactory(veniceCluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory); try ( VeniceWriter veniceWriter = vwFactory.createVeniceWriter( new VeniceWriterOptions.Builder(topic).setKeySerializer(keySerializer) @@ -233,8 +234,8 @@ public void testComputeSwappedFields() throws Exception { String topic = newVersion.getKafkaTopic(); PubSubProducerAdapterFactory pubSubProducerAdapterFactory = veniceCluster.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory(); - VeniceWriterFactory vwFactory = TestUtils - .getVeniceWriterFactory(veniceCluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory); + VeniceWriterFactory vwFactory = IntegrationTestPushUtils + .getVeniceWriterFactory(veniceCluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory); try ( VeniceWriter veniceWriter = vwFactory .createVeniceWriter(new VeniceWriterOptions.Builder(topic).setKeySerializer(keySerializer).build()); @@ -348,8 +349,8 @@ public void testComputeOnStoreWithQTFDScompliantSchema() throws Exception { valuesByKey.put(key2, value2); PubSubProducerAdapterFactory pubSubProducerAdapterFactory = veniceCluster.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory(); - VeniceWriterFactory vwFactory = TestUtils - .getVeniceWriterFactory(veniceCluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory); + VeniceWriterFactory vwFactory = IntegrationTestPushUtils + .getVeniceWriterFactory(veniceCluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory); try ( VeniceWriter veniceWriter = vwFactory .createVeniceWriter(new VeniceWriterOptions.Builder(topic).setKeySerializer(keySerializer).build()); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/storagenode/StorageNodeComputeTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/storagenode/StorageNodeComputeTest.java index 88faa304b2..418cbdf16d 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/storagenode/StorageNodeComputeTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/storagenode/StorageNodeComputeTest.java @@ -28,6 +28,7 @@ import com.linkedin.venice.serialization.VeniceKafkaSerializer; import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; import com.linkedin.venice.tehuti.MetricsUtils; +import com.linkedin.venice.utils.IntegrationTestPushUtils; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.writer.VeniceWriter; @@ -180,8 +181,8 @@ public void testCompute(CompressionStrategy compressionStrategy, ValueSize value PubSubProducerAdapterFactory pubSubProducerAdapterFactory = veniceCluster.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory(); - VeniceWriterFactory vwFactory = TestUtils - .getVeniceWriterFactory(veniceCluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory); + VeniceWriterFactory vwFactory = IntegrationTestPushUtils + .getVeniceWriterFactory(veniceCluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory); try (VeniceWriter veniceWriter = vwFactory.createVeniceWriter( new VeniceWriterOptions.Builder(topic).setKeySerializer(keySerializer) .setValueSerializer(new DefaultSerializer()) @@ -293,8 +294,8 @@ public void testComputeRequestSize() throws Exception { try ( PubSubProducerAdapterFactory pubSubProducerAdapterFactory = veniceCluster.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory(); - VeniceWriter veniceWriter = TestUtils - .getVeniceWriterFactory(veniceCluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory) + VeniceWriter veniceWriter = IntegrationTestPushUtils + .getVeniceWriterFactory(veniceCluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory) .createVeniceWriter( new VeniceWriterOptions.Builder(newVersion.getKafkaTopic()).setKeySerializer(keySerializer).build()); AvroGenericStoreClient storeClient = ClientFactory.getAndStartGenericAvroClient( diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/storagenode/StorageNodeReadTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/storagenode/StorageNodeReadTest.java index 11d07abfb7..24983cc4dd 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/storagenode/StorageNodeReadTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/storagenode/StorageNodeReadTest.java @@ -33,6 +33,7 @@ import com.linkedin.venice.serializer.RecordDeserializer; import com.linkedin.venice.serializer.RecordSerializer; import com.linkedin.venice.serializer.SerializerDeserializerFactory; +import com.linkedin.venice.utils.IntegrationTestPushUtils; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; @@ -114,8 +115,8 @@ public void setUp() throws InterruptedException, ExecutionException, VeniceClien valueSerializer = new VeniceAvroKafkaSerializer(stringSchema); PubSubProducerAdapterFactory pubSubProducerAdapterFactory = veniceCluster.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory(); - veniceWriter = TestUtils - .getVeniceWriterFactory(veniceCluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory) + veniceWriter = IntegrationTestPushUtils + .getVeniceWriterFactory(veniceCluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory) .createVeniceWriter( new VeniceWriterOptions.Builder(storeVersionName).setKeySerializer(keySerializer) .setValueSerializer(valueSerializer) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/throttle/TestRouterReadQuotaThrottler.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/throttle/TestRouterReadQuotaThrottler.java index e4be6c78c0..d350fabbf4 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/throttle/TestRouterReadQuotaThrottler.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/throttle/TestRouterReadQuotaThrottler.java @@ -18,6 +18,7 @@ import com.linkedin.venice.router.throttle.ReadRequestThrottler; import com.linkedin.venice.serialization.VeniceKafkaSerializer; import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; +import com.linkedin.venice.utils.IntegrationTestPushUtils; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; @@ -56,7 +57,7 @@ public void setUp() throws Exception { PubSubProducerAdapterFactory pubSubProducerAdapterFactory = cluster.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory(); VeniceWriterFactory writerFactory = - TestUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory); + IntegrationTestPushUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory); try (VeniceKafkaSerializer keySerializer = new VeniceAvroKafkaSerializer(stringSchema); VeniceKafkaSerializer valueSerializer = new VeniceAvroKafkaSerializer(stringSchema); VeniceWriter writer = writerFactory.createVeniceWriter( diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/utils/IntegrationTestPushUtils.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/utils/IntegrationTestPushUtils.java index 05863290fb..a986ecf876 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/utils/IntegrationTestPushUtils.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/utils/IntegrationTestPushUtils.java @@ -40,9 +40,11 @@ import com.linkedin.venice.kafka.TopicManagerRepository; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; +import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; import com.linkedin.venice.pubsub.PubSubTopicRepository; import com.linkedin.venice.samza.VeniceObjectWithTimestamp; import com.linkedin.venice.samza.VeniceSystemFactory; +import com.linkedin.venice.writer.VeniceWriterFactory; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -383,4 +385,14 @@ public static TopicManagerRepository getTopicManagerRepo( .setTopicMinLogCompactionLagMs(topicMinLogCompactionLagMs) .build(); } + + public static VeniceWriterFactory getVeniceWriterFactory( + PubSubBrokerWrapper pubSubBrokerWrapper, + PubSubProducerAdapterFactory pubSubProducerAdapterFactory) { + Properties veniceWriterProperties = new Properties(); + veniceWriterProperties.put(KAFKA_BOOTSTRAP_SERVERS, pubSubBrokerWrapper.getAddress()); + veniceWriterProperties + .putAll(PubSubBrokerWrapper.getBrokerDetailsForClients(Collections.singletonList(pubSubBrokerWrapper))); + return TestUtils.getVeniceWriterFactory(veniceWriterProperties, pubSubProducerAdapterFactory); + } } diff --git a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java index b82e1575ce..e6e6805f6c 100644 --- a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java +++ b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java @@ -570,14 +570,6 @@ public static String getClusterToD2String(Map clusterToD2) { return clusterToD2.entrySet().stream().map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(",")); } - public static VeniceWriterFactory getVeniceWriterFactory( - String kafkaBootstrapServers, - PubSubProducerAdapterFactory pubSubProducerAdapterFactory) { - Properties properties = new Properties(); - properties.put(ConfigKeys.KAFKA_BOOTSTRAP_SERVERS, kafkaBootstrapServers); - return getVeniceWriterFactory(properties, pubSubProducerAdapterFactory); - } - public static VeniceWriterFactory getVeniceWriterFactory( Properties properties, PubSubProducerAdapterFactory pubSubProducerAdapterFactory) {