From 682cf6503943212bc66832e1d5521a96d890ba4f Mon Sep 17 00:00:00 2001 From: Koorous Vargha Date: Wed, 13 Nov 2024 15:44:37 -0800 Subject: [PATCH 01/11] Drop storage partitions gracefully --- .../com/linkedin/davinci/DaVinciBackend.java | 1 + .../ingestion/DefaultIngestionBackend.java | 4 ++- .../isolated/IsolatedIngestionServer.java | 1 + .../ActiveActiveStoreIngestionTask.java | 3 ++ .../kafka/consumer/ConsumerActionType.java | 2 +- .../consumer/KafkaStoreIngestionService.java | 30 +++++++++++++++++++ .../LeaderFollowerStoreIngestionTask.java | 3 ++ .../kafka/consumer/StoreIngestionTask.java | 24 +++++++++++++-- .../consumer/StoreIngestionTaskFactory.java | 4 +++ .../ActiveActiveStoreIngestionTaskTest.java | 3 ++ .../consumer/StoreIngestionTaskTest.java | 25 ++++++++++++++++ .../linkedin/venice/server/VeniceServer.java | 1 + 12 files changed, 96 insertions(+), 5 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index 494a73c0ab..ea8c156624 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -252,6 +252,7 @@ public DaVinciBackend( .map(objectCacheConfig -> new ObjectCacheBackend(clientConfig, objectCacheConfig, schemaRepository)); ingestionService = new KafkaStoreIngestionService( + storageService, storageService.getStorageEngineRepository(), configLoader, storageMetadataService, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java index a43ed1decf..f1e633d1fb 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java @@ -145,8 +145,10 @@ public void dropStoragePartitionGracefully( final int waitIntervalInSecond = 1; final int maxRetry = timeoutInSeconds / waitIntervalInSecond; getStoreIngestionService().stopConsumptionAndWait(storeConfig, partition, waitIntervalInSecond, maxRetry, true); + getStoreIngestionService().dropStoragePartitionGracefully(storeConfig, partition); + // Drops corresponding data partition from storage. - this.storageService.dropStorePartition(storeConfig, partition, removeEmptyStorageEngine); + // this.storageService.dropStorePartition(storeConfig, partition, removeEmptyStorageEngine); } @Override diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServer.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServer.java index 2456fcfe31..e15eb80753 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServer.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServer.java @@ -695,6 +695,7 @@ private void initializeIsolatedIngestionServer() { // Create KafkaStoreIngestionService storeIngestionService = new KafkaStoreIngestionService( + storageService, storageService.getStorageEngineRepository(), configLoader, storageMetadataService, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java index a4f1a17095..90bc6598bf 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java @@ -15,6 +15,7 @@ import com.linkedin.davinci.replication.merge.RmdSerDe; import com.linkedin.davinci.replication.merge.StringAnnotatedStoreSchemaCache; import com.linkedin.davinci.stats.AggVersionedIngestionStats; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.davinci.storage.chunking.ChunkedValueManifestContainer; import com.linkedin.davinci.storage.chunking.RawBytesChunkingAdapter; import com.linkedin.davinci.storage.chunking.SingleGetChunkingAdapter; @@ -103,6 +104,7 @@ private static class ReusableObjects { private final ThreadLocal threadLocalReusableObjects = ThreadLocal.withInitial(ReusableObjects::new); public ActiveActiveStoreIngestionTask( + StorageService storageService, StoreIngestionTaskFactory.Builder builder, Store store, Version version, @@ -114,6 +116,7 @@ public ActiveActiveStoreIngestionTask( Optional cacheBackend, DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) { super( + storageService, builder, store, version, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumerActionType.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumerActionType.java index 398191044d..8aaca3c07f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumerActionType.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumerActionType.java @@ -4,7 +4,7 @@ * An Enum enumerating all valid types of {@link ConsumerAction}. */ public enum ConsumerActionType { - SUBSCRIBE(1), UNSUBSCRIBE(1), RESET_OFFSET(1), PAUSE(1), RESUME(1), + SUBSCRIBE(1), UNSUBSCRIBE(1), RESET_OFFSET(1), PAUSE(1), RESUME(1), DROP_PARTITION(1), /** * KILL action has higher priority than others, so that once KILL action is added to the action queue, * we will process it immediately to avoid doing throw-away works. diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java index ba3bb10c1a..70509915c6 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java @@ -31,6 +31,7 @@ import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService; import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend; import com.linkedin.davinci.store.view.VeniceViewWriterFactory; import com.linkedin.venice.SSLConfig; @@ -130,6 +131,7 @@ public class KafkaStoreIngestionService extends AbstractVeniceService implements private static final String GROUP_ID_FORMAT = "%s_%s"; private static final Logger LOGGER = LogManager.getLogger(KafkaStoreIngestionService.class); + private final StorageService storageService; private final VeniceConfigLoader veniceConfigLoader; @@ -190,6 +192,7 @@ public class KafkaStoreIngestionService extends AbstractVeniceService implements private final ExecutorService aaWCWorkLoadProcessingThreadPool; public KafkaStoreIngestionService( + StorageService storageService, StorageEngineRepository storageEngineRepository, VeniceConfigLoader veniceConfigLoader, StorageMetadataService storageMetadataService, @@ -212,6 +215,7 @@ public KafkaStoreIngestionService( PubSubClientsFactory pubSubClientsFactory, Optional sslFactory, HeartbeatMonitoringService heartbeatMonitoringService) { + this.storageService = storageService; this.cacheBackend = cacheBackend; this.recordTransformerFunction = recordTransformerFunction; this.storageMetadataService = storageMetadataService; @@ -519,6 +523,7 @@ private StoreIngestionTask createStoreIngestionTask( }; return ingestionTaskFactory.getNewIngestionTask( + storageService, store, version, getKafkaConsumerProperties(veniceStoreVersionConfig), @@ -920,6 +925,31 @@ public void stopConsumptionAndWait( } } + /** + * Drops the corresponding Venice Partition gracefully. + * This should only be called after {@link #stopConsumptionAndWait} has been called + * @param veniceStore Venice Store for the partition. + * @param partitionId Venice partition's id. + */ + public CompletableFuture dropStoragePartitionGracefully(VeniceStoreVersionConfig veniceStore, int partitionId) { + final String topic = veniceStore.getStoreVersionName(); + + if (isPartitionConsuming(topic, partitionId)) { + throw new VeniceException("Tried to drop storage partition that is still consuming"); + } + + try (AutoCloseableLock ignore = topicLockManager.getLockForResource(topic)) { + StoreIngestionTask ingestionTask = topicNameToIngestionTaskMap.get(topic); + if (ingestionTask != null && ingestionTask.isRunning()) { + return ingestionTask + .dropPartition(new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(topic), partitionId)); + } else { + LOGGER.warn("Ignoring drop partition message for Topic {} Partition {}", topic, partitionId); + return CompletableFuture.completedFuture(null); + } + } + } + /** * This function will try to kill the ingestion tasks belonging to non-current versions. * And this is mainly being used by memory limiter feature to free up resources when encountering memory diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index 76042645a2..9ebff318fa 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -23,6 +23,7 @@ import com.linkedin.davinci.schema.merge.CollectionTimestampMergeRecordHelper; import com.linkedin.davinci.schema.merge.MergeRecordHelper; import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.davinci.storage.chunking.ChunkedValueManifestContainer; import com.linkedin.davinci.storage.chunking.GenericRecordChunkingAdapter; import com.linkedin.davinci.store.AbstractStorageEngine; @@ -204,6 +205,7 @@ public class LeaderFollowerStoreIngestionTask extends StoreIngestionTask { private final Version version; public LeaderFollowerStoreIngestionTask( + StorageService storageService, StoreIngestionTaskFactory.Builder builder, Store store, Version version, @@ -215,6 +217,7 @@ public LeaderFollowerStoreIngestionTask( Optional cacheBackend, DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) { super( + storageService, builder, store, version, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 8ac571b5d1..33c289e4de 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -2,9 +2,7 @@ import static com.linkedin.davinci.ingestion.LagType.OFFSET_LAG; import static com.linkedin.davinci.ingestion.LagType.TIME_LAG; -import static com.linkedin.davinci.kafka.consumer.ConsumerActionType.RESET_OFFSET; -import static com.linkedin.davinci.kafka.consumer.ConsumerActionType.SUBSCRIBE; -import static com.linkedin.davinci.kafka.consumer.ConsumerActionType.UNSUBSCRIBE; +import static com.linkedin.davinci.kafka.consumer.ConsumerActionType.*; import static com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType.LEADER; import static com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType.STANDBY; import static com.linkedin.davinci.validation.KafkaDataIntegrityValidator.DISABLED; @@ -34,6 +32,7 @@ import com.linkedin.davinci.stats.HostLevelIngestionStats; import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.davinci.store.AbstractStorageEngine; import com.linkedin.davinci.store.StoragePartitionConfig; import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend; @@ -189,6 +188,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); /** storage destination for consumption */ + protected final StorageService storageService; protected final StorageEngineRepository storageEngineRepository; protected final AbstractStorageEngine storageEngine; @@ -344,6 +344,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { protected final ExecutorService parallelProcessingThreadPool; public StoreIngestionTask( + StorageService storageService, StoreIngestionTaskFactory.Builder builder, Store store, Version version, @@ -361,6 +362,7 @@ public StoreIngestionTask( this.databaseSyncBytesIntervalForTransactionalMode = storeConfig.getDatabaseSyncBytesIntervalForTransactionalMode(); this.databaseSyncBytesIntervalForDeferredWriteMode = storeConfig.getDatabaseSyncBytesIntervalForDeferredWriteMode(); this.kafkaProps = kafkaConsumerProperties; + this.storageService = storageService; this.storageEngineRepository = builder.getStorageEngineRepository(); this.storageMetadataService = builder.getStorageMetadataService(); this.storeRepository = builder.getMetadataRepo(); @@ -630,6 +632,18 @@ public synchronized CompletableFuture unSubscribePartition( return consumerAction.getFuture(); } + /** + * + * Adds an asynchronous partition drop request for the task. + * This is always a Helix triggered action + */ + public synchronized CompletableFuture dropPartition(PubSubTopicPartition topicPartition) { + throwIfNotRunning(); + ConsumerAction consumerAction = new ConsumerAction(DROP_PARTITION, topicPartition, nextSeqNum(), true); + consumerActionsQueue.add(consumerAction); + return consumerAction.getFuture(); + } + public boolean hasAnySubscription() { return !partitionConsumptionStateMap.isEmpty(); } @@ -2135,6 +2149,10 @@ protected void processCommonConsumerAction(ConsumerAction consumerAction) throws LOGGER.info("Kill this consumer task for Topic: {}", topic); // Throw the exception here to break the consumption loop, and then this task is marked as error status. throw new VeniceIngestionTaskKilledException(KILLED_JOB_MESSAGE + topic); + case DROP_PARTITION: + LOGGER.info("{} Dropping partition: {}", ingestionTaskName, topicPartition); + this.storageService.dropStorePartition(storeConfig, partition, true); + LOGGER.info("{} Dropped partition: {}", ingestionTaskName, topicPartition); default: throw new UnsupportedOperationException(operation.name() + " is not supported in " + getClass().getName()); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskFactory.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskFactory.java index 6a48231c3c..4c1ef625fa 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskFactory.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskFactory.java @@ -11,6 +11,7 @@ import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService; import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend; import com.linkedin.davinci.store.view.VeniceViewWriterFactory; import com.linkedin.venice.kafka.protocol.state.PartitionState; @@ -43,6 +44,7 @@ private StoreIngestionTaskFactory(Builder builder) { } public StoreIngestionTask getNewIngestionTask( + StorageService storageService, Store store, Version version, Properties kafkaConsumerProperties, @@ -54,6 +56,7 @@ public StoreIngestionTask getNewIngestionTask( DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) { if (version.isActiveActiveReplicationEnabled()) { return new ActiveActiveStoreIngestionTask( + storageService, builder, store, version, @@ -66,6 +69,7 @@ public StoreIngestionTask getNewIngestionTask( recordTransformerFunction); } return new LeaderFollowerStoreIngestionTask( + storageService, builder, store, version, diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java index fa8e79576a..3311fc95c9 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java @@ -28,6 +28,7 @@ import com.linkedin.davinci.stats.AggVersionedIngestionStats; import com.linkedin.davinci.stats.HostLevelIngestionStats; import com.linkedin.davinci.storage.StorageEngineRepository; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.davinci.storage.chunking.ChunkedValueManifestContainer; import com.linkedin.davinci.storage.chunking.ChunkingUtils; import com.linkedin.davinci.store.AbstractStorageEngine; @@ -231,6 +232,7 @@ public void testisReadyToServeAnnouncedWithRTLag() { Version mockVersion = new VersionImpl(STORE_NAME, 1, PUSH_JOB_ID); mockVersion.setHybridStoreConfig(hybridStoreConfig); + StorageService storageService = mock(StorageService.class); Store store = new ZKStore( STORE_NAME, "Felix", @@ -250,6 +252,7 @@ public void testisReadyToServeAnnouncedWithRTLag() { VeniceStoreVersionConfig storeVersionConfig = new VeniceStoreVersionConfig(STORE_NAME + "_v1", new VeniceProperties(kafkaConsumerProperties)); ActiveActiveStoreIngestionTask ingestionTask = new ActiveActiveStoreIngestionTask( + storageService, builder, store, mockVersion, diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index 4966daf8dc..f18609dfe7 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -96,6 +96,7 @@ import com.linkedin.davinci.stats.KafkaConsumerServiceStats; import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.davinci.store.AbstractStorageEngine; import com.linkedin.davinci.store.AbstractStorageIterator; import com.linkedin.davinci.store.AbstractStoragePartition; @@ -818,6 +819,7 @@ private void runTest( true, aaConfig, storeVersionConfigOverride); + StorageService storageService = mock(StorageService.class); Store mockStore = storeAndVersionConfigsUnderTest.store; Version version = storeAndVersionConfigsUnderTest.version; VeniceStoreVersionConfig storeConfig = storeAndVersionConfigsUnderTest.storeVersionConfig; @@ -835,6 +837,7 @@ private void runTest( storeIngestionTaskUnderTest = spy( ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, version, kafkaProps, @@ -2759,6 +2762,7 @@ public void testRecordsCanBeThrottledPerRegion() throws ExecutionException, Inte false, false, AA_ON); + StorageService storageService = mock(StorageService.class); Store mockStore = storeAndVersionConfigs.store; Version version = storeAndVersionConfigs.version; VeniceStoreVersionConfig storeConfig = storeAndVersionConfigs.storeVersionConfig; @@ -2778,6 +2782,7 @@ public void testRecordsCanBeThrottledPerRegion() throws ExecutionException, Inte kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer()); storeIngestionTaskUnderTest = ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, version, kafkaProps, @@ -2895,6 +2900,7 @@ public void testIsReadyToServe(NodeType nodeType, AAConfig aaConfig, DataReplica false, true, aaConfig); + StorageService storageService = mock(StorageService.class); Store mockStore = storeAndVersionConfigs.store; Version version = storeAndVersionConfigs.version; VeniceStoreVersionConfig storeConfig = storeAndVersionConfigs.storeVersionConfig; @@ -2926,6 +2932,7 @@ public void testIsReadyToServe(NodeType nodeType, AAConfig aaConfig, DataReplica kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer()); storeIngestionTaskUnderTest = ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, version, kafkaProps, @@ -3119,6 +3126,7 @@ public void testActiveActiveStoreIsReadyToServe(HybridConfig hybridConfig, NodeT false, true, AA_ON); + StorageService storageService = mock(StorageService.class); Store mockStore = storeAndVersionConfigs.store; Version version = storeAndVersionConfigs.version; VeniceStoreVersionConfig storeConfig = storeAndVersionConfigs.storeVersionConfig; @@ -3142,6 +3150,7 @@ public void testActiveActiveStoreIsReadyToServe(HybridConfig hybridConfig, NodeT Properties kafkaProps = new Properties(); kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer()); storeIngestionTaskUnderTest = ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, version, kafkaProps, @@ -3258,6 +3267,7 @@ public void testCheckAndLogIfLagIsAcceptableForHybridStore( false, true, aaConfig); + StorageService storageService = mock(StorageService.class); Store mockStore = storeAndVersionConfigs.store; Version version = storeAndVersionConfigs.version; VeniceStoreVersionConfig storeConfig = storeAndVersionConfigs.storeVersionConfig; @@ -3287,6 +3297,7 @@ public void testCheckAndLogIfLagIsAcceptableForHybridStore( Properties kafkaProps = new Properties(); kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer()); storeIngestionTaskUnderTest = ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, version, kafkaProps, @@ -3423,6 +3434,7 @@ public void testGetAndUpdateLeaderCompletedState(HybridConfig hybridConfig, Node false, true, AA_ON); + StorageService storageService = mock(StorageService.class); Store mockStore = storeAndVersionConfigs.store; Version version = storeAndVersionConfigs.version; VeniceStoreVersionConfig storeConfig = storeAndVersionConfigs.storeVersionConfig; @@ -3439,6 +3451,7 @@ public void testGetAndUpdateLeaderCompletedState(HybridConfig hybridConfig, Node kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer()); LeaderFollowerStoreIngestionTask ingestionTask = (LeaderFollowerStoreIngestionTask) ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, version, kafkaProps, @@ -3518,6 +3531,7 @@ public void testProcessTopicSwitch(NodeType nodeType) { new HybridStoreConfigImpl(100, 100, 100, DataReplicationPolicy.AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP); MockStoreVersionConfigs storeAndVersionConfigs = setupStoreAndVersionMocks(2, partitionerConfig, Optional.of(hybridStoreConfig), false, true, AA_OFF); + StorageService storageService = mock(StorageService.class); Store mockStore = storeAndVersionConfigs.store; Version version = storeAndVersionConfigs.version; VeniceStoreVersionConfig storeConfig = storeAndVersionConfigs.storeVersionConfig; @@ -3534,6 +3548,7 @@ public void testProcessTopicSwitch(NodeType nodeType) { kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer()); storeIngestionTaskUnderTest = ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, version, kafkaProps, @@ -3577,6 +3592,7 @@ public void testUpdateConsumedUpstreamRTOffsetMapDuringRTSubscription(AAConfig a doReturn(VersionStatus.STARTED).when(mockVersion).getStatus(); ReadOnlyStoreRepository mockReadOnlyStoreRepository = mock(ReadOnlyStoreRepository.class); + StorageService storageService = mock(StorageService.class); Store mockStore = mock(Store.class); doReturn(storeName).when(mockStore).getName(); doReturn(mockStore).when(mockReadOnlyStoreRepository).getStoreOrThrow(eq(storeName)); @@ -3606,6 +3622,7 @@ public void testUpdateConsumedUpstreamRTOffsetMapDuringRTSubscription(AAConfig a LeaderFollowerStoreIngestionTask ingestionTask = (LeaderFollowerStoreIngestionTask) ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, mockVersion, mockKafkaConsumerProperties, @@ -3711,6 +3728,7 @@ public void testLeaderShouldSubscribeToCorrectVTOffset() { doReturn("localhost").when(version).getPushStreamSourceAddress(); Store store = mock(Store.class); + StorageService storageService = mock(StorageService.class); doReturn(version).when(store).getVersion(eq(1)); String versionTopicName = "testStore_v1"; @@ -3719,6 +3737,7 @@ public void testLeaderShouldSubscribeToCorrectVTOffset() { doReturn(versionTopicName).when(storeConfig).getStoreVersionName(); LeaderFollowerStoreIngestionTask leaderFollowerStoreIngestionTask = spy( new LeaderFollowerStoreIngestionTask( + storageService, builder, store, version, @@ -4214,6 +4233,7 @@ public void testBatchOnlyStoreDataRecovery() { DataRecoveryVersionConfig dataRecoveryVersionConfig = new DataRecoveryVersionConfigImpl("dc-0", false, 1); doReturn(dataRecoveryVersionConfig).when(version).getDataRecoveryVersionConfig(); + StorageService storageService = mock(StorageService.class); Store store = mock(Store.class); doReturn(version).when(store).getVersion(eq(1)); @@ -4230,6 +4250,7 @@ public void testBatchOnlyStoreDataRecovery() { null).build(); doReturn(Version.parseStoreFromVersionTopic(topic)).when(store).getName(); storeIngestionTaskUnderTest = ingestionTaskFactory.getNewIngestionTask( + storageService, store, version, new Properties(), @@ -4289,6 +4310,7 @@ public void testMaybeSendIngestionHeartbeat( NodeType nodeType, HybridConfig hybridConfig) { String storeName = Utils.getUniqueString("store"); + StorageService storageService = mock(StorageService.class); Store mockStore = mock(Store.class); doReturn(storeName).when(mockStore).getName(); String versionTopic = Version.composeKafkaTopic(storeName, 1); @@ -4345,6 +4367,7 @@ public void testMaybeSendIngestionHeartbeat( .build(); LeaderFollowerStoreIngestionTask ingestionTask = (LeaderFollowerStoreIngestionTask) ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, mockVersion, mockKafkaConsumerProperties, @@ -4378,6 +4401,7 @@ public void testMaybeSendIngestionHeartbeat( @Test public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() throws InterruptedException { String storeName = Utils.getUniqueString("store"); + StorageService storageService = mock(StorageService.class); Store mockStore = mock(Store.class); doReturn(storeName).when(mockStore).getName(); String versionTopic = Version.composeKafkaTopic(storeName, 1); @@ -4432,6 +4456,7 @@ public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() throws Inter .build(); LeaderFollowerStoreIngestionTask ingestionTask = (LeaderFollowerStoreIngestionTask) ingestionTaskFactory.getNewIngestionTask( + storageService, mockStore, mockVersion, mockKafkaConsumerProperties, diff --git a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java index 2611f92dec..7554d63587 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java @@ -365,6 +365,7 @@ private List createServices() { // create and add KafkaSimpleConsumerService this.kafkaStoreIngestionService = new KafkaStoreIngestionService( + storageService, storageService.getStorageEngineRepository(), veniceConfigLoader, storageMetadataService, From b49d9c4cb9fdca31aeb877f17826f3f608f12cd7 Mon Sep 17 00:00:00 2001 From: Koorous Vargha Date: Wed, 13 Nov 2024 15:55:43 -0800 Subject: [PATCH 02/11] Pass storageService to constructors --- .../davinci/kafka/consumer/StoreIngestionTask.java | 5 ++++- .../kafka/consumer/KafkaStoreIngestionServiceTest.java | 7 +++++++ .../linkedin/davinci/kafka/consumer/PushTimeoutTest.java | 5 +++++ 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 33c289e4de..6a86e5a012 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -2,7 +2,10 @@ import static com.linkedin.davinci.ingestion.LagType.OFFSET_LAG; import static com.linkedin.davinci.ingestion.LagType.TIME_LAG; -import static com.linkedin.davinci.kafka.consumer.ConsumerActionType.*; +import static com.linkedin.davinci.kafka.consumer.ConsumerActionType.DROP_PARTITION; +import static com.linkedin.davinci.kafka.consumer.ConsumerActionType.RESET_OFFSET; +import static com.linkedin.davinci.kafka.consumer.ConsumerActionType.SUBSCRIBE; +import static com.linkedin.davinci.kafka.consumer.ConsumerActionType.UNSUBSCRIBE; import static com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType.LEADER; import static com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType.STANDBY; import static com.linkedin.davinci.validation.KafkaDataIntegrityValidator.DISABLED; diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java index 65ca79333e..5dbde97de8 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java @@ -22,6 +22,7 @@ import com.linkedin.davinci.config.VeniceStoreVersionConfig; import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.davinci.store.AbstractStorageEngine; import com.linkedin.davinci.store.AbstractStorageEngineTest; import com.linkedin.venice.exceptions.VeniceNoStoreException; @@ -72,6 +73,7 @@ @Test public abstract class KafkaStoreIngestionServiceTest { + private StorageService mockStorageService; private StorageEngineRepository mockStorageEngineRepository; private VeniceConfigLoader mockVeniceConfigLoader; private StorageMetadataService storageMetadataService; @@ -88,6 +90,7 @@ public abstract class KafkaStoreIngestionServiceTest { @BeforeClass public void setUp() { + mockStorageService = mock(StorageService.class); mockStorageEngineRepository = mock(StorageEngineRepository.class); doReturn(mock(AbstractStorageEngine.class)).when(mockStorageEngineRepository).getLocalStorageEngine(anyString()); storageMetadataService = mock(StorageMetadataService.class); @@ -149,6 +152,7 @@ private void setupMockConfig() { @Test public void testDisableMetricsEmission() { kafkaStoreIngestionService = new KafkaStoreIngestionService( + mockStorageService, mockStorageEngineRepository, mockVeniceConfigLoader, storageMetadataService, @@ -233,6 +237,7 @@ public void testGetIngestingTopicsNotWithOnlineVersion() { // Without starting the ingestion service test getIngestingTopicsWithVersionStatusNotOnline would return the correct // topics under different scenarios. kafkaStoreIngestionService = new KafkaStoreIngestionService( + mockStorageService, mockStorageEngineRepository, mockVeniceConfigLoader, storageMetadataService, @@ -321,6 +326,7 @@ public void testGetIngestingTopicsNotWithOnlineVersion() { @Test public void testCloseStoreIngestionTask() { kafkaStoreIngestionService = new KafkaStoreIngestionService( + mockStorageService, mockStorageEngineRepository, mockVeniceConfigLoader, storageMetadataService, @@ -386,6 +392,7 @@ public void testCloseStoreIngestionTask() { @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class) public void testStoreIngestionTaskShutdownLastPartition(boolean isIsolatedIngestion) { kafkaStoreIngestionService = new KafkaStoreIngestionService( + mockStorageService, mockStorageEngineRepository, mockVeniceConfigLoader, storageMetadataService, diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PushTimeoutTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PushTimeoutTest.java index 966ea0906e..9042bc2848 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PushTimeoutTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PushTimeoutTest.java @@ -12,6 +12,7 @@ import com.linkedin.davinci.stats.AggHostLevelIngestionStats; import com.linkedin.davinci.stats.HostLevelIngestionStats; import com.linkedin.davinci.storage.StorageMetadataService; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.venice.exceptions.VeniceTimeoutException; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; @@ -55,6 +56,7 @@ public void testPushTimeoutForLeaderFollowerStores() { .setHostLevelIngestionStats(mockAggStoreIngestionStats) .setPubSubTopicRepository(pubSubTopicRepository); + StorageService storageService = mock(StorageService.class); Store mockStore = builder.getMetadataRepo().getStoreOrThrow(storeName); Version version = mockStore.getVersion(versionNumber); @@ -66,6 +68,7 @@ public void testPushTimeoutForLeaderFollowerStores() { doReturn(versionTopic).when(mockVeniceStoreVersionConfig).getStoreVersionName(); LeaderFollowerStoreIngestionTask leaderFollowerStoreIngestionTask = new LeaderFollowerStoreIngestionTask( + storageService, builder, mockStore, version, @@ -113,6 +116,7 @@ public void testReportIfCatchUpBaseTopicOffsetRouteWillNotMakePushTimeout() { .setHostLevelIngestionStats(mockAggStoreIngestionStats) .setPubSubTopicRepository(pubSubTopicRepository); + StorageService storageService = mock(StorageService.class); Store mockStore = builder.getMetadataRepo().getStoreOrThrow(storeName); Version version = mockStore.getVersion(versionNumber); @@ -139,6 +143,7 @@ public void testReportIfCatchUpBaseTopicOffsetRouteWillNotMakePushTimeout() { doReturn(mockOffsetRecord).when(mockStorageMetadataService).getLastOffset(eq(versionTopic), eq(0)); LeaderFollowerStoreIngestionTask leaderFollowerStoreIngestionTask = new LeaderFollowerStoreIngestionTask( + storageService, builder, mockStore, version, From 772166b25b4930f7679c3457dec96b3b566e55e9 Mon Sep 17 00:00:00 2001 From: Koorous Vargha Date: Thu, 14 Nov 2024 10:18:10 -0800 Subject: [PATCH 03/11] If ingestion task isn't running, drop partition sychnronously instead --- .../ingestion/DefaultIngestionBackend.java | 3 --- .../consumer/KafkaStoreIngestionService.java | 16 +++++++++------- .../kafka/consumer/StoreIngestionTask.java | 5 +++-- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java index f1e633d1fb..79b8c9959f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java @@ -146,9 +146,6 @@ public void dropStoragePartitionGracefully( final int maxRetry = timeoutInSeconds / waitIntervalInSecond; getStoreIngestionService().stopConsumptionAndWait(storeConfig, partition, waitIntervalInSecond, maxRetry, true); getStoreIngestionService().dropStoragePartitionGracefully(storeConfig, partition); - - // Drops corresponding data partition from storage. - // this.storageService.dropStorePartition(storeConfig, partition, removeEmptyStorageEngine); } @Override diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java index 70509915c6..88fe571964 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java @@ -919,8 +919,8 @@ public void stopConsumptionAndWait( if (isIsolatedIngestion) { LOGGER.info("Ingestion task for topic {} will be kept open for the access from main process.", topicName); } else { - LOGGER.info("Shutting down ingestion task of topic {}", topicName); - shutdownStoreIngestionTask(topicName); + // LOGGER.info("Shutting down ingestion task of topic {}", topicName); + // shutdownStoreIngestionTask(topicName); } } } @@ -931,7 +931,7 @@ public void stopConsumptionAndWait( * @param veniceStore Venice Store for the partition. * @param partitionId Venice partition's id. */ - public CompletableFuture dropStoragePartitionGracefully(VeniceStoreVersionConfig veniceStore, int partitionId) { + public void dropStoragePartitionGracefully(VeniceStoreVersionConfig veniceStore, int partitionId) { final String topic = veniceStore.getStoreVersionName(); if (isPartitionConsuming(topic, partitionId)) { @@ -941,11 +941,13 @@ public CompletableFuture dropStoragePartitionGracefully(VeniceStoreVersion try (AutoCloseableLock ignore = topicLockManager.getLockForResource(topic)) { StoreIngestionTask ingestionTask = topicNameToIngestionTaskMap.get(topic); if (ingestionTask != null && ingestionTask.isRunning()) { - return ingestionTask - .dropPartition(new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(topic), partitionId)); + ingestionTask.dropPartition(new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(topic), partitionId)); } else { - LOGGER.warn("Ignoring drop partition message for Topic {} Partition {}", topic, partitionId); - return CompletableFuture.completedFuture(null); + LOGGER.info( + "Ingestion task isn't running for Topic {} Partition {}. Dropping partition synchronously instead", + veniceStore.getStoreVersionName(), + partitionId); + this.storageService.dropStorePartition(veniceStore, partitionId, true); } } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 6a86e5a012..98e029660f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -640,11 +640,10 @@ public synchronized CompletableFuture unSubscribePartition( * Adds an asynchronous partition drop request for the task. * This is always a Helix triggered action */ - public synchronized CompletableFuture dropPartition(PubSubTopicPartition topicPartition) { + public synchronized void dropPartition(PubSubTopicPartition topicPartition) { throwIfNotRunning(); ConsumerAction consumerAction = new ConsumerAction(DROP_PARTITION, topicPartition, nextSeqNum(), true); consumerActionsQueue.add(consumerAction); - return consumerAction.getFuture(); } public boolean hasAnySubscription() { @@ -3908,6 +3907,8 @@ public synchronized void close() { // This method signals the run method to end, which closes the // resources before exiting. + LOGGER.info("Closed ingestionTask"); + if (recordTransformer != null) { long startTime = System.currentTimeMillis(); recordTransformer.onEndVersionIngestion(); From ee2591dc522969f6759ecc55c5c32b0f2b7d875f Mon Sep 17 00:00:00 2001 From: Koorous Vargha Date: Thu, 14 Nov 2024 10:19:45 -0800 Subject: [PATCH 04/11] Cleanup --- .../davinci/kafka/consumer/KafkaStoreIngestionService.java | 4 ++-- .../linkedin/davinci/kafka/consumer/StoreIngestionTask.java | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java index 88fe571964..37925de828 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java @@ -919,8 +919,8 @@ public void stopConsumptionAndWait( if (isIsolatedIngestion) { LOGGER.info("Ingestion task for topic {} will be kept open for the access from main process.", topicName); } else { - // LOGGER.info("Shutting down ingestion task of topic {}", topicName); - // shutdownStoreIngestionTask(topicName); + LOGGER.info("Shutting down ingestion task of topic {}", topicName); + shutdownStoreIngestionTask(topicName); } } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 98e029660f..d9dea12449 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -3907,8 +3907,6 @@ public synchronized void close() { // This method signals the run method to end, which closes the // resources before exiting. - LOGGER.info("Closed ingestionTask"); - if (recordTransformer != null) { long startTime = System.currentTimeMillis(); recordTransformer.onEndVersionIngestion(); From de4d3d0b0dc8199ecd02ce7ec4b2ba2917175b06 Mon Sep 17 00:00:00 2001 From: Koorous Vargha Date: Thu, 14 Nov 2024 15:44:50 -0800 Subject: [PATCH 05/11] Add unit test testDropStoragePartitionGracefully --- .../KafkaStoreIngestionServiceTest.java | 57 ++++++++++++++++--- 1 file changed, 50 insertions(+), 7 deletions(-) diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java index 5dbde97de8..565eb8eefe 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java @@ -4,13 +4,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.atMostOnce; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.*; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; @@ -53,9 +47,11 @@ import com.linkedin.venice.utils.DataProviderUtils; import com.linkedin.venice.utils.Pair; import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.utils.locks.ResourceAutoClosableLockManager; import io.tehuti.metrics.MetricsRepository; import it.unimi.dsi.fastutil.ints.Int2ObjectMaps; import it.unimi.dsi.fastutil.objects.Object2IntMaps; +import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; import java.util.NavigableMap; @@ -63,6 +59,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import org.apache.avro.Schema; import org.mockito.Mockito; @@ -496,4 +493,50 @@ public void testHasCurrentVersionBootstrapping() { assertTrue(KafkaStoreIngestionService.hasCurrentVersionBootstrapping(mapContainsCurrentBootstrappingTask)); } + + @Test + public void testDropStoragePartitionGracefully() throws NoSuchFieldException, IllegalAccessException { + kafkaStoreIngestionService = mock(KafkaStoreIngestionService.class); + String topicName = "test-store_v1"; + int partitionId = 0; + VeniceProperties veniceProperties = AbstractStorageEngineTest.getServerProperties(PersistenceType.ROCKS_DB); + VeniceStoreVersionConfig config = new VeniceStoreVersionConfig(topicName, veniceProperties); + doCallRealMethod().when(kafkaStoreIngestionService).dropStoragePartitionGracefully(config, partitionId); + + Field topicLockManagerField = kafkaStoreIngestionService.getClass().getDeclaredField("topicLockManager"); + topicLockManagerField.setAccessible(true); + topicLockManagerField.set(kafkaStoreIngestionService, new ResourceAutoClosableLockManager<>(ReentrantLock::new)); + + NavigableMap topicNameToIngestionTaskMap = mock(NavigableMap.class); + Field topicNameToIngestionTaskMapField = + kafkaStoreIngestionService.getClass().getDeclaredField("topicNameToIngestionTaskMap"); + topicNameToIngestionTaskMapField.setAccessible(true); + topicNameToIngestionTaskMapField.set(kafkaStoreIngestionService, topicNameToIngestionTaskMap); + + PubSubTopicRepository pubSubTopicRepository = mock(PubSubTopicRepository.class); + Field pubSubTopicRepositoryField = kafkaStoreIngestionService.getClass().getDeclaredField("pubSubTopicRepository"); + pubSubTopicRepositoryField.setAccessible(true); + pubSubTopicRepositoryField.set(kafkaStoreIngestionService, pubSubTopicRepository); + + StorageService storageService = mock(StorageService.class); + Field storageServiceField = kafkaStoreIngestionService.getClass().getDeclaredField("storageService"); + storageServiceField.setAccessible(true); + storageServiceField.set(kafkaStoreIngestionService, storageService); + + StoreIngestionTask storeIngestionTask = mock(StoreIngestionTask.class); + when(topicNameToIngestionTaskMap.get(topicName)).thenReturn(storeIngestionTask); + + PubSubTopic pubSubTopic = mock(PubSubTopic.class); + when(pubSubTopicRepository.getTopic(topicName)).thenReturn(pubSubTopic); + + // Verify that when the ingestion task is running, it drops the store partition asynchronously + when(storeIngestionTask.isRunning()).thenReturn(true); + kafkaStoreIngestionService.dropStoragePartitionGracefully(config, partitionId); + verify(storeIngestionTask).dropPartition(any()); + + // Verify that when the ingestion task isn't running, it drops the store partition synchronously + when(storeIngestionTask.isRunning()).thenReturn(false); + kafkaStoreIngestionService.dropStoragePartitionGracefully(config, partitionId); + verify(storageService).dropStorePartition(config, partitionId, true); + } } From 7db261b30e408abaded71b8c70bc9250018fd5ca Mon Sep 17 00:00:00 2001 From: Koorous Vargha Date: Thu, 14 Nov 2024 16:52:12 -0800 Subject: [PATCH 06/11] Test SIT dropPartition --- .../kafka/consumer/KafkaStoreIngestionServiceTest.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java index 565eb8eefe..54c3f7d3e6 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java @@ -59,6 +59,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import org.apache.avro.Schema; @@ -524,7 +525,14 @@ public void testDropStoragePartitionGracefully() throws NoSuchFieldException, Il storageServiceField.set(kafkaStoreIngestionService, storageService); StoreIngestionTask storeIngestionTask = mock(StoreIngestionTask.class); + + PriorityBlockingQueue consumerActionsQueue = mock(PriorityBlockingQueue.class); + Field consumerActionsQueueField = StoreIngestionTask.class.getDeclaredField("consumerActionsQueue"); + consumerActionsQueueField.setAccessible(true); + consumerActionsQueueField.set(storeIngestionTask, consumerActionsQueue); + when(topicNameToIngestionTaskMap.get(topicName)).thenReturn(storeIngestionTask); + doCallRealMethod().when(storeIngestionTask).dropPartition(any()); PubSubTopic pubSubTopic = mock(PubSubTopic.class); when(pubSubTopicRepository.getTopic(topicName)).thenReturn(pubSubTopic); @@ -533,6 +541,7 @@ public void testDropStoragePartitionGracefully() throws NoSuchFieldException, Il when(storeIngestionTask.isRunning()).thenReturn(true); kafkaStoreIngestionService.dropStoragePartitionGracefully(config, partitionId); verify(storeIngestionTask).dropPartition(any()); + verify(consumerActionsQueue).add(any()); // Verify that when the ingestion task isn't running, it drops the store partition synchronously when(storeIngestionTask.isRunning()).thenReturn(false); From fee2146933c14988973111813d877f697c6c8524 Mon Sep 17 00:00:00 2001 From: Koorous Vargha Date: Thu, 14 Nov 2024 19:53:00 -0800 Subject: [PATCH 07/11] Add more logging and add integration test --- .../consumer/KafkaStoreIngestionService.java | 4 ++ .../kafka/consumer/StoreIngestionTask.java | 1 + .../KafkaStoreIngestionServiceTest.java | 10 ++++- .../venice/server/VeniceServerTest.java | 39 +++++++++++++++++++ 4 files changed, 53 insertions(+), 1 deletion(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java index 37925de828..9e71e2f1aa 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java @@ -941,6 +941,10 @@ public void dropStoragePartitionGracefully(VeniceStoreVersionConfig veniceStore, try (AutoCloseableLock ignore = topicLockManager.getLockForResource(topic)) { StoreIngestionTask ingestionTask = topicNameToIngestionTaskMap.get(topic); if (ingestionTask != null && ingestionTask.isRunning()) { + LOGGER.info( + "Ingestion task is still running for Topic {} Partition {}. Dropping partition asynchronously.", + veniceStore.getStoreVersionName(), + partitionId); ingestionTask.dropPartition(new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(topic), partitionId)); } else { LOGGER.info( diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index d9dea12449..bda0846a26 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -2155,6 +2155,7 @@ protected void processCommonConsumerAction(ConsumerAction consumerAction) throws LOGGER.info("{} Dropping partition: {}", ingestionTaskName, topicPartition); this.storageService.dropStorePartition(storeConfig, partition, true); LOGGER.info("{} Dropped partition: {}", ingestionTaskName, topicPartition); + break; default: throw new UnsupportedOperationException(operation.name() + " is not supported in " + getClass().getName()); } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java index 54c3f7d3e6..8119178ea8 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java @@ -4,7 +4,15 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.atMostOnce; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/server/VeniceServerTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/server/VeniceServerTest.java index 2e978c5087..0e7237e3f6 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/server/VeniceServerTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/server/VeniceServerTest.java @@ -9,6 +9,7 @@ import com.linkedin.d2.balancer.D2Client; import com.linkedin.davinci.listener.response.ServerCurrentVersionResponse; import com.linkedin.davinci.storage.StorageEngineRepository; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.r2.message.rest.RestRequest; import com.linkedin.r2.message.rest.RestRequestBuilder; import com.linkedin.r2.message.rest.RestResponse; @@ -49,6 +50,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.Encoder; @@ -338,4 +340,41 @@ public void testStartServerWithSystemSchemaInitialization() { }); } } + + @Test + public void testDropStorePartitionGracefully() { + try (VeniceClusterWrapper cluster = ServiceFactory.getVeniceCluster(1, 1, 0)) { + Properties featureProperties = new Properties(); + featureProperties.setProperty(SERVER_ENABLE_SERVER_ALLOW_LIST, Boolean.toString(true)); + featureProperties.setProperty(SERVER_IS_AUTO_JOIN, Boolean.toString(true)); + VeniceServerWrapper server = cluster.getVeniceServers().get(0); + Assert.assertTrue(server.getVeniceServer().isStarted()); + + StorageService storageService = server.getVeniceServer().getStorageService(); + Assert.assertTrue(server.getVeniceServer().isStarted()); + StorageEngineRepository repository = storageService.getStorageEngineRepository(); + Assert + .assertTrue(repository.getAllLocalStorageEngines().isEmpty(), "New node should not have any storage engine."); + + // Create a new store + String storeName = cluster.createStore(1); + String storeVersionName = Version.composeKafkaTopic(storeName, 1); + Assert.assertEquals(repository.getAllLocalStorageEngines().size(), 1); + Assert.assertTrue(server.getVeniceServer().getHelixParticipationService().isRunning()); + Assert.assertEquals(storageService.getStorageEngine(storeVersionName).getPartitionIds().size(), 3); + + // Add servers to trigger a rebalance, which will redistribute and drop partitions for the current participant + cluster.addVeniceServer(featureProperties, new Properties()); + cluster.addVeniceServer(featureProperties, new Properties()); + cluster.addVeniceServer(featureProperties, new Properties()); + + repository = server.getVeniceServer().getStorageService().getStorageEngineRepository(); + Assert.assertEquals(repository.getAllLocalStorageEngines().size(), 1); + + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, () -> { + // Number of partitions should have decreased due to rebalancing + Assert.assertTrue(storageService.getStorageEngine(storeVersionName).getPartitionIds().size() < 3); + }); + } + } } From 483fa9d927a2bf997499949b74b67ce6cd3ebb71 Mon Sep 17 00:00:00 2001 From: Koorous Vargha Date: Thu, 14 Nov 2024 20:34:35 -0800 Subject: [PATCH 08/11] Add synchronous partition drop --- .../consumer/KafkaStoreIngestionService.java | 4 +- .../venice/server/VeniceServerTest.java | 40 +++++++++++++++++-- 2 files changed, 38 insertions(+), 6 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java index 9e71e2f1aa..2f30c15063 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java @@ -942,13 +942,13 @@ public void dropStoragePartitionGracefully(VeniceStoreVersionConfig veniceStore, StoreIngestionTask ingestionTask = topicNameToIngestionTaskMap.get(topic); if (ingestionTask != null && ingestionTask.isRunning()) { LOGGER.info( - "Ingestion task is still running for Topic {} Partition {}. Dropping partition asynchronously.", + "Ingestion task is still running for Topic {}. Dropping partition {} asynchronously", veniceStore.getStoreVersionName(), partitionId); ingestionTask.dropPartition(new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(topic), partitionId)); } else { LOGGER.info( - "Ingestion task isn't running for Topic {} Partition {}. Dropping partition synchronously instead", + "Ingestion task isn't running for Topic {}. Dropping partition {} synchronously", veniceStore.getStoreVersionName(), partitionId); this.storageService.dropStorePartition(veniceStore, partitionId, true); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/server/VeniceServerTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/server/VeniceServerTest.java index 0e7237e3f6..f1101f34b8 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/server/VeniceServerTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/server/VeniceServerTest.java @@ -342,7 +342,7 @@ public void testStartServerWithSystemSchemaInitialization() { } @Test - public void testDropStorePartitionGracefully() { + public void testDropStorePartitionAsynchronously() { try (VeniceClusterWrapper cluster = ServiceFactory.getVeniceCluster(1, 1, 0)) { Properties featureProperties = new Properties(); featureProperties.setProperty(SERVER_ENABLE_SERVER_ALLOW_LIST, Boolean.toString(true)); @@ -352,7 +352,7 @@ public void testDropStorePartitionGracefully() { StorageService storageService = server.getVeniceServer().getStorageService(); Assert.assertTrue(server.getVeniceServer().isStarted()); - StorageEngineRepository repository = storageService.getStorageEngineRepository(); + final StorageEngineRepository repository = storageService.getStorageEngineRepository(); Assert .assertTrue(repository.getAllLocalStorageEngines().isEmpty(), "New node should not have any storage engine."); @@ -368,13 +368,45 @@ public void testDropStorePartitionGracefully() { cluster.addVeniceServer(featureProperties, new Properties()); cluster.addVeniceServer(featureProperties, new Properties()); - repository = server.getVeniceServer().getStorageService().getStorageEngineRepository(); Assert.assertEquals(repository.getAllLocalStorageEngines().size(), 1); TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, () -> { - // Number of partitions should have decreased due to rebalancing + // Partitions should have been dropped asynchronously due to rebalancing Assert.assertTrue(storageService.getStorageEngine(storeVersionName).getPartitionIds().size() < 3); }); } } + + @Test + public void testDropStorePartitionSynchronously() { + try (VeniceClusterWrapper cluster = ServiceFactory.getVeniceCluster(1, 1, 0)) { + Properties featureProperties = new Properties(); + featureProperties.setProperty(SERVER_ENABLE_SERVER_ALLOW_LIST, Boolean.toString(true)); + featureProperties.setProperty(SERVER_IS_AUTO_JOIN, Boolean.toString(true)); + VeniceServerWrapper server = cluster.getVeniceServers().get(0); + Assert.assertTrue(server.getVeniceServer().isStarted()); + + StorageService storageService = server.getVeniceServer().getStorageService(); + Assert.assertTrue(server.getVeniceServer().isStarted()); + final StorageEngineRepository repository = storageService.getStorageEngineRepository(); + Assert + .assertTrue(repository.getAllLocalStorageEngines().isEmpty(), "New node should not have any storage engine."); + + // Create a new store + String storeName = cluster.createStore(1); + String storeVersionName = Version.composeKafkaTopic(storeName, 1); + Assert.assertEquals(repository.getAllLocalStorageEngines().size(), 1); + Assert.assertTrue(server.getVeniceServer().getHelixParticipationService().isRunning()); + Assert.assertEquals(storageService.getStorageEngine(storeVersionName).getPartitionIds().size(), 3); + + cluster.useControllerClient(controllerClient -> { + controllerClient.disableAndDeleteStore(storeName); + }); + + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, () -> { + // All partitions should have been dropped synchronously + Assert.assertEquals(repository.getAllLocalStorageEngines().size(), 0); + }); + } + } } From 13a1de823da2622b6087b850d614d9526cccc0bd Mon Sep 17 00:00:00 2001 From: Koorous Vargha Date: Fri, 15 Nov 2024 15:14:44 -0800 Subject: [PATCH 09/11] Remove redundant constructor parameter --- .../src/main/java/com/linkedin/davinci/DaVinciBackend.java | 1 - .../davinci/ingestion/isolated/IsolatedIngestionServer.java | 1 - .../davinci/kafka/consumer/KafkaStoreIngestionService.java | 4 +--- .../kafka/consumer/KafkaStoreIngestionServiceTest.java | 5 +---- .../main/java/com/linkedin/venice/server/VeniceServer.java | 1 - 5 files changed, 2 insertions(+), 10 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index ea8c156624..141af20a36 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -253,7 +253,6 @@ public DaVinciBackend( ingestionService = new KafkaStoreIngestionService( storageService, - storageService.getStorageEngineRepository(), configLoader, storageMetadataService, clusterInfoProvider, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServer.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServer.java index e15eb80753..b232b77fbd 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServer.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServer.java @@ -696,7 +696,6 @@ private void initializeIsolatedIngestionServer() { // Create KafkaStoreIngestionService storeIngestionService = new KafkaStoreIngestionService( storageService, - storageService.getStorageEngineRepository(), configLoader, storageMetadataService, clusterInfoProvider, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java index 2f30c15063..4b54d17396 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java @@ -29,7 +29,6 @@ import com.linkedin.davinci.stats.AggVersionedIngestionStats; import com.linkedin.davinci.stats.ParticipantStoreConsumptionStats; import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService; -import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; import com.linkedin.davinci.storage.StorageService; import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend; @@ -193,7 +192,6 @@ public class KafkaStoreIngestionService extends AbstractVeniceService implements public KafkaStoreIngestionService( StorageService storageService, - StorageEngineRepository storageEngineRepository, VeniceConfigLoader veniceConfigLoader, StorageMetadataService storageMetadataService, ClusterInfoProvider clusterInfoProvider, @@ -452,7 +450,7 @@ public void handleStoreDeleted(Store store) { ingestionTaskFactory = StoreIngestionTaskFactory.builder() .setVeniceWriterFactory(veniceWriterFactory) - .setStorageEngineRepository(storageEngineRepository) + .setStorageEngineRepository(storageService.getStorageEngineRepository()) .setStorageMetadataService(storageMetadataService) .setLeaderFollowerNotifiersQueue(leaderFollowerNotifiers) .setSchemaRepository(schemaRepo) diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java index 8119178ea8..d226f0c888 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java @@ -98,6 +98,7 @@ public abstract class KafkaStoreIngestionServiceTest { public void setUp() { mockStorageService = mock(StorageService.class); mockStorageEngineRepository = mock(StorageEngineRepository.class); + when(mockStorageService.getStorageEngineRepository()).thenReturn(mockStorageEngineRepository); doReturn(mock(AbstractStorageEngine.class)).when(mockStorageEngineRepository).getLocalStorageEngine(anyString()); storageMetadataService = mock(StorageMetadataService.class); mockClusterInfoProvider = mock(ClusterInfoProvider.class); @@ -159,7 +160,6 @@ private void setupMockConfig() { public void testDisableMetricsEmission() { kafkaStoreIngestionService = new KafkaStoreIngestionService( mockStorageService, - mockStorageEngineRepository, mockVeniceConfigLoader, storageMetadataService, mockClusterInfoProvider, @@ -244,7 +244,6 @@ public void testGetIngestingTopicsNotWithOnlineVersion() { // topics under different scenarios. kafkaStoreIngestionService = new KafkaStoreIngestionService( mockStorageService, - mockStorageEngineRepository, mockVeniceConfigLoader, storageMetadataService, mockClusterInfoProvider, @@ -333,7 +332,6 @@ public void testGetIngestingTopicsNotWithOnlineVersion() { public void testCloseStoreIngestionTask() { kafkaStoreIngestionService = new KafkaStoreIngestionService( mockStorageService, - mockStorageEngineRepository, mockVeniceConfigLoader, storageMetadataService, mockClusterInfoProvider, @@ -399,7 +397,6 @@ public void testCloseStoreIngestionTask() { public void testStoreIngestionTaskShutdownLastPartition(boolean isIsolatedIngestion) { kafkaStoreIngestionService = new KafkaStoreIngestionService( mockStorageService, - mockStorageEngineRepository, mockVeniceConfigLoader, storageMetadataService, mockClusterInfoProvider, diff --git a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java index 7554d63587..9db76f074c 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java @@ -366,7 +366,6 @@ private List createServices() { // create and add KafkaSimpleConsumerService this.kafkaStoreIngestionService = new KafkaStoreIngestionService( storageService, - storageService.getStorageEngineRepository(), veniceConfigLoader, storageMetadataService, new StaticClusterInfoProvider(Collections.singleton(clusterConfig.getClusterName())), From 04d43f1d71103cf2025f87448820224c43f5eff8 Mon Sep 17 00:00:00 2001 From: Koorous Vargha Date: Fri, 15 Nov 2024 16:22:46 -0800 Subject: [PATCH 10/11] When a KILL is issued, make sure to drop any partitions that haven't been processed yet --- .../kafka/consumer/StoreIngestionTask.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index bda0846a26..8467c21bb0 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -345,6 +345,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { protected final boolean batchReportIncPushStatusEnabled; protected final ExecutorService parallelProcessingThreadPool; + private final Set pendingPartitionDrops = Collections.synchronizedSet(new HashSet<>()); public StoreIngestionTask( StorageService storageService, @@ -644,6 +645,7 @@ public synchronized void dropPartition(PubSubTopicPartition topicPartition) { throwIfNotRunning(); ConsumerAction consumerAction = new ConsumerAction(DROP_PARTITION, topicPartition, nextSeqNum(), true); consumerActionsQueue.add(consumerAction); + pendingPartitionDrops.add(topicPartition.getPartitionNumber()); } public boolean hasAnySubscription() { @@ -2149,11 +2151,25 @@ protected void processCommonConsumerAction(ConsumerAction consumerAction) throws break; case KILL: LOGGER.info("Kill this consumer task for Topic: {}", topic); + + if (!this.pendingPartitionDrops.isEmpty()) { + LOGGER.info( + "Partitions {} are pending to be dropped for Topic: {}. Dropping them before killing consumer task.", + this.pendingPartitionDrops, + topic); + synchronized (this.pendingPartitionDrops) { + for (Integer partitionToDrop: this.pendingPartitionDrops) { + this.storageService.dropStorePartition(storeConfig, partitionToDrop, true); + this.pendingPartitionDrops.remove(partitionToDrop); + } + } + } // Throw the exception here to break the consumption loop, and then this task is marked as error status. throw new VeniceIngestionTaskKilledException(KILLED_JOB_MESSAGE + topic); case DROP_PARTITION: LOGGER.info("{} Dropping partition: {}", ingestionTaskName, topicPartition); this.storageService.dropStorePartition(storeConfig, partition, true); + this.pendingPartitionDrops.remove(partition); LOGGER.info("{} Dropped partition: {}", ingestionTaskName, topicPartition); break; default: From fb034c8767ac0a373c8c8fa98860f5da5f63b221 Mon Sep 17 00:00:00 2001 From: Koorous Vargha Date: Fri, 15 Nov 2024 16:50:55 -0800 Subject: [PATCH 11/11] Fix unit test --- .../kafka/consumer/KafkaStoreIngestionServiceTest.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java index d226f0c888..77202178cd 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java @@ -536,6 +536,11 @@ public void testDropStoragePartitionGracefully() throws NoSuchFieldException, Il consumerActionsQueueField.setAccessible(true); consumerActionsQueueField.set(storeIngestionTask, consumerActionsQueue); + Set pendingPartitionDrops = mock(Set.class); + Field pendingPartitionDropsField = StoreIngestionTask.class.getDeclaredField("pendingPartitionDrops"); + pendingPartitionDropsField.setAccessible(true); + pendingPartitionDropsField.set(storeIngestionTask, pendingPartitionDrops); + when(topicNameToIngestionTaskMap.get(topicName)).thenReturn(storeIngestionTask); doCallRealMethod().when(storeIngestionTask).dropPartition(any());