From d0ee62357f2f7d7f5f66779d4ed2341b286482b5 Mon Sep 17 00:00:00 2001 From: Gaojie Liu Date: Wed, 30 Oct 2024 11:34:59 -0700 Subject: [PATCH] [server][common] Fixed bug in AA/WC parallel processing support (#1252) * [server][common] Fixed bug in AA/WC parallel processing support This PR fixed the following issues: 1. AASIT should pass non-null `KeyLevelLocksManager` to `IngestionBatchProcessor`, otherwise, race condition will happen. 2. Fixed the locking order in `IngestionBatchProcessor` to avoid deadlock. 3. Updated `SparseConcurrentList#computeIfAbsent` to skip adjust list size if the computed result is `null`. --- .../ActiveActiveStoreIngestionTask.java | 6 +- .../consumer/IngestionBatchProcessor.java | 75 +++++++++++-------- .../LeaderFollowerStoreIngestionTask.java | 4 +- .../PubSubMessageProcessedResultWrapper.java | 4 +- .../kafka/consumer/StoreIngestionTask.java | 6 +- .../linkedin/davinci/utils/ByteArrayKey.java | 12 ++- .../consumer/IngestionBatchProcessorTest.java | 27 ++++++- .../consumer/StoreIngestionTaskTest.java | 8 ++ .../venice/utils/SparseConcurrentList.java | 6 ++ .../utils/SparseConcurrentListTest.java | 4 + 10 files changed, 105 insertions(+), 47 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java index 7b2b2c46a3..a4f1a17095 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java @@ -153,14 +153,14 @@ public ActiveActiveStoreIngestionTask( this.remoteIngestionRepairService = builder.getRemoteIngestionRepairService(); this.ingestionBatchProcessorLazy = Lazy.of(() -> { if (!serverConfig.isAAWCWorkloadParallelProcessingEnabled()) { - LOGGER.info("AA/WC workload parallel processing enabled is false"); + LOGGER.info("AA/WC workload parallel processing is disabled for store version: {}", getKafkaVersionTopic()); return null; } - LOGGER.info("AA/WC workload parallel processing enabled is true"); + LOGGER.info("AA/WC workload parallel processing is enabled for store version: {}", getKafkaVersionTopic()); return new IngestionBatchProcessor( kafkaVersionTopic, parallelProcessingThreadPool, - null, + keyLevelLocksManager.get(), this::processActiveActiveMessage, isWriteComputationEnabled, isActiveActiveReplicationEnabled(), diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/IngestionBatchProcessor.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/IngestionBatchProcessor.java index 299de9c0f0..f862560b79 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/IngestionBatchProcessor.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/IngestionBatchProcessor.java @@ -14,9 +14,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; @@ -27,6 +28,8 @@ * resources to speed up the leader ingestion. */ public class IngestionBatchProcessor { + private static final TreeMap EMPTY_TREE_MAP = new TreeMap(); + interface ProcessingFunction { PubSubMessageProcessedResult apply( PubSubMessage consumerRecord, @@ -71,32 +74,41 @@ public IngestionBatchProcessor( this.version = Version.parseVersionFromKafkaTopicName(storeVersionName); } + // For testing + KeyLevelLocksManager getLockManager() { + return this.lockManager; + } + /** * When {@link #lockManager} is not null, this function will try to lock all the keys * (except Control Messages) passed by the params. */ - public List lockKeys(List> records) { + public NavigableMap lockKeys( + List> records) { if (lockManager != null) { - List locks = new ArrayList<>(records.size()); + /** + * Need to use a {@link TreeMap} to make sure the locking will be executed in a deterministic order, otherwise + * deadlock can happen. + * Considering there could be multiple consumers, which are executing this function concurrently, and if they + * are trying to lock the same set of keys with different orders, deadlock can happen. + */ + TreeMap keyLockMap = new TreeMap<>(); records.forEach(r -> { if (!r.getKey().isControlMessage()) { - ReentrantLock lock = lockManager.acquireLockByKey(ByteArrayKey.wrap(r.getKey().getKey())); - locks.add(lock); - lock.lock(); + keyLockMap.computeIfAbsent(ByteArrayKey.wrap(r.getKey().getKey()), k -> lockManager.acquireLockByKey(k)); } }); - return locks; + keyLockMap.forEach((k, v) -> v.lock()); + return keyLockMap; } - return Collections.emptyList(); + return Collections.emptyNavigableMap(); } - public void unlockKeys(List> records, List locks) { + public void unlockKeys(NavigableMap keyLockMap) { if (lockManager != null) { - locks.forEach(lock -> lock.unlock()); - records.forEach(r -> { - if (!r.getKey().isControlMessage()) { - lockManager.releaseLock(ByteArrayKey.wrap(r.getKey().getKey())); - } + keyLockMap.descendingMap().forEach((key, lock) -> { + lock.unlock(); + lockManager.releaseLock(key); }); } } @@ -123,23 +135,9 @@ public List> resultList = new ArrayList<>(records.size()); - records.forEach(r -> { - resultList.add(new PubSubMessageProcessedResultWrapper<>(r)); - if (!r.getTopicPartition().getPubSubTopic().isRealTime()) { - isAllMessagesFromRTTopic.set(false); - } - }); - if (!isWriteComputationEnabled && !isActiveActiveReplicationEnabled) { - return resultList; - } - // Only handle records from the real-time topic - if (!isAllMessagesFromRTTopic.get()) { - return resultList; - } - /** * We would like to process the messages belonging to the same key sequentially to avoid race conditions. */ @@ -147,14 +145,25 @@ public List>> keyGroupMap = new HashMap<>(records.size()); - for (PubSubMessageProcessedResultWrapper r: resultList) { - PubSubMessage message = r.getMessage(); - if (!message.getKey().isControlMessage()) { + for (PubSubMessage message: records) { + if (!message.getTopicPartition().getPubSubTopic().isRealTime()) { + isAllMessagesFromRTTopic = false; + } + PubSubMessageProcessedResultWrapper resultWrapper = new PubSubMessageProcessedResultWrapper<>(message); + resultList.add(resultWrapper); + if (!message.getKey().isControlMessage() && isAllMessagesFromRTTopic) { ByteArrayKey byteArrayKey = ByteArrayKey.wrap(message.getKey().getKey()); - keyGroupMap.computeIfAbsent(byteArrayKey, (ignored) -> new ArrayList<>()).add(r); + keyGroupMap.computeIfAbsent(byteArrayKey, (ignored) -> new ArrayList<>()).add(resultWrapper); totalNumOfRecords++; } } + if (!isWriteComputationEnabled && !isActiveActiveReplicationEnabled) { + return resultList; + } + // Only handle records from the real-time topic + if (!isAllMessagesFromRTTopic) { + return resultList; + } aggVersionedIngestionStats .recordBatchProcessingRequest(storeName, version, totalNumOfRecords, System.currentTimeMillis()); hostLevelIngestionStats.recordBatchProcessingRequest(totalNumOfRecords); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index 9d3ef49104..e83fa0bfeb 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -340,10 +340,10 @@ public LeaderFollowerStoreIngestionTask( serverConfig.isComputeFastAvroEnabled()); this.ingestionBatchProcessingLazy = Lazy.of(() -> { if (!serverConfig.isAAWCWorkloadParallelProcessingEnabled()) { - LOGGER.info("AA/WC workload parallel processing enabled is false"); + LOGGER.info("AA/WC workload parallel processing is disabled for store version: {}", getKafkaVersionTopic()); return null; } - LOGGER.info("AA/WC workload parallel processing enabled is true"); + LOGGER.info("AA/WC workload parallel processing is enabled for store version: {}", getKafkaVersionTopic()); return new IngestionBatchProcessor( kafkaVersionTopic, parallelProcessingThreadPool, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PubSubMessageProcessedResultWrapper.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PubSubMessageProcessedResultWrapper.java index 98a9ca3feb..09bdaf17ea 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PubSubMessageProcessedResultWrapper.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PubSubMessageProcessedResultWrapper.java @@ -19,7 +19,7 @@ public PubSubMessageProcessedResult getProcessedResult() { return processedResult; } - public void setProcessedResult(PubSubMessageProcessedResult transformedResult) { - this.processedResult = transformedResult; + public void setProcessedResult(PubSubMessageProcessedResult processedResult) { + this.processedResult = processedResult; } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 68bd865c76..e611ab5fd6 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -38,6 +38,7 @@ import com.linkedin.davinci.store.StoragePartitionConfig; import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend; import com.linkedin.davinci.store.record.ValueRecord; +import com.linkedin.davinci.utils.ByteArrayKey; import com.linkedin.davinci.utils.ChunkAssembler; import com.linkedin.davinci.validation.KafkaDataIntegrityValidator; import com.linkedin.davinci.validation.PartitionTracker; @@ -125,6 +126,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.Objects; import java.util.Optional; import java.util.Properties; @@ -1262,7 +1264,7 @@ protected void produceToStoreBufferServiceOrKafkaInBatch( * Process records batch by batch. */ for (List> batch: batches) { - List locks = ingestionBatchProcessor.lockKeys(batch); + NavigableMap keyLockMap = ingestionBatchProcessor.lockKeys(batch); try { long beforeProcessingPerRecordTimestampNs = System.nanoTime(); List> processedResults = @@ -1288,7 +1290,7 @@ protected void produceToStoreBufferServiceOrKafkaInBatch( elapsedTimeForPuttingIntoQueue); } } finally { - ingestionBatchProcessor.unlockKeys(batch, locks); + ingestionBatchProcessor.unlockKeys(keyLockMap); } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/utils/ByteArrayKey.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/utils/ByteArrayKey.java index 92b8d456d3..3a4fcb13d5 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/utils/ByteArrayKey.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/utils/ByteArrayKey.java @@ -1,12 +1,13 @@ package com.linkedin.davinci.utils; +import com.linkedin.venice.utils.ByteUtils; import java.util.Arrays; /** * A low overhead immutable container of byte[] suitable for use as a map key. */ -public class ByteArrayKey { +public class ByteArrayKey implements Comparable { private final byte[] content; private final int hashCode; @@ -31,6 +32,10 @@ public boolean equals(Object o) { return Arrays.equals(content, that.content); } + public byte[] getContent() { + return this.content; + } + @Override public int hashCode() { return this.hashCode; @@ -39,4 +44,9 @@ public int hashCode() { public static ByteArrayKey wrap(byte[] content) { return new ByteArrayKey(content); } + + @Override + public int compareTo(ByteArrayKey o) { + return ByteUtils.compare(content, o.content); + } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/IngestionBatchProcessorTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/IngestionBatchProcessorTest.java index 768e3ae364..5c2ec5fc01 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/IngestionBatchProcessorTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/IngestionBatchProcessorTest.java @@ -4,6 +4,7 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; @@ -31,6 +32,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; +import java.util.NavigableMap; import java.util.concurrent.locks.ReentrantLock; import org.testng.annotations.Test; @@ -116,21 +118,38 @@ public void lockKeysTest() { true, mock(AggVersionedIngestionStats.class), mock(HostLevelIngestionStats.class)); - List locks = batchProcessor.lockKeys(Arrays.asList(rtMessage1, rtMessage2)); + /** + * Switch the input order to make sure the `lockKeys` function would sort them when locking. + */ + NavigableMap keyLockMap = + batchProcessor.lockKeys(Arrays.asList(rtMessage2, rtMessage1)); verify(mockKeyLevelLocksManager).acquireLockByKey(ByteArrayKey.wrap(key1)); verify(mockKeyLevelLocksManager).acquireLockByKey(ByteArrayKey.wrap(key2)); verify(lockForKey1).lock(); verify(lockForKey2).lock(); - assertEquals(locks.get(0), lockForKey1); - assertEquals(locks.get(1), lockForKey2); + // Verify the order + ReentrantLock[] locks = keyLockMap.values().toArray(new ReentrantLock[0]); + assertEquals(locks[0], lockForKey1); + assertEquals(locks[1], lockForKey2); // unlock test - batchProcessor.unlockKeys(Arrays.asList(rtMessage1, rtMessage2), locks); + batchProcessor.unlockKeys(keyLockMap); verify(lockForKey1).unlock(); verify(lockForKey2).unlock(); verify(mockKeyLevelLocksManager).releaseLock(ByteArrayKey.wrap(key1)); verify(mockKeyLevelLocksManager).releaseLock(ByteArrayKey.wrap(key2)); + + // Duplicate messages in the batch + keyLockMap = batchProcessor.lockKeys(Arrays.asList(rtMessage1, rtMessage2, rtMessage1)); + verify(mockKeyLevelLocksManager, times(2)).acquireLockByKey(ByteArrayKey.wrap(key1)); + verify(mockKeyLevelLocksManager, times(2)).acquireLockByKey(ByteArrayKey.wrap(key2)); + verify(lockForKey1, times(2)).lock(); + verify(lockForKey2, times(2)).lock(); + // Verify the order + locks = keyLockMap.values().toArray(new ReentrantLock[0]); + assertEquals(locks[0], lockForKey1); + assertEquals(locks[1], lockForKey2); } @Test diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index aa5967c098..31e94689c5 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -3152,6 +3152,14 @@ public void testActiveActiveStoreIsReadyToServe(HybridConfig hybridConfig, NodeT Optional.empty(), null); + if (hybridConfig.equals(HYBRID) && nodeType.equals(LEADER) && isAaWCParallelProcessingEnabled()) { + assertTrue(storeIngestionTaskUnderTest instanceof ActiveActiveStoreIngestionTask); + ActiveActiveStoreIngestionTask activeActiveStoreIngestionTask = + (ActiveActiveStoreIngestionTask) storeIngestionTaskUnderTest; + assertNotNull(activeActiveStoreIngestionTask.getIngestionBatchProcessor()); + assertNotNull(activeActiveStoreIngestionTask.getIngestionBatchProcessor().getLockManager()); + } + String rtTopicName = Version.composeRealTimeTopic(mockStore.getName()); PubSubTopic rtTopic = pubSubTopicRepository.getTopic(rtTopicName); TopicSwitch topicSwitchWithMultipleSourceKafkaServers = new TopicSwitch(); diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/utils/SparseConcurrentList.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/utils/SparseConcurrentList.java index e2e8d2f9a1..5bf13cbb84 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/utils/SparseConcurrentList.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/utils/SparseConcurrentList.java @@ -249,6 +249,12 @@ public E computeIfAbsent(int index, IntFunction mappingFunction) { element = get(index); if (element == null) { element = mappingFunction.apply(index); + /** + * Don't update the list if the computed result is `null`. + */ + if (element == null) { + return null; + } /** * It's important NOT to call {@link #handleSizeDuringMutation(Object, Object)} since {@link #set(int, Object)} * already calls it. diff --git a/internal/venice-client-common/src/test/java/com/linkedin/venice/utils/SparseConcurrentListTest.java b/internal/venice-client-common/src/test/java/com/linkedin/venice/utils/SparseConcurrentListTest.java index f7e2f83ccd..099f3aaba1 100644 --- a/internal/venice-client-common/src/test/java/com/linkedin/venice/utils/SparseConcurrentListTest.java +++ b/internal/venice-client-common/src/test/java/com/linkedin/venice/utils/SparseConcurrentListTest.java @@ -86,6 +86,10 @@ public void testNonNullSize() { assertEquals(scl.values().size(), scl.nonNullSize()); assertFalse(scl.isEmpty()); + // Compute if absent for an unpopulated index with computed result as `null`. + scl.computeIfAbsent(40, k -> null); + assertEquals(scl.size(), 8); + // Go back to the initial state... scl.clear(); assertEquals(scl.size(), 0);