From 0d3a36378c7193579774e3ad5e01fd816df68d44 Mon Sep 17 00:00:00 2001 From: Hao Xu Date: Tue, 22 Aug 2023 10:48:08 -0700 Subject: [PATCH] Code improvements. --- .../com/linkedin/davinci/DaVinciBackend.java | 2 +- .../helix/HelixParticipationService.java | 3 +- .../consumer/KafkaStoreIngestionService.java | 7 +- .../CachedPubSubMetadataGetterTest.java | 1 - .../java/com/linkedin/venice/AdminTool.java | 3 +- .../hadoop/input/kafka/KafkaInputFormat.java | 11 +- .../linkedin/venice/kafka/TopicManager.java | 190 ++++++++---------- .../venice/kafka/TopicManagerRepository.java | 32 +-- .../PartitionOffsetFetcherFactory.java | 16 +- .../PartitionOffsetFetcherImpl.java | 96 +++++---- .../PushStatusStoreVeniceWriterCache.java | 9 +- .../PushStatusStoreWriter.java | 2 +- .../venice/system/store/MetaStoreWriter.java | 11 +- .../venice/kafka/TopicManagerTest.java | 23 +-- .../venice/endToEnd/PushStatusStoreTest.java | 2 +- .../kafka/TopicManagerIntegrationTest.java | 2 +- .../utils/IntegrationTestPushUtils.java | 5 +- .../venice/utils/TestDictionaryUtils.java | 3 +- .../venice/writer/VeniceWriterTest.java | 2 +- .../venice/controller/VeniceHelixAdmin.java | 25 +-- 20 files changed, 212 insertions(+), 233 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index ef9fe393284..4a7fbb488ac 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -190,7 +190,7 @@ public DaVinciBackend( String clusterName = backendConfig.getClusterName(); writerFactoryMap.put(clusterName, writerFactory); pushStatusStoreWriter = - new PushStatusStoreWriter(writerFactoryMap, instanceName, derivedSchemaID, s -> clusterName); + new PushStatusStoreWriter(writerFactoryMap, instanceName, derivedSchemaID, s -> Optional.of(clusterName)); SchemaReader kafkaMessageEnvelopeSchemaReader = ClientFactory.getSchemaReader( ClientConfig.cloneConfig(clientConfig) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixParticipationService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixParticipationService.java index bbf3d5ade34..63b31762bee 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixParticipationService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixParticipationService.java @@ -43,6 +43,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -307,7 +308,7 @@ private void asyncStart() { veniceWriterFactoryMap, instance.getNodeId(), veniceProperties.getInt(PUSH_STATUS_STORE_DERIVED_SCHEMA_ID, 1), - s -> clusterName); + s -> Optional.of(clusterName)); // Record replica status in Zookeeper. // Need to be started before connecting to ZK, otherwise some notification will not be sent by this notifier. diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java index 63c12775c0a..b8977e6edf9 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java @@ -315,14 +315,13 @@ public KafkaStoreIngestionService( .setPubSubTopicRepository(pubSubTopicRepository) .setMetricsRepository(metricsRepository) .setLocalKafkaBootstrapServers(serverConfig.getKafkaBootstrapServers()) - .setPubSubConsumerAdapterFactory(pubSubClientsFactory.getConsumerAdapterFactory()) + .setDefaultPubSubClientsFactory(pubSubClientsFactory) .setTopicDeletionStatusPollIntervalMs(DEFAULT_TOPIC_DELETION_STATUS_POLL_INTERVAL_MS) .setTopicMinLogCompactionLagMs(DEFAULT_KAFKA_MIN_LOG_COMPACTION_LAG_MS) .setKafkaOperationTimeoutMs(DEFAULT_KAFKA_OPERATION_TIMEOUT_MS) .setPubSubProperties(this::getPubSubSSLPropertiesFromServerConfig) - .setPubSubAdminAdapterFactory(pubSubClientsFactory.getAdminAdapterFactory()) .setPubSubClientsFactoryMap(pubSubClientsFactoryMap) - .setClusterNameSupplier(s -> serverConfig.getClusterName()) + .setClusterNameSupplier(s -> Optional.of(serverConfig.getClusterName())) .build(); VeniceNotifier notifier = new LogNotifier(); @@ -338,7 +337,7 @@ public KafkaStoreIngestionService( metastoreWriterFactoryMap, zkSharedSchemaRepository.get(), pubSubTopicRepository, - s -> serverConfig.getClusterName()); + s -> Optional.of(serverConfig.getClusterName())); this.metaSystemStoreReplicaStatusNotifier = new MetaSystemStoreReplicaStatusNotifier( serverConfig.getClusterName(), metaStoreWriter, diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/CachedPubSubMetadataGetterTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/CachedPubSubMetadataGetterTest.java index f6268281496..eaa5b74610f 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/CachedPubSubMetadataGetterTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/CachedPubSubMetadataGetterTest.java @@ -23,7 +23,6 @@ public class CachedPubSubMetadataGetterTest { private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); - private final String clusterName = "testCluster"; @Test public void testGetEarliestOffset() { diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java index 700137940c0..e61c271842c 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java @@ -1410,8 +1410,7 @@ private static void deleteKafkaTopic(CommandLine cmd) throws Exception { .setTopicDeletionStatusPollIntervalMs(topicDeletionStatusPollingInterval) .setTopicMinLogCompactionLagMs(0L) .setLocalKafkaBootstrapServers(kafkaBootstrapServer) - .setPubSubConsumerAdapterFactory(PUB_SUB_CLIENTS_FACTORY.getConsumerAdapterFactory()) - .setPubSubAdminAdapterFactory(PUB_SUB_CLIENTS_FACTORY.getAdminAdapterFactory()) + .setDefaultPubSubClientsFactory(PUB_SUB_CLIENTS_FACTORY) .setPubSubTopicRepository(pubSubTopicRepository) .build()) { TopicManager topicManager = topicManagerRepository.getTopicManager(); diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputFormat.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputFormat.java index dfcc2ff510f..9fc42f6c8e4 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputFormat.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputFormat.java @@ -16,11 +16,11 @@ import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter; import com.linkedin.venice.utils.VeniceProperties; import java.io.IOException; -import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -47,22 +47,17 @@ public class KafkaInputFormat implements InputFormat getLatestOffsets(JobConf config) { VeniceProperties consumerProperties = KafkaInputUtils.getConsumerProperties(config); - String clusterName = "cluster_name"; PubSubClientsFactory pubSubClientsFactory = new PubSubClientsFactory( new ApacheKafkaProducerAdapterFactory(), new ApacheKafkaConsumerAdapterFactory(), new ApacheKafkaAdminAdapterFactory()); - Map pubSubClientsFactoryMap = - Collections.singletonMap(clusterName, pubSubClientsFactory); try (TopicManagerRepository topicManagerRepository = TopicManagerRepository.builder() .setPubSubProperties(k -> consumerProperties) .setLocalKafkaBootstrapServers(config.get(KAFKA_INPUT_BROKER_URL)) .setPubSubTopicRepository(pubSubTopicRepository) - .setPubSubAdminAdapterFactory(new ApacheKafkaAdminAdapterFactory()) - .setPubSubConsumerAdapterFactory(new ApacheKafkaConsumerAdapterFactory()) - .setPubSubClientsFactoryMap(pubSubClientsFactoryMap) - .setClusterNameSupplier(s -> clusterName) + .setDefaultPubSubClientsFactory(pubSubClientsFactory) + .setClusterNameSupplier(s -> Optional.empty()) .build()) { try (TopicManager topicManager = topicManagerRepository.getTopicManager()) { String topic = config.get(KAFKA_INPUT_TOPIC); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/TopicManager.java b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/TopicManager.java index 863ee7612ae..45da17aab31 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/TopicManager.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/TopicManager.java @@ -10,6 +10,7 @@ import com.linkedin.venice.pubsub.PubSubAdminAdapterFactory; import com.linkedin.venice.pubsub.PubSubClientsFactory; import com.linkedin.venice.pubsub.PubSubConstants; +import com.linkedin.venice.pubsub.PubSubConsumerAdapterFactory; import com.linkedin.venice.pubsub.PubSubTopicConfiguration; import com.linkedin.venice.pubsub.PubSubTopicPartitionInfo; import com.linkedin.venice.pubsub.PubSubTopicRepository; @@ -30,6 +31,7 @@ import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.utils.lazy.Lazy; import com.linkedin.venice.utils.pools.LandFillObjectPool; import io.tehuti.metrics.MetricsRepository; @@ -38,7 +40,6 @@ import java.time.Duration; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -52,10 +53,10 @@ /** * Topic Manager is shared by multiple cluster's controllers running in one physical Venice controller instance. * - * This class contains one global {@link PubSubConsumerAdapter}, which is not thread-safe, so when you add new functions, + * This class contains per cluster and a default {@link PubSubConsumerAdapter}, which is not thread-safe, so when you add new functions, * which is using this global consumer, please add 'synchronized' keyword, otherwise this {@link TopicManager} * won't be thread-safe, and Kafka consumer will report the following error when multiple threads are trying to - * use the same consumer: PubSubConsumerAdapter is not safe for multi-threaded access. + * use the same consumer: PubSubConsumerAdapter is not safe for multi-thread access. */ public class TopicManager implements Closeable { private static final int FAST_KAFKA_OPERATION_TIMEOUT_MS = Time.MS_PER_SECOND; @@ -82,16 +83,12 @@ public class TopicManager implements Closeable { // Immutable state private final Logger logger; private final String pubSubBootstrapServers; - private final long kafkaOperationTimeoutMs; + private final long pubSubOperationTimeoutMs; private final long topicMinLogCompactionLagMs; - private final PubSubAdminAdapterFactory pubSubAdminAdapterFactory; - // TODO: Use single PubSubAdminAdapter for both read and write operations - private final Lazy pubSubWriteOnlyAdminAdapter; - private final Lazy pubSubReadOnlyAdminAdapter; - - private final Lazy pubSubConsumerAdapter; - private final Map pubSubAdminAdapterMap = new HashMap<>(); - private final Map pubSubConsumerAdapterMap = new HashMap<>(); + private final Lazy pubSubDefaultAdminAdapter; + private final Lazy pubSubDefaultConsumerAdapter; + private final Map pubSubAdminAdapterMap = new VeniceConcurrentHashMap<>(); + private final Map pubSubConsumerAdapterMap = new VeniceConcurrentHashMap<>(); private final PartitionOffsetFetcher partitionOffsetFetcher; private final Map pubSubClientsFactoryMap; private final PubSubTopicRepository pubSubTopicRepository; @@ -102,22 +99,22 @@ public class TopicManager implements Closeable { new LandFillObjectPool<>(KafkaMessageEnvelope::new)); private final Optional optionalMetricsRepository; private final TopicManagerRepository.ClusterNameSupplier clusterNameSupplier; - TopicManagerRepository.SSLPropertiesSupplier pubSubPropertiesSupplier; + private final TopicManagerRepository.SSLPropertiesSupplier pubSubPropertiesSupplier; // It's expensive to grab the topic config over and over again, and it changes infrequently. So we temporarily cache // queried configs. Cache topicConfigCache = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build(); private PubSubConsumerAdapter getConsumerAdapter(PubSubTopic pubSubTopic) { - if (pubSubTopic == null) { - return pubSubConsumerAdapter.get(); + Optional clusterName = clusterNameSupplier.get(pubSubTopic); + if (!clusterName.isPresent() || pubSubClientsFactoryMap == null) { + return pubSubDefaultConsumerAdapter.get(); } - String clusterName = clusterNameSupplier.get(pubSubTopic); - if (clusterName == null) { - return pubSubConsumerAdapter.get(); + if (!pubSubClientsFactoryMap.containsKey(clusterName.get())) { + throw new PubSubClientException("No PubSubClientsFactory found for cluster: " + clusterName.get()); } return pubSubConsumerAdapterMap.computeIfAbsent( - clusterName, + clusterName.get(), c -> pubSubClientsFactoryMap.get(c) .getConsumerAdapterFactory() .create( @@ -128,14 +125,14 @@ private PubSubConsumerAdapter getConsumerAdapter(PubSubTopic pubSubTopic) { } private PubSubAdminAdapter getAdminAdapter(PubSubTopic pubSubTopic) { - if (pubSubTopic == null) { - return pubSubReadOnlyAdminAdapter.get(); + Optional clusterName = clusterNameSupplier.get(pubSubTopic); + if (!clusterName.isPresent() || pubSubClientsFactoryMap == null) { + return pubSubDefaultAdminAdapter.get(); } - String clusterName = clusterNameSupplier.get(pubSubTopic); - if (clusterName == null) { - return pubSubReadOnlyAdminAdapter.get(); + if (!pubSubClientsFactoryMap.containsKey(clusterName.get())) { + throw new PubSubClientException("No PubSubClientsFactory found for cluster: " + clusterName.get()); } - return pubSubAdminAdapterMap.computeIfAbsent(clusterName, c -> { + return pubSubAdminAdapterMap.computeIfAbsent(clusterName.get(), c -> { PubSubAdminAdapterFactory pubSubAdminAdapterFactory = pubSubClientsFactoryMap.get(c).getAdminAdapterFactory(); VeniceProperties veniceProperties = pubSubPropertiesSupplier.get(pubSubBootstrapServers); PubSubAdminAdapter adminAdapter = pubSubAdminAdapterFactory.create(veniceProperties, pubSubTopicRepository); @@ -153,19 +150,11 @@ private PubSubAdminAdapter getAdminAdapter(PubSubTopic pubSubTopic) { }); } - public interface AdminSupplier { - PubSubAdminAdapter get(PubSubTopic pubSubTopic); - } - - public interface ConsumerSupplier { - PubSubConsumerAdapter get(PubSubTopic pubSubTopic); - } - public TopicManager(TopicManagerRepository.Builder builder, String pubSubBootstrapServers) { this.logger = LogManager.getLogger(this.getClass().getSimpleName() + " [" + pubSubBootstrapServers + "]"); - this.kafkaOperationTimeoutMs = builder.getKafkaOperationTimeoutMs(); + this.pubSubOperationTimeoutMs = builder.getKafkaOperationTimeoutMs(); this.topicMinLogCompactionLagMs = builder.getTopicMinLogCompactionLagMs(); - this.pubSubAdminAdapterFactory = builder.getPubSubAdminAdapterFactory(); + this.pubSubBootstrapServers = pubSubBootstrapServers; this.pubSubClientsFactoryMap = builder.getPubSubClientsFactoryMap(); @@ -175,49 +164,37 @@ public TopicManager(TopicManagerRepository.Builder builder, String pubSubBootstr this.optionalMetricsRepository = Optional.ofNullable(builder.getMetricsRepository()); - this.pubSubReadOnlyAdminAdapter = Lazy.of(() -> { - PubSubAdminAdapter pubSubReadOnlyAdmin = - pubSubAdminAdapterFactory.create(pubSubPropertiesSupplier.get(pubSubBootstrapServers), pubSubTopicRepository); - pubSubReadOnlyAdmin = createInstrumentedPubSubAdmin( - optionalMetricsRepository, - "ReadOnlyKafkaAdminStats", - pubSubReadOnlyAdmin, - pubSubBootstrapServers); - logger.info( - "{} is using read-only pubsub admin client of class: {}", - this.getClass().getSimpleName(), - pubSubReadOnlyAdmin.getClassName()); - return pubSubReadOnlyAdmin; - }); + PubSubAdminAdapterFactory pubSubAdminAdapterFactory = builder.getPubSubClientsFactory().getAdminAdapterFactory(); + PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory = + builder.getPubSubClientsFactory().getConsumerAdapterFactory(); - this.pubSubWriteOnlyAdminAdapter = Lazy.of(() -> { - PubSubAdminAdapter pubSubWriteOnlyAdmin = + this.pubSubDefaultAdminAdapter = Lazy.of(() -> { + PubSubAdminAdapter pubSubAdminAdapter = pubSubAdminAdapterFactory.create(pubSubPropertiesSupplier.get(pubSubBootstrapServers), pubSubTopicRepository); - pubSubWriteOnlyAdmin = createInstrumentedPubSubAdmin( + pubSubAdminAdapter = createInstrumentedPubSubAdmin( optionalMetricsRepository, - "WriteOnlyKafkaAdminStats", - pubSubWriteOnlyAdmin, + "DefaultKafkaAdminStats", + pubSubAdminAdapter, pubSubBootstrapServers); logger.info( - "{} is using write-only pubsub admin client of class: {}", + "{} is using default pubsub admin client of class: {}", this.getClass().getSimpleName(), - pubSubWriteOnlyAdmin.getClassName()); - return pubSubWriteOnlyAdmin; + pubSubAdminAdapter.getClassName()); + return pubSubAdminAdapter; }); - this.pubSubConsumerAdapter = Lazy.of( - () -> builder.getPubSubConsumerAdapterFactory() - .create( - pubSubPropertiesSupplier.get(pubSubBootstrapServers), - false, - pubSubMessageDeserializer, - pubSubBootstrapServers)); + this.pubSubDefaultConsumerAdapter = Lazy.of( + () -> pubSubConsumerAdapterFactory.create( + pubSubPropertiesSupplier.get(pubSubBootstrapServers), + false, + pubSubMessageDeserializer, + pubSubBootstrapServers)); this.partitionOffsetFetcher = PartitionOffsetFetcherFactory.createDefaultPartitionOffsetFetcher( this::getConsumerAdapter, this::getAdminAdapter, pubSubBootstrapServers, - kafkaOperationTimeoutMs, + pubSubOperationTimeoutMs, optionalMetricsRepository); } @@ -330,7 +307,7 @@ public void createTopic( long startTime = System.currentTimeMillis(); long deadlineMs = - startTime + (useFastKafkaOperationTimeout ? FAST_KAFKA_OPERATION_TIMEOUT_MS : kafkaOperationTimeoutMs); + startTime + (useFastKafkaOperationTimeout ? FAST_KAFKA_OPERATION_TIMEOUT_MS : pubSubOperationTimeoutMs); PubSubTopicConfiguration pubSubTopicConfiguration = new PubSubTopicConfiguration(Optional.of(retentionTimeMs), logCompaction, minIsr, topicMinLogCompactionLagMs); logger.info( @@ -347,7 +324,7 @@ public void createTopic( 10, Duration.ofMillis(200), Duration.ofSeconds(1), - Duration.ofMillis(useFastKafkaOperationTimeout ? FAST_KAFKA_OPERATION_TIMEOUT_MS : kafkaOperationTimeoutMs), + Duration.ofMillis(useFastKafkaOperationTimeout ? FAST_KAFKA_OPERATION_TIMEOUT_MS : pubSubOperationTimeoutMs), CREATE_TOPIC_RETRIABLE_EXCEPTIONS); } catch (Exception e) { if (ExceptionUtils.recursiveClassEquals(e, PubSubTopicExistsException.class)) { @@ -383,34 +360,34 @@ protected void waitUntilTopicCreated(PubSubTopic pubSubTopic, int partitionCount /** * Update retention for the given topic. * If the topic doesn't exist, this operation will throw {@link PubSubTopicDoesNotExistException} - * @param topicName + * @param pubSubTopic * @param retentionInMS * @return true if the retention time config of the input topic gets updated; return false if nothing gets updated */ - public boolean updateTopicRetention(PubSubTopic topicName, long retentionInMS) + public boolean updateTopicRetention(PubSubTopic pubSubTopic, long retentionInMS) throws PubSubTopicDoesNotExistException { - PubSubTopicConfiguration pubSubTopicConfiguration = getTopicConfig(topicName); - return updateTopicRetention(topicName, retentionInMS, pubSubTopicConfiguration); + PubSubTopicConfiguration pubSubTopicConfiguration = getTopicConfig(pubSubTopic); + return updateTopicRetention(pubSubTopic, retentionInMS, pubSubTopicConfiguration); } /** * Update retention for the given topic given a {@link Properties}. - * @param topicName + * @param pubSubTopic * @param expectedRetentionInMs * @param pubSubTopicConfiguration * @return true if the retention time gets updated; false if no update is needed. */ public boolean updateTopicRetention( - PubSubTopic topicName, + PubSubTopic pubSubTopic, long expectedRetentionInMs, PubSubTopicConfiguration pubSubTopicConfiguration) throws PubSubTopicDoesNotExistException { Optional retentionTimeMs = pubSubTopicConfiguration.retentionInMs(); if (!retentionTimeMs.isPresent() || expectedRetentionInMs != retentionTimeMs.get()) { pubSubTopicConfiguration.setRetentionInMs(Optional.of(expectedRetentionInMs)); - getAdminAdapter(topicName).setTopicConfig(topicName, pubSubTopicConfiguration); + getAdminAdapter(pubSubTopic).setTopicConfig(pubSubTopic, pubSubTopicConfiguration); logger.info( "Updated topic: {} with retention.ms: {} in cluster [{}]", - topicName, + pubSubTopic, expectedRetentionInMs, this.pubSubBootstrapServers); return true; @@ -451,7 +428,7 @@ public synchronized void updateTopicCompactionPolicy( || expectedLogCompacted && expectedMinLogCompactionLagMs != currentMinLogCompactionLagMs) { pubSubTopicConfiguration.setLogCompacted(expectedLogCompacted); pubSubTopicConfiguration.setMinLogCompactionLagMs(expectedMinLogCompactionLagMs); - pubSubWriteOnlyAdminAdapter.get().setTopicConfig(topic, pubSubTopicConfiguration); + pubSubDefaultAdminAdapter.get().setTopicConfig(topic, pubSubTopicConfiguration); logger.info( "Kafka compaction policy for topic: {} has been updated from {} to {}, min compaction lag updated from {} to {}", topic, @@ -462,25 +439,25 @@ public synchronized void updateTopicCompactionPolicy( } } - public boolean isTopicCompactionEnabled(PubSubTopic topicName) { - PubSubTopicConfiguration topicProperties = getCachedTopicConfig(topicName); + public boolean isTopicCompactionEnabled(PubSubTopic pubSubTopic) { + PubSubTopicConfiguration topicProperties = getCachedTopicConfig(pubSubTopic); return topicProperties.isLogCompacted(); } - public long getTopicMinLogCompactionLagMs(PubSubTopic topicName) { - PubSubTopicConfiguration topicProperties = getCachedTopicConfig(topicName); + public long getTopicMinLogCompactionLagMs(PubSubTopic pubSubTopic) { + PubSubTopicConfiguration topicProperties = getCachedTopicConfig(pubSubTopic); return topicProperties.minLogCompactionLagMs(); } - public boolean updateTopicMinInSyncReplica(PubSubTopic topicName, int minISR) + public boolean updateTopicMinInSyncReplica(PubSubTopic pubSubTopic, int minISR) throws PubSubTopicDoesNotExistException { - PubSubTopicConfiguration pubSubTopicConfiguration = getTopicConfig(topicName); + PubSubTopicConfiguration pubSubTopicConfiguration = getTopicConfig(pubSubTopic); Optional currentMinISR = pubSubTopicConfiguration.minInSyncReplicas(); // config doesn't exist config is different if (!currentMinISR.isPresent() || !currentMinISR.get().equals(minISR)) { pubSubTopicConfiguration.setMinInSyncReplicas(Optional.of(minISR)); - pubSubWriteOnlyAdminAdapter.get().setTopicConfig(topicName, pubSubTopicConfiguration); - logger.info("Updated topic: {} with min.insync.replicas: {}", topicName, minISR); + pubSubDefaultAdminAdapter.get().setTopicConfig(pubSubTopic, pubSubTopicConfiguration); + logger.info("Updated topic: {} with min.insync.replicas: {}", pubSubTopic, minISR); return true; } // min.insync.replicas has already been updated for this topic before @@ -488,14 +465,14 @@ public boolean updateTopicMinInSyncReplica(PubSubTopic topicName, int minISR) } public Map getAllTopicRetentions() { - return pubSubReadOnlyAdminAdapter.get().getAllTopicRetentions(); + return pubSubDefaultAdminAdapter.get().getAllTopicRetentions(); } /** * Return topic retention time in MS. */ - public long getTopicRetention(PubSubTopic topicName) throws PubSubTopicDoesNotExistException { - PubSubTopicConfiguration pubSubTopicConfiguration = getTopicConfig(topicName); + public long getTopicRetention(PubSubTopic pubSubTopic) throws PubSubTopicDoesNotExistException { + PubSubTopicConfiguration pubSubTopicConfiguration = getTopicConfig(pubSubTopic); return getTopicRetention(pubSubTopicConfiguration); } @@ -508,14 +485,14 @@ public long getTopicRetention(PubSubTopicConfiguration pubSubTopicConfiguration) /** * Check whether topic is absent or truncated - * @param topicName + * @param pubSubTopic * @param truncatedTopicMaxRetentionMs * @return true if the topic does not exist or if it exists but its retention time is below truncated threshold * false if the topic exists and its retention time is above truncated threshold */ - public boolean isTopicTruncated(PubSubTopic topicName, long truncatedTopicMaxRetentionMs) { + public boolean isTopicTruncated(PubSubTopic pubSubTopic, long truncatedTopicMaxRetentionMs) { try { - return isRetentionBelowTruncatedThreshold(getTopicRetention(topicName), truncatedTopicMaxRetentionMs); + return isRetentionBelowTruncatedThreshold(getTopicRetention(pubSubTopic), truncatedTopicMaxRetentionMs); } catch (PubSubTopicDoesNotExistException e) { return true; } @@ -528,34 +505,34 @@ public boolean isRetentionBelowTruncatedThreshold(long retention, long truncated /** * This operation is a little heavy, since it will pull the configs for all the topics. */ - public PubSubTopicConfiguration getTopicConfig(PubSubTopic topicName) throws PubSubTopicDoesNotExistException { - final PubSubTopicConfiguration pubSubTopicConfiguration = getAdminAdapter(topicName).getTopicConfig(topicName); - topicConfigCache.put(topicName, pubSubTopicConfiguration); + public PubSubTopicConfiguration getTopicConfig(PubSubTopic pubSubTopic) throws PubSubTopicDoesNotExistException { + final PubSubTopicConfiguration pubSubTopicConfiguration = getAdminAdapter(pubSubTopic).getTopicConfig(pubSubTopic); + topicConfigCache.put(pubSubTopic, pubSubTopicConfiguration); return pubSubTopicConfiguration; } - public PubSubTopicConfiguration getTopicConfigWithRetry(PubSubTopic topicName) { + public PubSubTopicConfiguration getTopicConfigWithRetry(PubSubTopic pubSubTopic) { final PubSubTopicConfiguration pubSubTopicConfiguration = - getAdminAdapter(topicName).getTopicConfigWithRetry(topicName); - topicConfigCache.put(topicName, pubSubTopicConfiguration); + getAdminAdapter(pubSubTopic).getTopicConfigWithRetry(pubSubTopic); + topicConfigCache.put(pubSubTopic, pubSubTopicConfiguration); return pubSubTopicConfiguration; } /** * Still heavy, but can be called repeatedly to amortize that cost. */ - public PubSubTopicConfiguration getCachedTopicConfig(PubSubTopic topicName) { + public PubSubTopicConfiguration getCachedTopicConfig(PubSubTopic pubSubTopic) { // query the cache first, if it doesn't have it, query it from kafka and store it. - PubSubTopicConfiguration pubSubTopicConfiguration = topicConfigCache.getIfPresent(topicName); + PubSubTopicConfiguration pubSubTopicConfiguration = topicConfigCache.getIfPresent(pubSubTopic); if (pubSubTopicConfiguration == null) { - pubSubTopicConfiguration = getTopicConfigWithRetry(topicName); + pubSubTopicConfiguration = getTopicConfigWithRetry(pubSubTopic); } return pubSubTopicConfiguration; } - public Map getSomeTopicConfigs(Set topicNames) { + public Map getSomeTopicConfigs(Set pubSubTopics) { final Map topicConfigs = - getAdminAdapter(null).getSomeTopicConfigs(topicNames); + pubSubDefaultAdminAdapter.get().getSomeTopicConfigs(pubSubTopics); for (Map.Entry topicConfig: topicConfigs.entrySet()) { topicConfigCache.put(topicConfig.getKey(), topicConfig.getValue()); } @@ -574,10 +551,10 @@ public void ensureTopicIsDeletedAndBlock(PubSubTopic pubSubTopic) { logger.info("Deleting topic: {}", pubSubTopic); try { - pubSubWriteOnlyAdminAdapter.get().deleteTopic(pubSubTopic, Duration.ofMillis(kafkaOperationTimeoutMs)); + pubSubDefaultAdminAdapter.get().deleteTopic(pubSubTopic, Duration.ofMillis(pubSubOperationTimeoutMs)); logger.info("Topic: {} has been deleted", pubSubTopic); } catch (PubSubOpTimeoutException e) { - logger.warn("Failed to delete topic: {} after {} ms", pubSubTopic, kafkaOperationTimeoutMs); + logger.warn("Failed to delete topic: {} after {} ms", pubSubTopic, pubSubOperationTimeoutMs); } catch (PubSubTopicDoesNotExistException e) { // No-op. Topic is deleted already, consider this as a successful deletion. } catch (PubSubClientRetriableException | PubSubClientException e) { @@ -586,7 +563,7 @@ public void ensureTopicIsDeletedAndBlock(PubSubTopic pubSubTopic) { } // let's make sure the topic is deleted - if (pubSubWriteOnlyAdminAdapter.get().containsTopic(pubSubTopic)) { + if (pubSubDefaultAdminAdapter.get().containsTopic(pubSubTopic)) { throw new PubSubTopicExistsException("Topic: " + pubSubTopic.getName() + " still exists after deletion"); } } @@ -615,7 +592,7 @@ public void ensureTopicIsDeletedAndBlockWithRetry(PubSubTopic pubSubTopic) { } public synchronized Set listTopics() { - return pubSubReadOnlyAdminAdapter.get().listAllTopics(); + return pubSubDefaultAdminAdapter.get().listAllTopics(); } /** @@ -748,8 +725,7 @@ public synchronized void close() { Utils.closeQuietlyWithErrorLogged(partitionOffsetFetcher); pubSubConsumerAdapterMap.forEach((clusterName, adapter) -> Utils.closeQuietlyWithErrorLogged(adapter)); pubSubAdminAdapterMap.forEach((clusterName, adapter) -> Utils.closeQuietlyWithErrorLogged(adapter)); - pubSubReadOnlyAdminAdapter.ifPresent(Utils::closeQuietlyWithErrorLogged); - pubSubWriteOnlyAdminAdapter.ifPresent(Utils::closeQuietlyWithErrorLogged); + pubSubDefaultAdminAdapter.ifPresent(Utils::closeQuietlyWithErrorLogged); } // For testing only diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/TopicManagerRepository.java b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/TopicManagerRepository.java index c71f2ed5823..3cdaa49c255 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/TopicManagerRepository.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/TopicManagerRepository.java @@ -5,12 +5,8 @@ import static com.linkedin.venice.kafka.TopicManager.DEFAULT_KAFKA_OPERATION_TIMEOUT_MS; import com.linkedin.venice.exceptions.VeniceException; -import com.linkedin.venice.pubsub.PubSubAdminAdapterFactory; import com.linkedin.venice.pubsub.PubSubClientsFactory; -import com.linkedin.venice.pubsub.PubSubConsumerAdapterFactory; import com.linkedin.venice.pubsub.PubSubTopicRepository; -import com.linkedin.venice.pubsub.api.PubSubAdminAdapter; -import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; @@ -18,6 +14,7 @@ import io.tehuti.metrics.MetricsRepository; import java.io.Closeable; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import org.apache.logging.log4j.LogManager; @@ -82,8 +79,7 @@ public static class Builder { private long kafkaOperationTimeoutMs = DEFAULT_KAFKA_OPERATION_TIMEOUT_MS; private long topicDeletionStatusPollIntervalMs = DEFAULT_TOPIC_DELETION_STATUS_POLL_INTERVAL_MS; private long topicMinLogCompactionLagMs = DEFAULT_KAFKA_MIN_LOG_COMPACTION_LAG_MS; - private PubSubAdminAdapterFactory pubSubAdminAdapterFactory; - private PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory; + private PubSubClientsFactory defaultPubSubClientsFactory; private Map pubSubClientsFactoryMap; private PubSubTopicRepository pubSubTopicRepository; private MetricsRepository metricsRepository; @@ -131,18 +127,14 @@ public PubSubTopicRepository getPubSubTopicRepository() { return pubSubTopicRepository; } - public PubSubAdminAdapterFactory getPubSubAdminAdapterFactory() { - return pubSubAdminAdapterFactory; - } - - public PubSubConsumerAdapterFactory getPubSubConsumerAdapterFactory() { - return pubSubConsumerAdapterFactory; - } - public Map getPubSubClientsFactoryMap() { return pubSubClientsFactoryMap; } + public PubSubClientsFactory getPubSubClientsFactory() { + return defaultPubSubClientsFactory; + } + public SSLPropertiesSupplier getPubSubProperties() { return pubSubProperties; } @@ -175,14 +167,8 @@ public Builder setPubSubTopicRepository(PubSubTopicRepository pubSubTopicReposit return set(() -> this.pubSubTopicRepository = pubSubTopicRepository); } - public Builder setPubSubAdminAdapterFactory( - PubSubAdminAdapterFactory pubSubAdminAdapterFactory) { - return set(() -> this.pubSubAdminAdapterFactory = pubSubAdminAdapterFactory); - } - - public Builder setPubSubConsumerAdapterFactory( - PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory) { - return set(() -> this.pubSubConsumerAdapterFactory = pubSubConsumerAdapterFactory); + public Builder setDefaultPubSubClientsFactory(PubSubClientsFactory pubSubClientsFactory) { + return set(() -> this.defaultPubSubClientsFactory = pubSubClientsFactory); } public Builder setPubSubClientsFactoryMap(Map pubSubClientsFactoryMap) { @@ -203,6 +189,6 @@ public interface SSLPropertiesSupplier { } public interface ClusterNameSupplier { - String get(PubSubTopic pubSubTopic); + Optional get(PubSubTopic pubSubTopic); } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/partitionoffset/PartitionOffsetFetcherFactory.java b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/partitionoffset/PartitionOffsetFetcherFactory.java index 5772f36a9cb..163b70147db 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/partitionoffset/PartitionOffsetFetcherFactory.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/partitionoffset/PartitionOffsetFetcherFactory.java @@ -1,6 +1,8 @@ package com.linkedin.venice.kafka.partitionoffset; -import com.linkedin.venice.kafka.TopicManager; +import com.linkedin.venice.pubsub.api.PubSubAdminAdapter; +import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter; +import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.utils.SystemTime; import io.tehuti.metrics.MetricsRepository; import java.util.Optional; @@ -8,8 +10,8 @@ public class PartitionOffsetFetcherFactory { public static PartitionOffsetFetcher createDefaultPartitionOffsetFetcher( - TopicManager.ConsumerSupplier consumerSupplier, - TopicManager.AdminSupplier adminSupplier, + ConsumerSupplier consumerSupplier, + AdminSupplier adminSupplier, String pubSubBootstrapServers, long kafkaOperationTimeoutMs, Optional optionalMetricsRepository) { @@ -29,4 +31,12 @@ public static PartitionOffsetFetcher createDefaultPartitionOffsetFetcher( return partitionOffsetFetcher; } } + + public interface AdminSupplier { + PubSubAdminAdapter get(PubSubTopic pubSubTopic); + } + + public interface ConsumerSupplier { + PubSubConsumerAdapter get(PubSubTopic pubSubTopic); + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/partitionoffset/PartitionOffsetFetcherImpl.java b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/partitionoffset/PartitionOffsetFetcherImpl.java index 57c28498b38..a5bedeb9338 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/partitionoffset/PartitionOffsetFetcherImpl.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/partitionoffset/PartitionOffsetFetcherImpl.java @@ -3,11 +3,11 @@ import static com.linkedin.venice.offsets.OffsetRecord.LOWEST_OFFSET; import com.linkedin.venice.exceptions.VeniceException; -import com.linkedin.venice.kafka.TopicManager; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl; import com.linkedin.venice.pubsub.PubSubTopicPartitionInfo; +import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter; import com.linkedin.venice.pubsub.api.PubSubMessage; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; @@ -16,6 +16,7 @@ import com.linkedin.venice.pubsub.api.exceptions.PubSubTopicDoesNotExistException; import com.linkedin.venice.utils.RetryUtils; import com.linkedin.venice.utils.Utils; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.utils.locks.AutoCloseableLock; import it.unimi.dsi.fastutil.ints.Int2LongMap; import it.unimi.dsi.fastutil.ints.Int2LongMaps; @@ -41,29 +42,34 @@ public class PartitionOffsetFetcherImpl implements PartitionOffsetFetcher { public static final Duration DEFAULT_KAFKA_OFFSET_API_TIMEOUT = Duration.ofMinutes(1); public static final long NO_PRODUCER_TIME_IN_EMPTY_TOPIC_PARTITION = -1; private static final int KAFKA_POLLING_RETRY_MAX_ATTEMPT = 3; - private final TopicManager.ConsumerSupplier consumerSupplier; - private final TopicManager.AdminSupplier adminSupplier; + private final PartitionOffsetFetcherFactory.ConsumerSupplier consumerSupplier; + private final PartitionOffsetFetcherFactory.AdminSupplier adminSupplier; private final Logger logger; - private final Lock adminConsumerLock; - private final Duration kafkaOperationTimeout; + private final Duration pubSubOperationTimeout; + private final Map adminConsumerLocks; public PartitionOffsetFetcherImpl( - TopicManager.ConsumerSupplier consumerSupplier, - TopicManager.AdminSupplier adminSupplier, - long kafkaOperationTimeoutMs, - String kafkaBootstrapServers) { + PartitionOffsetFetcherFactory.ConsumerSupplier consumerSupplier, + PartitionOffsetFetcherFactory.AdminSupplier adminSupplier, + long pubSubOperationTimeoutMs, + String pubSubBootstrapServers) { this.consumerSupplier = consumerSupplier; this.adminSupplier = adminSupplier; - this.adminConsumerLock = new ReentrantLock(); - this.kafkaOperationTimeout = Duration.ofMillis(kafkaOperationTimeoutMs); + this.pubSubOperationTimeout = Duration.ofMillis(pubSubOperationTimeoutMs); + this.adminConsumerLocks = new VeniceConcurrentHashMap<>(); this.logger = - LogManager.getLogger(PartitionOffsetFetcherImpl.class.getSimpleName() + " [" + kafkaBootstrapServers + "]"); + LogManager.getLogger(PartitionOffsetFetcherImpl.class.getSimpleName() + " [" + pubSubBootstrapServers + "]"); + } + + private Lock getAdminConsumerLock(PubSubConsumerAdapter adminConsumer) { + return adminConsumerLocks.computeIfAbsent(adminConsumer, (key) -> new ReentrantLock()); } @Override public Int2LongMap getTopicLatestOffsets(PubSubTopic topic) { - try (AutoCloseableLock ignore = AutoCloseableLock.of(adminConsumerLock)) { - List partitionInfoList = consumerSupplier.get(topic).partitionsFor(topic); + PubSubConsumerAdapter adminConsumer = consumerSupplier.get(topic); + try (AutoCloseableLock ignore = AutoCloseableLock.of(getAdminConsumerLock(adminConsumer))) { + List partitionInfoList = adminConsumer.partitionsFor(topic); if (partitionInfoList == null || partitionInfoList.isEmpty()) { logger.warn("Unexpected! Topic: {} has a null partition set, returning empty map for latest offsets", topic); return Int2LongMaps.EMPTY_MAP; @@ -73,7 +79,7 @@ public Int2LongMap getTopicLatestOffsets(PubSubTopic topic) { .collect(Collectors.toList()); Map offsetsByTopicPartitions = - consumerSupplier.get(topic).endOffsets(topicPartitions, DEFAULT_KAFKA_OFFSET_API_TIMEOUT); + adminConsumer.endOffsets(topicPartitions, DEFAULT_KAFKA_OFFSET_API_TIMEOUT); Int2LongMap offsetsByTopicPartitionIds = new Int2LongOpenHashMap(offsetsByTopicPartitions.size()); for (Map.Entry offsetByTopicPartition: offsetsByTopicPartitions.entrySet()) { offsetsByTopicPartitionIds @@ -89,7 +95,8 @@ private long getLatestOffset(PubSubTopicPartition pubSubTopicPartition) throws P "Cannot retrieve latest offsets for invalid partition " + pubSubTopicPartition.getPartitionNumber()); } PubSubTopic pubSubTopic = pubSubTopicPartition.getPubSubTopic(); - try (AutoCloseableLock ignore = AutoCloseableLock.of(adminConsumerLock)) { + PubSubConsumerAdapter adminConsumer = consumerSupplier.get(pubSubTopic); + try (AutoCloseableLock ignore = AutoCloseableLock.of(getAdminConsumerLock(adminConsumer))) { if (!adminSupplier.get(pubSubTopic) .containsTopicWithPartitionCheckExpectationAndRetry(pubSubTopicPartition, 3, true)) { throw new PubSubTopicDoesNotExistException( @@ -97,8 +104,8 @@ private long getLatestOffset(PubSubTopicPartition pubSubTopicPartition) throws P + pubSubTopicPartition.getPartitionNumber() + " is invalid"); } - Map offsetMap = consumerSupplier.get(pubSubTopic) - .endOffsets(Collections.singletonList(pubSubTopicPartition), DEFAULT_KAFKA_OFFSET_API_TIMEOUT); + Map offsetMap = + adminConsumer.endOffsets(Collections.singletonList(pubSubTopicPartition), DEFAULT_KAFKA_OFFSET_API_TIMEOUT); Long offset = offsetMap.get(pubSubTopicPartition); if (offset != null) { return offset; @@ -142,7 +149,8 @@ private long getEndOffset( @Override public long getPartitionOffsetByTime(PubSubTopicPartition pubSubTopicPartition, long timestamp) { - try (AutoCloseableLock ignore = AutoCloseableLock.of(adminConsumerLock)) { + PubSubConsumerAdapter adminConsumer = consumerSupplier.get(pubSubTopicPartition.getPubSubTopic()); + try (AutoCloseableLock ignore = AutoCloseableLock.of(getAdminConsumerLock(adminConsumer))) { Long result = offsetsForTimesWithRetry(pubSubTopicPartition, timestamp); if (result == null) { result = getOffsetByTimeIfOutOfRange(pubSubTopicPartition, timestamp); @@ -156,10 +164,11 @@ public long getPartitionOffsetByTime(PubSubTopicPartition pubSubTopicPartition, } private Long offsetsForTimesWithRetry(PubSubTopicPartition pubSubTopicPartition, long timestamp) { - try (AutoCloseableLock ignore = AutoCloseableLock.of(adminConsumerLock)) { - PubSubTopic pubSubTopic = pubSubTopicPartition.getPubSubTopic(); + PubSubTopic pubSubTopic = pubSubTopicPartition.getPubSubTopic(); + PubSubConsumerAdapter adminConsumer = consumerSupplier.get(pubSubTopic); + try (AutoCloseableLock ignore = AutoCloseableLock.of(getAdminConsumerLock(adminConsumer))) { Long topicPartitionOffset = RetryUtils.executeWithMaxAttemptAndExponentialBackoff( - () -> consumerSupplier.get(pubSubTopic).offsetForTime(pubSubTopicPartition, timestamp, kafkaOperationTimeout), + () -> adminConsumer.offsetForTime(pubSubTopicPartition, timestamp, pubSubOperationTimeout), 25, Duration.ofMillis(100), Duration.ofSeconds(5), @@ -169,11 +178,12 @@ private Long offsetsForTimesWithRetry(PubSubTopicPartition pubSubTopicPartition, } } - private Long endOffsetsWithRetry(PubSubTopicPartition partition) { - try (AutoCloseableLock ignore = AutoCloseableLock.of(adminConsumerLock)) { - PubSubTopic pubSubTopic = partition.getPubSubTopic(); + private Long endOffsetsWithRetry(PubSubTopicPartition pubSubTopicPartition) { + PubSubTopic pubSubTopic = pubSubTopicPartition.getPubSubTopic(); + PubSubConsumerAdapter adminConsumer = consumerSupplier.get(pubSubTopic); + try (AutoCloseableLock ignore = AutoCloseableLock.of(getAdminConsumerLock(adminConsumer))) { Long topicPartitionOffset = RetryUtils.executeWithMaxAttemptAndExponentialBackoff( - () -> consumerSupplier.get(pubSubTopic).endOffset(partition), + () -> adminConsumer.endOffset(pubSubTopicPartition), 25, Duration.ofMillis(100), Duration.ofSeconds(5), @@ -304,14 +314,15 @@ private List> consumeLatestR "Last record count must be greater than or equal to 1. Got: " + lastRecordsCount); } PubSubTopic pubSubTopic = pubSubTopicPartition.getPubSubTopic(); - try (AutoCloseableLock ignore = AutoCloseableLock.of(adminConsumerLock)) { + PubSubConsumerAdapter adminConsumer = consumerSupplier.get(pubSubTopic); + try (AutoCloseableLock ignore = AutoCloseableLock.of(getAdminConsumerLock(adminConsumer))) { if (!adminSupplier.get(pubSubTopic) .containsTopicWithExpectationAndRetry(pubSubTopicPartition.getPubSubTopic(), 3, true)) { throw new PubSubTopicDoesNotExistException(pubSubTopicPartition.getPubSubTopic()); } try { - Map offsetByTopicPartition = consumerSupplier.get(pubSubTopic) - .endOffsets(Collections.singletonList(pubSubTopicPartition), DEFAULT_KAFKA_OFFSET_API_TIMEOUT); + Map offsetByTopicPartition = + adminConsumer.endOffsets(Collections.singletonList(pubSubTopicPartition), DEFAULT_KAFKA_OFFSET_API_TIMEOUT); if (offsetByTopicPartition == null || !offsetByTopicPartition.containsKey(pubSubTopicPartition)) { throw new VeniceException( "Got no results of finding end offsets for topic partition: " + pubSubTopicPartition); @@ -322,8 +333,7 @@ private List> consumeLatestR // Empty topic return Collections.emptyList(); } else { - Long earliestOffset = - consumerSupplier.get(pubSubTopic).beginningOffset(pubSubTopicPartition, DEFAULT_KAFKA_OFFSET_API_TIMEOUT); + Long earliestOffset = adminConsumer.beginningOffset(pubSubTopicPartition, DEFAULT_KAFKA_OFFSET_API_TIMEOUT); if (earliestOffset == null) { throw new VeniceException( "Got no results of finding the earliest offset for topic partition: " + pubSubTopicPartition); @@ -334,7 +344,7 @@ private List> consumeLatestR } else { // poll the last message and retrieve the producer timestamp final long startConsumeOffset = Math.max(latestOffset - lastRecordsCount, earliestOffset); - consumerSupplier.get(pubSubTopic).subscribe(pubSubTopicPartition, startConsumeOffset - 1); + adminConsumer.subscribe(pubSubTopicPartition, startConsumeOffset - 1); List> allConsumedRecords = new ArrayList<>(lastRecordsCount); @@ -356,7 +366,7 @@ private List> consumeLatestR KAFKA_POLLING_RETRY_MAX_ATTEMPT); oneBatchConsumedRecords = - consumerSupplier.get(pubSubTopic).poll(kafkaOperationTimeout.toMillis()).get(pubSubTopicPartition); + adminConsumer.poll(pubSubOperationTimeout.toMillis()).get(pubSubTopicPartition); } if (oneBatchConsumedRecords.isEmpty()) { /** @@ -379,14 +389,15 @@ private List> consumeLatestR } } } finally { - consumerSupplier.get(pubSubTopic).unSubscribe(pubSubTopicPartition); + adminConsumer.unSubscribe(pubSubTopicPartition); } } } @Override public List partitionsFor(PubSubTopic topic) { - try (AutoCloseableLock ignore = AutoCloseableLock.of(adminConsumerLock)) { + PubSubConsumerAdapter adminConsumer = consumerSupplier.get(topic); + try (AutoCloseableLock ignore = AutoCloseableLock.of(getAdminConsumerLock(adminConsumer))) { return consumerSupplier.get(topic).partitionsFor(topic); } } @@ -395,7 +406,8 @@ public List partitionsFor(PubSubTopic topic) { public long getOffsetByTimeIfOutOfRange(PubSubTopicPartition pubSubTopicPartition, long timestamp) throws PubSubTopicDoesNotExistException { PubSubTopic pubSubTopic = pubSubTopicPartition.getPubSubTopic(); - try (AutoCloseableLock ignore = AutoCloseableLock.of(adminConsumerLock)) { + PubSubConsumerAdapter adminConsumer = consumerSupplier.get(pubSubTopic); + try (AutoCloseableLock ignore = AutoCloseableLock.of(getAdminConsumerLock(adminConsumer))) { long latestOffset = getLatestOffset(pubSubTopicPartition); if (latestOffset <= 0) { long nextOffset = LOWEST_OFFSET + 1; @@ -417,7 +429,7 @@ public long getOffsetByTimeIfOutOfRange(PubSubTopicPartition pubSubTopicPartitio } try { - consumerSupplier.get(pubSubTopic).subscribe(pubSubTopicPartition, latestOffset - 2); + adminConsumer.subscribe(pubSubTopicPartition, latestOffset - 2); Map>> records = new HashMap<>(); /** * We should retry to get the last record from that topic/partition, never return 0L here because 0L offset @@ -432,7 +444,7 @@ public long getOffsetByTimeIfOutOfRange(PubSubTopicPartition pubSubTopicPartitio latestOffset - 1, attempts, KAFKA_POLLING_RETRY_MAX_ATTEMPT); - records = consumerSupplier.get(pubSubTopic).poll(kafkaOperationTimeout.toMillis()); + records = adminConsumer.poll(pubSubOperationTimeout.toMillis()); } if (records.isEmpty()) { /** @@ -483,7 +495,7 @@ public long getOffsetByTimeIfOutOfRange(PubSubTopicPartition pubSubTopicPartitio timestamp); return resultOffset; } finally { - consumerSupplier.get(pubSubTopic).unSubscribe(pubSubTopicPartition); + adminConsumer.unSubscribe(pubSubTopicPartition); } } } @@ -493,7 +505,8 @@ public long getOffsetByTimeIfOutOfRange(PubSubTopicPartition pubSubTopicPartitio */ private long getEarliestOffset(PubSubTopicPartition pubSubTopicPartition) throws PubSubTopicDoesNotExistException { PubSubTopic pubSubTopic = pubSubTopicPartition.getPubSubTopic(); - try (AutoCloseableLock ignore = AutoCloseableLock.of(adminConsumerLock)) { + PubSubConsumerAdapter adminConsumer = consumerSupplier.get(pubSubTopic); + try (AutoCloseableLock ignore = AutoCloseableLock.of(getAdminConsumerLock(adminConsumer))) { if (!adminSupplier.get(pubSubTopic) .containsTopicWithExpectationAndRetry(pubSubTopicPartition.getPubSubTopic(), 3, true)) { throw new PubSubTopicDoesNotExistException( @@ -503,8 +516,7 @@ private long getEarliestOffset(PubSubTopicPartition pubSubTopicPartition) throws throw new IllegalArgumentException( "Cannot retrieve latest offsets for invalid partition " + pubSubTopicPartition.getPartitionNumber()); } - Long offset = - consumerSupplier.get(pubSubTopic).beginningOffset(pubSubTopicPartition, DEFAULT_KAFKA_OFFSET_API_TIMEOUT); + Long offset = adminConsumer.beginningOffset(pubSubTopicPartition, DEFAULT_KAFKA_OFFSET_API_TIMEOUT); if (offset == null) { throw new VeniceException( "offset result returned from beginningOffsets does not contain entry: " + pubSubTopicPartition); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java index f043cfc58e0..7518fb7348b 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java @@ -3,6 +3,7 @@ import com.linkedin.venice.common.VeniceSystemStoreUtils; import com.linkedin.venice.meta.Version; import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.api.exceptions.PubSubClientException; import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; @@ -11,6 +12,7 @@ import com.linkedin.venice.writer.VeniceWriterFactory; import com.linkedin.venice.writer.VeniceWriterOptions; import java.util.Map; +import java.util.Optional; import org.apache.avro.Schema; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -51,8 +53,11 @@ public VeniceWriter prepareVeniceWriter(String storeName) { .setChunkingEnabled(false) .setPartitionCount(1) .build(); - String clusterName = clusterNameSupplier.get(pubSubTopicRepository.getTopic(rtTopic)); - return writerFactoryMap.get(clusterName).createVeniceWriter(options); + Optional clusterName = clusterNameSupplier.get(pubSubTopicRepository.getTopic(rtTopic)); + if (!clusterName.isPresent()) { + throw new PubSubClientException("Failed to get cluster name for topic: " + rtTopic); + } + return writerFactoryMap.get(clusterName.get()).createVeniceWriter(options); }); } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java index 087839bc4e9..8a5b19009cc 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java @@ -171,6 +171,6 @@ public void close() { } public interface ClusterNameSupplier { - String get(PubSubTopic pubSubTopic); + Optional get(PubSubTopic pubSubTopic); } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java index 97c6e2675c5..8edc7399a78 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java @@ -11,6 +11,7 @@ import com.linkedin.venice.meta.ZKStore; import com.linkedin.venice.pubsub.PubSubTopicRepository; import com.linkedin.venice.pubsub.api.PubSubTopic; +import com.linkedin.venice.pubsub.api.exceptions.PubSubClientException; import com.linkedin.venice.pushmonitor.ExecutionStatus; import com.linkedin.venice.schema.GeneratedSchemaID; import com.linkedin.venice.schema.SchemaEntry; @@ -38,6 +39,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.function.Supplier; @@ -435,8 +437,11 @@ VeniceWriter getOrCreateMetaStoreWriter(String metaStoreName) { .setChunkingEnabled(false) .setPartitionCount(1) .build(); - - return writerFactoryMap.get(clusterNameSupplier.get(rtTopic)).createVeniceWriter(options); + Optional clusterName = clusterNameSupplier.get(rtTopic); + if (!clusterName.isPresent()) { + throw new PubSubClientException("Failed to get cluster name for topic: " + rtTopic); + } + return writerFactoryMap.get(clusterName.get()).createVeniceWriter(options); }); } @@ -491,6 +496,6 @@ public void close() throws IOException { } public interface ClusterNameSupplier { - String get(PubSubTopic pubSubTopic); + Optional get(PubSubTopic pubSubTopic); } } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/kafka/TopicManagerTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/kafka/TopicManagerTest.java index 8210804829b..d276910c776 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/kafka/TopicManagerTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/kafka/TopicManagerTest.java @@ -66,6 +66,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -112,24 +113,21 @@ protected void createTopicManager() { PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory = mock(PubSubConsumerAdapterFactory.class); doReturn(mockInMemoryConsumer).when(pubSubConsumerAdapterFactory).create(any(), anyBoolean(), any(), anyString()); PubSubProducerAdapterFactory pubSubProducerAdapterFactory = mock(PubSubProducerAdapterFactory.class); - Map pubSubClientsFactoryMap = Collections.singletonMap( - clusterName, - new PubSubClientsFactory( - pubSubProducerAdapterFactory, - pubSubConsumerAdapterFactory, - pubSubAdminAdapterFactory)); + PubSubClientsFactory pubSubClientsFactory = + new PubSubClientsFactory(pubSubProducerAdapterFactory, pubSubConsumerAdapterFactory, pubSubAdminAdapterFactory); + Map pubSubClientsFactoryMap = + Collections.singletonMap(clusterName, pubSubClientsFactory); topicManager = TopicManagerRepository.builder() .setPubSubProperties(k -> VeniceProperties.empty()) .setPubSubTopicRepository(pubSubTopicRepository) .setLocalKafkaBootstrapServers("localhost:1234") - .setPubSubConsumerAdapterFactory(pubSubConsumerAdapterFactory) - .setPubSubAdminAdapterFactory(pubSubAdminAdapterFactory) + .setDefaultPubSubClientsFactory(pubSubClientsFactory) .setKafkaOperationTimeoutMs(500L) .setTopicDeletionStatusPollIntervalMs(100L) .setTopicMinLogCompactionLagMs(MIN_COMPACTION_LAG) .setPubSubClientsFactoryMap(pubSubClientsFactoryMap) - .setClusterNameSupplier(s -> clusterName) + .setClusterNameSupplier(s -> Optional.of(clusterName)) .build() .getTopicManager(); } @@ -545,6 +543,8 @@ public void testTimeoutOnGettingMaxOffset() throws IOException { doReturn(mockPubSubConsumer).when(consumerAdapterFactory).create(any(), anyBoolean(), any(), anyString()); doReturn(mockPubSubAdminAdapter).when(adminAdapterFactory).create(any(), eq(pubSubTopicRepository)); Map pubSubClientsFactoryMap = new HashMap<>(); + PubSubClientsFactory pubSubClientsFactory = + new PubSubClientsFactory(producerAdapterFactory, consumerAdapterFactory, adminAdapterFactory); pubSubClientsFactoryMap.put( clusterName, new PubSubClientsFactory(producerAdapterFactory, consumerAdapterFactory, adminAdapterFactory)); @@ -553,13 +553,12 @@ public void testTimeoutOnGettingMaxOffset() throws IOException { .setPubSubProperties(k -> VeniceProperties.empty()) .setPubSubTopicRepository(pubSubTopicRepository) .setLocalKafkaBootstrapServers(localPubSubBrokerAddress) - .setPubSubAdminAdapterFactory(adminAdapterFactory) - .setPubSubConsumerAdapterFactory(consumerAdapterFactory) + .setDefaultPubSubClientsFactory(pubSubClientsFactory) .setKafkaOperationTimeoutMs(DEFAULT_KAFKA_OPERATION_TIMEOUT_MS) .setTopicDeletionStatusPollIntervalMs(100) .setTopicMinLogCompactionLagMs(MIN_COMPACTION_LAG) .setPubSubClientsFactoryMap(pubSubClientsFactoryMap) - .setClusterNameSupplier(s -> clusterName) + .setClusterNameSupplier(s -> Optional.of(clusterName)) .build() .getTopicManager()) { Assert.assertThrows( diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java index 348c507c3dc..0c3469f2af4 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java @@ -273,7 +273,7 @@ public void testIncrementalPushStatusReadingFromPushStatusStoreInController() th clusterName, cluster.getLeaderVeniceController().getVeniceHelixAdmin().getVeniceWriterFactory()); PushStatusStoreRecordDeleter statusStoreDeleter = - new PushStatusStoreRecordDeleter(writerFactoryMap, s -> clusterName); + new PushStatusStoreRecordDeleter(writerFactoryMap, s -> Optional.of(clusterName)); // After deleting the inc push status belonging to just one partition we should expect // SOIP from the controller since other partition has replicas with EOIP status diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/kafka/TopicManagerIntegrationTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/kafka/TopicManagerIntegrationTest.java index 9feaee1a2ff..650dc6b3314 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/kafka/TopicManagerIntegrationTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/kafka/TopicManagerIntegrationTest.java @@ -39,7 +39,7 @@ protected void createTopicManager() { topicManager = IntegrationTestPushUtils .getTopicManagerRepo( - 500L, + 1000L, 100L, MIN_COMPACTION_LAG, pubSubBrokerWrapper, diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/utils/IntegrationTestPushUtils.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/utils/IntegrationTestPushUtils.java index 4ecf0e8230b..7df06a4e9ac 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/utils/IntegrationTestPushUtils.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/utils/IntegrationTestPushUtils.java @@ -378,13 +378,12 @@ public static TopicManagerRepository getTopicManagerRepo( .setPubSubProperties(k -> new VeniceProperties(properties)) .setPubSubTopicRepository(pubSubTopicRepository) .setLocalKafkaBootstrapServers(pubSubBootstrapServers) - .setPubSubConsumerAdapterFactory(pubSubBrokerWrapper.getPubSubClientsFactory().getConsumerAdapterFactory()) - .setPubSubAdminAdapterFactory(pubSubBrokerWrapper.getPubSubClientsFactory().getAdminAdapterFactory()) + .setDefaultPubSubClientsFactory(pubSubBrokerWrapper.getPubSubClientsFactory()) .setPubSubClientsFactoryMap(pubSubClientsFactoryMap) .setKafkaOperationTimeoutMs(kafkaOperationTimeoutMs) .setTopicDeletionStatusPollIntervalMs(topicDeletionStatusPollIntervalMs) .setTopicMinLogCompactionLagMs(topicMinLogCompactionLagMs) - .setClusterNameSupplier(s -> clusterName) + .setClusterNameSupplier(s -> Optional.of(clusterName)) .build(); } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/utils/TestDictionaryUtils.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/utils/TestDictionaryUtils.java index 816d5c669a5..aa875a8c1cf 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/utils/TestDictionaryUtils.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/utils/TestDictionaryUtils.java @@ -41,8 +41,6 @@ public class TestDictionaryUtils { private PubSubProducerAdapterFactory pubSubProducerAdapterFactory; private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); - private final String clusterName = "test_cluster"; - private String getTopic() { String callingFunction = Thread.currentThread().getStackTrace()[2].getMethodName(); PubSubTopic pubSubTopic = @@ -68,6 +66,7 @@ public void setUp() { mockTime = new TestMockTime(); pubSubBrokerWrapper = ServiceFactory.getPubSubBroker( new PubSubBrokerConfigs.Builder().setMockTime(mockTime).setRegionName(STANDALONE_REGION_NAME).build()); + String clusterName = "test_cluster"; manager = IntegrationTestPushUtils .getTopicManagerRepo( diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/writer/VeniceWriterTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/writer/VeniceWriterTest.java index dbff9b9b5f8..74f418d6f3e 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/writer/VeniceWriterTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/writer/VeniceWriterTest.java @@ -48,12 +48,12 @@ public class VeniceWriterTest { private TopicManager topicManager; private PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory; private PubSubProducerAdapterFactory pubSubProducerAdapterFactory; - private final String clusterName = "test_cluster"; private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); @BeforeClass public void setUp() { + String clusterName = "test_cluster"; pubSubBrokerWrapper = ServiceFactory.getPubSubBroker(); pubSubConsumerAdapterFactory = pubSubBrokerWrapper.getPubSubClientsFactory().getConsumerAdapterFactory(); pubSubProducerAdapterFactory = pubSubBrokerWrapper.getPubSubClientsFactory().getProducerAdapterFactory(); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 2970ecec673..a09a572d2a7 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -517,8 +517,7 @@ public VeniceHelixAdmin( .setTopicMinLogCompactionLagMs(DEFAULT_KAFKA_MIN_LOG_COMPACTION_LAG_MS) .setKafkaOperationTimeoutMs(DEFAULT_KAFKA_OPERATION_TIMEOUT_MS) .setPubSubProperties(this::getPubSubSSLPropertiesFromControllerConfig) - .setPubSubAdminAdapterFactory(pubSubClientsFactory.getAdminAdapterFactory()) - .setPubSubConsumerAdapterFactory(veniceConsumerFactory) + .setDefaultPubSubClientsFactory(pubSubClientsFactory) .setPubSubClientsFactoryMap(pubSubClientsFactoryMap) .setClusterNameSupplier(this::discoverClusterFromStore) .build(); @@ -698,29 +697,25 @@ public void handleDeletedInstances(Set deletedInstances) { } } - private String discoverClusterFromStore(PubSubTopic pubSubTopic) { + private Optional discoverClusterFromStore(PubSubTopic pubSubTopic) { if (pubSubTopic.getPubSubTopicType().equals(PubSubTopicType.ADMIN_TOPIC)) { - return AdminTopicUtils.getClusterNameFromTopicName(pubSubTopic.getName()); + return Optional.of(AdminTopicUtils.getClusterNameFromTopicName(pubSubTopic.getName())); } String storeName = pubSubTopic.getStoreName(); if (VeniceSystemStoreUtils.isSystemStore(storeName)) { - String clusterName; - if (storeName.equals(AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE.getSystemStoreName())) { - clusterName = multiClusterConfigs.getSystemSchemaClusterName(); - return clusterName; - } else if (storeName.equals(AvroProtocolDefinition.PUSH_STATUS_SYSTEM_SCHEMA_STORE.getSystemStoreName())) { - clusterName = multiClusterConfigs.getSystemSchemaClusterName(); - return clusterName; + if (storeName.equals(AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE.getSystemStoreName()) + || storeName.equals(AvroProtocolDefinition.PUSH_STATUS_SYSTEM_SCHEMA_STORE.getSystemStoreName())) { + return Optional.of(multiClusterConfigs.getSystemSchemaClusterName()); } } try { String clusterName = discoverCluster(pubSubTopic.getStoreName()).getFirst(); - LOGGER.info("Discovered cluster {} from store for topic {}", clusterName, pubSubTopic); - return clusterName; + LOGGER.debug("Discovered cluster {} from store for topic {}", clusterName, pubSubTopic); + return Optional.of(clusterName); } catch (VeniceNoStoreException e) { - LOGGER.error("Failed to discover cluster from store for topic {}", pubSubTopic, e); - return null; + LOGGER.debug("Failed to discover cluster from store for topic {}", pubSubTopic, e); + return Optional.empty(); } }