diff --git a/docs/docs/configuration/avro.md b/docs/docs/configuration/avro.md index 80fc74581..30b552b38 100644 --- a/docs/docs/configuration/avro.md +++ b/docs/docs/configuration/avro.md @@ -29,4 +29,33 @@ akhq: value-schema-file: "Value.avsc" ``` +# Avro serialization + +Avro messages using Schema registry are automatically encoded if the registry is configured (see [Kafka cluster](../configuration/brokers.md)). + +You can also encoded raw binary Avro messages, that is messages must then be decoded directly with [DatumReader](https://avro.apache.org/docs/current/api/java/org/apache/avro/io/DatumReader.html) without any header. +You must provide a `schemas-folder` and mappings which associate a `topic-regex` and a schema file name. The schema can be +specified either for message keys with `key-schema-file` and/or for values with `value-schema-file`. + +Here is an example of configuration: + +``` +akhq: + connections: + kafka: + properties: + # standard kafka properties + serialization: + avro-raw: + schemas-folder: "/app/avro_schemas" + topics-mapping: + - topic-regex: "album.*" + value-schema-file: "Album.avsc" + - topic-regex: "film.*" + value-schema-file: "Film.avsc" + - topic-regex: "test.*" + key-schema-file: "Key.avsc" + value-schema-file: "Value.avsc" +``` + Examples can be found in [tests](https://github.com/tchiotludo/akhq/tree/dev/src/main/java/org/akhq/utils). diff --git a/src/main/java/org/akhq/configs/Connection.java b/src/main/java/org/akhq/configs/Connection.java index cf6109bc0..77cb96f3c 100644 --- a/src/main/java/org/akhq/configs/Connection.java +++ b/src/main/java/org/akhq/configs/Connection.java @@ -20,6 +20,7 @@ public class Connection extends AbstractProperties { List connect; List ksqldb; Deserialization deserialization = new Deserialization(); + Serialization serialization = new Serialization(); UiOptions uiOptions = new UiOptions(); public Connection(@Parameter String name) { @@ -60,6 +61,19 @@ public static class AvroDeserializationTopicsMapping { } } + @Getter + @ConfigurationProperties("serialization") + public static class Serialization { + AvroSerializationTopicsMapping avroRaw; + + @Data + @ConfigurationProperties("avro-raw") + public static class AvroSerializationTopicsMapping { + String schemasFolder; + List topicsMapping = new ArrayList<>(); + } + } + @Data @ConfigurationProperties("ui-options") public static class UiOptions { diff --git a/src/main/java/org/akhq/repositories/CustomSerializerRepository.java b/src/main/java/org/akhq/repositories/CustomSerializerRepository.java new file mode 100644 index 000000000..8cd1a17b9 --- /dev/null +++ b/src/main/java/org/akhq/repositories/CustomSerializerRepository.java @@ -0,0 +1,25 @@ +package org.akhq.repositories; + +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import java.util.HashMap; +import java.util.Map; +import org.akhq.modules.KafkaModule; +import org.akhq.utils.JsonToAvroSerializer; + +@Singleton +public class CustomSerializerRepository { + @Inject + private KafkaModule kafkaModule; + private final Map jsonToAvroSerializers = new HashMap<>(); + + public JsonToAvroSerializer getJsonToAvroSerializer(String clusterId) { + if (!this.jsonToAvroSerializers.containsKey(clusterId)) { + this.jsonToAvroSerializers.put( + clusterId, + new JsonToAvroSerializer(this.kafkaModule.getConnection(clusterId).getSerialization().getAvroRaw()) + ); + } + return this.jsonToAvroSerializers.get(clusterId); + } +} diff --git a/src/main/java/org/akhq/repositories/RecordRepository.java b/src/main/java/org/akhq/repositories/RecordRepository.java index 83714fc08..c26cd1839 100644 --- a/src/main/java/org/akhq/repositories/RecordRepository.java +++ b/src/main/java/org/akhq/repositories/RecordRepository.java @@ -25,6 +25,7 @@ import org.akhq.modules.schemaregistry.RecordWithSchemaSerializerFactory; import org.akhq.utils.AvroToJsonSerializer; import org.akhq.utils.Debug; +import org.akhq.utils.JsonToAvroSerializer; import org.akhq.utils.MaskingUtils; import org.apache.kafka.clients.admin.DeletedRecords; import org.apache.kafka.clients.admin.RecordsToDelete; @@ -44,7 +45,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; -import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; import jakarta.inject.Inject; @@ -75,6 +75,9 @@ public class RecordRepository extends AbstractRepository { @Inject private CustomDeserializerRepository customDeserializerRepository; + @Inject + private CustomSerializerRepository customSerializerRepository; + @Inject private AvroWireFormatConverter avroWireFormatConverter; @@ -612,19 +615,17 @@ public RecordMetadata produce( Optional valueSchema ) throws ExecutionException, InterruptedException, RestClientException, IOException { byte[] keyAsBytes = null; - byte[] valueAsBytes; + byte[] valueAsBytes = null; if (key.isPresent()) { if (keySchema.isPresent() && StringUtils.isNotEmpty(keySchema.get())) { - Schema schema = schemaRegistryRepository.getLatestVersion(clusterId, keySchema.get()); - SchemaSerializer keySerializer = serializerFactory.createSerializer(clusterId, schema.getId()); - keyAsBytes = keySerializer.serialize(key.get()); + keyAsBytes = getBytesBySchemaRegistry(clusterId, key.get(), keySchema.get()); } else { - keyAsBytes = key.filter(Predicate.not(String::isEmpty)).map(String::getBytes).orElse(null); + keyAsBytes = getBytesByAvroSerializer(clusterId, topic, key.get(), true); } } else { try { - if (Topic.isCompacted(configRepository.findByTopic(clusterId, value.isEmpty() ? null : value.get()))) { + if (Topic.isCompacted(configRepository.findByTopic(clusterId, value.orElse(null)))) { throw new IllegalArgumentException("Key missing for produce onto compacted topic"); } } catch (ExecutionException ex) { @@ -633,16 +634,34 @@ public RecordMetadata produce( } if (value.isPresent() && valueSchema.isPresent() && StringUtils.isNotEmpty(valueSchema.get())) { - Schema schema = schemaRegistryRepository.getLatestVersion(clusterId, valueSchema.get()); - SchemaSerializer valueSerializer = serializerFactory.createSerializer(clusterId, schema.getId()); - valueAsBytes = valueSerializer.serialize(value.get()); - } else { - valueAsBytes = value.filter(Predicate.not(String::isEmpty)).map(String::getBytes).orElse(null); + valueAsBytes = getBytesBySchemaRegistry(clusterId, value.get(), valueSchema.get()); + } else if (value.isPresent()) { + valueAsBytes = getBytesByAvroSerializer(clusterId, topic, value.get(), false); } return produce(clusterId, topic, valueAsBytes, headers, keyAsBytes, partition, timestamp); } + private byte[] getBytesBySchemaRegistry(String clusterId, String data, String subject) throws IOException, RestClientException { + Schema schema = schemaRegistryRepository.getLatestVersion(clusterId, subject); + SchemaSerializer serializer = serializerFactory.createSerializer(clusterId, schema.getId()); + return serializer.serialize(data); + } + + private byte[] getBytesByAvroSerializer(String clusterId, String topic, String data, boolean isKey) { + JsonToAvroSerializer jsonToAvroSerializer = customSerializerRepository.getJsonToAvroSerializer(clusterId); + + if (jsonToAvroSerializer == null) { + return data.isEmpty() ? null : data.getBytes(); + } + + try { + return jsonToAvroSerializer.serialize(topic, data, isKey); + } catch (Exception exception) { + return data.isEmpty() ? null : data.getBytes(); + } + } + public RecordMetadata delete(String clusterId, String topic, Integer partition, byte[] key) throws ExecutionException, InterruptedException { return kafkaModule.getProducer(clusterId).send(new ProducerRecord<>( topic, diff --git a/src/main/java/org/akhq/utils/AvroToJsonDeserializer.java b/src/main/java/org/akhq/utils/AvroToJsonDeserializer.java index c6059fef1..4ee35b3c0 100644 --- a/src/main/java/org/akhq/utils/AvroToJsonDeserializer.java +++ b/src/main/java/org/akhq/utils/AvroToJsonDeserializer.java @@ -2,7 +2,6 @@ import io.micronaut.core.serialize.exceptions.SerializationException; import lombok.extern.slf4j.Slf4j; -import org.akhq.configs.AvroTopicsMapping; import org.akhq.configs.Connection; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; @@ -10,16 +9,7 @@ import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; -import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Function; /** * Class for deserialization of messages in Avro raw data binary format using topics mapping config. @@ -27,59 +17,19 @@ @Slf4j public class AvroToJsonDeserializer { private final DecoderFactory decoderFactory = DecoderFactory.get(); - private final Map keySchemas; - private final Map valueSchemas; - private final List topicsMapping; - private final String avroSchemasFolder; + private final RawAvroUtils rawAvroUtils; private final AvroToJsonSerializer avroToJsonSerializer; public AvroToJsonDeserializer(Connection.Deserialization.AvroDeserializationTopicsMapping avroDeserializationTopicsMapping, AvroToJsonSerializer avroToJsonSerializer) { if (avroDeserializationTopicsMapping == null) { - this.keySchemas = new HashMap<>(); - this.valueSchemas = new HashMap<>(); - this.topicsMapping = new ArrayList<>(); - this.avroSchemasFolder = null; + this.rawAvroUtils = null; this.avroToJsonSerializer = null; } else { - this.avroSchemasFolder = avroDeserializationTopicsMapping.getSchemasFolder(); - this.topicsMapping = avroDeserializationTopicsMapping.getTopicsMapping(); - this.keySchemas = buildSchemas(AvroTopicsMapping::getKeySchemaFile); - this.valueSchemas = buildSchemas(AvroTopicsMapping::getValueSchemaFile); + this.rawAvroUtils = new RawAvroUtils(avroDeserializationTopicsMapping.getTopicsMapping(), avroDeserializationTopicsMapping.getSchemasFolder()); this.avroToJsonSerializer = avroToJsonSerializer; } } - /** - * Load Avro schemas from schema folder - * - * @return map where keys are topic regexes and value is Avro schema - */ - private Map buildSchemas(Function schemaFileMapper) { - Map allSchemas = new HashMap<>(); - for (AvroTopicsMapping mapping : topicsMapping) { - String schemaFile = schemaFileMapper.apply(mapping); - - if (schemaFile != null) { - try { - Schema schema = loadSchemaFile(mapping, schemaFile); - allSchemas.put(mapping.getTopicRegex(), schema); - } catch (IOException e) { - throw new RuntimeException(String.format("Cannot get a schema file for the topics regex [%s]", mapping.getTopicRegex()), e); - } - } - } - return allSchemas; - } - - Schema loadSchemaFile(AvroTopicsMapping mapping, String schemaFile) throws IOException { - if (avroSchemasFolder != null && Files.exists(Path.of(avroSchemasFolder))) { - String fullPath = avroSchemasFolder + File.separator + schemaFile; - return new Schema.Parser().parse(Path.of(fullPath).toFile()); - } - throw new FileNotFoundException("Avro schema file is not found for topic regex [" + - mapping.getTopicRegex() + "]. Folder is not specified or doesn't exist."); - } - /** * Deserialize from Avro raw data binary format to Json. * Messages must have been encoded directly with {@link org.apache.avro.io.DatumWriter}, not {@link org.apache.avro.file.DataFileWriter} or {@link org.apache.avro.message.BinaryMessageEncoder}. @@ -92,23 +42,7 @@ Schema loadSchemaFile(AvroTopicsMapping mapping, String schemaFile) throws IOExc * @return {@code null} if cannot deserialize or configuration is not matching, return decoded string otherwise */ public String deserialize(String topic, byte[] buffer, boolean isKey) { - AvroTopicsMapping matchingConfig = findMatchingConfig(topic); - if (matchingConfig == null) { - log.debug("Avro deserialization config is not found for topic [{}]", topic); - return null; - } - - if (matchingConfig.getKeySchemaFile() == null && matchingConfig.getValueSchemaFile() == null) { - throw new SerializationException(String.format("Avro deserialization is configured for topic [%s], " + - "but schema is not specified neither for a key, nor for a value.", topic)); - } - - Schema schema; - if (isKey) { - schema = keySchemas.get(matchingConfig.getTopicRegex()); - } else { - schema = valueSchemas.get(matchingConfig.getTopicRegex()); - } + Schema schema = rawAvroUtils.getSchema(topic, isKey); if (schema == null) { return null; @@ -124,17 +58,6 @@ public String deserialize(String topic, byte[] buffer, boolean isKey) { return result; } - private AvroTopicsMapping findMatchingConfig(String topic) { - for (AvroTopicsMapping mapping : topicsMapping) { - if (topic.matches(mapping.getTopicRegex())) { - return new AvroTopicsMapping( - mapping.getTopicRegex(), - mapping.getKeySchemaFile(), mapping.getValueSchemaFile()); - } - } - return null; - } - private String tryToDeserializeWithSchemaFile(byte[] buffer, Schema schema) throws IOException { DatumReader reader = new GenericDatumReader<>(schema); Object result = reader.read(null, decoderFactory.binaryDecoder(buffer, null)); diff --git a/src/main/java/org/akhq/utils/JsonToAvroSerializer.java b/src/main/java/org/akhq/utils/JsonToAvroSerializer.java new file mode 100644 index 000000000..c52dd6c8b --- /dev/null +++ b/src/main/java/org/akhq/utils/JsonToAvroSerializer.java @@ -0,0 +1,70 @@ +package org.akhq.utils; + +import io.micronaut.core.serialize.exceptions.SerializationException; +import lombok.extern.slf4j.Slf4j; +import org.akhq.configs.Connection; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.io.*; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +/** + * Class for serialization of messages in Json to Avro raw data binary format using topics mapping config. + */ +@Slf4j +public class JsonToAvroSerializer { + private final EncoderFactory encoderFactory = EncoderFactory.get(); + private final DecoderFactory decoderFactory = DecoderFactory.get(); + private final RawAvroUtils rawAvroUtils; + + public JsonToAvroSerializer(Connection.Serialization.AvroSerializationTopicsMapping avroSerializationTopicsMapping) { + if (avroSerializationTopicsMapping == null) { + this.rawAvroUtils = null; + } else { + this.rawAvroUtils = new RawAvroUtils(avroSerializationTopicsMapping.getTopicsMapping(), avroSerializationTopicsMapping.getSchemasFolder()); + } + } + + /** + * Serialize from Json to Avro raw data binary format. + * Messages must be decoded directly with {@link org.apache.avro.io.DatumReader}, not {@link org.apache.avro.file.DataFileReader} or {@link org.apache.avro.message.BinaryMessageDecoder}. + * Topic name should match topic-regex from {@code akhq.connections.[clusterName].serialization.avro.topics-mapping} config + * and schema should be set for key or value in that config. + * + * @param topic current topic name + * @param data Json data to encode + * @param isKey is this data represent key or value + * @return {@code null} if cannot serialize or configuration is not matching, return encoded string otherwise + */ + public byte[] serialize(String topic, String data, boolean isKey) { + Schema schema = rawAvroUtils.getSchema(topic, isKey); + + if (schema == null) { + return null; + } + + byte[] result; + try { + result = tryToSerializeWithSchemaFile(data, schema); + } catch (Exception e) { + throw new SerializationException(String.format("Cannot deserialize message with Avro deserializer " + + "for topic [%s] and schema [%s]", topic, schema.getFullName()), e); + } + return result; + } + + private byte[] tryToSerializeWithSchemaFile(String json, Schema schema) throws IOException { + DatumReader reader = new GenericDatumReader<>(schema); + GenericDatumWriter writer = new GenericDatumWriter<>(schema); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + Decoder decoder = decoderFactory.jsonDecoder(schema, json); + Encoder encoder = encoderFactory.binaryEncoder(output, null); + Object datum = reader.read(null, decoder); + writer.write(datum, encoder); + encoder.flush(); + return output.toByteArray(); + } +} diff --git a/src/main/java/org/akhq/utils/RawAvroUtils.java b/src/main/java/org/akhq/utils/RawAvroUtils.java new file mode 100644 index 000000000..e8f114fba --- /dev/null +++ b/src/main/java/org/akhq/utils/RawAvroUtils.java @@ -0,0 +1,93 @@ +package org.akhq.utils; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +import io.micronaut.core.serialize.exceptions.SerializationException; +import lombok.extern.slf4j.Slf4j; +import org.akhq.configs.AvroTopicsMapping; +import org.apache.avro.Schema; + +@Slf4j +public class RawAvroUtils { + private final Map keySchemas; + private final Map valueSchemas; + private final List topicsMapping; + private final String avroSchemasFolder; + + public RawAvroUtils(List topicsMapping, String avroSchemasFolder) { + this.topicsMapping = topicsMapping; + this.avroSchemasFolder = avroSchemasFolder; + this.keySchemas = buildSchemas(AvroTopicsMapping::getKeySchemaFile); + this.valueSchemas = buildSchemas(AvroTopicsMapping::getValueSchemaFile); + } + + /** + * Load Avro schemas from schema folder + * + * @return map where keys are topic regexes and value is Avro schema + */ + private Map buildSchemas(Function schemaFileMapper) { + Map allSchemas = new HashMap<>(); + for (AvroTopicsMapping mapping : topicsMapping) { + String schemaFile = schemaFileMapper.apply(mapping); + + if (schemaFile != null) { + try { + Schema schema = loadSchemaFile(mapping, schemaFile); + allSchemas.put(mapping.getTopicRegex(), schema); + } catch (IOException e) { + throw new RuntimeException(String.format("Cannot get a schema file for the topics regex [%s]", mapping.getTopicRegex()), e); + } + } + } + return allSchemas; + } + + private Schema loadSchemaFile(AvroTopicsMapping mapping, String schemaFile) throws IOException { + if (avroSchemasFolder != null && Files.exists(Path.of(avroSchemasFolder))) { + String fullPath = avroSchemasFolder + File.separator + schemaFile; + return new Schema.Parser().parse(Path.of(fullPath).toFile()); + } + throw new FileNotFoundException("Avro schema file is not found for topic regex [" + + mapping.getTopicRegex() + "]. Folder is not specified or doesn't exist."); + } + + Schema getSchema(String topic, boolean isKey) { + AvroTopicsMapping matchingConfig = findMatchingConfig(topic); + if (matchingConfig == null) { + log.debug("Avro raw config is not found for topic [{}]", topic); + return null; + } + + if (matchingConfig.getKeySchemaFile() == null && matchingConfig.getValueSchemaFile() == null) { + throw new SerializationException(String.format("Avro serialization is configured for topic [%s], " + + "but schema is not specified neither for a key, nor for a value.", topic)); + } + + Schema schema; + if (isKey) { + return keySchemas.get(matchingConfig.getTopicRegex()); + } else { + return valueSchemas.get(matchingConfig.getTopicRegex()); + } + } + + private AvroTopicsMapping findMatchingConfig(String topic) { + for (AvroTopicsMapping mapping : topicsMapping) { + if (topic.matches(mapping.getTopicRegex())) { + return new AvroTopicsMapping( + mapping.getTopicRegex(), + mapping.getKeySchemaFile(), mapping.getValueSchemaFile()); + } + } + return null; + } +} diff --git a/src/test/java/org/akhq/utils/JsonToAvroSerializerTest.java b/src/test/java/org/akhq/utils/JsonToAvroSerializerTest.java new file mode 100644 index 000000000..b0c7318dd --- /dev/null +++ b/src/test/java/org/akhq/utils/JsonToAvroSerializerTest.java @@ -0,0 +1,180 @@ +package org.akhq.utils; + +import org.akhq.AlbumAvro; +import org.akhq.FilmAvro; +import org.akhq.configs.AvroTopicsMapping; +import org.akhq.configs.Connection; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + +class JsonToAvroSerializerTest { + Connection.Serialization.AvroSerializationTopicsMapping avroSerializationTopicsMapping; + AlbumAvro albumAvro; + FilmAvro filmAvro; + + @BeforeEach + void before() throws URISyntaxException { + createTopicAvroSerializationMapping(); + createAlbumObject(); + createFilmObject(); + } + + private void createTopicAvroSerializationMapping() throws URISyntaxException { + avroSerializationTopicsMapping = new Connection.Serialization.AvroSerializationTopicsMapping(); + + URI uri = ClassLoader.getSystemResource("avro").toURI(); + String avroSchemaFolder = Paths.get(uri).toString(); + avroSerializationTopicsMapping.setSchemasFolder(avroSchemaFolder); + + AvroTopicsMapping albumTopicsMapping = new AvroTopicsMapping(); + albumTopicsMapping.setTopicRegex("album.*"); + albumTopicsMapping.setValueSchemaFile("Album.avsc"); + + AvroTopicsMapping filmTopicsMapping = new AvroTopicsMapping(); + filmTopicsMapping.setTopicRegex("film.*"); + filmTopicsMapping.setValueSchemaFile("Film.avsc"); + + // Do not specify schema neither for a key, nor for a value + AvroTopicsMapping incorrectTopicsMapping = new AvroTopicsMapping(); + incorrectTopicsMapping.setTopicRegex("incorrect.*"); + + avroSerializationTopicsMapping.setTopicsMapping( + Arrays.asList(albumTopicsMapping, filmTopicsMapping, incorrectTopicsMapping)); + } + + private void createAlbumObject() { + List artists = Collections.singletonList("Imagine Dragons"); + List songTitles = Arrays.asList("Birds", "Zero", "Natural", "Machine"); + Album album = new Album("Origins", artists, 2018, songTitles); + albumAvro = AlbumAvro.newBuilder() + .setTitle(album.getTitle()) + .setArtist(album.getArtists()) + .setReleaseYear(album.getReleaseYear()) + .setSongTitle(album.getSongsTitles()) + .build(); + } + + private void createFilmObject() { + List starring = Arrays.asList("Harrison Ford", "Mark Hamill", "Carrie Fisher", "Adam Driver", "Daisy Ridley"); + Film film = new Film("Star Wars: The Force Awakens", "J. J. Abrams", 2015, 135, starring); + filmAvro = FilmAvro.newBuilder() + .setName(film.getName()) + .setProducer(film.getProducer()) + .setReleaseYear(film.getReleaseYear()) + .setDuration(film.getDuration()) + .setStarring(film.getStarring()) + .build(); + } + + @Test + void serializeAlbum() throws IOException, URISyntaxException { + JsonToAvroSerializer jsonToAvroSerializer = new JsonToAvroSerializer(avroSerializationTopicsMapping); + final String jsonAlbum = "{" + + "\"title\":\"Origins\"," + + "\"artist\":[\"Imagine Dragons\"]," + + "\"releaseYear\":2018," + + "\"songTitle\":[\"Birds\",\"Zero\",\"Natural\",\"Machine\"]" + + "}"; + byte[] encodedAlbum = jsonToAvroSerializer.serialize("album.topic.name", jsonAlbum, false); + byte[] expectedAlbum = toByteArray("Album.avsc", albumAvro); + assertEquals(expectedAlbum, encodedAlbum); + } + + @Test + void serializeFilm() throws IOException, URISyntaxException { + JsonToAvroSerializer jsonToAvroSerializer = new JsonToAvroSerializer(avroSerializationTopicsMapping); + final String jsonFilm = "{" + + "\"name\":\"Star Wars: The Force Awakens\"," + + "\"producer\":\"J. J. Abrams\"," + + "\"releaseYear\":2015," + + "\"duration\":135," + + "\"starring\":[\"Harrison Ford\",\"Mark Hamill\",\"Carrie Fisher\",\"Adam Driver\",\"Daisy Ridley\"]" + + "}"; + byte[] encodedFilm = jsonToAvroSerializer.serialize("film.topic.name", jsonFilm, false); + byte[] expectedFilm = toByteArray("Film.avsc", filmAvro); + assertEquals(expectedFilm, encodedFilm); + } + + @Test + void serializeForNotMatchingTopic() { + JsonToAvroSerializer jsonToAvroSerializer = new JsonToAvroSerializer(avroSerializationTopicsMapping); + final String jsonFilm = "{" + + "\"name\":\"Star Wars: The Force Awakens\"," + + "\"producer\":\"J. J. Abrams\"," + + "\"releaseYear\":2015," + + "\"duration\":135," + + "\"starring\":[\"Harrison Ford\",\"Mark Hamill\",\"Carrie Fisher\",\"Adam Driver\",\"Daisy Ridley\"]" + + "}"; + byte[] encodedFilm = jsonToAvroSerializer.serialize("random.topic.name", jsonFilm, false); + assertNull(encodedFilm); + } + + @Test + void serializeForKeyWhenItsTypeNotSet() { + JsonToAvroSerializer jsonToAvroSerializer = new JsonToAvroSerializer(avroSerializationTopicsMapping); + final String jsonFilm = "{" + + "\"name\":\"Star Wars: The Force Awakens\"," + + "\"producer\":\"J. J. Abrams\"," + + "\"releaseYear\":2015," + + "\"duration\":135," + + "\"starring\":[\"Harrison Ford\",\"Mark Hamill\",\"Carrie Fisher\",\"Adam Driver\",\"Daisy Ridley\"]" + + "}"; + byte[] encodedFilm = jsonToAvroSerializer.serialize("random.topic.name", jsonFilm, true); + assertNull(encodedFilm); + } + + @Test + void serializeWhenTypeNotSetForKeyAndValue() { + JsonToAvroSerializer jsonToAvroSerializer = new JsonToAvroSerializer(avroSerializationTopicsMapping); + final String jsonFilm = "{" + + "\"name\":\"Star Wars: The Force Awakens\"," + + "\"producer\":\"J. J. Abrams\"," + + "\"releaseYear\":2015," + + "\"duration\":135," + + "\"starring\":[\"Harrison Ford\",\"Mark Hamill\",\"Carrie Fisher\",\"Adam Driver\",\"Daisy Ridley\"]" + + "}"; + Exception exception = assertThrows(RuntimeException.class, () -> { + jsonToAvroSerializer.serialize("incorrect.topic.name", jsonFilm, true); + }); + String expectedMessage = "schema is not specified neither for a key, nor for a value"; + String actualMessage = exception.getMessage(); + assertTrue(actualMessage.contains(expectedMessage)); + } + + private byte[] toByteArray(String schemaName, T datum) throws IOException, URISyntaxException { + Schema schema = resolveSchema(schemaName); + + DatumWriter writer = new GenericDatumWriter<>(schema); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + Encoder encoder = EncoderFactory.get().binaryEncoder(bos, null); + writer.write(datum, encoder); + encoder.flush(); + bos.close(); + + return bos.toByteArray(); + } + + private Schema resolveSchema(String schemaName) throws URISyntaxException, IOException { + URI uri = ClassLoader.getSystemResource("avro").toURI(); + File schemaFile = Paths.get(uri).resolve(schemaName).toFile(); + + return new Schema.Parser().parse(schemaFile); + } +}