From 49a32c62096322203d6887e5a4249353b96535c2 Mon Sep 17 00:00:00 2001 From: Gaojie Liu Date: Tue, 22 Aug 2023 14:40:22 -0700 Subject: [PATCH] [vpj] Fixed KIF repush to handle chunk cleanup messages (#603) The behavior of the regression is as follows: 1. The chunk delete message will use `-10` as schema id. 2. When KIF encounters a delete message, it would ignore the schema id and treat the key as a top-level key and it would use non-chunk-suffix to decide the key portion from the composite key, which is wrong. 3. During shuffle and sort phase, if there is no collsion between the regular keys and the wrong key extracted from #2, no issue, and if there is a collision (which is rare), this entry will be removed from repush, which would result in data loss. This code change fixes the regression in the following way: 1. Use delete.schema_id to decide how to handle delete key. 2. Skip the chunk cleanup message when assembling large message in repush. There is an existing E2E test for this scenario: PartialUpdateTest#testReplicationMetadataChunkingE2E And the reason it didn't fail is that the exact condition as described above didn't trigger. --- .../input/kafka/KafkaInputRecordReader.java | 7 ++- .../input/kafka/VeniceKafkaInputReducer.java | 2 +- .../input/kafka/chunk/ChunkAssembler.java | 4 ++ .../input/kafka/chunk/TestChunkAssembler.java | 54 ++++++++++++------- .../linkedin/venice/writer/VeniceWriter.java | 4 +- .../venice/endToEnd/PartialUpdateTest.java | 5 +- 6 files changed, 51 insertions(+), 25 deletions(-) diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputRecordReader.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputRecordReader.java index 207693d7cb..5d56a68612 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputRecordReader.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputRecordReader.java @@ -244,7 +244,12 @@ private int getSchemaIdFromValue(KafkaMessageEnvelope kafkaMessageEnvelope) thro Put put = (Put) kafkaMessageEnvelope.payloadUnion; return put.schemaId; case DELETE: - return -1; + /** + * The chunk cleanup message will use schema id: {@link com.linkedin.venice.serialization.avro.AvroProtocolDefinition#CHUNK}, + * so we will extract the schema id from the payload, instead of -1. + */ + Delete delete = (Delete) kafkaMessageEnvelope.payloadUnion; + return delete.schemaId; default: throw new IOException("Unexpected '" + messageType + "' message from Kafka topic partition: " + topicPartition); } diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/VeniceKafkaInputReducer.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/VeniceKafkaInputReducer.java index 8dafd5d8c2..978080d9e3 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/VeniceKafkaInputReducer.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/VeniceKafkaInputReducer.java @@ -130,7 +130,7 @@ protected DuplicateKeyPrinter initDuplicateKeyPrinter(JobConf job) { protected VeniceWriterMessage extract(BytesWritable key, Iterator valueIterator, Reporter reporter) { KafkaInputMapperKey mapperKey = KAFKA_INPUT_MAPPER_KEY_AVRO_SPECIFIC_DESERIALIZER .deserialize(ByteBuffer.wrap(key.getBytes(), 0, key.getLength())); - byte[] keyBytes = mapperKey.key.array(); + byte[] keyBytes = ByteUtils.extractByteArray(mapperKey.key); if (!valueIterator.hasNext()) { throw new VeniceException("There is no value corresponding to key bytes: " + ByteUtils.toHexString(keyBytes)); } diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/chunk/ChunkAssembler.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/chunk/ChunkAssembler.java index af6d8ee49f..a0679696e1 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/chunk/ChunkAssembler.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/chunk/ChunkAssembler.java @@ -86,6 +86,10 @@ public ValueBytesAndSchemaId assembleAndGetValue(final byte[] keyBytes, final It // Ignore older entries since a more recent manifest is discovered. continue; } + if (reusedMapperValue.schemaId == AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion()) { + // Ignore all the chunk cleanup messages. + continue; + } /** * The latest event is a delete event. */ diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/chunk/TestChunkAssembler.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/chunk/TestChunkAssembler.java index a94492528e..5bb154827b 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/chunk/TestChunkAssembler.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/chunk/TestChunkAssembler.java @@ -16,6 +16,7 @@ import com.linkedin.venice.storage.protocol.ChunkId; import com.linkedin.venice.storage.protocol.ChunkedKeySuffix; import com.linkedin.venice.storage.protocol.ChunkedValueManifest; +import com.linkedin.venice.writer.VeniceWriter; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; @@ -120,7 +121,7 @@ public void testNoCompleteLargeValueWithMissingManifest() { messageSequenceNumber, VALUE_SCHEMA_ID, 0); - values.remove(0); // Remove the first value which should be a manifest + values.remove(1); // Remove the second value which should be a manifest ChunkAssembler.ValueBytesAndSchemaId assembledValue = chunkAssembler.assembleAndGetValue(serializedKey, values.iterator()); @@ -145,7 +146,7 @@ public void testNoCompleteLargeValueWithMissingChunk() { messageSequenceNumber, VALUE_SCHEMA_ID, 0); - int indexOfMissingChunk = ThreadLocalRandom.current().nextInt(values.size() - 2) + 1; + int indexOfMissingChunk = ThreadLocalRandom.current().nextInt(values.size() - 3) + 2; values.remove(indexOfMissingChunk); // Remove a chunk chunkAssembler.assembleAndGetValue(serializedKey, values.iterator()); } @@ -188,7 +189,7 @@ public void testOneCompleteLargeValueAndOneIncompleteLargeValue() { VALUE_SCHEMA_ID_2, totalChunkCount1 + 1); - values2.remove(0); // Remove the manifest from the second sequence + values2.remove(1); // Remove the manifest from the second sequence List allValues = new ArrayList<>(); allValues.addAll(values2); allValues.addAll(values1); @@ -240,9 +241,9 @@ public void testOneCompleteLargeValueAndOneIncompleteLargeValueCase2() { chunkId2.segmentNumber, chunkId2.messageSequenceNumber, VALUE_SCHEMA_ID_2, - totalChunkCount1); + values1.size()); - int indexOfMissingChunk = ThreadLocalRandom.current().nextInt(values2.size() - 2) + 1; + int indexOfMissingChunk = ThreadLocalRandom.current().nextInt(values2.size() - 3) + 2; values2.remove(indexOfMissingChunk); // Remove a chunk from the second sequence List allValues = new ArrayList<>(); allValues.addAll(values2); @@ -469,7 +470,7 @@ public void testRegularValueAtTheEndWithIncompleteLargeValue() { VALUE_SCHEMA_ID, 0); // Randomly remove a value to simulate the incomplete large value - values.remove(ThreadLocalRandom.current().nextInt(values.size() - 2) + 1); + values.remove(ThreadLocalRandom.current().nextInt(values.size() - 3) + 2); byte[] regularValueBytes = createChunkBytes(100, 23); values.add(0, createRegularValue(regularValueBytes, VALUE_SCHEMA_ID_2, totalChunkCount + 1, MapperValueType.PUT)); @@ -581,7 +582,7 @@ public void testDeleteValueAtTheEndWithIncompleteLargeValue() { VALUE_SCHEMA_ID, 0); // Randomly remove a value to simulate the incomplete large value - values.remove(ThreadLocalRandom.current().nextInt(values.size() - 2) + 1); + values.remove(ThreadLocalRandom.current().nextInt(values.size() - 3) + 2); // "Delete value" at the end values.add(0, createRegularValue(new byte[0], -1, totalChunkCount + 1, MapperValueType.DELETE)); @@ -635,7 +636,7 @@ public void testRegularValueAndIncompleteLargeValue() { // regular value wins VALUE_SCHEMA_ID, 1)); // Randomly remove a value chunk to simulate the incomplete large value - values.remove(ThreadLocalRandom.current().nextInt(values.size() - 2) + 1); + values.remove(ThreadLocalRandom.current().nextInt(values.size() - 3) + 2); byte[] regularValueBytes = createChunkBytes(100, 23); values.add(createRegularValue(regularValueBytes, VALUE_SCHEMA_ID_2, 0, MapperValueType.PUT)); @@ -731,7 +732,7 @@ public void testDeleteValueAndIncompleteLargeValue() { VALUE_SCHEMA_ID, 1)); // Randomly remove a chunk value to simulate the incomplete large value - values.remove(ThreadLocalRandom.current().nextInt(values.size() - 2) + 1); + values.remove(ThreadLocalRandom.current().nextInt(values.size() - 3) + 2); values.add(createRegularValue(new byte[0], -1, 0, MapperValueType.DELETE)); ChunkAssembler.ValueBytesAndSchemaId assembledValue = @@ -755,6 +756,7 @@ private BytesWritable createRegularValue(byte[] valueBytes, int schemaId, int of /** * Create a sequence of chunks and a manifest. E.g. chunk_0, chunk_1, ..., chunk_N, chunk_manifest + * This function will also add a chunk cleanup message after manifest. */ private List createKafkaInputMapperValues( byte[] serializedKey, @@ -824,19 +826,35 @@ private List createKafkaInputMapperValues( chunkedRmdManifest.schemaId = valueSchemaID; chunkedRmdManifest.size = rmdChunkInfo.totalChunkCount * rmdChunkInfo.eachCountSizeInBytes; } - KafkaInputMapperValue lastMapperValue = new KafkaInputMapperValue(); - lastMapperValue.valueType = MapperValueType.PUT; - lastMapperValue.offset = currOffset; - lastMapperValue.schemaId = CHUNK_MANIFEST_SCHEMA_ID; - lastMapperValue.value = ByteBuffer.wrap(CHUNKED_VALUE_MANIFEST_SERIALIZER.serialize("", chunkedValueManifest)); - lastMapperValue.chunkedKeySuffix = ByteBuffer + KafkaInputMapperValue mapperValueForManifest = new KafkaInputMapperValue(); + mapperValueForManifest.valueType = MapperValueType.PUT; + mapperValueForManifest.offset = currOffset++; + mapperValueForManifest.schemaId = CHUNK_MANIFEST_SCHEMA_ID; + mapperValueForManifest.value = + ByteBuffer.wrap(CHUNKED_VALUE_MANIFEST_SERIALIZER.serialize("", chunkedValueManifest)); + mapperValueForManifest.chunkedKeySuffix = ByteBuffer .wrap(CHUNKED_KEY_SUFFIX_SERIALIZER.serialize("", KeyWithChunkingSuffixSerializer.NON_CHUNK_KEY_SUFFIX)); - lastMapperValue.replicationMetadataPayload = (rmdChunkInfo == null) + mapperValueForManifest.replicationMetadataPayload = (rmdChunkInfo == null) ? ByteBuffer.wrap(new byte[0]) : ByteBuffer.wrap(CHUNKED_VALUE_MANIFEST_SERIALIZER.serialize("", chunkedRmdManifest)); - lastMapperValue.replicationMetadataVersionId = 1; + mapperValueForManifest.replicationMetadataVersionId = 1; + values.add(serialize(mapperValueForManifest)); + + // Add one chunk cleanup message + KafkaInputMapperValue valueChunkCleanupMessage = new KafkaInputMapperValue(); + valueChunkCleanupMessage.valueType = MapperValueType.DELETE; + valueChunkCleanupMessage.offset = currOffset++; + valueChunkCleanupMessage.schemaId = CHUNK_VALUE_SCHEMA_ID; + valueChunkCleanupMessage.value = VeniceWriter.EMPTY_BYTE_BUFFER; + + ChunkedKeySuffix chunkedKeySuffix = + createChunkedKeySuffix(segmentNumber, sequenceNumber, valueChunkInfo.totalChunkCount * 2); + valueChunkCleanupMessage.chunkedKeySuffix = + ByteBuffer.wrap(CHUNKED_KEY_SUFFIX_SERIALIZER.serialize("", chunkedKeySuffix)); + valueChunkCleanupMessage.replicationMetadataVersionId = VeniceWriter.VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_ID; + valueChunkCleanupMessage.replicationMetadataPayload = VeniceWriter.EMPTY_BYTE_BUFFER; + values.add(serialize(valueChunkCleanupMessage)); - values.add(serialize(lastMapperValue)); // The offset of the messages will be in descending order. Collections.reverse(values); return values; diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java index 83cd31806e..f76e333f75 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java @@ -190,9 +190,9 @@ public class VeniceWriter extends AbstractVeniceWriter { */ public static final int VENICE_DEFAULT_VALUE_SCHEMA_ID = -1; - private static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; + public static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; - private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.wrap(EMPTY_BYTE_ARRAY); + public static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.wrap(EMPTY_BYTE_ARRAY); public static final LeaderMetadataWrapper DEFAULT_LEADER_METADATA_WRAPPER = new LeaderMetadataWrapper(DEFAULT_UPSTREAM_OFFSET, DEFAULT_UPSTREAM_KAFKA_CLUSTER_ID); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java index 0a9813b69e..abd7399e1e 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java @@ -475,8 +475,8 @@ public void testNonAAPartialUpdateChunkDeletion() throws IOException { * This integration test performs a few actions to test RMD chunking logic: * (1) Send a bunch of large UPDATE messages to make sure eventually the key's value + RMD size greater than 1MB and * thus trigger chunking / RMD chunking. - * (2) Run a KIF repush to make sure it handles RMD chunks correctly. - * (3) Send a DELETE message to partially delete some of the items in the map field. + * (2) Run a KIF re-push to make sure it handles RMD chunks correctly. + * (3) Send a DELETE message to partially delete some items in the map field. * (4) Send a DELETE message to fully delete the record. */ @Test(timeOut = TEST_TIMEOUT_MS * 4) @@ -651,7 +651,6 @@ public void testReplicationMetadataChunkingE2E() throws IOException { throw new VeniceException(e); } }); - // Validate RMD bytes after PUT requests. String kafkaTopic_v2 = Version.composeKafkaTopic(storeName, 2); validateRmdData(rmdSerDe, kafkaTopic_v2, key, rmdWithValueSchemaId -> {