Skip to content

Commit

Permalink
MINOR: Fix MessageFormatters (apache#18266)
Browse files Browse the repository at this point in the history
While looking at the message formatters in apache#18261, I have noticed at few incorrect test cases.
* We should not log anything when the record type is unknown because the formatters have clear goals.
* We should not parse the value when the key is null or when the key cannot be parsed. While it works in the tests, in practice, this is wrong because we cannot assume that type of the value if the type of the key is not defined. The key drives the type of the entire record.

Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
dajac authored Dec 19, 2024
1 parent 64279d2 commit b31aa65
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

public abstract class ApiMessageFormatter implements MessageFormatter {

private static final String TYPE = "type";
private static final String VERSION = "version";
private static final String DATA = "data";
private static final String KEY = "key";
Expand All @@ -46,22 +47,22 @@ public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream o
byte[] key = consumerRecord.key();
if (Objects.nonNull(key)) {
short keyVersion = ByteBuffer.wrap(key).getShort();
JsonNode dataNode = readToKeyJson(ByteBuffer.wrap(key), keyVersion);
JsonNode dataNode = readToKeyJson(ByteBuffer.wrap(key));

if (dataNode instanceof NullNode) {
return;
}
json.putObject(KEY)
.put(VERSION, keyVersion)
.put(TYPE, keyVersion)
.set(DATA, dataNode);
} else {
json.set(KEY, NullNode.getInstance());
return;
}

byte[] value = consumerRecord.value();
if (Objects.nonNull(value)) {
short valueVersion = ByteBuffer.wrap(value).getShort();
JsonNode dataNode = readToValueJson(ByteBuffer.wrap(value), valueVersion);
JsonNode dataNode = readToValueJson(ByteBuffer.wrap(value));

json.putObject(VALUE)
.put(VERSION, valueVersion)
Expand All @@ -77,6 +78,6 @@ public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream o
}
}

protected abstract JsonNode readToKeyJson(ByteBuffer byteBuffer, short version);
protected abstract JsonNode readToValueJson(ByteBuffer byteBuffer, short version);
protected abstract JsonNode readToKeyJson(ByteBuffer byteBuffer);
protected abstract JsonNode readToValueJson(ByteBuffer byteBuffer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,66 +16,34 @@
*/
package org.apache.kafka.tools.consumer;

import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKeyJsonConverter;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValueJsonConverter;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.TextNode;

import java.nio.ByteBuffer;
import java.util.Optional;

public class GroupMetadataMessageFormatter extends ApiMessageFormatter {

@Override
protected JsonNode readToKeyJson(ByteBuffer byteBuffer, short version) {
return readToGroupMetadataKey(byteBuffer)
.map(logKey -> transferKeyMessageToJsonNode(logKey, version))
.orElseGet(() -> new TextNode(UNKNOWN));
}

@Override
protected JsonNode readToValueJson(ByteBuffer byteBuffer, short version) {
return readToGroupMetadataValue(byteBuffer)
.map(logValue -> GroupMetadataValueJsonConverter.write(logValue, version))
.orElseGet(() -> new TextNode(UNKNOWN));
}

private Optional<ApiMessage> readToGroupMetadataKey(ByteBuffer byteBuffer) {
protected JsonNode readToKeyJson(ByteBuffer byteBuffer) {
short version = byteBuffer.getShort();
if (version >= OffsetCommitKey.LOWEST_SUPPORTED_VERSION
&& version <= OffsetCommitKey.HIGHEST_SUPPORTED_VERSION) {
return Optional.of(new OffsetCommitKey(new ByteBufferAccessor(byteBuffer), version));
} else if (version >= GroupMetadataKey.LOWEST_SUPPORTED_VERSION && version <= GroupMetadataKey.HIGHEST_SUPPORTED_VERSION) {
return Optional.of(new GroupMetadataKey(new ByteBufferAccessor(byteBuffer), version));
} else {
return Optional.empty();
if (version >= GroupMetadataKey.LOWEST_SUPPORTED_VERSION && version <= GroupMetadataKey.HIGHEST_SUPPORTED_VERSION) {
return GroupMetadataKeyJsonConverter.write(new GroupMetadataKey(new ByteBufferAccessor(byteBuffer), version), version);
}
return NullNode.getInstance();
}

private JsonNode transferKeyMessageToJsonNode(ApiMessage message, short version) {
if (message instanceof OffsetCommitKey) {
return NullNode.getInstance();
} else if (message instanceof GroupMetadataKey) {
return GroupMetadataKeyJsonConverter.write((GroupMetadataKey) message, version);
} else {
return new TextNode(UNKNOWN);
}
}

private Optional<GroupMetadataValue> readToGroupMetadataValue(ByteBuffer byteBuffer) {
@Override
protected JsonNode readToValueJson(ByteBuffer byteBuffer) {
short version = byteBuffer.getShort();
if (version >= GroupMetadataValue.LOWEST_SUPPORTED_VERSION
&& version <= GroupMetadataValue.HIGHEST_SUPPORTED_VERSION) {
return Optional.of(new GroupMetadataValue(new ByteBufferAccessor(byteBuffer), version));
} else {
return Optional.empty();
if (version >= GroupMetadataValue.LOWEST_SUPPORTED_VERSION && version <= GroupMetadataValue.HIGHEST_SUPPORTED_VERSION) {
return GroupMetadataValueJsonConverter.write(new GroupMetadataValue(new ByteBufferAccessor(byteBuffer), version), version);
}
return new TextNode(UNKNOWN);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
*/
package org.apache.kafka.tools.consumer;

import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKeyJsonConverter;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
Expand All @@ -29,56 +27,26 @@
import com.fasterxml.jackson.databind.node.TextNode;

import java.nio.ByteBuffer;
import java.util.Optional;

/**
* Formatter for use with tools such as console consumer: Consumer should also set exclude.internal.topics to false.
*/
public class OffsetsMessageFormatter extends ApiMessageFormatter {

@Override
protected JsonNode readToKeyJson(ByteBuffer byteBuffer, short version) {
return readToGroupMetadataKey(byteBuffer)
.map(logKey -> transferKeyMessageToJsonNode(logKey, version))
.orElseGet(() -> new TextNode(UNKNOWN));
}

@Override
protected JsonNode readToValueJson(ByteBuffer byteBuffer, short version) {
return readToOffsetMessageValue(byteBuffer)
.map(logValue -> OffsetCommitValueJsonConverter.write(logValue, version))
.orElseGet(() -> new TextNode(UNKNOWN));
}

private Optional<ApiMessage> readToGroupMetadataKey(ByteBuffer byteBuffer) {
protected JsonNode readToKeyJson(ByteBuffer byteBuffer) {
short version = byteBuffer.getShort();
if (version >= OffsetCommitKey.LOWEST_SUPPORTED_VERSION
&& version <= OffsetCommitKey.HIGHEST_SUPPORTED_VERSION) {
return Optional.of(new OffsetCommitKey(new ByteBufferAccessor(byteBuffer), version));
} else if (version >= GroupMetadataKey.LOWEST_SUPPORTED_VERSION && version <= GroupMetadataKey.HIGHEST_SUPPORTED_VERSION) {
return Optional.of(new GroupMetadataKey(new ByteBufferAccessor(byteBuffer), version));
} else {
return Optional.empty();
if (version >= OffsetCommitKey.LOWEST_SUPPORTED_VERSION && version <= OffsetCommitKey.HIGHEST_SUPPORTED_VERSION) {
return OffsetCommitKeyJsonConverter.write(new OffsetCommitKey(new ByteBufferAccessor(byteBuffer), version), version);
}
return NullNode.getInstance();
}

private JsonNode transferKeyMessageToJsonNode(ApiMessage logKey, short keyVersion) {
if (logKey instanceof OffsetCommitKey) {
return OffsetCommitKeyJsonConverter.write((OffsetCommitKey) logKey, keyVersion);
} else if (logKey instanceof GroupMetadataKey) {
return NullNode.getInstance();
} else {
return new TextNode(UNKNOWN);
}
}

private Optional<OffsetCommitValue> readToOffsetMessageValue(ByteBuffer byteBuffer) {
@Override
protected JsonNode readToValueJson(ByteBuffer byteBuffer) {
short version = byteBuffer.getShort();
if (version >= OffsetCommitValue.LOWEST_SUPPORTED_VERSION
&& version <= OffsetCommitValue.HIGHEST_SUPPORTED_VERSION) {
return Optional.of(new OffsetCommitValue(new ByteBufferAccessor(byteBuffer), version));
} else {
return Optional.empty();
if (version >= OffsetCommitValue.LOWEST_SUPPORTED_VERSION && version <= OffsetCommitValue.HIGHEST_SUPPORTED_VERSION) {
return OffsetCommitValueJsonConverter.write(new OffsetCommitValue(new ByteBufferAccessor(byteBuffer), version), version);
}
return new TextNode(UNKNOWN);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,44 +23,27 @@
import org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.TextNode;

import java.nio.ByteBuffer;
import java.util.Optional;

public class TransactionLogMessageFormatter extends ApiMessageFormatter {

@Override
protected JsonNode readToKeyJson(ByteBuffer byteBuffer, short version) {
return readToTransactionLogKey(byteBuffer)
.map(logKey -> TransactionLogKeyJsonConverter.write(logKey, version))
.orElseGet(() -> new TextNode(UNKNOWN));
}

@Override
protected JsonNode readToValueJson(ByteBuffer byteBuffer, short version) {
return readToTransactionLogValue(byteBuffer)
.map(logValue -> TransactionLogValueJsonConverter.write(logValue, version))
.orElseGet(() -> new TextNode(UNKNOWN));
}

private Optional<TransactionLogKey> readToTransactionLogKey(ByteBuffer byteBuffer) {
protected JsonNode readToKeyJson(ByteBuffer byteBuffer) {
short version = byteBuffer.getShort();
if (version >= TransactionLogKey.LOWEST_SUPPORTED_VERSION
&& version <= TransactionLogKey.HIGHEST_SUPPORTED_VERSION) {
return Optional.of(new TransactionLogKey(new ByteBufferAccessor(byteBuffer), version));
} else {
return Optional.empty();
if (version >= TransactionLogKey.LOWEST_SUPPORTED_VERSION && version <= TransactionLogKey.HIGHEST_SUPPORTED_VERSION) {
return TransactionLogKeyJsonConverter.write(new TransactionLogKey(new ByteBufferAccessor(byteBuffer), version), version);
}
return NullNode.getInstance();
}

private Optional<TransactionLogValue> readToTransactionLogValue(ByteBuffer byteBuffer) {
@Override
protected JsonNode readToValueJson(ByteBuffer byteBuffer) {
short version = byteBuffer.getShort();
if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION
&& version <= TransactionLogValue.HIGHEST_SUPPORTED_VERSION) {
return Optional.of(new TransactionLogValue(new ByteBufferAccessor(byteBuffer), version));
} else {
return Optional.empty();
if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION && version <= TransactionLogValue.HIGHEST_SUPPORTED_VERSION) {
return TransactionLogValueJsonConverter.write(new TransactionLogValue(new ByteBufferAccessor(byteBuffer), version), version);
}
return new TextNode(UNKNOWN);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ private static Stream<Arguments> parameters() {
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 10, GROUP_METADATA_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 10, GROUP_METADATA_VALUE).array(),
"{\"key\":{\"version\":10,\"data\":\"unknown\"},\"value\":{\"version\":10,\"data\":\"unknown\"}}"
""
),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 0, GROUP_METADATA_VALUE).array(),
"{\"key\":{\"version\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":0," +
"{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":0," +
"\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," +
"\"leader\":\"leader\",\"members\":[{\"memberId\":\"member-1\",\"clientId\":\"client-1\"," +
"\"clientHost\":\"host-1\",\"sessionTimeout\":1500,\"subscription\":\"AAE=\"," +
Expand All @@ -90,7 +90,7 @@ private static Stream<Arguments> parameters() {
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 1, GROUP_METADATA_VALUE).array(),
"{\"key\":{\"version\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":1," +
"{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":1," +
"\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," +
"\"leader\":\"leader\",\"members\":[{\"memberId\":\"member-1\",\"clientId\":\"client-1\"," +
"\"clientHost\":\"host-1\",\"rebalanceTimeout\":1000,\"sessionTimeout\":1500," +
Expand All @@ -99,7 +99,7 @@ private static Stream<Arguments> parameters() {
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_VALUE).array(),
"{\"key\":{\"version\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":2," +
"{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":2," +
"\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," +
"\"leader\":\"leader\",\"currentStateTimestamp\":1234,\"members\":[{\"memberId\":\"member-1\"," +
"\"clientId\":\"client-1\",\"clientHost\":\"host-1\",\"rebalanceTimeout\":1000," +
Expand All @@ -108,7 +108,7 @@ private static Stream<Arguments> parameters() {
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 3, GROUP_METADATA_VALUE).array(),
"{\"key\":{\"version\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":3," +
"{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":3," +
"\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," +
"\"leader\":\"leader\",\"currentStateTimestamp\":1234,\"members\":[{\"memberId\":\"member-1\"," +
"\"groupInstanceId\":\"group-instance-1\",\"clientId\":\"client-1\",\"clientHost\":\"host-1\"," +
Expand All @@ -117,7 +117,7 @@ private static Stream<Arguments> parameters() {
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 4, GROUP_METADATA_VALUE).array(),
"{\"key\":{\"version\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":4," +
"{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":4," +
"\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," +
"\"leader\":\"leader\",\"currentStateTimestamp\":1234,\"members\":[{\"memberId\":\"member-1\"," +
"\"groupInstanceId\":\"group-instance-1\",\"clientId\":\"client-1\",\"clientHost\":\"host-1\"," +
Expand All @@ -126,16 +126,12 @@ private static Stream<Arguments> parameters() {
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
null,
"{\"key\":{\"version\":2,\"data\":{\"group\":\"group-id\"}},\"value\":null}"),
"{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":null}"),
Arguments.of(
null,
MessageUtil.toVersionPrefixedByteBuffer((short) 4, GROUP_METADATA_VALUE).array(),
"{\"key\":null,\"value\":{\"version\":4,\"data\":{\"protocolType\":\"consumer\",\"generation\":1," +
"\"protocol\":\"range\",\"leader\":\"leader\",\"currentStateTimestamp\":1234," +
"\"members\":[{\"memberId\":\"member-1\",\"groupInstanceId\":\"group-instance-1\"," +
"\"clientId\":\"client-1\",\"clientHost\":\"host-1\",\"rebalanceTimeout\":1000," +
"\"sessionTimeout\":1500,\"subscription\":\"AAE=\",\"assignment\":\"AQI=\"}]}}}"),
Arguments.of(null, null, "{\"key\":null,\"value\":null}"),
""),
Arguments.of(null, null, ""),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 4, OFFSET_COMMIT_VALUE).array(),
Expand Down
Loading

0 comments on commit b31aa65

Please sign in to comment.