Skip to content

Commit

Permalink
[vpj] Fixed KIF repush to handle chunk cleanup messages (#603)
Browse files Browse the repository at this point in the history
The behavior of the regression is as follows:
1. The chunk delete message will use `-10` as schema id.
2. When KIF encounters a delete message, it would ignore the schema
   id and treat the key as a top-level key and it would use non-chunk-suffix
   to decide the key portion from the composite key, which is wrong.
3. During shuffle and sort phase, if there is no collsion between the regular keys
   and the wrong key extracted from #2, no issue, and if there is a collision (which
   is rare), this entry will be removed from repush, which would result in data loss.

This code change fixes the regression in the following way:
1. Use delete.schema_id to decide how to handle delete key.
2. Skip the chunk cleanup message when assembling large message in repush.
There is an existing E2E test for this scenario:
PartialUpdateTest#testReplicationMetadataChunkingE2E
And the reason it didn't fail is that the exact condition as described above
didn't trigger.
  • Loading branch information
gaojieliu authored Aug 22, 2023
1 parent 854b014 commit 49a32c6
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,12 @@ private int getSchemaIdFromValue(KafkaMessageEnvelope kafkaMessageEnvelope) thro
Put put = (Put) kafkaMessageEnvelope.payloadUnion;
return put.schemaId;
case DELETE:
return -1;
/**
* The chunk cleanup message will use schema id: {@link com.linkedin.venice.serialization.avro.AvroProtocolDefinition#CHUNK},
* so we will extract the schema id from the payload, instead of -1.
*/
Delete delete = (Delete) kafkaMessageEnvelope.payloadUnion;
return delete.schemaId;
default:
throw new IOException("Unexpected '" + messageType + "' message from Kafka topic partition: " + topicPartition);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ protected DuplicateKeyPrinter initDuplicateKeyPrinter(JobConf job) {
protected VeniceWriterMessage extract(BytesWritable key, Iterator<BytesWritable> valueIterator, Reporter reporter) {
KafkaInputMapperKey mapperKey = KAFKA_INPUT_MAPPER_KEY_AVRO_SPECIFIC_DESERIALIZER
.deserialize(ByteBuffer.wrap(key.getBytes(), 0, key.getLength()));
byte[] keyBytes = mapperKey.key.array();
byte[] keyBytes = ByteUtils.extractByteArray(mapperKey.key);
if (!valueIterator.hasNext()) {
throw new VeniceException("There is no value corresponding to key bytes: " + ByteUtils.toHexString(keyBytes));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ public ValueBytesAndSchemaId assembleAndGetValue(final byte[] keyBytes, final It
// Ignore older entries since a more recent manifest is discovered.
continue;
}
if (reusedMapperValue.schemaId == AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion()) {
// Ignore all the chunk cleanup messages.
continue;
}
/**
* The latest event is a delete event.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.linkedin.venice.storage.protocol.ChunkId;
import com.linkedin.venice.storage.protocol.ChunkedKeySuffix;
import com.linkedin.venice.storage.protocol.ChunkedValueManifest;
import com.linkedin.venice.writer.VeniceWriter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -120,7 +121,7 @@ public void testNoCompleteLargeValueWithMissingManifest() {
messageSequenceNumber,
VALUE_SCHEMA_ID,
0);
values.remove(0); // Remove the first value which should be a manifest
values.remove(1); // Remove the second value which should be a manifest

ChunkAssembler.ValueBytesAndSchemaId assembledValue =
chunkAssembler.assembleAndGetValue(serializedKey, values.iterator());
Expand All @@ -145,7 +146,7 @@ public void testNoCompleteLargeValueWithMissingChunk() {
messageSequenceNumber,
VALUE_SCHEMA_ID,
0);
int indexOfMissingChunk = ThreadLocalRandom.current().nextInt(values.size() - 2) + 1;
int indexOfMissingChunk = ThreadLocalRandom.current().nextInt(values.size() - 3) + 2;
values.remove(indexOfMissingChunk); // Remove a chunk
chunkAssembler.assembleAndGetValue(serializedKey, values.iterator());
}
Expand Down Expand Up @@ -188,7 +189,7 @@ public void testOneCompleteLargeValueAndOneIncompleteLargeValue() {
VALUE_SCHEMA_ID_2,
totalChunkCount1 + 1);

values2.remove(0); // Remove the manifest from the second sequence
values2.remove(1); // Remove the manifest from the second sequence
List<BytesWritable> allValues = new ArrayList<>();
allValues.addAll(values2);
allValues.addAll(values1);
Expand Down Expand Up @@ -240,9 +241,9 @@ public void testOneCompleteLargeValueAndOneIncompleteLargeValueCase2() {
chunkId2.segmentNumber,
chunkId2.messageSequenceNumber,
VALUE_SCHEMA_ID_2,
totalChunkCount1);
values1.size());

int indexOfMissingChunk = ThreadLocalRandom.current().nextInt(values2.size() - 2) + 1;
int indexOfMissingChunk = ThreadLocalRandom.current().nextInt(values2.size() - 3) + 2;
values2.remove(indexOfMissingChunk); // Remove a chunk from the second sequence
List<BytesWritable> allValues = new ArrayList<>();
allValues.addAll(values2);
Expand Down Expand Up @@ -469,7 +470,7 @@ public void testRegularValueAtTheEndWithIncompleteLargeValue() {
VALUE_SCHEMA_ID,
0);
// Randomly remove a value to simulate the incomplete large value
values.remove(ThreadLocalRandom.current().nextInt(values.size() - 2) + 1);
values.remove(ThreadLocalRandom.current().nextInt(values.size() - 3) + 2);

byte[] regularValueBytes = createChunkBytes(100, 23);
values.add(0, createRegularValue(regularValueBytes, VALUE_SCHEMA_ID_2, totalChunkCount + 1, MapperValueType.PUT));
Expand Down Expand Up @@ -581,7 +582,7 @@ public void testDeleteValueAtTheEndWithIncompleteLargeValue() {
VALUE_SCHEMA_ID,
0);
// Randomly remove a value to simulate the incomplete large value
values.remove(ThreadLocalRandom.current().nextInt(values.size() - 2) + 1);
values.remove(ThreadLocalRandom.current().nextInt(values.size() - 3) + 2);

// "Delete value" at the end
values.add(0, createRegularValue(new byte[0], -1, totalChunkCount + 1, MapperValueType.DELETE));
Expand Down Expand Up @@ -635,7 +636,7 @@ public void testRegularValueAndIncompleteLargeValue() { // regular value wins
VALUE_SCHEMA_ID,
1));
// Randomly remove a value chunk to simulate the incomplete large value
values.remove(ThreadLocalRandom.current().nextInt(values.size() - 2) + 1);
values.remove(ThreadLocalRandom.current().nextInt(values.size() - 3) + 2);
byte[] regularValueBytes = createChunkBytes(100, 23);
values.add(createRegularValue(regularValueBytes, VALUE_SCHEMA_ID_2, 0, MapperValueType.PUT));

Expand Down Expand Up @@ -731,7 +732,7 @@ public void testDeleteValueAndIncompleteLargeValue() {
VALUE_SCHEMA_ID,
1));
// Randomly remove a chunk value to simulate the incomplete large value
values.remove(ThreadLocalRandom.current().nextInt(values.size() - 2) + 1);
values.remove(ThreadLocalRandom.current().nextInt(values.size() - 3) + 2);
values.add(createRegularValue(new byte[0], -1, 0, MapperValueType.DELETE));

ChunkAssembler.ValueBytesAndSchemaId assembledValue =
Expand All @@ -755,6 +756,7 @@ private BytesWritable createRegularValue(byte[] valueBytes, int schemaId, int of

/**
* Create a sequence of chunks and a manifest. E.g. chunk_0, chunk_1, ..., chunk_N, chunk_manifest
* This function will also add a chunk cleanup message after manifest.
*/
private List<BytesWritable> createKafkaInputMapperValues(
byte[] serializedKey,
Expand Down Expand Up @@ -824,19 +826,35 @@ private List<BytesWritable> createKafkaInputMapperValues(
chunkedRmdManifest.schemaId = valueSchemaID;
chunkedRmdManifest.size = rmdChunkInfo.totalChunkCount * rmdChunkInfo.eachCountSizeInBytes;
}
KafkaInputMapperValue lastMapperValue = new KafkaInputMapperValue();
lastMapperValue.valueType = MapperValueType.PUT;
lastMapperValue.offset = currOffset;
lastMapperValue.schemaId = CHUNK_MANIFEST_SCHEMA_ID;
lastMapperValue.value = ByteBuffer.wrap(CHUNKED_VALUE_MANIFEST_SERIALIZER.serialize("", chunkedValueManifest));
lastMapperValue.chunkedKeySuffix = ByteBuffer
KafkaInputMapperValue mapperValueForManifest = new KafkaInputMapperValue();
mapperValueForManifest.valueType = MapperValueType.PUT;
mapperValueForManifest.offset = currOffset++;
mapperValueForManifest.schemaId = CHUNK_MANIFEST_SCHEMA_ID;
mapperValueForManifest.value =
ByteBuffer.wrap(CHUNKED_VALUE_MANIFEST_SERIALIZER.serialize("", chunkedValueManifest));
mapperValueForManifest.chunkedKeySuffix = ByteBuffer
.wrap(CHUNKED_KEY_SUFFIX_SERIALIZER.serialize("", KeyWithChunkingSuffixSerializer.NON_CHUNK_KEY_SUFFIX));
lastMapperValue.replicationMetadataPayload = (rmdChunkInfo == null)
mapperValueForManifest.replicationMetadataPayload = (rmdChunkInfo == null)
? ByteBuffer.wrap(new byte[0])
: ByteBuffer.wrap(CHUNKED_VALUE_MANIFEST_SERIALIZER.serialize("", chunkedRmdManifest));
lastMapperValue.replicationMetadataVersionId = 1;
mapperValueForManifest.replicationMetadataVersionId = 1;
values.add(serialize(mapperValueForManifest));

// Add one chunk cleanup message
KafkaInputMapperValue valueChunkCleanupMessage = new KafkaInputMapperValue();
valueChunkCleanupMessage.valueType = MapperValueType.DELETE;
valueChunkCleanupMessage.offset = currOffset++;
valueChunkCleanupMessage.schemaId = CHUNK_VALUE_SCHEMA_ID;
valueChunkCleanupMessage.value = VeniceWriter.EMPTY_BYTE_BUFFER;

ChunkedKeySuffix chunkedKeySuffix =
createChunkedKeySuffix(segmentNumber, sequenceNumber, valueChunkInfo.totalChunkCount * 2);
valueChunkCleanupMessage.chunkedKeySuffix =
ByteBuffer.wrap(CHUNKED_KEY_SUFFIX_SERIALIZER.serialize("", chunkedKeySuffix));
valueChunkCleanupMessage.replicationMetadataVersionId = VeniceWriter.VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_ID;
valueChunkCleanupMessage.replicationMetadataPayload = VeniceWriter.EMPTY_BYTE_BUFFER;
values.add(serialize(valueChunkCleanupMessage));

values.add(serialize(lastMapperValue));
// The offset of the messages will be in descending order.
Collections.reverse(values);
return values;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,9 @@ public class VeniceWriter<K, V, U> extends AbstractVeniceWriter<K, V, U> {
*/
public static final int VENICE_DEFAULT_VALUE_SCHEMA_ID = -1;

private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
public static final byte[] EMPTY_BYTE_ARRAY = new byte[0];

private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.wrap(EMPTY_BYTE_ARRAY);
public static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.wrap(EMPTY_BYTE_ARRAY);

public static final LeaderMetadataWrapper DEFAULT_LEADER_METADATA_WRAPPER =
new LeaderMetadataWrapper(DEFAULT_UPSTREAM_OFFSET, DEFAULT_UPSTREAM_KAFKA_CLUSTER_ID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,8 @@ public void testNonAAPartialUpdateChunkDeletion() throws IOException {
* This integration test performs a few actions to test RMD chunking logic:
* (1) Send a bunch of large UPDATE messages to make sure eventually the key's value + RMD size greater than 1MB and
* thus trigger chunking / RMD chunking.
* (2) Run a KIF repush to make sure it handles RMD chunks correctly.
* (3) Send a DELETE message to partially delete some of the items in the map field.
* (2) Run a KIF re-push to make sure it handles RMD chunks correctly.
* (3) Send a DELETE message to partially delete some items in the map field.
* (4) Send a DELETE message to fully delete the record.
*/
@Test(timeOut = TEST_TIMEOUT_MS * 4)
Expand Down Expand Up @@ -651,7 +651,6 @@ public void testReplicationMetadataChunkingE2E() throws IOException {
throw new VeniceException(e);
}
});

// Validate RMD bytes after PUT requests.
String kafkaTopic_v2 = Version.composeKafkaTopic(storeName, 2);
validateRmdData(rmdSerDe, kafkaTopic_v2, key, rmdWithValueSchemaId -> {
Expand Down

0 comments on commit 49a32c6

Please sign in to comment.