Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom serializer for avro raw format #1860

Open
wants to merge 8 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions docs/docs/configuration/avro.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,33 @@ akhq:
value-schema-file: "Value.avsc"
```

# Avro serialization

Avro messages using Schema registry are automatically encoded if the registry is configured (see [Kafka cluster](../configuration/brokers.md)).

You can also encoded raw binary Avro messages, that is messages must then be decoded directly with [DatumReader](https://avro.apache.org/docs/current/api/java/org/apache/avro/io/DatumReader.html) without any header.
You must provide a `schemas-folder` and mappings which associate a `topic-regex` and a schema file name. The schema can be
specified either for message keys with `key-schema-file` and/or for values with `value-schema-file`.

Here is an example of configuration:

```
akhq:
connections:
kafka:
properties:
# standard kafka properties
serialization:
avro-raw:
schemas-folder: "/app/avro_schemas"
topics-mapping:
- topic-regex: "album.*"
value-schema-file: "Album.avsc"
- topic-regex: "film.*"
value-schema-file: "Film.avsc"
- topic-regex: "test.*"
key-schema-file: "Key.avsc"
value-schema-file: "Value.avsc"
```

Examples can be found in [tests](https://github.com/tchiotludo/akhq/tree/dev/src/main/java/org/akhq/utils).
14 changes: 14 additions & 0 deletions src/main/java/org/akhq/configs/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class Connection extends AbstractProperties {
List<Connect> connect;
List<KsqlDb> ksqldb;
Deserialization deserialization = new Deserialization();
Serialization serialization = new Serialization();
UiOptions uiOptions = new UiOptions();

public Connection(@Parameter String name) {
Expand Down Expand Up @@ -60,6 +61,19 @@ public static class AvroDeserializationTopicsMapping {
}
}

@Getter
@ConfigurationProperties("serialization")
public static class Serialization {
AvroSerializationTopicsMapping avroRaw;

@Data
@ConfigurationProperties("avro-raw")
public static class AvroSerializationTopicsMapping {
String schemasFolder;
List<AvroTopicsMapping> topicsMapping = new ArrayList<>();
}
}

@Data
@ConfigurationProperties("ui-options")
public static class UiOptions {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.akhq.repositories;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.util.HashMap;
import java.util.Map;
import org.akhq.modules.KafkaModule;
import org.akhq.utils.JsonToAvroSerializer;

@Singleton
public class CustomSerializerRepository {
@Inject
private KafkaModule kafkaModule;
private final Map<String, JsonToAvroSerializer> jsonToAvroSerializers = new HashMap<>();

public JsonToAvroSerializer getJsonToAvroSerializer(String clusterId) {
if (!this.jsonToAvroSerializers.containsKey(clusterId)) {
this.jsonToAvroSerializers.put(
clusterId,
new JsonToAvroSerializer(this.kafkaModule.getConnection(clusterId).getSerialization().getAvroRaw())
);
}
return this.jsonToAvroSerializers.get(clusterId);
}
}
43 changes: 31 additions & 12 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.akhq.modules.schemaregistry.RecordWithSchemaSerializerFactory;
import org.akhq.utils.AvroToJsonSerializer;
import org.akhq.utils.Debug;
import org.akhq.utils.JsonToAvroSerializer;
import org.akhq.utils.MaskingUtils;
import org.apache.kafka.clients.admin.DeletedRecords;
import org.apache.kafka.clients.admin.RecordsToDelete;
Expand All @@ -44,7 +45,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import jakarta.inject.Inject;
Expand Down Expand Up @@ -75,6 +75,9 @@ public class RecordRepository extends AbstractRepository {
@Inject
private CustomDeserializerRepository customDeserializerRepository;

@Inject
private CustomSerializerRepository customSerializerRepository;

@Inject
private AvroWireFormatConverter avroWireFormatConverter;

Expand Down Expand Up @@ -612,19 +615,17 @@ public RecordMetadata produce(
Optional<String> valueSchema
) throws ExecutionException, InterruptedException, RestClientException, IOException {
byte[] keyAsBytes = null;
byte[] valueAsBytes;
byte[] valueAsBytes = null;

if (key.isPresent()) {
if (keySchema.isPresent() && StringUtils.isNotEmpty(keySchema.get())) {
Schema schema = schemaRegistryRepository.getLatestVersion(clusterId, keySchema.get());
SchemaSerializer keySerializer = serializerFactory.createSerializer(clusterId, schema.getId());
keyAsBytes = keySerializer.serialize(key.get());
keyAsBytes = getBytesBySchemaRegistry(clusterId, key.get(), keySchema.get());
} else {
keyAsBytes = key.filter(Predicate.not(String::isEmpty)).map(String::getBytes).orElse(null);
keyAsBytes = getBytesByAvroSerializer(clusterId, topic, key.get(), true);
}
} else {
try {
if (Topic.isCompacted(configRepository.findByTopic(clusterId, value.isEmpty() ? null : value.get()))) {
if (Topic.isCompacted(configRepository.findByTopic(clusterId, value.orElse(null)))) {
throw new IllegalArgumentException("Key missing for produce onto compacted topic");
}
} catch (ExecutionException ex) {
Expand All @@ -633,16 +634,34 @@ public RecordMetadata produce(
}

if (value.isPresent() && valueSchema.isPresent() && StringUtils.isNotEmpty(valueSchema.get())) {
Schema schema = schemaRegistryRepository.getLatestVersion(clusterId, valueSchema.get());
SchemaSerializer valueSerializer = serializerFactory.createSerializer(clusterId, schema.getId());
valueAsBytes = valueSerializer.serialize(value.get());
} else {
valueAsBytes = value.filter(Predicate.not(String::isEmpty)).map(String::getBytes).orElse(null);
valueAsBytes = getBytesBySchemaRegistry(clusterId, value.get(), valueSchema.get());
} else if (value.isPresent()) {
valueAsBytes = getBytesByAvroSerializer(clusterId, topic, value.get(), false);
}

return produce(clusterId, topic, valueAsBytes, headers, keyAsBytes, partition, timestamp);
}

private byte[] getBytesBySchemaRegistry(String clusterId, String data, String subject) throws IOException, RestClientException {
Schema schema = schemaRegistryRepository.getLatestVersion(clusterId, subject);
SchemaSerializer serializer = serializerFactory.createSerializer(clusterId, schema.getId());
return serializer.serialize(data);
}

private byte[] getBytesByAvroSerializer(String clusterId, String topic, String data, boolean isKey) {
JsonToAvroSerializer jsonToAvroSerializer = customSerializerRepository.getJsonToAvroSerializer(clusterId);

if (jsonToAvroSerializer == null) {
return data.isEmpty() ? null : data.getBytes();
}

try {
return jsonToAvroSerializer.serialize(topic, data, isKey);
} catch (Exception exception) {
return data.isEmpty() ? null : data.getBytes();
}
}

public RecordMetadata delete(String clusterId, String topic, Integer partition, byte[] key) throws ExecutionException, InterruptedException {
return kafkaModule.getProducer(clusterId).send(new ProducerRecord<>(
topic,
Expand Down
85 changes: 4 additions & 81 deletions src/main/java/org/akhq/utils/AvroToJsonDeserializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,84 +2,34 @@

import io.micronaut.core.serialize.exceptions.SerializationException;
import lombok.extern.slf4j.Slf4j;
import org.akhq.configs.AvroTopicsMapping;
import org.akhq.configs.Connection;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

/**
* Class for deserialization of messages in Avro raw data binary format using topics mapping config.
*/
@Slf4j
public class AvroToJsonDeserializer {
private final DecoderFactory decoderFactory = DecoderFactory.get();
private final Map<String, Schema> keySchemas;
private final Map<String, Schema> valueSchemas;
private final List<AvroTopicsMapping> topicsMapping;
private final String avroSchemasFolder;
private final RawAvroUtils rawAvroUtils;
private final AvroToJsonSerializer avroToJsonSerializer;

public AvroToJsonDeserializer(Connection.Deserialization.AvroDeserializationTopicsMapping avroDeserializationTopicsMapping, AvroToJsonSerializer avroToJsonSerializer) {
if (avroDeserializationTopicsMapping == null) {
this.keySchemas = new HashMap<>();
this.valueSchemas = new HashMap<>();
this.topicsMapping = new ArrayList<>();
this.avroSchemasFolder = null;
this.rawAvroUtils = null;
this.avroToJsonSerializer = null;
} else {
this.avroSchemasFolder = avroDeserializationTopicsMapping.getSchemasFolder();
this.topicsMapping = avroDeserializationTopicsMapping.getTopicsMapping();
this.keySchemas = buildSchemas(AvroTopicsMapping::getKeySchemaFile);
this.valueSchemas = buildSchemas(AvroTopicsMapping::getValueSchemaFile);
this.rawAvroUtils = new RawAvroUtils(avroDeserializationTopicsMapping.getTopicsMapping(), avroDeserializationTopicsMapping.getSchemasFolder());
this.avroToJsonSerializer = avroToJsonSerializer;
}
}

/**
* Load Avro schemas from schema folder
*
* @return map where keys are topic regexes and value is Avro schema
*/
private Map<String, Schema> buildSchemas(Function<AvroTopicsMapping, String> schemaFileMapper) {
Map<String, Schema> allSchemas = new HashMap<>();
for (AvroTopicsMapping mapping : topicsMapping) {
String schemaFile = schemaFileMapper.apply(mapping);

if (schemaFile != null) {
try {
Schema schema = loadSchemaFile(mapping, schemaFile);
allSchemas.put(mapping.getTopicRegex(), schema);
} catch (IOException e) {
throw new RuntimeException(String.format("Cannot get a schema file for the topics regex [%s]", mapping.getTopicRegex()), e);
}
}
}
return allSchemas;
}

Schema loadSchemaFile(AvroTopicsMapping mapping, String schemaFile) throws IOException {
if (avroSchemasFolder != null && Files.exists(Path.of(avroSchemasFolder))) {
String fullPath = avroSchemasFolder + File.separator + schemaFile;
return new Schema.Parser().parse(Path.of(fullPath).toFile());
}
throw new FileNotFoundException("Avro schema file is not found for topic regex [" +
mapping.getTopicRegex() + "]. Folder is not specified or doesn't exist.");
}

/**
* Deserialize from Avro raw data binary format to Json.
* Messages must have been encoded directly with {@link org.apache.avro.io.DatumWriter}, not {@link org.apache.avro.file.DataFileWriter} or {@link org.apache.avro.message.BinaryMessageEncoder}.
Expand All @@ -92,23 +42,7 @@ Schema loadSchemaFile(AvroTopicsMapping mapping, String schemaFile) throws IOExc
* @return {@code null} if cannot deserialize or configuration is not matching, return decoded string otherwise
*/
public String deserialize(String topic, byte[] buffer, boolean isKey) {
AvroTopicsMapping matchingConfig = findMatchingConfig(topic);
if (matchingConfig == null) {
log.debug("Avro deserialization config is not found for topic [{}]", topic);
return null;
}

if (matchingConfig.getKeySchemaFile() == null && matchingConfig.getValueSchemaFile() == null) {
throw new SerializationException(String.format("Avro deserialization is configured for topic [%s], " +
"but schema is not specified neither for a key, nor for a value.", topic));
}

Schema schema;
if (isKey) {
schema = keySchemas.get(matchingConfig.getTopicRegex());
} else {
schema = valueSchemas.get(matchingConfig.getTopicRegex());
}
Schema schema = rawAvroUtils.getSchema(topic, isKey);

if (schema == null) {
return null;
Expand All @@ -124,17 +58,6 @@ public String deserialize(String topic, byte[] buffer, boolean isKey) {
return result;
}

private AvroTopicsMapping findMatchingConfig(String topic) {
for (AvroTopicsMapping mapping : topicsMapping) {
if (topic.matches(mapping.getTopicRegex())) {
return new AvroTopicsMapping(
mapping.getTopicRegex(),
mapping.getKeySchemaFile(), mapping.getValueSchemaFile());
}
}
return null;
}

private String tryToDeserializeWithSchemaFile(byte[] buffer, Schema schema) throws IOException {
DatumReader<?> reader = new GenericDatumReader<>(schema);
Object result = reader.read(null, decoderFactory.binaryDecoder(buffer, null));
Expand Down
70 changes: 70 additions & 0 deletions src/main/java/org/akhq/utils/JsonToAvroSerializer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.akhq.utils;

import io.micronaut.core.serialize.exceptions.SerializationException;
import lombok.extern.slf4j.Slf4j;
import org.akhq.configs.Connection;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.*;

import java.io.ByteArrayOutputStream;
import java.io.IOException;

/**
* Class for serialization of messages in Json to Avro raw data binary format using topics mapping config.
*/
@Slf4j
public class JsonToAvroSerializer {
private final EncoderFactory encoderFactory = EncoderFactory.get();
private final DecoderFactory decoderFactory = DecoderFactory.get();
private final RawAvroUtils rawAvroUtils;

public JsonToAvroSerializer(Connection.Serialization.AvroSerializationTopicsMapping avroSerializationTopicsMapping) {
if (avroSerializationTopicsMapping == null) {
this.rawAvroUtils = null;
} else {
this.rawAvroUtils = new RawAvroUtils(avroSerializationTopicsMapping.getTopicsMapping(), avroSerializationTopicsMapping.getSchemasFolder());
}
}

/**
* Serialize from Json to Avro raw data binary format.
* Messages must be decoded directly with {@link org.apache.avro.io.DatumReader}, not {@link org.apache.avro.file.DataFileReader} or {@link org.apache.avro.message.BinaryMessageDecoder}.
* Topic name should match topic-regex from {@code akhq.connections.[clusterName].serialization.avro.topics-mapping} config
* and schema should be set for key or value in that config.
*
* @param topic current topic name
* @param data Json data to encode
* @param isKey is this data represent key or value
* @return {@code null} if cannot serialize or configuration is not matching, return encoded string otherwise
*/
public byte[] serialize(String topic, String data, boolean isKey) {
Schema schema = rawAvroUtils.getSchema(topic, isKey);

if (schema == null) {
return null;
}

byte[] result;
try {
result = tryToSerializeWithSchemaFile(data, schema);
} catch (Exception e) {
throw new SerializationException(String.format("Cannot deserialize message with Avro deserializer " +
"for topic [%s] and schema [%s]", topic, schema.getFullName()), e);
}
return result;
}

private byte[] tryToSerializeWithSchemaFile(String json, Schema schema) throws IOException {
DatumReader<Object> reader = new GenericDatumReader<>(schema);
GenericDatumWriter<Object> writer = new GenericDatumWriter<>(schema);
ByteArrayOutputStream output = new ByteArrayOutputStream();
Decoder decoder = decoderFactory.jsonDecoder(schema, json);
Encoder encoder = encoderFactory.binaryEncoder(output, null);
Object datum = reader.read(null, decoder);
writer.write(datum, encoder);
encoder.flush();
return output.toByteArray();
}
}
Loading