Skip to content

Commit

Permalink
Adding properties to venice writer property passing.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hao Xu committed Sep 7, 2023
1 parent 08866ab commit a466e7e
Show file tree
Hide file tree
Showing 22 changed files with 86 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.serialization.VeniceKafkaSerializer;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.writer.VeniceWriter;
Expand Down Expand Up @@ -186,8 +187,8 @@ private void pushSyntheticData() throws ExecutionException, InterruptedException

PubSubProducerAdapterFactory pubSubProducerAdapterFactory =
veniceCluster.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory();
VeniceWriterFactory vwFactory = TestUtils
.getVeniceWriterFactory(veniceCluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory);
VeniceWriterFactory vwFactory = IntegrationTestPushUtils
.getVeniceWriterFactory(veniceCluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory);
try (VeniceWriter<Object, byte[], byte[]> veniceWriter = vwFactory.createVeniceWriter(
new VeniceWriterOptions.Builder(pushVersionTopic).setKeySerializer(keySerializer).build())) {
veniceWriter.broadcastStartOfPush(Collections.emptyMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void testSkipMessageEndToEnd() throws ExecutionException, InterruptedExce
PubSubProducerAdapterFactory pubSubProducerAdapterFactory =
pubSubBrokerWrapper.getPubSubClientsFactory().getProducerAdapterFactory();
VeniceWriter<byte[], byte[], byte[]> writer =
TestUtils.getVeniceWriterFactory(pubSubBrokerWrapper.getAddress(), pubSubProducerAdapterFactory)
IntegrationTestPushUtils.getVeniceWriterFactory(pubSubBrokerWrapper, pubSubProducerAdapterFactory)
.createVeniceWriter(new VeniceWriterOptions.Builder(adminTopic.getName()).build())) {
byte[] message = getStoreCreationMessage(clusterName, storeName, owner, "invalid_key_schema", valueSchema, 1);
long badOffset = writer.put(new byte[0], message, AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ public void testUnstableIngestionIsolation() throws Exception {
final int pushVersion = newVersion.getVersion();
String topic = newVersion.getKafkaTopic();
VeniceWriterFactory vwFactory =
TestUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory);
IntegrationTestPushUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory);
VeniceKafkaSerializer keySerializer = new VeniceAvroKafkaSerializer(DEFAULT_KEY_SCHEMA);
VeniceKafkaSerializer valueSerializer = new VeniceAvroKafkaSerializer(DEFAULT_VALUE_SCHEMA);

Expand Down Expand Up @@ -749,7 +749,7 @@ public void testBootstrap(DaVinciConfig daVinciConfig) throws Exception {
VersionCreationResponse newVersion = cluster.getNewVersion(storeName);
String topic = newVersion.getKafkaTopic();
VeniceWriterFactory vwFactory =
TestUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory);
IntegrationTestPushUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory);
VeniceKafkaSerializer keySerializer = new VeniceAvroKafkaSerializer(DEFAULT_KEY_SCHEMA);
VeniceKafkaSerializer valueSerializer = new VeniceAvroKafkaSerializer(DEFAULT_VALUE_SCHEMA);
int valueSchemaId = HelixReadOnlySchemaRepository.VALUE_SCHEMA_STARTING_ID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory;
import com.linkedin.venice.serialization.VeniceKafkaSerializer;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.PropertyBuilder;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
Expand Down Expand Up @@ -185,7 +186,7 @@ public void testComputeOnStoreWithQTFDScompliantSchema() throws Exception {
VersionCreationResponse newVersion = cluster.getNewVersion(storeName);
String topic = newVersion.getKafkaTopic();
VeniceWriterFactory vwFactory =
TestUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory);
IntegrationTestPushUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory);

