diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputRecordReader.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputRecordReader.java index 207693d7cb..5d56a68612 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputRecordReader.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputRecordReader.java @@ -244,7 +244,12 @@ private int getSchemaIdFromValue(KafkaMessageEnvelope kafkaMessageEnvelope) thro Put put = (Put) kafkaMessageEnvelope.payloadUnion; return put.schemaId; case DELETE: - return -1; + /** + * The chunk cleanup message will use schema id: {@link com.linkedin.venice.serialization.avro.AvroProtocolDefinition#CHUNK}, + * so we will extract the schema id from the payload, instead of -1. + */ + Delete delete = (Delete) kafkaMessageEnvelope.payloadUnion; + return delete.schemaId; default: throw new IOException("Unexpected '" + messageType + "' message from Kafka topic partition: " + topicPartition); } diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/VeniceKafkaInputReducer.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/VeniceKafkaInputReducer.java index 8dafd5d8c2..978080d9e3 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/VeniceKafkaInputReducer.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/VeniceKafkaInputReducer.java @@ -130,7 +130,7 @@ protected DuplicateKeyPrinter initDuplicateKeyPrinter(JobConf job) { protected VeniceWriterMessage extract(BytesWritable key, Iterator valueIterator, Reporter reporter) { KafkaInputMapperKey mapperKey = KAFKA_INPUT_MAPPER_KEY_AVRO_SPECIFIC_DESERIALIZER .deserialize(ByteBuffer.wrap(key.getBytes(), 0, key.getLength())); - byte[] keyBytes = mapperKey.key.array(); + byte[] keyBytes = ByteUtils.extractByteArray(mapperKey.key); if (!valueIterator.hasNext()) { throw new VeniceException("There is no value corresponding to key bytes: " + ByteUtils.toHexString(keyBytes)); } diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/chunk/ChunkAssembler.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/chunk/ChunkAssembler.java index af6d8ee49f..a0679696e1 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/chunk/ChunkAssembler.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/chunk/ChunkAssembler.java @@ -86,6 +86,10 @@ public ValueBytesAndSchemaId assembleAndGetValue(final byte[] keyBytes, final It // Ignore older entries since a more recent manifest is discovered. continue; } + if (reusedMapperValue.schemaId == AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion()) { + // Ignore all the chunk cleanup messages. + continue; + } /** * The latest event is a delete event. */ diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/chunk/TestChunkAssembler.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/chunk/TestChunkAssembler.java index a94492528e..5bb154827b 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/chunk/TestChunkAssembler.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/chunk/TestChunkAssembler.java @@ -16,6 +16,7 @@ import com.linkedin.venice.storage.protocol.ChunkId; import com.linkedin.venice.storage.protocol.ChunkedKeySuffix; import com.linkedin.venice.storage.protocol.ChunkedValueManifest; +import com.linkedin.venice.writer.VeniceWriter; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; @@ -120,7 +121,7 @@ public void testNoCompleteLargeValueWithMissingManifest() { messageSequenceNumber, VALUE_SCHEMA_ID, 0); - values.remove(0); // Remove the first value which should be a manifest + values.remove(1); // Remove the second value which should be a manifest ChunkAssembler.ValueBytesAndSchemaId assembledValue = chunkAssembler.assembleAndGetValue(serializedKey, values.iterator()); @@ -145,7 +146,7 @@ public void testNoCompleteLargeValueWithMissingChunk() { messageSequenceNumber, VALUE_SCHEMA_ID, 0); - int indexOfMissingChunk = ThreadLocalRandom.current().nextInt(values.size() - 2) + 1; + int indexOfMissingChunk = ThreadLocalRandom.current().nextInt(values.size() - 3) + 2; values.remove(indexOfMissingChunk); // Remove a chunk chunkAssembler.assembleAndGetValue(serializedKey, values.iterator()); } @@ -188,7 +189,7 @@ public void testOneCompleteLargeValueAndOneIncompleteLargeValue() { VALUE_SCHEMA_ID_2, totalChunkCount1 + 1); - values2.remove(0); // Remove the manifest from the second sequence + values2.remove(1); // Remove the manifest from the second sequence List allValues = new ArrayList<>(); allValues.addAll(values2); allValues.addAll(values1); @@ -240,9 +241,9 @@ public void testOneCompleteLargeValueAndOneIncompleteLargeValueCase2() { chunkId2.segmentNumber, chunkId2.messageSequenceNumber, VALUE_SCHEMA_ID_2, - totalChunkCount1); + values1.size()); - int indexOfMissingChunk = ThreadLocalRandom.current().nextInt(values2.size() - 2) + 1; + int indexOfMissingChunk = ThreadLocalRandom.current().nextInt(values2.size() - 3) + 2; values2.remove(indexOfMissingChunk); // Remove a chunk from the second sequence List allValues = new ArrayList<>(); allValues.addAll(values2); @@ -469,7 +470,7 @@ public void testRegularValueAtTheEndWithIncompleteLargeValue() { VALUE_SCHEMA_ID, 0); // Randomly remove a value to simulate the incomplete large value - values.remove(ThreadLocalRandom.current().nextInt(values.size() - 2) + 1); + values.remove(ThreadLocalRandom.current().nextInt(values.size() - 3) + 2); byte[] regularValueBytes = createChunkBytes(100, 23); values.add(0, createRegularValue(regularValueBytes, VALUE_SCHEMA_ID_2, totalChunkCount + 1, MapperValueType.PUT)); @@ -581,7 +582,7 @@ public void testDeleteValueAtTheEndWithIncompleteLargeValue() { VALUE_SCHEMA_ID, 0); // Randomly remove a value to simulate the incomplete large value - values.remove(ThreadLocalRandom.current().nextInt(values.size() - 2) + 1); + values.remove(ThreadLocalRandom.current().nextInt(values.size() - 3) + 2); // "Delete value" at the end values.add(0, createRegularValue(new byte[0], -1, totalChunkCount + 1, MapperValueType.DELETE)); @@ -635,7 +636,7 @@ public void testRegularValueAndIncompleteLargeValue() { // regular value wins VALUE_SCHEMA_ID, 1)); // Randomly remove a value chunk to simulate the incomplete large value - values.remove(ThreadLocalRandom.current().nextInt(values.size() - 2) + 1); + values.remove(ThreadLocalRandom.current().nextInt(values.size() - 3) + 2); byte[] regularValueBytes = createChunkBytes(100, 23); values.add(createRegularValue(regularValueBytes, VALUE_SCHEMA_ID_2, 0, MapperValueType.PUT)); @@ -731,7 +732,7 @@ public void testDeleteValueAndIncompleteLargeValue() { VALUE_SCHEMA_ID, 1)); // Randomly remove a chunk value to simulate the incomplete large value - values.remove(ThreadLocalRandom.current().nextInt(values.size() - 2) + 1); + values.remove(ThreadLocalRandom.current().nextInt(values.size() - 3) + 2); values.add(createRegularValue(new byte[0], -1, 0, MapperValueType.DELETE)); ChunkAssembler.ValueBytesAndSchemaId assembledValue = @@ -755,6 +756,7 @@ private BytesWritable createRegularValue(byte[] valueBytes, int schemaId, int of /** * Create a sequence of chunks and a manifest. E.g. chunk_0, chunk_1, ..., chunk_N, chunk_manifest + * This function will also add a chunk cleanup message after manifest. */ private List createKafkaInputMapperValues( byte[] serializedKey, @@ -824,19 +826,35 @@ private List createKafkaInputMapperValues( chunkedRmdManifest.schemaId = valueSchemaID; chunkedRmdManifest.size = rmdChunkInfo.totalChunkCount * rmdChunkInfo.eachCountSizeInBytes; } - KafkaInputMapperValue lastMapperValue = new KafkaInputMapperValue(); - lastMapperValue.valueType = MapperValueType.PUT; - lastMapperValue.offset = currOffset; - lastMapperValue.schemaId = CHUNK_MANIFEST_SCHEMA_ID; - lastMapperValue.value = ByteBuffer.wrap(CHUNKED_VALUE_MANIFEST_SERIALIZER.serialize("", chunkedValueManifest)); - lastMapperValue.chunkedKeySuffix = ByteBuffer + KafkaInputMapperValue mapperValueForManifest = new KafkaInputMapperValue(); + mapperValueForManifest.valueType = MapperValueType.PUT; + mapperValueForManifest.offset = currOffset++; + mapperValueForManifest.schemaId = CHUNK_MANIFEST_SCHEMA_ID; + mapperValueForManifest.value = + ByteBuffer.wrap(CHUNKED_VALUE_MANIFEST_SERIALIZER.serialize("", chunkedValueManifest)); + mapperValueForManifest.chunkedKeySuffix = ByteBuffer .wrap(CHUNKED_KEY_SUFFIX_SERIALIZER.serialize("", KeyWithChunkingSuffixSerializer.NON_CHUNK_KEY_SUFFIX)); - lastMapperValue.replicationMetadataPayload = (rmdChunkInfo == null) + mapperValueForManifest.replicationMetadataPayload = (rmdChunkInfo == null) ? ByteBuffer.wrap(new byte[0]) : ByteBuffer.wrap(CHUNKED_VALUE_MANIFEST_SERIALIZER.serialize("", chunkedRmdManifest)); - lastMapperValue.replicationMetadataVersionId = 1; + mapperValueForManifest.replicationMetadataVersionId = 1; + values.add(serialize(mapperValueForManifest)); + + // Add one chunk cleanup message + KafkaInputMapperValue valueChunkCleanupMessage = new KafkaInputMapperValue(); + valueChunkCleanupMessage.valueType = MapperValueType.DELETE; + valueChunkCleanupMessage.offset = currOffset++; + valueChunkCleanupMessage.schemaId = CHUNK_VALUE_SCHEMA_ID; + valueChunkCleanupMessage.value = VeniceWriter.EMPTY_BYTE_BUFFER; + + ChunkedKeySuffix chunkedKeySuffix = + createChunkedKeySuffix(segmentNumber, sequenceNumber, valueChunkInfo.totalChunkCount * 2); + valueChunkCleanupMessage.chunkedKeySuffix = + ByteBuffer.wrap(CHUNKED_KEY_SUFFIX_SERIALIZER.serialize("", chunkedKeySuffix)); + valueChunkCleanupMessage.replicationMetadataVersionId = VeniceWriter.VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_ID; + valueChunkCleanupMessage.replicationMetadataPayload = VeniceWriter.EMPTY_BYTE_BUFFER; + values.add(serialize(valueChunkCleanupMessage)); - values.add(serialize(lastMapperValue)); // The offset of the messages will be in descending order. Collections.reverse(values); return values; diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java index 83cd31806e..f76e333f75 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java @@ -190,9 +190,9 @@ public class VeniceWriter extends AbstractVeniceWriter { */ public static final int VENICE_DEFAULT_VALUE_SCHEMA_ID = -1; - private static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; + public static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; - private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.wrap(EMPTY_BYTE_ARRAY); + public static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.wrap(EMPTY_BYTE_ARRAY); public static final LeaderMetadataWrapper DEFAULT_LEADER_METADATA_WRAPPER = new LeaderMetadataWrapper(DEFAULT_UPSTREAM_OFFSET, DEFAULT_UPSTREAM_KAFKA_CLUSTER_ID); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java index 0a9813b69e..abd7399e1e 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java @@ -475,8 +475,8 @@ public void testNonAAPartialUpdateChunkDeletion() throws IOException { * This integration test performs a few actions to test RMD chunking logic: * (1) Send a bunch of large UPDATE messages to make sure eventually the key's value + RMD size greater than 1MB and * thus trigger chunking / RMD chunking. - * (2) Run a KIF repush to make sure it handles RMD chunks correctly. - * (3) Send a DELETE message to partially delete some of the items in the map field. + * (2) Run a KIF re-push to make sure it handles RMD chunks correctly. + * (3) Send a DELETE message to partially delete some items in the map field. * (4) Send a DELETE message to fully delete the record. */ @Test(timeOut = TEST_TIMEOUT_MS * 4) @@ -651,7 +651,6 @@ public void testReplicationMetadataChunkingE2E() throws IOException { throw new VeniceException(e); } }); - // Validate RMD bytes after PUT requests. String kafkaTopic_v2 = Version.composeKafkaTopic(storeName, 2); validateRmdData(rmdSerDe, kafkaTopic_v2, key, rmdWithValueSchemaId -> {