diff --git a/docs/_Sidebar.md b/docs/_Sidebar.md index 8ec94323..d6dc8f20 100644 --- a/docs/_Sidebar.md +++ b/docs/_Sidebar.md @@ -15,6 +15,7 @@ - [Producer configuration](producer-configuration.md) - [Consumer configuration](consumer-configuration.md) - [Schemas](schemas.md) + - [Schema Registry](schema-registry.md) # Knowledge center - [How to run a test plan](how-to-run.md) diff --git a/docs/images/SchemaRegistrySelection.png b/docs/images/SchemaRegistrySelection.png new file mode 100644 index 00000000..d4032876 Binary files /dev/null and b/docs/images/SchemaRegistrySelection.png differ diff --git a/docs/schema-registry.md b/docs/schema-registry.md new file mode 100644 index 00000000..57e8c292 --- /dev/null +++ b/docs/schema-registry.md @@ -0,0 +1,32 @@ +# Configuration Schema Registry + +Although we support the use of schemas without external dependencies such as Schema Registry, it is common to use it, since it allows to +have control of the schemas with versions. + +To choose the use of Schema Registry it is necessary to do it in the following way introducing the basic properties that appear in the +image: +![](images/SchemaRegistrySelection.png) + +The properties are supported by the schema registry available to us (At the moment are Confluent Schema Registry, and Apicurio Schema Registry) + +A list of the properties explained here: +* `schema.registry.auth.enabled` If your schema registry have auth enabled or not +* `schema.registry.auth.method` The method to connect with your schema registry +* `schema.registry.username` Username +* `schema.registry.password` Password +* `schema.registry.bearer` Token for SASL/OAUTHBEARER + +## Some Recommendations + +To avoid having unclear schemas, when use Apicurio they recommend using this functionality instead of auto-generating. +https://www.apicur.io/registry/docs/apicurio-registry/2.1.x/assets-attachments/registry-rest-api.htm#tag/Artifacts/operation/createArtifact + +However, in Confluent Schema Registry, they take the SubjectName as ID, so remember it if you see a different name in your Schema List. + + + + + + + + diff --git a/pom-maven-central.xml b/pom-maven-central.xml index 7426a07d..f684ab91 100644 --- a/pom-maven-central.xml +++ b/pom-maven-central.xml @@ -7,7 +7,7 @@ kloadgen - 5.6.1 + 5.6.2 KLoadGen Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial diff --git a/pom.xml b/pom.xml index 62a41161..7f8eecd9 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ kloadgen - 5.6.1 + 5.6.2 KLoadGen Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial diff --git a/schema_registry_docker/docker-compose-noauth.yml b/schema_registry_docker/docker-compose-noauth.yml index d8976b99..168db483 100644 --- a/schema_registry_docker/docker-compose-noauth.yml +++ b/schema_registry_docker/docker-compose-noauth.yml @@ -33,7 +33,7 @@ services: apicurio-registry: image: 'apicurio/apicurio-registry-mem:latest-release' ports: - - '8080:8080' + - '8080:8080' environment: AUTH_ENABLED: false kafka-manager: diff --git a/src/main/java/com/sngular/kloadgen/sampler/schemaregistry/impl/ApicurioSchemaRegistry.java b/src/main/java/com/sngular/kloadgen/sampler/schemaregistry/impl/ApicurioSchemaRegistry.java index 4b4051e0..aa3ab1cc 100644 --- a/src/main/java/com/sngular/kloadgen/sampler/schemaregistry/impl/ApicurioSchemaRegistry.java +++ b/src/main/java/com/sngular/kloadgen/sampler/schemaregistry/impl/ApicurioSchemaRegistry.java @@ -2,26 +2,22 @@ import java.io.IOException; import java.io.InputStream; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; +import java.util.Objects; +import com.google.protobuf.Message; import com.sngular.kloadgen.common.SchemaTypeEnum; import com.sngular.kloadgen.exception.KLoadGenException; import com.sngular.kloadgen.sampler.schemaregistry.SchemaRegistryAdapter; -import com.sngular.kloadgen.sampler.schemaregistry.SchemaRegistryConstants; import com.sngular.kloadgen.sampler.schemaregistry.adapter.impl.ApicurioParsedSchemaMetadata; import com.sngular.kloadgen.sampler.schemaregistry.adapter.impl.ApicurioSchemaMetadata; import com.sngular.kloadgen.sampler.schemaregistry.adapter.impl.BaseParsedSchema; import com.sngular.kloadgen.sampler.schemaregistry.adapter.impl.BaseSchemaMetadata; -import com.sngular.kloadgen.sampler.schemaregistry.adapter.impl.ParsedSchemaAdapter; import com.sngular.kloadgen.sampler.schemaregistry.adapter.impl.SchemaMetadataAdapter; -import com.sngular.kloadgen.util.SchemaRegistryKeyHelper; import io.apicurio.registry.resolver.SchemaParser; import io.apicurio.registry.rest.client.RegistryClient; import io.apicurio.registry.rest.client.RegistryClientFactory; @@ -30,147 +26,127 @@ import io.apicurio.registry.rest.v2.beans.SearchedArtifact; import io.apicurio.registry.serde.SerdeConfig; import io.apicurio.registry.serde.avro.AvroSchemaParser; +import io.apicurio.registry.serde.jsonschema.JsonSchema; import io.apicurio.registry.serde.jsonschema.JsonSchemaParser; import io.apicurio.registry.serde.protobuf.ProtobufSchemaParser; import io.apicurio.registry.types.ArtifactState; -import org.apache.tika.io.IOUtils; +import io.apicurio.registry.utils.protobuf.schema.ProtobufSchema; +import org.apache.avro.Schema; public class ApicurioSchemaRegistry implements SchemaRegistryAdapter { private RegistryClient schemaRegistryClient; - private Map propertiesMap; - - private ApicurioParsedSchemaMetadata apicurioParsedSchemaMetadata; - - public ApicurioSchemaRegistry() { - this.propertiesMap = Map.of(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_NAME, SchemaRegistryConstants.SCHEMA_REGISTRY_APICURIO, - SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL_KEY, SerdeConfig.REGISTRY_URL); - } - @Override - public String getSchemaRegistryUrlKey() { + public final String getSchemaRegistryUrlKey() { return SerdeConfig.REGISTRY_URL; } @Override - public void setSchemaRegistryClient(String url, Map properties) { + public final void setSchemaRegistryClient(final String url, final Map properties) { this.schemaRegistryClient = RegistryClientFactory.create(url); } @Override - public void setSchemaRegistryClient(Map properties) { - String url = properties.get(this.getSchemaRegistryUrlKey()).toString(); + public final void setSchemaRegistryClient(final Map properties) { + final String url = Objects.toString(properties.get(this.getSchemaRegistryUrlKey()), ""); this.schemaRegistryClient = RegistryClientFactory.create(url); } @Override - public Collection getAllSubjects() throws KLoadGenException { - Collection subjects = new ArrayList<>(); + public final Collection getAllSubjects() throws KLoadGenException { + final Collection subjects = new ArrayList<>(); try { - List artifacts = this.schemaRegistryClient.searchArtifacts(null, null, null, - null, null, null, null, null, null).getArtifacts(); + final List artifacts = this.schemaRegistryClient.searchArtifacts(null, null, null, + null, null, null, null, null, null).getArtifacts(); - for (SearchedArtifact searchedArtifact : artifacts) { - subjects.add(searchedArtifact.getName()); + for (final SearchedArtifact searchedArtifact : artifacts) { + subjects.add(searchedArtifact.getId()); } return subjects; - } catch (RestClientException e) { + } catch (final RestClientException e) { throw new KLoadGenException(e.getMessage()); } } @Override - public BaseSchemaMetadata getLatestSchemaMetadata(String subjectName) throws KLoadGenException { + public final BaseSchemaMetadata getLatestSchemaMetadata(final String artifactId) throws KLoadGenException { try { - final SearchedArtifact searchedArtifact = getLastestSearchedArtifact(subjectName); + final SearchedArtifact searchedArtifact = getLastestSearchedArtifact(artifactId); final ArtifactMetaData artifactMetaData = this.schemaRegistryClient.getArtifactMetaData(searchedArtifact.getGroupId(), searchedArtifact.getId()); - return new BaseSchemaMetadata(new ApicurioSchemaMetadata(artifactMetaData)); - } catch (RestClientException e) { + return new BaseSchemaMetadata<>(new ApicurioSchemaMetadata(artifactMetaData)); + } catch (final RestClientException e) { throw new KLoadGenException(e.getMessage()); } } @Override - public BaseParsedSchema getSchemaBySubject(String subjectName) { + public final BaseParsedSchema getSchemaBySubject(final String artifactId) { final ApicurioParsedSchemaMetadata schema = new ApicurioParsedSchemaMetadata(); try { - List artifacts = this.schemaRegistryClient.searchArtifacts(null, subjectName, null, - null, null, null, null, null, null).getArtifacts(); - if (artifacts.isEmpty()) { - throw new KLoadGenException(String.format("Schema %s not found", subjectName)); - } else { - SearchedArtifact searchedArtifact = artifacts.get(0); - InputStream inputStream = this.schemaRegistryClient.getLatestArtifact(searchedArtifact.getGroupId(), searchedArtifact.getId()); - String result = IOUtils.toString(inputStream, String.valueOf(StandardCharsets.UTF_8)); - - String searchedArtifactType = searchedArtifact.getType().toString(); - if (SchemaTypeEnum.AVRO.name().equalsIgnoreCase(searchedArtifactType)) { - SchemaParser parserAvro = new AvroSchemaParser(null); - schema.setSchema(parserAvro.parseSchema(result.getBytes(StandardCharsets.UTF_8), new HashMap<>())); - } else if (SchemaTypeEnum.JSON.name().equalsIgnoreCase(searchedArtifactType)) { - SchemaParser parserJson = new JsonSchemaParser(); - schema.setSchema(parserJson.parseSchema(result.getBytes(StandardCharsets.UTF_8), new HashMap<>())); - } else if (SchemaTypeEnum.PROTOBUF.name().equalsIgnoreCase(searchedArtifactType)) { - SchemaParser parserProtobuf = new ProtobufSchemaParser(); - schema.setSchema(parserProtobuf.parseSchema(result.getBytes(StandardCharsets.UTF_8), new HashMap<>())); - } else { - throw new KLoadGenException(String.format("Schema type not supported %s", searchedArtifactType)); - } - schema.setType(searchedArtifactType); - ParsedSchemaAdapter parsedSchemaAdapter = schema; - return new BaseParsedSchema(parsedSchemaAdapter); - } - } catch (IOException e) { + final SearchedArtifact searchedArtifact = getLastestSearchedArtifact(artifactId); + final InputStream inputStream = this.schemaRegistryClient.getLatestArtifact(searchedArtifact.getGroupId(), searchedArtifact.getId()); + final String searchedArtifactType = searchedArtifact.getType(); + setSchemaBySchemaType(schema, inputStream.readAllBytes(), searchedArtifactType); + schema.setType(searchedArtifactType); + return new BaseParsedSchema<>(schema); + } catch (final IOException e) { throw new KLoadGenException(e); } } - public BaseParsedSchema getSchemaBySubjectAndId(String subjectName, BaseSchemaMetadata metadata) { + private static void setSchemaBySchemaType(final ApicurioParsedSchemaMetadata schema, final byte[] result, final String searchedArtifactType) { + + switch (SchemaTypeEnum.valueOf(searchedArtifactType)) { + case AVRO: + final SchemaParser parserAvro = new AvroSchemaParser<>(null); + schema.setSchema(parserAvro.parseSchema(result, new HashMap<>())); + break; + case PROTOBUF: + final SchemaParser parserProtobuf = new ProtobufSchemaParser<>(); + schema.setSchema(parserProtobuf.parseSchema(result, new HashMap<>())); + break; + case JSON: + final SchemaParser parserJson = new JsonSchemaParser<>(); + schema.setSchema(parserJson.parseSchema(result, new HashMap<>())); + break; + default: + throw new KLoadGenException(String.format("Schema type not supported %s", searchedArtifactType)); + + } + } + + @Override + public final BaseParsedSchema getSchemaBySubjectAndId( + final String subjectName, final BaseSchemaMetadata metadata) { final ApicurioParsedSchemaMetadata schema = new ApicurioParsedSchemaMetadata(); - SchemaMetadataAdapter schemaMetadataAdapter = metadata.getSchemaMetadataAdapter(); + final SchemaMetadataAdapter schemaMetadataAdapter = metadata.getSchemaMetadataAdapter(); try { - InputStream inputStream = this.schemaRegistryClient.getContentByGlobalId(schemaMetadataAdapter.getGlobalId()); - String result = IOUtils.toString(inputStream, String.valueOf(StandardCharsets.UTF_8)); - - String searchedArtifactType = schemaMetadataAdapter.getType(); - if (SchemaTypeEnum.AVRO.name().equalsIgnoreCase(searchedArtifactType)) { - SchemaParser parserAvro = new AvroSchemaParser(null); - schema.setSchema(parserAvro.parseSchema(result.getBytes(StandardCharsets.UTF_8), new HashMap<>())); - } else if (SchemaTypeEnum.JSON.name().equalsIgnoreCase(searchedArtifactType)) { - SchemaParser parserJson = new JsonSchemaParser(); - schema.setSchema(parserJson.parseSchema(result.getBytes(StandardCharsets.UTF_8), new HashMap<>())); - } else if (SchemaTypeEnum.PROTOBUF.name().equalsIgnoreCase(searchedArtifactType)) { - SchemaParser parserProtobuf = new ProtobufSchemaParser(); - schema.setSchema(parserProtobuf.parseSchema(result.getBytes(StandardCharsets.UTF_8), new HashMap<>())); - } else { - throw new KLoadGenException(String.format("Schema type not supported %s", searchedArtifactType)); - } - } catch (IOException e) { + final InputStream inputStream = this.schemaRegistryClient.getContentByGlobalId(schemaMetadataAdapter.getGlobalId()); + + final String searchedArtifactType = schemaMetadataAdapter.getType(); + setSchemaBySchemaType(schema, inputStream.readAllBytes(), searchedArtifactType); + } catch (final IOException e) { throw new RuntimeException(e); } - ParsedSchemaAdapter parsedSchemaAdapter = schema; - return new BaseParsedSchema(parsedSchemaAdapter); + return new BaseParsedSchema<>(schema); } - private SearchedArtifact getLastestSearchedArtifact(final String subjectName) { - boolean found = false; - SearchedArtifact best = null; - final Comparator comparator = Comparator.comparing(SearchedArtifact::getModifiedOn); - for (SearchedArtifact artifact : this.schemaRegistryClient.searchArtifacts(null, null, null, null, null, null, null, null, - null, - null, null).getArtifacts()) { - if (artifact.getName().equals(subjectName) && ArtifactState.ENABLED.equals(artifact.getState()) && !found || comparator.compare(artifact, best) > 0) { - found = true; - best = artifact; + private SearchedArtifact getLastestSearchedArtifact(final String artifactId) { + SearchedArtifact searchedArtifact = null; + + for (final SearchedArtifact artifact : this.schemaRegistryClient.searchArtifacts(null, null, null, null, null, null, + null, null, null, null, null).getArtifacts()) { + if (artifact.getId().equals(artifactId) && ArtifactState.ENABLED.equals(artifact.getState())) { + searchedArtifact = artifact; } } - SearchedArtifact searchedArtifact = (found ? Optional.of(best) : Optional.empty()) - .orElseThrow(() -> new KLoadGenException(String.format("Does not exist any enabled" + - "artifact " + - "registered with name %s", - subjectName))); + + if (searchedArtifact == null) { + throw new KLoadGenException(String.format("Does not exist any enabled artifact registered with id %s", artifactId)); + } + return searchedArtifact; } } diff --git a/src/test/java/com/sngular/kloadgen/loadgen/impl/AvroSRLoadGeneratorTest.java b/src/test/java/com/sngular/kloadgen/loadgen/impl/AvroSRLoadGeneratorTest.java index 04566831..c1c62f4d 100644 --- a/src/test/java/com/sngular/kloadgen/loadgen/impl/AvroSRLoadGeneratorTest.java +++ b/src/test/java/com/sngular/kloadgen/loadgen/impl/AvroSRLoadGeneratorTest.java @@ -56,7 +56,7 @@ void testAvroLoadGenerator(final WireMockRuntimeInfo wmRuntimeInfo) throws KLoad originals.put(schemaRegistryManager.getSchemaRegistryUrlKey(), wmRuntimeInfo.getHttpBaseUrl()); final AvroSRLoadGenerator avroLoadGenerator = new AvroSRLoadGenerator(); - avroLoadGenerator.setUpGenerator(originals, "simple-schema", fieldValueMappingList); + avroLoadGenerator.setUpGenerator(originals, "dad37185-782b-4bed-9cf6-678d1d4587d9", fieldValueMappingList); final Object message = avroLoadGenerator.nextMessage(); Assertions.assertThat(message).isNotNull().isInstanceOf(EnrichedRecord.class);