Skip to content

Commit

Permalink
[admin-tool] Handle DELETE messages for deprecated chunks in KafkaTop…
Browse files Browse the repository at this point in the history
…icDumper (#599)

Recently, we've added support to emit DELETE messages to clean up leaked chunks (in PR #535) but we didn't handle this in the topic dumper tool
  • Loading branch information
nisargthakkar authored Aug 22, 2023
1 parent 489180d commit 6b6c72c
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.guid.GuidUtils;
import com.linkedin.venice.kafka.protocol.ControlMessage;
import com.linkedin.venice.kafka.protocol.Delete;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.LeaderMetadata;
import com.linkedin.venice.kafka.protocol.ProducerMetadata;
Expand All @@ -28,7 +29,6 @@
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.serialization.avro.ChunkedValueManifestSerializer;
import com.linkedin.venice.serializer.AvroSpecificDeserializer;
import com.linkedin.venice.storage.protocol.ChunkId;
import com.linkedin.venice.storage.protocol.ChunkedKeySuffix;
import com.linkedin.venice.storage.protocol.ChunkedValueManifest;
import com.linkedin.venice.utils.ByteUtils;
Expand Down Expand Up @@ -354,7 +354,7 @@ String getChunkMetadataLog(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> r
schemaId = ((Put) kafkaMessageEnvelope.payloadUnion).schemaId;
break;
case DELETE:
schemaId = -1;
schemaId = ((Delete) kafkaMessageEnvelope.payloadUnion).schemaId;
break;
default:
throw new IOException(
Expand All @@ -363,58 +363,48 @@ String getChunkMetadataLog(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> r
}

final ChunkKeyValueTransformer.KeyType keyType = ChunkKeyValueTransformer.getKeyType(messageType, schemaId);
ChunkId firstChunkId = getFirstChunkId(keyType, kafkaKey, kafkaMessageEnvelope);

if (firstChunkId == null) {
String chunkMetadataFormatWithoutFirstChunkMd = " ChunkMd=(type:%s)";
return String.format(chunkMetadataFormatWithoutFirstChunkMd, keyType);
} else {
String chunkMetadataFormatWithFirstChunkMd = " ChunkMd=(type:%s, FirstChunkMd=(guid:%s,seg:%d,seq:%d))";
return String.format(
chunkMetadataFormatWithFirstChunkMd,
keyType,
GuidUtils.getHexFromGuid(firstChunkId.producerGUID),
firstChunkId.segmentNumber,
firstChunkId.messageSequenceNumber);
switch (keyType) {
case WITH_FULL_VALUE:
return String.format(" ChunkMd=(type:%s)", keyType);
case WITH_VALUE_CHUNK:
final RawKeyBytesAndChunkedKeySuffix rawKeyBytesAndChunkedKeySuffix =
chunkKeyValueTransformer.splitChunkedKey(kafkaKey.getKey(), keyType);
final ByteBuffer chunkedKeySuffixBytes = rawKeyBytesAndChunkedKeySuffix.getChunkedKeySuffixBytes();
final ChunkedKeySuffix chunkedKeySuffix = chunkedKeySuffixDeserializer.deserialize(chunkedKeySuffixBytes);
return String.format(
" ChunkMd=(type:%s, ChunkIndex: %d, FirstChunkMd=(guid:%s,seg:%d,seq:%d))",
keyType,
chunkedKeySuffix.chunkId.chunkIndex,
GuidUtils.getHexFromGuid(chunkedKeySuffix.chunkId.producerGUID),
chunkedKeySuffix.chunkId.segmentNumber,
chunkedKeySuffix.chunkId.messageSequenceNumber);
case WITH_CHUNK_MANIFEST:
Put putMessage = (Put) kafkaMessageEnvelope.payloadUnion;
ChunkedValueManifest chunkedValueManifest =
manifestSerializer.deserialize(ByteUtils.extractByteArray(putMessage.putValue), putMessage.schemaId);

ByteBuffer firstChunkKeyWithChunkIdSuffix = chunkedValueManifest.keysWithChunkIdSuffix.get(0);
final RawKeyBytesAndChunkedKeySuffix firstChunkRawKeyBytesAndChunkedKeySuffix = chunkKeyValueTransformer
.splitChunkedKey(ByteUtils.extractByteArray(firstChunkKeyWithChunkIdSuffix), WITH_VALUE_CHUNK);
final ByteBuffer firstChunkKeySuffixBytes =
firstChunkRawKeyBytesAndChunkedKeySuffix.getChunkedKeySuffixBytes();
final ChunkedKeySuffix firstChunkedKeySuffix =
chunkedKeySuffixDeserializer.deserialize(firstChunkKeySuffixBytes);

return String.format(
" ChunkMd=(type:%s, FirstChunkMd=(guid:%s,seg:%d,seq:%d))",
keyType,
GuidUtils.getHexFromGuid(firstChunkedKeySuffix.chunkId.producerGUID),
firstChunkedKeySuffix.chunkId.segmentNumber,
firstChunkedKeySuffix.chunkId.messageSequenceNumber);
default:
throw new VeniceException("Unexpected key type: " + keyType);
}
} else {
return "";
}
}

private ChunkId getFirstChunkId(
ChunkKeyValueTransformer.KeyType keyType,
KafkaKey kafkaKey,
KafkaMessageEnvelope kafkaMessageEnvelope) {
final RawKeyBytesAndChunkedKeySuffix rawKeyBytesAndChunkedKeySuffix =
chunkKeyValueTransformer.splitChunkedKey(kafkaKey.getKey(), keyType);

final ByteBuffer chunkedKeySuffixBytes = rawKeyBytesAndChunkedKeySuffix.getChunkedKeySuffixBytes();
final ChunkedKeySuffix chunkedKeySuffix = chunkedKeySuffixDeserializer.deserialize(chunkedKeySuffixBytes);

switch (keyType) {
case WITH_VALUE_CHUNK:
return chunkedKeySuffix.chunkId;
case WITH_CHUNK_MANIFEST:
Put putMessage = (Put) kafkaMessageEnvelope.payloadUnion;
ChunkedValueManifest chunkedValueManifest =
manifestSerializer.deserialize(ByteUtils.extractByteArray(putMessage.putValue), putMessage.schemaId);

ByteBuffer firstChunkKeyWithChunkIdSuffix = chunkedValueManifest.keysWithChunkIdSuffix.get(0);
final RawKeyBytesAndChunkedKeySuffix firstChunkRawKeyBytesAndChunkedKeySuffix = chunkKeyValueTransformer
.splitChunkedKey(ByteUtils.extractByteArray(firstChunkKeyWithChunkIdSuffix), WITH_VALUE_CHUNK);
final ByteBuffer firstChunkKeySuffixBytes = firstChunkRawKeyBytesAndChunkedKeySuffix.getChunkedKeySuffixBytes();
final ChunkedKeySuffix firstChunkedKeySuffix =
chunkedKeySuffixDeserializer.deserialize(firstChunkKeySuffixBytes);

return firstChunkedKeySuffix.chunkId;
case WITH_FULL_VALUE:
return null;
default:
throw new VeniceException("Unexpected key type: " + keyType);
}
}

@Override
public void close() throws Exception {
if (dataFileWriter != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,24 +82,24 @@ public void testAdminToolConsumptionForChunkedData() throws IOException {
int firstChunkSequenceNumber = 1;
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage1 =
getChunkedRecord(serializedKey, firstChunkSegmentNumber, firstChunkSequenceNumber, 0, 0, pubSubTopicPartition);
String dirstChunkMetadataLog = kafkaTopicDumper.getChunkMetadataLog(pubSubMessage1);
String firstChunkMetadataLog = kafkaTopicDumper.getChunkMetadataLog(pubSubMessage1);
Assert.assertEquals(
dirstChunkMetadataLog,
" ChunkMd=(type:WITH_VALUE_CHUNK, FirstChunkMd=(guid:00000000000000000000000000000000,seg:1,seq:1))");
firstChunkMetadataLog,
" ChunkMd=(type:WITH_VALUE_CHUNK, ChunkIndex: 0, FirstChunkMd=(guid:00000000000000000000000000000000,seg:1,seq:1))");

PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage2 =
getChunkedRecord(serializedKey, firstChunkSegmentNumber, firstChunkSequenceNumber, 1, 0, pubSubTopicPartition);
String secondChunkMetadataLog = kafkaTopicDumper.getChunkMetadataLog(pubSubMessage2);
Assert.assertEquals(
secondChunkMetadataLog,
" ChunkMd=(type:WITH_VALUE_CHUNK, FirstChunkMd=(guid:00000000000000000000000000000000,seg:1,seq:1))");
" ChunkMd=(type:WITH_VALUE_CHUNK, ChunkIndex: 1, FirstChunkMd=(guid:00000000000000000000000000000000,seg:1,seq:1))");

PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage3 =
getChunkedRecord(serializedKey, firstChunkSegmentNumber, firstChunkSequenceNumber, 2, 0, pubSubTopicPartition);
String thirdChunkMetadataLog = kafkaTopicDumper.getChunkMetadataLog(pubSubMessage3);
Assert.assertEquals(
thirdChunkMetadataLog,
" ChunkMd=(type:WITH_VALUE_CHUNK, FirstChunkMd=(guid:00000000000000000000000000000000,seg:1,seq:1))");
" ChunkMd=(type:WITH_VALUE_CHUNK, ChunkIndex: 2, FirstChunkMd=(guid:00000000000000000000000000000000,seg:1,seq:1))");

PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage4 =
getChunkValueManifestRecord(serializedKey, pubSubMessage1, firstChunkSequenceNumber, pubSubTopicPartition);
Expand Down

0 comments on commit 6b6c72c

Please sign in to comment.