Skip to content

Commit

Permalink
KAFKA-18199; Fix size calculation for nullable tagged structs (apache…
Browse files Browse the repository at this point in the history
…#18127)

When a struct field is tagged and nullable, it is serialized as
{ varint tag; varint dataLength; nullable data }, where
nullable is serialized as
{ varint isNotNull; if (isNotNull) struct s; }. The length field
includes the is-not-null varint.

This patch fixes a bug in serialization where the written value of
the length field and the value used to compute the size of the length
field differs by 1. In practice this has no impact unless the
serialized length of the struct is 127 bytes, since the varint encodings
of 127 and 128 have different lengths (0x7f vs 0x80 01).

Reviewers: David Jacot <[email protected]>
  • Loading branch information
squah-confluent authored Dec 13, 2024
1 parent 770d64d commit b94defa
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.common.protocol.ObjectSerializationCache;

import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -98,6 +99,29 @@ public void testToStringWithNullStructs() {
message.toString();
}

/**
* Regression test for KAFKA-18199. Tests that the size of the varint encoding a tagged nullable
* struct's size is calculated correctly.
*/
@Test
public void testTaggedStructSize() {
NullableStructMessageData message = new NullableStructMessageData()
.setNullableStruct(null)
.setNullableStruct2(null)
.setNullableStruct3(null)
.setNullableStruct4(new NullableStructMessageData.MyStruct4()
.setMyInt(4)
.setMyString(new String(new char[121])));

// We want the struct to be 127 bytes long, so that the varint encoding of its size is one
// short of overflowing into a two-byte representation. An extra byte is added to the
// nullable struct size to account for the is-not-null flag.
assertEquals(127, message.nullableStruct4().size(new ObjectSerializationCache(), (short) 2));

NullableStructMessageData newMessage = roundTrip(message, (short) 2);
assertEquals(message, newMessage);
}

private NullableStructMessageData deserialize(ByteBuffer buf, short version) {
NullableStructMessageData message = new NullableStructMessageData();
message.read(new ByteBufferAccessor(buf.duplicate()), version);
Expand All @@ -110,6 +134,8 @@ private ByteBuffer serialize(NullableStructMessageData message, short version) {

private NullableStructMessageData roundTrip(NullableStructMessageData message, short version) {
ByteBuffer buffer = serialize(message, version);
// Check size calculation
assertEquals(buffer.remaining(), message.size(new ObjectSerializationCache(), version));
return deserialize(buffer.duplicate(), version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,8 @@ private SimpleExampleMessageData roundTripSerde(
short version
) {
ByteBuffer buf = MessageUtil.toByteBuffer(message, version);
// Check size calculation
assertEquals(buf.remaining(), message.size(new ObjectSerializationCache(), version));
return deserialize(buf.duplicate(), version);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1332,19 +1332,24 @@ private void generateVariableLengthFieldSize(FieldSpec field,
}).
generate(buffer);
} else if (field.type().isStruct()) {
// Adding a byte if the field is nullable. A byte works for both regular and tagged struct fields.
VersionConditional.forVersions(field.nullableVersions(), possibleVersions).
ifMember(__ -> {
buffer.printf("_size.addBytes(1);%n");
}).
generate(buffer);

if (tagged) {
buffer.printf("int _sizeBeforeStruct = _size.totalSize();%n", field.camelCaseName());
// Add a byte if the field is nullable.
VersionConditional.forVersions(field.nullableVersions(), possibleVersions).
ifMember(__ -> {
buffer.printf("_size.addBytes(1);%n");
}).
generate(buffer);
buffer.printf("this.%s.addSize(_size, _cache, _version);%n", field.camelCaseName());
buffer.printf("int _structSize = _size.totalSize() - _sizeBeforeStruct;%n", field.camelCaseName());
buffer.printf("_size.addBytes(ByteUtils.sizeOfUnsignedVarint(_structSize));%n");
} else {
// Add a byte if the field is nullable.
VersionConditional.forVersions(field.nullableVersions(), possibleVersions).
ifMember(__ -> {
buffer.printf("_size.addBytes(1);%n");
}).
generate(buffer);
buffer.printf("this.%s.addSize(_size, _cache, _version);%n", field.camelCaseName());
}
} else {
Expand Down

0 comments on commit b94defa

Please sign in to comment.