Skip to content

Commit

Permalink
[changelog] Add message compaction as an opt in feature to changelog …
Browse files Browse the repository at this point in the history
…consumer (#1230)

* [changelog] Add feature to compact events from changelog client

This introduces a configuration which will compact down data returned from poll to only contain the latest records for a given key.  This also maintains the order of results returned.
  • Loading branch information
ZacAttack authored Nov 4, 2024
1 parent e4fac90 commit 7c11753
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ public class ChangelogClientConfig<T extends SpecificRecord> {
private String viewName;

private String consumerName = "";

private boolean compactMessages = false;
private ClientConfig<T> innerClientConfig;
private D2ControllerClient d2ControllerClient;

Expand Down Expand Up @@ -79,6 +81,11 @@ public ChangelogClientConfig<T> setConsumerName(String consumerName) {
return this;
}

public ChangelogClientConfig<T> setShouldCompactMessages(boolean compactMessages) {
this.compactMessages = compactMessages;
return this;
}

public String getViewName() {
return viewName;
}
Expand All @@ -87,6 +94,10 @@ public String getConsumerName() {
return consumerName;
}

public boolean shouldCompactMessages() {
return compactMessages;
}

public ChangelogClientConfig<T> setControllerD2ServiceName(String controllerD2ServiceName) {
this.controllerD2ServiceName = controllerD2ServiceName;
return this;
Expand Down Expand Up @@ -207,7 +218,8 @@ public static <V extends SpecificRecord> ChangelogClientConfig<V> cloneConfig(Ch
.setVersionSwapDetectionIntervalTimeInMs(config.getVersionSwapDetectionIntervalTimeInMs())
.setRocksDBBlockCacheSizeInBytes(config.getRocksDBBlockCacheSizeInBytes())
.setConsumerName(config.consumerName)
.setDatabaseSyncBytesInterval(config.getDatabaseSyncBytesInterval());
.setDatabaseSyncBytesInterval(config.getDatabaseSyncBytesInterval())
.setShouldCompactMessages(config.shouldCompactMessages());
return newConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -620,6 +621,21 @@ protected Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> i
if (changeCaptureStats != null) {
changeCaptureStats.recordRecordsConsumed(pubSubMessages.size());
}
if (changelogClientConfig.shouldCompactMessages()) {
Map<K, PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> tempMap = new LinkedHashMap<>();
// The behavior of LinkedHashMap is such that it maintains the order of insertion, but for values which are
// replaced,
// it's put in at the position of the first insertion. This isn't quite what we want, we want to keep only
// a single key (just as a map would), but we want to keep the position of the last insertion as well. So in order
// to do that, we remove the entry before inserting it.
for (PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate> message: pubSubMessages) {
if (tempMap.containsKey(message.getKey())) {
tempMap.remove(message.getKey());
}
tempMap.put(message.getKey(), message);
}
return tempMap.values();
}
return pubSubMessages;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public void testConsumeBeforeAndAfterImage() throws ExecutionException, Interrup
partition,
oldVersionTopic,
newVersionTopic,
false,
false);
ChangelogClientConfig changelogClientConfig =
getChangelogClientConfig(d2ControllerClient).setViewName("changeCaptureView");
Expand Down Expand Up @@ -197,7 +198,16 @@ public void testAfterImageConsumerSeek() throws ExecutionException, InterruptedE
VeniceChangelogConsumerImpl mockInternalSeekConsumer = Mockito.mock(VeniceChangelogConsumerImpl.class);
Mockito.when(mockInternalSeekConsumer.subscribe(any())).thenReturn(CompletableFuture.completedFuture(null));
Mockito.when(mockInternalSeekConsumer.getPubSubConsumer()).thenReturn(mockPubSubConsumer);
prepareChangeCaptureRecordsToBePolled(0L, 10L, mockPubSubConsumer, oldVersionTopic, 0, oldVersionTopic, null, true);
prepareChangeCaptureRecordsToBePolled(
0L,
10L,
mockPubSubConsumer,
oldVersionTopic,
0,
oldVersionTopic,
null,
true,
false);
VeniceAfterImageConsumerImpl<String, Utf8> veniceChangelogConsumer = new VeniceAfterImageConsumerImpl<>(
changelogClientConfig,
mockPubSubConsumer,
Expand Down Expand Up @@ -305,6 +315,7 @@ public void testConsumeAfterImage() throws ExecutionException, InterruptedExcept
0,
oldVersionTopic,
null,
false,
false);
pubSubMessages =
(List<PubSubMessage<String, ChangeEvent<Utf8>, VeniceChangeCoordinate>>) veniceChangelogConsumer.poll(100);
Expand All @@ -321,6 +332,82 @@ public void testConsumeAfterImage() throws ExecutionException, InterruptedExcept
verify(mockPubSubConsumer).close();
}

@Test
public void testConsumeAfterImageWithCompaction() throws ExecutionException, InterruptedException {
D2ControllerClient d2ControllerClient = mock(D2ControllerClient.class);
StoreResponse storeResponse = mock(StoreResponse.class);
StoreInfo storeInfo = mock(StoreInfo.class);
doReturn(1).when(storeInfo).getCurrentVersion();
doReturn(2).when(storeInfo).getPartitionCount();
doReturn(storeInfo).when(storeResponse).getStore();
doReturn(storeResponse).when(d2ControllerClient).getStore(storeName);
MultiSchemaResponse multiRMDSchemaResponse = mock(MultiSchemaResponse.class);
MultiSchemaResponse.Schema rmdSchemaFromMultiSchemaResponse = mock(MultiSchemaResponse.Schema.class);
doReturn(rmdSchema.toString()).when(rmdSchemaFromMultiSchemaResponse).getSchemaStr();
doReturn(new MultiSchemaResponse.Schema[] { rmdSchemaFromMultiSchemaResponse }).when(multiRMDSchemaResponse)
.getSchemas();
doReturn(multiRMDSchemaResponse).when(d2ControllerClient).getAllReplicationMetadataSchemas(storeName);

PubSubConsumerAdapter mockPubSubConsumer = mock(PubSubConsumerAdapter.class);
PubSubTopic oldVersionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic(storeName, 1));
PubSubTopic oldChangeCaptureTopic =
pubSubTopicRepository.getTopic(oldVersionTopic + ChangeCaptureView.CHANGE_CAPTURE_TOPIC_SUFFIX);

prepareVersionTopicRecordsToBePolled(0L, 5L, mockPubSubConsumer, oldVersionTopic, 0, true);
ChangelogClientConfig changelogClientConfig =
new ChangelogClientConfig<>().setD2ControllerClient(d2ControllerClient)
.setSchemaReader(schemaReader)
.setStoreName(storeName)
.setShouldCompactMessages(true)
.setViewName("");
changelogClientConfig.getInnerClientConfig().setMetricsRepository(new MetricsRepository());
VeniceChangelogConsumerImpl<String, Utf8> veniceChangelogConsumer =
new VeniceAfterImageConsumerImpl<>(changelogClientConfig, mockPubSubConsumer);
Assert.assertEquals(veniceChangelogConsumer.getPartitionCount(), 2);

ThinClientMetaStoreBasedRepository mockRepository = mock(ThinClientMetaStoreBasedRepository.class);
Store store = mock(Store.class);
Version mockVersion = new VersionImpl(storeName, 1, "foo");
Mockito.when(store.getCurrentVersion()).thenReturn(1);
Mockito.when(store.getCompressionStrategy()).thenReturn(CompressionStrategy.NO_OP);
Mockito.when(mockRepository.getStore(anyString())).thenReturn(store);
Mockito.when(mockRepository.getValueSchema(storeName, 1)).thenReturn(new SchemaEntry(1, valueSchema));
Mockito.when(store.getVersionOrThrow(Mockito.anyInt())).thenReturn(mockVersion);
veniceChangelogConsumer.setStoreRepository(mockRepository);
veniceChangelogConsumer.subscribe(new HashSet<>(Arrays.asList(0))).get();
verify(mockPubSubConsumer).subscribe(new PubSubTopicPartitionImpl(oldVersionTopic, 0), OffsetRecord.LOWEST_OFFSET);

List<PubSubMessage<String, ChangeEvent<Utf8>, VeniceChangeCoordinate>> pubSubMessages =
new ArrayList<>(veniceChangelogConsumer.poll(100));
for (int i = 0; i < 5; i++) {
PubSubMessage<String, ChangeEvent<Utf8>, VeniceChangeCoordinate> pubSubMessage = pubSubMessages.get(i);
Utf8 messageStr = pubSubMessage.getValue().getCurrentValue();
Assert.assertEquals(messageStr.toString(), "newValue" + i);
}
prepareChangeCaptureRecordsToBePolled(
0L,
10L,
mockPubSubConsumer,
oldChangeCaptureTopic,
0,
oldVersionTopic,
null,
false,
true);
pubSubMessages = new ArrayList<>(veniceChangelogConsumer.poll(100));
Assert.assertFalse(pubSubMessages.isEmpty());
Assert.assertEquals(pubSubMessages.size(), 10);
for (int i = 0; i < 10; i++) {
PubSubMessage<String, ChangeEvent<Utf8>, VeniceChangeCoordinate> pubSubMessage = pubSubMessages.get(i);
Utf8 pubSubMessageValue = pubSubMessage.getValue().getCurrentValue();
Assert.assertEquals(pubSubMessageValue.toString(), "newValue" + i);
}

veniceChangelogConsumer.close();
verify(mockPubSubConsumer, times(2)).batchUnsubscribe(any());
verify(mockPubSubConsumer).close();
}

@Test
public void testMetricReportingThread() throws InterruptedException {
D2ControllerClient d2ControllerClient = mock(D2ControllerClient.class);
Expand Down Expand Up @@ -370,7 +457,8 @@ private void prepareChangeCaptureRecordsToBePolled(
int partition,
PubSubTopic oldVersionTopic,
PubSubTopic newVersionTopic,
boolean addEndOfPushMessage) {
boolean addEndOfPushMessage,
boolean repeatMessages) {
List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> pubSubMessageList = new ArrayList<>();

// Add a start of push message
Expand All @@ -389,6 +477,19 @@ private void prepareChangeCaptureRecordsToBePolled(
pubSubMessageList.add(pubSubMessage);
}

if (repeatMessages) {
for (long i = startIdx; i < endIdx; i++) {
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage = constructChangeCaptureConsumerRecord(
changeCaptureTopic,
partition,
"oldValue" + i,
"newValue" + i,
"key" + i,
Arrays.asList(i, i));
pubSubMessageList.add(pubSubMessage);
}
}

if (addEndOfPushMessage) {
pubSubMessageList.add(constructEndOfPushMessage(changeCaptureTopic, partition, endIdx + 1));
}
Expand Down

0 comments on commit 7c11753

Please sign in to comment.