VeniceKafkaSerializer keySerializer = new VeniceAvroKafkaSerializer(DEFAULT_KEY_SCHEMA);
VeniceKafkaSerializer valueSerializer = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_FOR_COMPUTE_NULLABLE_LIST_FIELD);
Expand Down Expand Up @@ -293,7 +294,7 @@ public void testReadComputeMissingField() throws Exception {
VersionCreationResponse newVersion = cluster.getNewVersion(storeName);
String topic = newVersion.getKafkaTopic();
VeniceWriterFactory vwFactory =
TestUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory);
IntegrationTestPushUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory);

VeniceKafkaSerializer keySerializer = new VeniceAvroKafkaSerializer(DEFAULT_KEY_SCHEMA);
VeniceKafkaSerializer valueSerializer = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_FOR_COMPUTE);
Expand Down Expand Up @@ -427,7 +428,7 @@ public void testReadComputeSwappedFields() throws Exception {
VersionCreationResponse newVersion = cluster.getNewVersion(storeName);
String topic = newVersion.getKafkaTopic();
VeniceWriterFactory vwFactory =
TestUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory);
IntegrationTestPushUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory);

VeniceKafkaSerializer keySerializer = new VeniceAvroKafkaSerializer(DEFAULT_KEY_SCHEMA);
VeniceKafkaSerializer valueSerializer = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_FOR_COMPUTE);
Expand Down Expand Up @@ -540,7 +541,7 @@ public void testComputeStreamingExecute() throws ExecutionException, Interrupted
VersionCreationResponse newVersion = cluster.getNewVersion(storeName);
String topic = newVersion.getKafkaTopic();
VeniceWriterFactory vwFactory =
TestUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory);
IntegrationTestPushUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory);

VeniceKafkaSerializer keySerializer = new VeniceAvroKafkaSerializer(KEY_SCHEMA_STEAMING_COMPUTE);
VeniceKafkaSerializer valueSerializer = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_STREAMING_COMPUTE);
Expand Down Expand Up @@ -639,7 +640,7 @@ public void testPartialKeyLookupWithRocksDBBlockBasedTable() throws ExecutionExc
VersionCreationResponse newVersion = cluster.getNewVersion(storeName);
String topic = newVersion.getKafkaTopic();
VeniceWriterFactory vwFactory =
TestUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory);
IntegrationTestPushUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory);

VeniceKafkaSerializer keySerializer = new VeniceAvroKafkaSerializer(KEY_SCHEMA_PARTIAL_KEY_LOOKUP);
VeniceKafkaSerializer valueSerializer = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_FOR_COMPUTE);
Expand Down Expand Up @@ -740,7 +741,7 @@ public void testPartialKeyLookupWithRocksDBPlainTable() throws ExecutionExceptio
VersionCreationResponse newVersion = cluster.getNewVersion(storeName);
String topic = newVersion.getKafkaTopic();
VeniceWriterFactory vwFactory =
TestUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory);
IntegrationTestPushUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory);

VeniceKafkaSerializer keySerializer = new VeniceAvroKafkaSerializer(KEY_SCHEMA_PARTIAL_KEY_LOOKUP);
VeniceKafkaSerializer valueSerializer = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_FOR_COMPUTE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.linkedin.venice.serialization.VeniceKafkaSerializer;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.writer.VeniceWriter;
Expand Down Expand Up @@ -106,7 +107,7 @@ public void testLiveUpdateSuppression(IngestionMode ingestionMode) throws Except
PubSubProducerAdapterFactory pubSubProducerAdapterFactory =
cluster.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory();
VeniceWriterFactory vwFactory =
TestUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory);
IntegrationTestPushUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory);
VeniceKafkaSerializer keySerializer = new VeniceAvroKafkaSerializer(DEFAULT_KEY_SCHEMA);
VeniceKafkaSerializer valueSerializer = new VeniceAvroKafkaSerializer(DEFAULT_VALUE_SCHEMA);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,10 +486,11 @@ public void testActiveActiveStoreRestart() throws Exception {
-1));

