From 380b3e05c9ffb82bc4618e3d092bc3aae096573f Mon Sep 17 00:00:00 2001 From: Erik Rohwedder Date: Fri, 12 Jul 2024 22:54:18 +0200 Subject: [PATCH 1/8] implement avro serializer with raw-avro schema --- .../java/org/akhq/configs/Connection.java | 14 ++ .../CustomSerializerRepository.java | 25 +++ .../akhq/repositories/RecordRepository.java | 24 ++- .../org/akhq/utils/JsonToAvroSerializer.java | 146 ++++++++++++++++++ 4 files changed, 205 insertions(+), 4 deletions(-) create mode 100644 src/main/java/org/akhq/repositories/CustomSerializerRepository.java create mode 100644 src/main/java/org/akhq/utils/JsonToAvroSerializer.java diff --git a/src/main/java/org/akhq/configs/Connection.java b/src/main/java/org/akhq/configs/Connection.java index cf6109bc0..14bd5f434 100644 --- a/src/main/java/org/akhq/configs/Connection.java +++ b/src/main/java/org/akhq/configs/Connection.java @@ -20,6 +20,7 @@ public class Connection extends AbstractProperties { List connect; List ksqldb; Deserialization deserialization = new Deserialization(); + Serialization serialization = new Serialization(); UiOptions uiOptions = new UiOptions(); public Connection(@Parameter String name) { @@ -60,6 +61,19 @@ public static class AvroDeserializationTopicsMapping { } } + @Getter + @ConfigurationProperties("serialization") // TODO use other key + public static class Serialization { + AvroSerializationTopicsMapping avroRaw; + + @Data + @ConfigurationProperties("avro-raw") + public static class AvroSerializationTopicsMapping { + String schemasFolder; + List topicsMapping = new ArrayList<>(); + } + } + @Data @ConfigurationProperties("ui-options") public static class UiOptions { diff --git a/src/main/java/org/akhq/repositories/CustomSerializerRepository.java b/src/main/java/org/akhq/repositories/CustomSerializerRepository.java new file mode 100644 index 000000000..8cd1a17b9 --- /dev/null +++ b/src/main/java/org/akhq/repositories/CustomSerializerRepository.java @@ -0,0 +1,25 @@ +package org.akhq.repositories; + +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import java.util.HashMap; +import java.util.Map; +import org.akhq.modules.KafkaModule; +import org.akhq.utils.JsonToAvroSerializer; + +@Singleton +public class CustomSerializerRepository { + @Inject + private KafkaModule kafkaModule; + private final Map jsonToAvroSerializers = new HashMap<>(); + + public JsonToAvroSerializer getJsonToAvroSerializer(String clusterId) { + if (!this.jsonToAvroSerializers.containsKey(clusterId)) { + this.jsonToAvroSerializers.put( + clusterId, + new JsonToAvroSerializer(this.kafkaModule.getConnection(clusterId).getSerialization().getAvroRaw()) + ); + } + return this.jsonToAvroSerializers.get(clusterId); + } +} diff --git a/src/main/java/org/akhq/repositories/RecordRepository.java b/src/main/java/org/akhq/repositories/RecordRepository.java index 83714fc08..c83793d4a 100644 --- a/src/main/java/org/akhq/repositories/RecordRepository.java +++ b/src/main/java/org/akhq/repositories/RecordRepository.java @@ -25,6 +25,7 @@ import org.akhq.modules.schemaregistry.RecordWithSchemaSerializerFactory; import org.akhq.utils.AvroToJsonSerializer; import org.akhq.utils.Debug; +import org.akhq.utils.JsonToAvroSerializer; import org.akhq.utils.MaskingUtils; import org.apache.kafka.clients.admin.DeletedRecords; import org.apache.kafka.clients.admin.RecordsToDelete; @@ -75,6 +76,9 @@ public class RecordRepository extends AbstractRepository { @Inject private CustomDeserializerRepository customDeserializerRepository; + @Inject + private CustomSerializerRepository customSerializerRepository; + @Inject private AvroWireFormatConverter avroWireFormatConverter; @@ -612,14 +616,14 @@ public RecordMetadata produce( Optional valueSchema ) throws ExecutionException, InterruptedException, RestClientException, IOException { byte[] keyAsBytes = null; - byte[] valueAsBytes; + byte[] valueAsBytes = null; if (key.isPresent()) { if (keySchema.isPresent() && StringUtils.isNotEmpty(keySchema.get())) { Schema schema = schemaRegistryRepository.getLatestVersion(clusterId, keySchema.get()); SchemaSerializer keySerializer = serializerFactory.createSerializer(clusterId, schema.getId()); keyAsBytes = keySerializer.serialize(key.get()); - } else { + } else { // TODO same for key + test keyAsBytes = key.filter(Predicate.not(String::isEmpty)).map(String::getBytes).orElse(null); } } else { @@ -636,8 +640,20 @@ public RecordMetadata produce( Schema schema = schemaRegistryRepository.getLatestVersion(clusterId, valueSchema.get()); SchemaSerializer valueSerializer = serializerFactory.createSerializer(clusterId, schema.getId()); valueAsBytes = valueSerializer.serialize(value.get()); - } else { - valueAsBytes = value.filter(Predicate.not(String::isEmpty)).map(String::getBytes).orElse(null); + } else if (value.isPresent()) { // TODO test + JsonToAvroSerializer jsonToAvroSerializer = customSerializerRepository.getJsonToAvroSerializer(clusterId); + if (jsonToAvroSerializer != null) { + try { + byte[] buffer = jsonToAvroSerializer.serialize(topic, value.get(), false); + if (buffer != null) { + valueAsBytes = buffer; + } + } catch (Exception exception) { + valueAsBytes = value.filter(Predicate.not(String::isEmpty)).map(String::getBytes).orElse(null); + } + } else { + valueAsBytes = value.filter(Predicate.not(String::isEmpty)).map(String::getBytes).orElse(null); + } } return produce(clusterId, topic, valueAsBytes, headers, keyAsBytes, partition, timestamp); diff --git a/src/main/java/org/akhq/utils/JsonToAvroSerializer.java b/src/main/java/org/akhq/utils/JsonToAvroSerializer.java new file mode 100644 index 000000000..a2c67f3d0 --- /dev/null +++ b/src/main/java/org/akhq/utils/JsonToAvroSerializer.java @@ -0,0 +1,146 @@ +package org.akhq.utils; + +import io.micronaut.core.serialize.exceptions.SerializationException; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; +import org.akhq.configs.AvroTopicsMapping; +import org.akhq.configs.Connection; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.io.*; + +// TODO test + unify +/** + * Class for serialization of messages in Json to Avro raw data binary format using topics mapping config. + */ +@Slf4j +public class JsonToAvroSerializer { + private final EncoderFactory encoderFactory = EncoderFactory.get(); + private final Map keySchemas; + private final Map valueSchemas; + private final List topicsMapping; + private final String avroSchemasFolder; + + public JsonToAvroSerializer(Connection.Serialization.AvroSerializationTopicsMapping avroSerializationTopicsMapping) { + if (avroSerializationTopicsMapping == null) { + this.keySchemas = new HashMap<>(); + this.valueSchemas = new HashMap<>(); + this.topicsMapping = new ArrayList<>(); + this.avroSchemasFolder = null; + } else { + this.avroSchemasFolder = avroSerializationTopicsMapping.getSchemasFolder(); + this.topicsMapping = avroSerializationTopicsMapping.getTopicsMapping(); + this.keySchemas = buildSchemas(AvroTopicsMapping::getKeySchemaFile); + this.valueSchemas = buildSchemas(AvroTopicsMapping::getValueSchemaFile); + } + } + + /** + * Load Avro schemas from schema folder + * + * @return map where keys are topic regexes and value is Avro schema + */ + private Map buildSchemas(Function schemaFileMapper) { + Map allSchemas = new HashMap<>(); + for (AvroTopicsMapping mapping : topicsMapping) { + String schemaFile = schemaFileMapper.apply(mapping); + + if (schemaFile != null) { + try { + Schema schema = loadSchemaFile(mapping, schemaFile); + allSchemas.put(mapping.getTopicRegex(), schema); + } catch (IOException e) { + throw new RuntimeException(String.format("Cannot get a schema file for the topics regex [%s]", mapping.getTopicRegex()), e); + } + } + } + return allSchemas; + } + + Schema loadSchemaFile(AvroTopicsMapping mapping, String schemaFile) throws IOException { + if (avroSchemasFolder != null && Files.exists(Path.of(avroSchemasFolder))) { + String fullPath = avroSchemasFolder + File.separator + schemaFile; + return new Schema.Parser().parse(Path.of(fullPath).toFile()); + } + throw new FileNotFoundException("Avro schema file is not found for topic regex [" + + mapping.getTopicRegex() + "]. Folder is not specified or doesn't exist."); + } + + /** + * Serialize from Json to Avro raw data binary format. + * Messages must be decoded directly with {@link org.apache.avro.io.DatumReader}, not {@link org.apache.avro.file.DataFileReader} or {@link org.apache.avro.message.BinaryMessageDecoder}. + * Topic name should match topic-regex from {@code akhq.connections.[clusterName].serialization.avro.topics-mapping} config + * and schema should be set for key or value in that config. + * + * @param topic current topic name + * @param data Json data to encode + * @param isKey is this data represent key or value + * @return {@code null} if cannot serialize or configuration is not matching, return encoded string otherwise + */ + public byte[] serialize(String topic, String data, boolean isKey) { + AvroTopicsMapping matchingConfig = findMatchingConfig(topic); + if (matchingConfig == null) { + log.debug("Avro serialization config is not found for topic [{}]", topic); + return null; + } + + if (matchingConfig.getKeySchemaFile() == null && matchingConfig.getValueSchemaFile() == null) { + throw new SerializationException(String.format("Avro serialization is configured for topic [%s], " + + "but schema is not specified neither for a key, nor for a value.", topic)); + } + + Schema schema; + if (isKey) { + schema = keySchemas.get(matchingConfig.getTopicRegex()); + } else { + schema = valueSchemas.get(matchingConfig.getTopicRegex()); + } + + if (schema == null) { + return null; + } + + byte[] result; + try { + result = tryToSerializeWithSchemaFile(data, schema); + } catch (Exception e) { + throw new SerializationException(String.format("Cannot deserialize message with Avro deserializer " + + "for topic [%s] and schema [%s]", topic, schema.getFullName()), e); + } + return result; + } + + private AvroTopicsMapping findMatchingConfig(String topic) { + for (AvroTopicsMapping mapping : topicsMapping) { + if (topic.matches(mapping.getTopicRegex())) { + return new AvroTopicsMapping( + mapping.getTopicRegex(), + mapping.getKeySchemaFile(), mapping.getValueSchemaFile()); + } + } + return null; + } + + private byte[] tryToSerializeWithSchemaFile(String json, Schema schema) throws IOException { + DatumReader reader = new GenericDatumReader<>(schema); + GenericDatumWriter writer = new GenericDatumWriter<>(schema); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + Decoder decoder = DecoderFactory.get().jsonDecoder(schema, json); + Encoder encoder = EncoderFactory.get().binaryEncoder(output, null); + Object datum = reader.read(null, decoder); + writer.write(datum, encoder); + encoder.flush(); + return output.toByteArray(); + } +} From ebc9911c3a7429e3167f8d1c597e92cab50a0927 Mon Sep 17 00:00:00 2001 From: Erik Rohwedder Date: Fri, 12 Jul 2024 23:15:00 +0200 Subject: [PATCH 2/8] extract method --- .../akhq/repositories/RecordRepository.java | 48 ++++++++++--------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/src/main/java/org/akhq/repositories/RecordRepository.java b/src/main/java/org/akhq/repositories/RecordRepository.java index c83793d4a..a0d7b2677 100644 --- a/src/main/java/org/akhq/repositories/RecordRepository.java +++ b/src/main/java/org/akhq/repositories/RecordRepository.java @@ -620,15 +620,13 @@ public RecordMetadata produce( if (key.isPresent()) { if (keySchema.isPresent() && StringUtils.isNotEmpty(keySchema.get())) { - Schema schema = schemaRegistryRepository.getLatestVersion(clusterId, keySchema.get()); - SchemaSerializer keySerializer = serializerFactory.createSerializer(clusterId, schema.getId()); - keyAsBytes = keySerializer.serialize(key.get()); - } else { // TODO same for key + test - keyAsBytes = key.filter(Predicate.not(String::isEmpty)).map(String::getBytes).orElse(null); + keyAsBytes = getBytesBySchemaRegistry(clusterId, key.get(), keySchema.get()); + } else { // TODO test + keyAsBytes = getBytesByAvroSerializer(clusterId, topic, key.get()); } } else { try { - if (Topic.isCompacted(configRepository.findByTopic(clusterId, value.isEmpty() ? null : value.get()))) { + if (Topic.isCompacted(configRepository.findByTopic(clusterId, value.orElse(null)))) { throw new IllegalArgumentException("Key missing for produce onto compacted topic"); } } catch (ExecutionException ex) { @@ -637,28 +635,34 @@ public RecordMetadata produce( } if (value.isPresent() && valueSchema.isPresent() && StringUtils.isNotEmpty(valueSchema.get())) { - Schema schema = schemaRegistryRepository.getLatestVersion(clusterId, valueSchema.get()); - SchemaSerializer valueSerializer = serializerFactory.createSerializer(clusterId, schema.getId()); - valueAsBytes = valueSerializer.serialize(value.get()); + valueAsBytes = getBytesBySchemaRegistry(clusterId, value.get(), valueSchema.get()); } else if (value.isPresent()) { // TODO test - JsonToAvroSerializer jsonToAvroSerializer = customSerializerRepository.getJsonToAvroSerializer(clusterId); - if (jsonToAvroSerializer != null) { - try { - byte[] buffer = jsonToAvroSerializer.serialize(topic, value.get(), false); - if (buffer != null) { - valueAsBytes = buffer; - } - } catch (Exception exception) { - valueAsBytes = value.filter(Predicate.not(String::isEmpty)).map(String::getBytes).orElse(null); - } - } else { - valueAsBytes = value.filter(Predicate.not(String::isEmpty)).map(String::getBytes).orElse(null); - } + valueAsBytes = getBytesByAvroSerializer(clusterId, topic, value.get()); } return produce(clusterId, topic, valueAsBytes, headers, keyAsBytes, partition, timestamp); } + private byte[] getBytesBySchemaRegistry(String clusterId, String data, String subject) throws IOException, RestClientException { + Schema schema = schemaRegistryRepository.getLatestVersion(clusterId, subject); + SchemaSerializer serializer = serializerFactory.createSerializer(clusterId, schema.getId()); + return serializer.serialize(data); + } + + private byte[] getBytesByAvroSerializer(String clusterId, String topic, String data) { + JsonToAvroSerializer jsonToAvroSerializer = customSerializerRepository.getJsonToAvroSerializer(clusterId); + + if (jsonToAvroSerializer == null) { + return data.isEmpty() ? null : data.getBytes(); + } + + try { + return jsonToAvroSerializer.serialize(topic, data, false); + } catch (Exception exception) { + return data.isEmpty() ? null : data.getBytes(); + } + } + public RecordMetadata delete(String clusterId, String topic, Integer partition, byte[] key) throws ExecutionException, InterruptedException { return kafkaModule.getProducer(clusterId).send(new ProducerRecord<>( topic, From 50e89a18ad3b73c46107c3bb341f49b052f7c136 Mon Sep 17 00:00:00 2001 From: Erik Rohwedder Date: Fri, 12 Jul 2024 23:26:45 +0200 Subject: [PATCH 3/8] repair --- src/main/java/org/akhq/repositories/RecordRepository.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/akhq/repositories/RecordRepository.java b/src/main/java/org/akhq/repositories/RecordRepository.java index a0d7b2677..4414b0b3b 100644 --- a/src/main/java/org/akhq/repositories/RecordRepository.java +++ b/src/main/java/org/akhq/repositories/RecordRepository.java @@ -622,7 +622,7 @@ public RecordMetadata produce( if (keySchema.isPresent() && StringUtils.isNotEmpty(keySchema.get())) { keyAsBytes = getBytesBySchemaRegistry(clusterId, key.get(), keySchema.get()); } else { // TODO test - keyAsBytes = getBytesByAvroSerializer(clusterId, topic, key.get()); + keyAsBytes = getBytesByAvroSerializer(clusterId, topic, key.get(), true); } } else { try { @@ -637,7 +637,7 @@ public RecordMetadata produce( if (value.isPresent() && valueSchema.isPresent() && StringUtils.isNotEmpty(valueSchema.get())) { valueAsBytes = getBytesBySchemaRegistry(clusterId, value.get(), valueSchema.get()); } else if (value.isPresent()) { // TODO test - valueAsBytes = getBytesByAvroSerializer(clusterId, topic, value.get()); + valueAsBytes = getBytesByAvroSerializer(clusterId, topic, value.get(), false); } return produce(clusterId, topic, valueAsBytes, headers, keyAsBytes, partition, timestamp); @@ -649,7 +649,7 @@ private byte[] getBytesBySchemaRegistry(String clusterId, String data, String su return serializer.serialize(data); } - private byte[] getBytesByAvroSerializer(String clusterId, String topic, String data) { + private byte[] getBytesByAvroSerializer(String clusterId, String topic, String data, boolean isKey) { JsonToAvroSerializer jsonToAvroSerializer = customSerializerRepository.getJsonToAvroSerializer(clusterId); if (jsonToAvroSerializer == null) { @@ -657,7 +657,7 @@ private byte[] getBytesByAvroSerializer(String clusterId, String topic, String d } try { - return jsonToAvroSerializer.serialize(topic, data, false); + return jsonToAvroSerializer.serialize(topic, data, isKey); } catch (Exception exception) { return data.isEmpty() ? null : data.getBytes(); } From 70a973096b43952ff2eacace511bf6f99f07640c Mon Sep 17 00:00:00 2001 From: Erik Rohwedder Date: Sat, 13 Jul 2024 00:15:28 +0200 Subject: [PATCH 4/8] introduce RawAvroUtils --- .../akhq/utils/AvroToJsonDeserializer.java | 85 +--------------- .../org/akhq/utils/JsonToAvroSerializer.java | 98 +++---------------- .../java/org/akhq/utils/RawAvroUtils.java | 93 ++++++++++++++++++ 3 files changed, 110 insertions(+), 166 deletions(-) create mode 100644 src/main/java/org/akhq/utils/RawAvroUtils.java diff --git a/src/main/java/org/akhq/utils/AvroToJsonDeserializer.java b/src/main/java/org/akhq/utils/AvroToJsonDeserializer.java index c6059fef1..4ee35b3c0 100644 --- a/src/main/java/org/akhq/utils/AvroToJsonDeserializer.java +++ b/src/main/java/org/akhq/utils/AvroToJsonDeserializer.java @@ -2,7 +2,6 @@ import io.micronaut.core.serialize.exceptions.SerializationException; import lombok.extern.slf4j.Slf4j; -import org.akhq.configs.AvroTopicsMapping; import org.akhq.configs.Connection; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; @@ -10,16 +9,7 @@ import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; -import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Function; /** * Class for deserialization of messages in Avro raw data binary format using topics mapping config. @@ -27,59 +17,19 @@ @Slf4j public class AvroToJsonDeserializer { private final DecoderFactory decoderFactory = DecoderFactory.get(); - private final Map keySchemas; - private final Map valueSchemas; - private final List topicsMapping; - private final String avroSchemasFolder; + private final RawAvroUtils rawAvroUtils; private final AvroToJsonSerializer avroToJsonSerializer; public AvroToJsonDeserializer(Connection.Deserialization.AvroDeserializationTopicsMapping avroDeserializationTopicsMapping, AvroToJsonSerializer avroToJsonSerializer) { if (avroDeserializationTopicsMapping == null) { - this.keySchemas = new HashMap<>(); - this.valueSchemas = new HashMap<>(); - this.topicsMapping = new ArrayList<>(); - this.avroSchemasFolder = null; + this.rawAvroUtils = null; this.avroToJsonSerializer = null; } else { - this.avroSchemasFolder = avroDeserializationTopicsMapping.getSchemasFolder(); - this.topicsMapping = avroDeserializationTopicsMapping.getTopicsMapping(); - this.keySchemas = buildSchemas(AvroTopicsMapping::getKeySchemaFile); - this.valueSchemas = buildSchemas(AvroTopicsMapping::getValueSchemaFile); + this.rawAvroUtils = new RawAvroUtils(avroDeserializationTopicsMapping.getTopicsMapping(), avroDeserializationTopicsMapping.getSchemasFolder()); this.avroToJsonSerializer = avroToJsonSerializer; } } - /** - * Load Avro schemas from schema folder - * - * @return map where keys are topic regexes and value is Avro schema - */ - private Map buildSchemas(Function schemaFileMapper) { - Map allSchemas = new HashMap<>(); - for (AvroTopicsMapping mapping : topicsMapping) { - String schemaFile = schemaFileMapper.apply(mapping); - - if (schemaFile != null) { - try { - Schema schema = loadSchemaFile(mapping, schemaFile); - allSchemas.put(mapping.getTopicRegex(), schema); - } catch (IOException e) { - throw new RuntimeException(String.format("Cannot get a schema file for the topics regex [%s]", mapping.getTopicRegex()), e); - } - } - } - return allSchemas; - } - - Schema loadSchemaFile(AvroTopicsMapping mapping, String schemaFile) throws IOException { - if (avroSchemasFolder != null && Files.exists(Path.of(avroSchemasFolder))) { - String fullPath = avroSchemasFolder + File.separator + schemaFile; - return new Schema.Parser().parse(Path.of(fullPath).toFile()); - } - throw new FileNotFoundException("Avro schema file is not found for topic regex [" + - mapping.getTopicRegex() + "]. Folder is not specified or doesn't exist."); - } - /** * Deserialize from Avro raw data binary format to Json. * Messages must have been encoded directly with {@link org.apache.avro.io.DatumWriter}, not {@link org.apache.avro.file.DataFileWriter} or {@link org.apache.avro.message.BinaryMessageEncoder}. @@ -92,23 +42,7 @@ Schema loadSchemaFile(AvroTopicsMapping mapping, String schemaFile) throws IOExc * @return {@code null} if cannot deserialize or configuration is not matching, return decoded string otherwise */ public String deserialize(String topic, byte[] buffer, boolean isKey) { - AvroTopicsMapping matchingConfig = findMatchingConfig(topic); - if (matchingConfig == null) { - log.debug("Avro deserialization config is not found for topic [{}]", topic); - return null; - } - - if (matchingConfig.getKeySchemaFile() == null && matchingConfig.getValueSchemaFile() == null) { - throw new SerializationException(String.format("Avro deserialization is configured for topic [%s], " + - "but schema is not specified neither for a key, nor for a value.", topic)); - } - - Schema schema; - if (isKey) { - schema = keySchemas.get(matchingConfig.getTopicRegex()); - } else { - schema = valueSchemas.get(matchingConfig.getTopicRegex()); - } + Schema schema = rawAvroUtils.getSchema(topic, isKey); if (schema == null) { return null; @@ -124,17 +58,6 @@ public String deserialize(String topic, byte[] buffer, boolean isKey) { return result; } - private AvroTopicsMapping findMatchingConfig(String topic) { - for (AvroTopicsMapping mapping : topicsMapping) { - if (topic.matches(mapping.getTopicRegex())) { - return new AvroTopicsMapping( - mapping.getTopicRegex(), - mapping.getKeySchemaFile(), mapping.getValueSchemaFile()); - } - } - return null; - } - private String tryToDeserializeWithSchemaFile(byte[] buffer, Schema schema) throws IOException { DatumReader reader = new GenericDatumReader<>(schema); Object result = reader.read(null, decoderFactory.binaryDecoder(buffer, null)); diff --git a/src/main/java/org/akhq/utils/JsonToAvroSerializer.java b/src/main/java/org/akhq/utils/JsonToAvroSerializer.java index a2c67f3d0..d15ab0c71 100644 --- a/src/main/java/org/akhq/utils/JsonToAvroSerializer.java +++ b/src/main/java/org/akhq/utils/JsonToAvroSerializer.java @@ -2,81 +2,36 @@ import io.micronaut.core.serialize.exceptions.SerializationException; import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Function; import lombok.extern.slf4j.Slf4j; -import org.akhq.configs.AvroTopicsMapping; import org.akhq.configs.Connection; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.io.*; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; -// TODO test + unify +// TODO test /** * Class for serialization of messages in Json to Avro raw data binary format using topics mapping config. */ @Slf4j public class JsonToAvroSerializer { private final EncoderFactory encoderFactory = EncoderFactory.get(); - private final Map keySchemas; - private final Map valueSchemas; - private final List topicsMapping; - private final String avroSchemasFolder; + private final DecoderFactory decoderFactory = DecoderFactory.get(); + private final RawAvroUtils rawAvroUtils; public JsonToAvroSerializer(Connection.Serialization.AvroSerializationTopicsMapping avroSerializationTopicsMapping) { if (avroSerializationTopicsMapping == null) { - this.keySchemas = new HashMap<>(); - this.valueSchemas = new HashMap<>(); - this.topicsMapping = new ArrayList<>(); - this.avroSchemasFolder = null; + this.rawAvroUtils = null; } else { - this.avroSchemasFolder = avroSerializationTopicsMapping.getSchemasFolder(); - this.topicsMapping = avroSerializationTopicsMapping.getTopicsMapping(); - this.keySchemas = buildSchemas(AvroTopicsMapping::getKeySchemaFile); - this.valueSchemas = buildSchemas(AvroTopicsMapping::getValueSchemaFile); + this.rawAvroUtils = new RawAvroUtils(avroSerializationTopicsMapping.getTopicsMapping(), avroSerializationTopicsMapping.getSchemasFolder()); } } - /** - * Load Avro schemas from schema folder - * - * @return map where keys are topic regexes and value is Avro schema - */ - private Map buildSchemas(Function schemaFileMapper) { - Map allSchemas = new HashMap<>(); - for (AvroTopicsMapping mapping : topicsMapping) { - String schemaFile = schemaFileMapper.apply(mapping); - - if (schemaFile != null) { - try { - Schema schema = loadSchemaFile(mapping, schemaFile); - allSchemas.put(mapping.getTopicRegex(), schema); - } catch (IOException e) { - throw new RuntimeException(String.format("Cannot get a schema file for the topics regex [%s]", mapping.getTopicRegex()), e); - } - } - } - return allSchemas; - } - - Schema loadSchemaFile(AvroTopicsMapping mapping, String schemaFile) throws IOException { - if (avroSchemasFolder != null && Files.exists(Path.of(avroSchemasFolder))) { - String fullPath = avroSchemasFolder + File.separator + schemaFile; - return new Schema.Parser().parse(Path.of(fullPath).toFile()); - } - throw new FileNotFoundException("Avro schema file is not found for topic regex [" + - mapping.getTopicRegex() + "]. Folder is not specified or doesn't exist."); - } - /** * Serialize from Json to Avro raw data binary format. * Messages must be decoded directly with {@link org.apache.avro.io.DatumReader}, not {@link org.apache.avro.file.DataFileReader} or {@link org.apache.avro.message.BinaryMessageDecoder}. @@ -89,23 +44,7 @@ Schema loadSchemaFile(AvroTopicsMapping mapping, String schemaFile) throws IOExc * @return {@code null} if cannot serialize or configuration is not matching, return encoded string otherwise */ public byte[] serialize(String topic, String data, boolean isKey) { - AvroTopicsMapping matchingConfig = findMatchingConfig(topic); - if (matchingConfig == null) { - log.debug("Avro serialization config is not found for topic [{}]", topic); - return null; - } - - if (matchingConfig.getKeySchemaFile() == null && matchingConfig.getValueSchemaFile() == null) { - throw new SerializationException(String.format("Avro serialization is configured for topic [%s], " + - "but schema is not specified neither for a key, nor for a value.", topic)); - } - - Schema schema; - if (isKey) { - schema = keySchemas.get(matchingConfig.getTopicRegex()); - } else { - schema = valueSchemas.get(matchingConfig.getTopicRegex()); - } + Schema schema = rawAvroUtils.getSchema(topic, isKey); if (schema == null) { return null; @@ -121,23 +60,12 @@ public byte[] serialize(String topic, String data, boolean isKey) { return result; } - private AvroTopicsMapping findMatchingConfig(String topic) { - for (AvroTopicsMapping mapping : topicsMapping) { - if (topic.matches(mapping.getTopicRegex())) { - return new AvroTopicsMapping( - mapping.getTopicRegex(), - mapping.getKeySchemaFile(), mapping.getValueSchemaFile()); - } - } - return null; - } - private byte[] tryToSerializeWithSchemaFile(String json, Schema schema) throws IOException { DatumReader reader = new GenericDatumReader<>(schema); GenericDatumWriter writer = new GenericDatumWriter<>(schema); ByteArrayOutputStream output = new ByteArrayOutputStream(); - Decoder decoder = DecoderFactory.get().jsonDecoder(schema, json); - Encoder encoder = EncoderFactory.get().binaryEncoder(output, null); + Decoder decoder = decoderFactory.jsonDecoder(schema, json); + Encoder encoder = encoderFactory.binaryEncoder(output, null); Object datum = reader.read(null, decoder); writer.write(datum, encoder); encoder.flush(); diff --git a/src/main/java/org/akhq/utils/RawAvroUtils.java b/src/main/java/org/akhq/utils/RawAvroUtils.java new file mode 100644 index 000000000..e2b64e67b --- /dev/null +++ b/src/main/java/org/akhq/utils/RawAvroUtils.java @@ -0,0 +1,93 @@ +package org.akhq.utils; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +import io.micronaut.core.serialize.exceptions.SerializationException; +import lombok.extern.slf4j.Slf4j; +import org.akhq.configs.AvroTopicsMapping; +import org.apache.avro.Schema; + +@Slf4j +public class RawAvroUtils { + private final Map keySchemas; + private final Map valueSchemas; + private final List topicsMapping; + private final String avroSchemasFolder; + + public RawAvroUtils(List topicsMapping, String avroSchemasFolder) { + this.topicsMapping = topicsMapping; + this.avroSchemasFolder = avroSchemasFolder; + this.keySchemas = buildSchemas(AvroTopicsMapping::getKeySchemaFile); + this.valueSchemas = buildSchemas(AvroTopicsMapping::getValueSchemaFile); + } + + /** + * Load Avro schemas from schema folder + * + * @return map where keys are topic regexes and value is Avro schema + */ + private Map buildSchemas(Function schemaFileMapper) { + Map allSchemas = new HashMap<>(); + for (AvroTopicsMapping mapping : topicsMapping) { + String schemaFile = schemaFileMapper.apply(mapping); + + if (schemaFile != null) { + try { + Schema schema = loadSchemaFile(mapping, schemaFile); + allSchemas.put(mapping.getTopicRegex(), schema); + } catch (IOException e) { + throw new RuntimeException(String.format("Cannot get a schema file for the topics regex [%s]", mapping.getTopicRegex()), e); + } + } + } + return allSchemas; + } + + private Schema loadSchemaFile(AvroTopicsMapping mapping, String schemaFile) throws IOException { + if (avroSchemasFolder != null && Files.exists(Path.of(avroSchemasFolder))) { + String fullPath = avroSchemasFolder + File.separator + schemaFile; + return new Schema.Parser().parse(Path.of(fullPath).toFile()); + } + throw new FileNotFoundException("Avro schema file is not found for topic regex [" + + mapping.getTopicRegex() + "]. Folder is not specified or doesn't exist."); + } + + Schema getSchema(String topic, boolean isKey) { + AvroTopicsMapping matchingConfig = findMatchingConfig(topic); + if (matchingConfig == null) { + log.debug("Avro serialization config is not found for topic [{}]", topic); + return null; + } + + if (matchingConfig.getKeySchemaFile() == null && matchingConfig.getValueSchemaFile() == null) { + throw new SerializationException(String.format("Avro serialization is configured for topic [%s], " + + "but schema is not specified neither for a key, nor for a value.", topic)); + } + + Schema schema; + if (isKey) { + return keySchemas.get(matchingConfig.getTopicRegex()); + } else { + return valueSchemas.get(matchingConfig.getTopicRegex()); + } + } + + private AvroTopicsMapping findMatchingConfig(String topic) { + for (AvroTopicsMapping mapping : topicsMapping) { + if (topic.matches(mapping.getTopicRegex())) { + return new AvroTopicsMapping( + mapping.getTopicRegex(), + mapping.getKeySchemaFile(), mapping.getValueSchemaFile()); + } + } + return null; + } +} From eed332d304045397a21c83900607d37b4e611964 Mon Sep 17 00:00:00 2001 From: Erik Rohwedder Date: Sat, 13 Jul 2024 00:30:31 +0200 Subject: [PATCH 5/8] add test --- .../akhq/repositories/RecordRepository.java | 3 +- .../org/akhq/utils/JsonToAvroSerializer.java | 12 +- .../java/org/akhq/utils/RawAvroUtils.java | 2 +- .../akhq/utils/JsonToAvroSerializerTest.java | 180 ++++++++++++++++++ 4 files changed, 186 insertions(+), 11 deletions(-) create mode 100644 src/test/java/org/akhq/utils/JsonToAvroSerializerTest.java diff --git a/src/main/java/org/akhq/repositories/RecordRepository.java b/src/main/java/org/akhq/repositories/RecordRepository.java index 4414b0b3b..3f660d0c1 100644 --- a/src/main/java/org/akhq/repositories/RecordRepository.java +++ b/src/main/java/org/akhq/repositories/RecordRepository.java @@ -45,7 +45,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; -import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; import jakarta.inject.Inject; @@ -636,7 +635,7 @@ public RecordMetadata produce( if (value.isPresent() && valueSchema.isPresent() && StringUtils.isNotEmpty(valueSchema.get())) { valueAsBytes = getBytesBySchemaRegistry(clusterId, value.get(), valueSchema.get()); - } else if (value.isPresent()) { // TODO test + } else if (value.isPresent()) { valueAsBytes = getBytesByAvroSerializer(clusterId, topic, value.get(), false); } diff --git a/src/main/java/org/akhq/utils/JsonToAvroSerializer.java b/src/main/java/org/akhq/utils/JsonToAvroSerializer.java index d15ab0c71..c52dd6c8b 100644 --- a/src/main/java/org/akhq/utils/JsonToAvroSerializer.java +++ b/src/main/java/org/akhq/utils/JsonToAvroSerializer.java @@ -1,20 +1,16 @@ package org.akhq.utils; import io.micronaut.core.serialize.exceptions.SerializationException; -import java.io.ByteArrayOutputStream; -import java.io.IOException; import lombok.extern.slf4j.Slf4j; import org.akhq.configs.Connection; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.io.DatumReader; -import org.apache.avro.io.Decoder; -import org.apache.avro.io.DecoderFactory; -import org.apache.avro.io.Encoder; -import org.apache.avro.io.EncoderFactory; +import org.apache.avro.io.*; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; -// TODO test /** * Class for serialization of messages in Json to Avro raw data binary format using topics mapping config. */ diff --git a/src/main/java/org/akhq/utils/RawAvroUtils.java b/src/main/java/org/akhq/utils/RawAvroUtils.java index e2b64e67b..e8f114fba 100644 --- a/src/main/java/org/akhq/utils/RawAvroUtils.java +++ b/src/main/java/org/akhq/utils/RawAvroUtils.java @@ -63,7 +63,7 @@ private Schema loadSchemaFile(AvroTopicsMapping mapping, String schemaFile) thro Schema getSchema(String topic, boolean isKey) { AvroTopicsMapping matchingConfig = findMatchingConfig(topic); if (matchingConfig == null) { - log.debug("Avro serialization config is not found for topic [{}]", topic); + log.debug("Avro raw config is not found for topic [{}]", topic); return null; } diff --git a/src/test/java/org/akhq/utils/JsonToAvroSerializerTest.java b/src/test/java/org/akhq/utils/JsonToAvroSerializerTest.java new file mode 100644 index 000000000..b0c7318dd --- /dev/null +++ b/src/test/java/org/akhq/utils/JsonToAvroSerializerTest.java @@ -0,0 +1,180 @@ +package org.akhq.utils; + +import org.akhq.AlbumAvro; +import org.akhq.FilmAvro; +import org.akhq.configs.AvroTopicsMapping; +import org.akhq.configs.Connection; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + +class JsonToAvroSerializerTest { + Connection.Serialization.AvroSerializationTopicsMapping avroSerializationTopicsMapping; + AlbumAvro albumAvro; + FilmAvro filmAvro; + + @BeforeEach + void before() throws URISyntaxException { + createTopicAvroSerializationMapping(); + createAlbumObject(); + createFilmObject(); + } + + private void createTopicAvroSerializationMapping() throws URISyntaxException { + avroSerializationTopicsMapping = new Connection.Serialization.AvroSerializationTopicsMapping(); + + URI uri = ClassLoader.getSystemResource("avro").toURI(); + String avroSchemaFolder = Paths.get(uri).toString(); + avroSerializationTopicsMapping.setSchemasFolder(avroSchemaFolder); + + AvroTopicsMapping albumTopicsMapping = new AvroTopicsMapping(); + albumTopicsMapping.setTopicRegex("album.*"); + albumTopicsMapping.setValueSchemaFile("Album.avsc"); + + AvroTopicsMapping filmTopicsMapping = new AvroTopicsMapping(); + filmTopicsMapping.setTopicRegex("film.*"); + filmTopicsMapping.setValueSchemaFile("Film.avsc"); + + // Do not specify schema neither for a key, nor for a value + AvroTopicsMapping incorrectTopicsMapping = new AvroTopicsMapping(); + incorrectTopicsMapping.setTopicRegex("incorrect.*"); + + avroSerializationTopicsMapping.setTopicsMapping( + Arrays.asList(albumTopicsMapping, filmTopicsMapping, incorrectTopicsMapping)); + } + + private void createAlbumObject() { + List artists = Collections.singletonList("Imagine Dragons"); + List songTitles = Arrays.asList("Birds", "Zero", "Natural", "Machine"); + Album album = new Album("Origins", artists, 2018, songTitles); + albumAvro = AlbumAvro.newBuilder() + .setTitle(album.getTitle()) + .setArtist(album.getArtists()) + .setReleaseYear(album.getReleaseYear()) + .setSongTitle(album.getSongsTitles()) + .build(); + } + + private void createFilmObject() { + List starring = Arrays.asList("Harrison Ford", "Mark Hamill", "Carrie Fisher", "Adam Driver", "Daisy Ridley"); + Film film = new Film("Star Wars: The Force Awakens", "J. J. Abrams", 2015, 135, starring); + filmAvro = FilmAvro.newBuilder() + .setName(film.getName()) + .setProducer(film.getProducer()) + .setReleaseYear(film.getReleaseYear()) + .setDuration(film.getDuration()) + .setStarring(film.getStarring()) + .build(); + } + + @Test + void serializeAlbum() throws IOException, URISyntaxException { + JsonToAvroSerializer jsonToAvroSerializer = new JsonToAvroSerializer(avroSerializationTopicsMapping); + final String jsonAlbum = "{" + + "\"title\":\"Origins\"," + + "\"artist\":[\"Imagine Dragons\"]," + + "\"releaseYear\":2018," + + "\"songTitle\":[\"Birds\",\"Zero\",\"Natural\",\"Machine\"]" + + "}"; + byte[] encodedAlbum = jsonToAvroSerializer.serialize("album.topic.name", jsonAlbum, false); + byte[] expectedAlbum = toByteArray("Album.avsc", albumAvro); + assertEquals(expectedAlbum, encodedAlbum); + } + + @Test + void serializeFilm() throws IOException, URISyntaxException { + JsonToAvroSerializer jsonToAvroSerializer = new JsonToAvroSerializer(avroSerializationTopicsMapping); + final String jsonFilm = "{" + + "\"name\":\"Star Wars: The Force Awakens\"," + + "\"producer\":\"J. J. Abrams\"," + + "\"releaseYear\":2015," + + "\"duration\":135," + + "\"starring\":[\"Harrison Ford\",\"Mark Hamill\",\"Carrie Fisher\",\"Adam Driver\",\"Daisy Ridley\"]" + + "}"; + byte[] encodedFilm = jsonToAvroSerializer.serialize("film.topic.name", jsonFilm, false); + byte[] expectedFilm = toByteArray("Film.avsc", filmAvro); + assertEquals(expectedFilm, encodedFilm); + } + + @Test + void serializeForNotMatchingTopic() { + JsonToAvroSerializer jsonToAvroSerializer = new JsonToAvroSerializer(avroSerializationTopicsMapping); + final String jsonFilm = "{" + + "\"name\":\"Star Wars: The Force Awakens\"," + + "\"producer\":\"J. J. Abrams\"," + + "\"releaseYear\":2015," + + "\"duration\":135," + + "\"starring\":[\"Harrison Ford\",\"Mark Hamill\",\"Carrie Fisher\",\"Adam Driver\",\"Daisy Ridley\"]" + + "}"; + byte[] encodedFilm = jsonToAvroSerializer.serialize("random.topic.name", jsonFilm, false); + assertNull(encodedFilm); + } + + @Test + void serializeForKeyWhenItsTypeNotSet() { + JsonToAvroSerializer jsonToAvroSerializer = new JsonToAvroSerializer(avroSerializationTopicsMapping); + final String jsonFilm = "{" + + "\"name\":\"Star Wars: The Force Awakens\"," + + "\"producer\":\"J. J. Abrams\"," + + "\"releaseYear\":2015," + + "\"duration\":135," + + "\"starring\":[\"Harrison Ford\",\"Mark Hamill\",\"Carrie Fisher\",\"Adam Driver\",\"Daisy Ridley\"]" + + "}"; + byte[] encodedFilm = jsonToAvroSerializer.serialize("random.topic.name", jsonFilm, true); + assertNull(encodedFilm); + } + + @Test + void serializeWhenTypeNotSetForKeyAndValue() { + JsonToAvroSerializer jsonToAvroSerializer = new JsonToAvroSerializer(avroSerializationTopicsMapping); + final String jsonFilm = "{" + + "\"name\":\"Star Wars: The Force Awakens\"," + + "\"producer\":\"J. J. Abrams\"," + + "\"releaseYear\":2015," + + "\"duration\":135," + + "\"starring\":[\"Harrison Ford\",\"Mark Hamill\",\"Carrie Fisher\",\"Adam Driver\",\"Daisy Ridley\"]" + + "}"; + Exception exception = assertThrows(RuntimeException.class, () -> { + jsonToAvroSerializer.serialize("incorrect.topic.name", jsonFilm, true); + }); + String expectedMessage = "schema is not specified neither for a key, nor for a value"; + String actualMessage = exception.getMessage(); + assertTrue(actualMessage.contains(expectedMessage)); + } + + private byte[] toByteArray(String schemaName, T datum) throws IOException, URISyntaxException { + Schema schema = resolveSchema(schemaName); + + DatumWriter writer = new GenericDatumWriter<>(schema); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + Encoder encoder = EncoderFactory.get().binaryEncoder(bos, null); + writer.write(datum, encoder); + encoder.flush(); + bos.close(); + + return bos.toByteArray(); + } + + private Schema resolveSchema(String schemaName) throws URISyntaxException, IOException { + URI uri = ClassLoader.getSystemResource("avro").toURI(); + File schemaFile = Paths.get(uri).resolve(schemaName).toFile(); + + return new Schema.Parser().parse(schemaFile); + } +} From 5812725f37d1f3995a08c0c1132ae4b222fc9c3a Mon Sep 17 00:00:00 2001 From: Erik Rohwedder Date: Sat, 13 Jul 2024 00:32:28 +0200 Subject: [PATCH 6/8] remove todo --- src/main/java/org/akhq/configs/Connection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/akhq/configs/Connection.java b/src/main/java/org/akhq/configs/Connection.java index 14bd5f434..77cb96f3c 100644 --- a/src/main/java/org/akhq/configs/Connection.java +++ b/src/main/java/org/akhq/configs/Connection.java @@ -62,7 +62,7 @@ public static class AvroDeserializationTopicsMapping { } @Getter - @ConfigurationProperties("serialization") // TODO use other key + @ConfigurationProperties("serialization") public static class Serialization { AvroSerializationTopicsMapping avroRaw; From f3b923c7a75217dd9ae0d99939658aa3a2de11f5 Mon Sep 17 00:00:00 2001 From: Erik Rohwedder Date: Sat, 13 Jul 2024 00:44:54 +0200 Subject: [PATCH 7/8] add documentation --- docs/docs/configuration/avro.md | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/docs/docs/configuration/avro.md b/docs/docs/configuration/avro.md index 80fc74581..30b552b38 100644 --- a/docs/docs/configuration/avro.md +++ b/docs/docs/configuration/avro.md @@ -29,4 +29,33 @@ akhq: value-schema-file: "Value.avsc" ``` +# Avro serialization + +Avro messages using Schema registry are automatically encoded if the registry is configured (see [Kafka cluster](../configuration/brokers.md)). + +You can also encoded raw binary Avro messages, that is messages must then be decoded directly with [DatumReader](https://avro.apache.org/docs/current/api/java/org/apache/avro/io/DatumReader.html) without any header. +You must provide a `schemas-folder` and mappings which associate a `topic-regex` and a schema file name. The schema can be +specified either for message keys with `key-schema-file` and/or for values with `value-schema-file`. + +Here is an example of configuration: + +``` +akhq: + connections: + kafka: + properties: + # standard kafka properties + serialization: + avro-raw: + schemas-folder: "/app/avro_schemas" + topics-mapping: + - topic-regex: "album.*" + value-schema-file: "Album.avsc" + - topic-regex: "film.*" + value-schema-file: "Film.avsc" + - topic-regex: "test.*" + key-schema-file: "Key.avsc" + value-schema-file: "Value.avsc" +``` + Examples can be found in [tests](https://github.com/tchiotludo/akhq/tree/dev/src/main/java/org/akhq/utils). From 9cb08c39d815b460a8a89e2874ffb3908a831a2d Mon Sep 17 00:00:00 2001 From: Erik Rohwedder Date: Sat, 13 Jul 2024 00:51:27 +0200 Subject: [PATCH 8/8] remove todo --- src/main/java/org/akhq/repositories/RecordRepository.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/akhq/repositories/RecordRepository.java b/src/main/java/org/akhq/repositories/RecordRepository.java index 3f660d0c1..c26cd1839 100644 --- a/src/main/java/org/akhq/repositories/RecordRepository.java +++ b/src/main/java/org/akhq/repositories/RecordRepository.java @@ -620,7 +620,7 @@ public RecordMetadata produce( if (key.isPresent()) { if (keySchema.isPresent() && StringUtils.isNotEmpty(keySchema.get())) { keyAsBytes = getBytesBySchemaRegistry(clusterId, key.get(), keySchema.get()); - } else { // TODO test + } else { keyAsBytes = getBytesByAvroSerializer(clusterId, topic, key.get(), true); } } else {