Skip to content

Commit

Permalink
Code improvements.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hao Xu committed Aug 23, 2023
1 parent 0d2f85e commit 0d3a363
Show file tree
Hide file tree
Showing 20 changed files with 212 additions and 233 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public DaVinciBackend(
String clusterName = backendConfig.getClusterName();
writerFactoryMap.put(clusterName, writerFactory);
pushStatusStoreWriter =
new PushStatusStoreWriter(writerFactoryMap, instanceName, derivedSchemaID, s -> clusterName);
new PushStatusStoreWriter(writerFactoryMap, instanceName, derivedSchemaID, s -> Optional.of(clusterName));

SchemaReader kafkaMessageEnvelopeSchemaReader = ClientFactory.getSchemaReader(
ClientConfig.cloneConfig(clientConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -307,7 +308,7 @@ private void asyncStart() {
veniceWriterFactoryMap,
instance.getNodeId(),
veniceProperties.getInt(PUSH_STATUS_STORE_DERIVED_SCHEMA_ID, 1),
s -> clusterName);
s -> Optional.of(clusterName));

// Record replica status in Zookeeper.
// Need to be started before connecting to ZK, otherwise some notification will not be sent by this notifier.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,14 +315,13 @@ public KafkaStoreIngestionService(
.setPubSubTopicRepository(pubSubTopicRepository)
.setMetricsRepository(metricsRepository)
.setLocalKafkaBootstrapServers(serverConfig.getKafkaBootstrapServers())
.setPubSubConsumerAdapterFactory(pubSubClientsFactory.getConsumerAdapterFactory())
.setDefaultPubSubClientsFactory(pubSubClientsFactory)
.setTopicDeletionStatusPollIntervalMs(DEFAULT_TOPIC_DELETION_STATUS_POLL_INTERVAL_MS)
.setTopicMinLogCompactionLagMs(DEFAULT_KAFKA_MIN_LOG_COMPACTION_LAG_MS)
.setKafkaOperationTimeoutMs(DEFAULT_KAFKA_OPERATION_TIMEOUT_MS)
.setPubSubProperties(this::getPubSubSSLPropertiesFromServerConfig)
.setPubSubAdminAdapterFactory(pubSubClientsFactory.getAdminAdapterFactory())
.setPubSubClientsFactoryMap(pubSubClientsFactoryMap)
.setClusterNameSupplier(s -> serverConfig.getClusterName())
.setClusterNameSupplier(s -> Optional.of(serverConfig.getClusterName()))
.build();

VeniceNotifier notifier = new LogNotifier();
Expand All @@ -338,7 +337,7 @@ public KafkaStoreIngestionService(
metastoreWriterFactoryMap,
zkSharedSchemaRepository.get(),
pubSubTopicRepository,
s -> serverConfig.getClusterName());
s -> Optional.of(serverConfig.getClusterName()));
this.metaSystemStoreReplicaStatusNotifier = new MetaSystemStoreReplicaStatusNotifier(
serverConfig.getClusterName(),
metaStoreWriter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

public class CachedPubSubMetadataGetterTest {
private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
private final String clusterName = "testCluster";

@Test
public void testGetEarliestOffset() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1410,8 +1410,7 @@ private static void deleteKafkaTopic(CommandLine cmd) throws Exception {
.setTopicDeletionStatusPollIntervalMs(topicDeletionStatusPollingInterval)
.setTopicMinLogCompactionLagMs(0L)
.setLocalKafkaBootstrapServers(kafkaBootstrapServer)
.setPubSubConsumerAdapterFactory(PUB_SUB_CLIENTS_FACTORY.getConsumerAdapterFactory())
.setPubSubAdminAdapterFactory(PUB_SUB_CLIENTS_FACTORY.getAdminAdapterFactory())
.setDefaultPubSubClientsFactory(PUB_SUB_CLIENTS_FACTORY)
.setPubSubTopicRepository(pubSubTopicRepository)
.build()) {
TopicManager topicManager = topicManagerRepository.getTopicManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.utils.VeniceProperties;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
Expand All @@ -47,22 +47,17 @@ public class KafkaInputFormat implements InputFormat<KafkaInputMapperKey, KafkaI

protected Map<TopicPartition, Long> getLatestOffsets(JobConf config) {
VeniceProperties consumerProperties = KafkaInputUtils.getConsumerProperties(config);
String clusterName = "cluster_name";
PubSubClientsFactory pubSubClientsFactory = new PubSubClientsFactory(
new ApacheKafkaProducerAdapterFactory(),
new ApacheKafkaConsumerAdapterFactory(),
new ApacheKafkaAdminAdapterFactory());
Map<String, PubSubClientsFactory> pubSubClientsFactoryMap =
Collections.singletonMap(clusterName, pubSubClientsFactory);

try (TopicManagerRepository topicManagerRepository = TopicManagerRepository.builder()
.setPubSubProperties(k -> consumerProperties)
.setLocalKafkaBootstrapServers(config.get(KAFKA_INPUT_BROKER_URL))
.setPubSubTopicRepository(pubSubTopicRepository)
.setPubSubAdminAdapterFactory(new ApacheKafkaAdminAdapterFactory())
.setPubSubConsumerAdapterFactory(new ApacheKafkaConsumerAdapterFactory())
.setPubSubClientsFactoryMap(pubSubClientsFactoryMap)
.setClusterNameSupplier(s -> clusterName)
.setDefaultPubSubClientsFactory(pubSubClientsFactory)
.setClusterNameSupplier(s -> Optional.empty())
.build()) {
try (TopicManager topicManager = topicManagerRepository.getTopicManager()) {
String topic = config.get(KAFKA_INPUT_TOPIC);
Expand Down
Loading

0 comments on commit 0d3a363

Please sign in to comment.