String topic = versionCreationResponse.getKafkaTopic();
String kafkaUrl = versionCreationResponse.getKafkaBootstrapServers();
PubSubBrokerWrapper pubSubBrokerWrapper = clusterWrapper.getPubSubBrokerWrapper();
PubSubProducerAdapterFactory pubSubProducerAdapterFactory =
clusterWrapper.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory();
VeniceWriterFactory veniceWriterFactory = TestUtils.getVeniceWriterFactory(kafkaUrl, pubSubProducerAdapterFactory);
pubSubBrokerWrapper.getPubSubClientsFactory().getProducerAdapterFactory();
VeniceWriterFactory veniceWriterFactory =
IntegrationTestPushUtils.getVeniceWriterFactory(pubSubBrokerWrapper, pubSubProducerAdapterFactory);
try (VeniceWriter<byte[], byte[], byte[]> veniceWriter =
veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(topic).build())) {
veniceWriter.broadcastStartOfPush(true, Collections.emptyMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.linkedin.venice.serialization.VeniceKafkaSerializer;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.utils.HelixUtils;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.ObjectMapperFactory;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Time;
Expand Down Expand Up @@ -101,8 +102,8 @@ public void setUp() throws VeniceClientException {
valueSerializer = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_STR);
PubSubProducerAdapterFactory pubSubProducerAdapterFactory =
veniceCluster.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory();
veniceWriter = TestUtils
.getVeniceWriterFactory(veniceCluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory)
veniceWriter = IntegrationTestPushUtils
.getVeniceWriterFactory(veniceCluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory)
.createVeniceWriter(
new VeniceWriterOptions.Builder(storeVersionName).setKeySerializer(keySerializer)
.setValueSerializer(valueSerializer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.hadoop.VenicePushJob;
import com.linkedin.venice.helix.HelixBaseRoutingRepository;
import com.linkedin.venice.integration.utils.PubSubBrokerWrapper;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceServerWrapper;
Expand All @@ -26,6 +27,7 @@
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory;
import com.linkedin.venice.serializer.AvroSerializer;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
Expand Down Expand Up @@ -133,10 +135,11 @@ public void testLeaderReplicaFailover() throws Exception {
-1));

String topic = versionCreationResponse.getKafkaTopic();
String kafkaUrl = versionCreationResponse.getKafkaBootstrapServers();
PubSubBrokerWrapper pubSubBrokerWrapper = clusterWrapper.getPubSubBrokerWrapper();
PubSubProducerAdapterFactory pubSubProducerAdapterFactory =
clusterWrapper.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory();
VeniceWriterFactory veniceWriterFactory = TestUtils.getVeniceWriterFactory(kafkaUrl, pubSubProducerAdapterFactory);
pubSubBrokerWrapper.getPubSubClientsFactory().getProducerAdapterFactory();
VeniceWriterFactory veniceWriterFactory =
IntegrationTestPushUtils.getVeniceWriterFactory(pubSubBrokerWrapper, pubSubProducerAdapterFactory);
HelixBaseRoutingRepository routingDataRepo = clusterWrapper.getLeaderVeniceController()
.getVeniceHelixAdmin()
.getHelixVeniceClusterResources(clusterWrapper.getClusterName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.linkedin.venice.fastclient.schema.TestValueSchema;
import com.linkedin.venice.helix.HelixReadOnlySchemaRepository;
import com.linkedin.venice.integration.utils.D2TestUtils;
import com.linkedin.venice.integration.utils.PubSubBrokerWrapper;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterCreateOptions;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
Expand All @@ -52,6 +53,7 @@
import com.linkedin.venice.systemstore.schemas.StoreMetaKey;
import com.linkedin.venice.systemstore.schemas.StoreMetaValue;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.PropertyBuilder;
import com.linkedin.venice.utils.SslUtils;
import com.linkedin.venice.utils.TestUtils;
Expand Down Expand Up @@ -256,10 +258,10 @@ protected void prepareData() throws Exception {
keySerializer = new VeniceAvroKafkaSerializer(KEY_SCHEMA_STR);
valueSerializer = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_STR);

PubSubBrokerWrapper pubSubBrokerWrapper = veniceCluster.getPubSubBrokerWrapper();
PubSubProducerAdapterFactory pubSubProducerAdapterFactory =
veniceCluster.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory();
veniceWriter = TestUtils
.getVeniceWriterFactory(veniceCluster.getPubSubBrokerWrapper().getAddress(), pubSubProducerAdapterFactory)
pubSubBrokerWrapper.getPubSubClientsFactory().getProducerAdapterFactory();
veniceWriter = IntegrationTestPushUtils.getVeniceWriterFactory(pubSubBrokerWrapper, pubSubProducerAdapterFactory)
.createVeniceWriter(
new VeniceWriterOptions.Builder(storeVersionName).setKeySerializer(keySerializer)
.setValueSerializer(valueSerializer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.writer.VeniceWriter;
Expand Down Expand Up @@ -65,7 +64,7 @@ public String getTopic(int numRecord, int numPartition) {
PubSubProducerAdapterFactory pubSubProducerAdapterFactory =
pubSubBrokerWrapper.getPubSubClientsFactory().getProducerAdapterFactory();
VeniceWriterFactory veniceWriterFactory =
TestUtils.getVeniceWriterFactory(pubSubBrokerWrapper.getAddress(), pubSubProducerAdapterFactory);
IntegrationTestPushUtils.getVeniceWriterFactory(pubSubBrokerWrapper, pubSubProducerAdapterFactory);
try (VeniceWriter<byte[], byte[], byte[]> veniceWriter =
veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(topicName).build())) {
for (int i = 0; i < numRecord; ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.writer.VeniceWriter;
Expand Down Expand Up @@ -61,8 +60,8 @@ public void cleanUp() throws IOException {
public String getTopic(int numRecord, Pair<Integer, Integer> updateRange, Pair<Integer, Integer> deleteRange) {
String topicName = Utils.getUniqueString("test_kafka_input_format") + "_v1";
manager.createTopic(pubSubTopicRepository.getTopic(topicName), 1, 1, true);
VeniceWriterFactory veniceWriterFactory = TestUtils.getVeniceWriterFactory(
pubSubBrokerWrapper.getAddress(),
VeniceWriterFactory veniceWriterFactory = IntegrationTestPushUtils.getVeniceWriterFactory(
pubSubBrokerWrapper,
pubSubBrokerWrapper.getPubSubClientsFactory().getProducerAdapterFactory());
try (VeniceWriter<byte[], byte[], byte[]> veniceWriter =
veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(topicName).build())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.integration.utils.D2TestUtils;
import com.linkedin.venice.integration.utils.PubSubBrokerWrapper;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.TestVeniceServer;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
Expand Down Expand Up @@ -265,10 +266,11 @@ public void testActiveActiveStoreWithRMDAndRestartServer(boolean deleteSSTFiles,
newVersion = versionToBePushed;

String topic = versionCreationResponse.getKafkaTopic();
String kafkaUrl = versionCreationResponse.getKafkaBootstrapServers();
PubSubBrokerWrapper pubSubBrokerWrapper = clusterWrapper.getPubSubBrokerWrapper();
PubSubProducerAdapterFactory pubSubProducerAdapterFactory =
clusterWrapper.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory();
VeniceWriterFactory veniceWriterFactory = TestUtils.getVeniceWriterFactory(kafkaUrl, pubSubProducerAdapterFactory);
pubSubBrokerWrapper.getPubSubClientsFactory().getProducerAdapterFactory();
VeniceWriterFactory veniceWriterFactory =
IntegrationTestPushUtils.getVeniceWriterFactory(pubSubBrokerWrapper, pubSubProducerAdapterFactory);

startKey += numKeys; // to have different version having different set of keys
int endKey = startKey + numKeys;
Expand Down
Loading

0 comments on commit a466e7e

Please sign in to comment.