diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ChangelogClientConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ChangelogClientConfig.java index ca2b86c701..8a1d54feb0 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ChangelogClientConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ChangelogClientConfig.java @@ -14,6 +14,8 @@ public class ChangelogClientConfig { private String viewName; private String consumerName = ""; + + private boolean compactMessages = false; private ClientConfig innerClientConfig; private D2ControllerClient d2ControllerClient; @@ -79,6 +81,11 @@ public ChangelogClientConfig setConsumerName(String consumerName) { return this; } + public ChangelogClientConfig setShouldCompactMessages(boolean compactMessages) { + this.compactMessages = compactMessages; + return this; + } + public String getViewName() { return viewName; } @@ -87,6 +94,10 @@ public String getConsumerName() { return consumerName; } + public boolean shouldCompactMessages() { + return compactMessages; + } + public ChangelogClientConfig setControllerD2ServiceName(String controllerD2ServiceName) { this.controllerD2ServiceName = controllerD2ServiceName; return this; @@ -207,7 +218,8 @@ public static ChangelogClientConfig cloneConfig(Ch .setVersionSwapDetectionIntervalTimeInMs(config.getVersionSwapDetectionIntervalTimeInMs()) .setRocksDBBlockCacheSizeInBytes(config.getRocksDBBlockCacheSizeInBytes()) .setConsumerName(config.consumerName) - .setDatabaseSyncBytesInterval(config.getDatabaseSyncBytesInterval()); + .setDatabaseSyncBytesInterval(config.getDatabaseSyncBytesInterval()) + .setShouldCompactMessages(config.shouldCompactMessages()); return newConfig; } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java index d464cdfbfe..79b8c404ba 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java @@ -61,6 +61,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -620,6 +621,21 @@ protected Collection, VeniceChangeCoordinate>> i if (changeCaptureStats != null) { changeCaptureStats.recordRecordsConsumed(pubSubMessages.size()); } + if (changelogClientConfig.shouldCompactMessages()) { + Map, VeniceChangeCoordinate>> tempMap = new LinkedHashMap<>(); + // The behavior of LinkedHashMap is such that it maintains the order of insertion, but for values which are + // replaced, + // it's put in at the position of the first insertion. This isn't quite what we want, we want to keep only + // a single key (just as a map would), but we want to keep the position of the last insertion as well. So in order + // to do that, we remove the entry before inserting it. + for (PubSubMessage, VeniceChangeCoordinate> message: pubSubMessages) { + if (tempMap.containsKey(message.getKey())) { + tempMap.remove(message.getKey()); + } + tempMap.put(message.getKey(), message); + } + return tempMap.values(); + } return pubSubMessages; } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java index c9db72213d..9be396e7c2 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java @@ -124,6 +124,7 @@ public void testConsumeBeforeAndAfterImage() throws ExecutionException, Interrup partition, oldVersionTopic, newVersionTopic, + false, false); ChangelogClientConfig changelogClientConfig = getChangelogClientConfig(d2ControllerClient).setViewName("changeCaptureView"); @@ -197,7 +198,16 @@ public void testAfterImageConsumerSeek() throws ExecutionException, InterruptedE VeniceChangelogConsumerImpl mockInternalSeekConsumer = Mockito.mock(VeniceChangelogConsumerImpl.class); Mockito.when(mockInternalSeekConsumer.subscribe(any())).thenReturn(CompletableFuture.completedFuture(null)); Mockito.when(mockInternalSeekConsumer.getPubSubConsumer()).thenReturn(mockPubSubConsumer); - prepareChangeCaptureRecordsToBePolled(0L, 10L, mockPubSubConsumer, oldVersionTopic, 0, oldVersionTopic, null, true); + prepareChangeCaptureRecordsToBePolled( + 0L, + 10L, + mockPubSubConsumer, + oldVersionTopic, + 0, + oldVersionTopic, + null, + true, + false); VeniceAfterImageConsumerImpl veniceChangelogConsumer = new VeniceAfterImageConsumerImpl<>( changelogClientConfig, mockPubSubConsumer, @@ -305,6 +315,7 @@ public void testConsumeAfterImage() throws ExecutionException, InterruptedExcept 0, oldVersionTopic, null, + false, false); pubSubMessages = (List, VeniceChangeCoordinate>>) veniceChangelogConsumer.poll(100); @@ -321,6 +332,82 @@ public void testConsumeAfterImage() throws ExecutionException, InterruptedExcept verify(mockPubSubConsumer).close(); } + @Test + public void testConsumeAfterImageWithCompaction() throws ExecutionException, InterruptedException { + D2ControllerClient d2ControllerClient = mock(D2ControllerClient.class); + StoreResponse storeResponse = mock(StoreResponse.class); + StoreInfo storeInfo = mock(StoreInfo.class); + doReturn(1).when(storeInfo).getCurrentVersion(); + doReturn(2).when(storeInfo).getPartitionCount(); + doReturn(storeInfo).when(storeResponse).getStore(); + doReturn(storeResponse).when(d2ControllerClient).getStore(storeName); + MultiSchemaResponse multiRMDSchemaResponse = mock(MultiSchemaResponse.class); + MultiSchemaResponse.Schema rmdSchemaFromMultiSchemaResponse = mock(MultiSchemaResponse.Schema.class); + doReturn(rmdSchema.toString()).when(rmdSchemaFromMultiSchemaResponse).getSchemaStr(); + doReturn(new MultiSchemaResponse.Schema[] { rmdSchemaFromMultiSchemaResponse }).when(multiRMDSchemaResponse) + .getSchemas(); + doReturn(multiRMDSchemaResponse).when(d2ControllerClient).getAllReplicationMetadataSchemas(storeName); + + PubSubConsumerAdapter mockPubSubConsumer = mock(PubSubConsumerAdapter.class); + PubSubTopic oldVersionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic(storeName, 1)); + PubSubTopic oldChangeCaptureTopic = + pubSubTopicRepository.getTopic(oldVersionTopic + ChangeCaptureView.CHANGE_CAPTURE_TOPIC_SUFFIX); + + prepareVersionTopicRecordsToBePolled(0L, 5L, mockPubSubConsumer, oldVersionTopic, 0, true); + ChangelogClientConfig changelogClientConfig = + new ChangelogClientConfig<>().setD2ControllerClient(d2ControllerClient) + .setSchemaReader(schemaReader) + .setStoreName(storeName) + .setShouldCompactMessages(true) + .setViewName(""); + changelogClientConfig.getInnerClientConfig().setMetricsRepository(new MetricsRepository()); + VeniceChangelogConsumerImpl veniceChangelogConsumer = + new VeniceAfterImageConsumerImpl<>(changelogClientConfig, mockPubSubConsumer); + Assert.assertEquals(veniceChangelogConsumer.getPartitionCount(), 2); + + ThinClientMetaStoreBasedRepository mockRepository = mock(ThinClientMetaStoreBasedRepository.class); + Store store = mock(Store.class); + Version mockVersion = new VersionImpl(storeName, 1, "foo"); + Mockito.when(store.getCurrentVersion()).thenReturn(1); + Mockito.when(store.getCompressionStrategy()).thenReturn(CompressionStrategy.NO_OP); + Mockito.when(mockRepository.getStore(anyString())).thenReturn(store); + Mockito.when(mockRepository.getValueSchema(storeName, 1)).thenReturn(new SchemaEntry(1, valueSchema)); + Mockito.when(store.getVersionOrThrow(Mockito.anyInt())).thenReturn(mockVersion); + veniceChangelogConsumer.setStoreRepository(mockRepository); + veniceChangelogConsumer.subscribe(new HashSet<>(Arrays.asList(0))).get(); + verify(mockPubSubConsumer).subscribe(new PubSubTopicPartitionImpl(oldVersionTopic, 0), OffsetRecord.LOWEST_OFFSET); + + List, VeniceChangeCoordinate>> pubSubMessages = + new ArrayList<>(veniceChangelogConsumer.poll(100)); + for (int i = 0; i < 5; i++) { + PubSubMessage, VeniceChangeCoordinate> pubSubMessage = pubSubMessages.get(i); + Utf8 messageStr = pubSubMessage.getValue().getCurrentValue(); + Assert.assertEquals(messageStr.toString(), "newValue" + i); + } + prepareChangeCaptureRecordsToBePolled( + 0L, + 10L, + mockPubSubConsumer, + oldChangeCaptureTopic, + 0, + oldVersionTopic, + null, + false, + true); + pubSubMessages = new ArrayList<>(veniceChangelogConsumer.poll(100)); + Assert.assertFalse(pubSubMessages.isEmpty()); + Assert.assertEquals(pubSubMessages.size(), 10); + for (int i = 0; i < 10; i++) { + PubSubMessage, VeniceChangeCoordinate> pubSubMessage = pubSubMessages.get(i); + Utf8 pubSubMessageValue = pubSubMessage.getValue().getCurrentValue(); + Assert.assertEquals(pubSubMessageValue.toString(), "newValue" + i); + } + + veniceChangelogConsumer.close(); + verify(mockPubSubConsumer, times(2)).batchUnsubscribe(any()); + verify(mockPubSubConsumer).close(); + } + @Test public void testMetricReportingThread() throws InterruptedException { D2ControllerClient d2ControllerClient = mock(D2ControllerClient.class); @@ -370,7 +457,8 @@ private void prepareChangeCaptureRecordsToBePolled( int partition, PubSubTopic oldVersionTopic, PubSubTopic newVersionTopic, - boolean addEndOfPushMessage) { + boolean addEndOfPushMessage, + boolean repeatMessages) { List> pubSubMessageList = new ArrayList<>(); // Add a start of push message @@ -389,6 +477,19 @@ private void prepareChangeCaptureRecordsToBePolled( pubSubMessageList.add(pubSubMessage); } + if (repeatMessages) { + for (long i = startIdx; i < endIdx; i++) { + PubSubMessage pubSubMessage = constructChangeCaptureConsumerRecord( + changeCaptureTopic, + partition, + "oldValue" + i, + "newValue" + i, + "key" + i, + Arrays.asList(i, i)); + pubSubMessageList.add(pubSubMessage); + } + } + if (addEndOfPushMessage) { pubSubMessageList.add(constructEndOfPushMessage(changeCaptureTopic, partition, endIdx + 1)); }