Skip to content

Commit

Permalink
#374 multiple schemas apicurio error (#382)
Browse files Browse the repository at this point in the history
Release 5.6.2

- Fixed an issue related to Apicurio when trying to get more than one schema from the registry from Jmeter.
- Added new documentation on using the schema registry in the wiki.
  • Loading branch information
AdrianLagartera authored May 19, 2023
1 parent 0464bbc commit 4bf87e9
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 99 deletions.
1 change: 1 addition & 0 deletions docs/_Sidebar.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
- [Producer configuration](producer-configuration.md)
- [Consumer configuration](consumer-configuration.md)
- [Schemas](schemas.md)
- [Schema Registry](schema-registry.md)

# Knowledge center
- [How to run a test plan](how-to-run.md)
Expand Down
Binary file added docs/images/SchemaRegistrySelection.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
32 changes: 32 additions & 0 deletions docs/schema-registry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Configuration Schema Registry

Although we support the use of schemas without external dependencies such as Schema Registry, it is common to use it, since it allows to
have control of the schemas with versions.

To choose the use of Schema Registry it is necessary to do it in the following way introducing the basic properties that appear in the
image:
![](images/SchemaRegistrySelection.png)

The properties are supported by the schema registry available to us (At the moment are Confluent Schema Registry, and Apicurio Schema Registry)

A list of the properties explained here:
* `schema.registry.auth.enabled` If your schema registry have auth enabled or not
* `schema.registry.auth.method` The method to connect with your schema registry
* `schema.registry.username` Username
* `schema.registry.password` Password
* `schema.registry.bearer` Token for SASL/OAUTHBEARER

## Some Recommendations

To avoid having unclear schemas, when use Apicurio they recommend using this functionality instead of auto-generating.
https://www.apicur.io/registry/docs/apicurio-registry/2.1.x/assets-attachments/registry-rest-api.htm#tag/Artifacts/operation/createArtifact

However, in Confluent Schema Registry, they take the SubjectName as ID, so remember it if you see a different name in your Schema List.








2 changes: 1 addition & 1 deletion pom-maven-central.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<artifactId>kloadgen</artifactId>

<version>5.6.1</version>
<version>5.6.2</version>

<name>KLoadGen</name>
<description>Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<artifactId>kloadgen</artifactId>

<version>5.6.1</version>
<version>5.6.2</version>

<name>KLoadGen</name>
<description>Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial
Expand Down
2 changes: 1 addition & 1 deletion schema_registry_docker/docker-compose-noauth.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ services:
apicurio-registry:
image: 'apicurio/apicurio-registry-mem:latest-release'
ports:
- '8080:8080'
- '8080:8080'
environment:
AUTH_ENABLED: false
kafka-manager:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,22 @@

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Objects;

import com.google.protobuf.Message;
import com.sngular.kloadgen.common.SchemaTypeEnum;
import com.sngular.kloadgen.exception.KLoadGenException;
import com.sngular.kloadgen.sampler.schemaregistry.SchemaRegistryAdapter;
import com.sngular.kloadgen.sampler.schemaregistry.SchemaRegistryConstants;
import com.sngular.kloadgen.sampler.schemaregistry.adapter.impl.ApicurioParsedSchemaMetadata;
import com.sngular.kloadgen.sampler.schemaregistry.adapter.impl.ApicurioSchemaMetadata;
import com.sngular.kloadgen.sampler.schemaregistry.adapter.impl.BaseParsedSchema;
import com.sngular.kloadgen.sampler.schemaregistry.adapter.impl.BaseSchemaMetadata;
import com.sngular.kloadgen.sampler.schemaregistry.adapter.impl.ParsedSchemaAdapter;
import com.sngular.kloadgen.sampler.schemaregistry.adapter.impl.SchemaMetadataAdapter;
import com.sngular.kloadgen.util.SchemaRegistryKeyHelper;
import io.apicurio.registry.resolver.SchemaParser;
import io.apicurio.registry.rest.client.RegistryClient;
import io.apicurio.registry.rest.client.RegistryClientFactory;
Expand All @@ -30,147 +26,127 @@
import io.apicurio.registry.rest.v2.beans.SearchedArtifact;
import io.apicurio.registry.serde.SerdeConfig;
import io.apicurio.registry.serde.avro.AvroSchemaParser;
import io.apicurio.registry.serde.jsonschema.JsonSchema;
import io.apicurio.registry.serde.jsonschema.JsonSchemaParser;
import io.apicurio.registry.serde.protobuf.ProtobufSchemaParser;
import io.apicurio.registry.types.ArtifactState;
import org.apache.tika.io.IOUtils;
import io.apicurio.registry.utils.protobuf.schema.ProtobufSchema;
import org.apache.avro.Schema;

public class ApicurioSchemaRegistry implements SchemaRegistryAdapter {

private RegistryClient schemaRegistryClient;

private Map<String, String> propertiesMap;

private ApicurioParsedSchemaMetadata apicurioParsedSchemaMetadata;

public ApicurioSchemaRegistry() {
this.propertiesMap = Map.of(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_NAME, SchemaRegistryConstants.SCHEMA_REGISTRY_APICURIO,
SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL_KEY, SerdeConfig.REGISTRY_URL);
}

@Override
public String getSchemaRegistryUrlKey() {
public final String getSchemaRegistryUrlKey() {
return SerdeConfig.REGISTRY_URL;
}

@Override
public void setSchemaRegistryClient(String url, Map<String, ?> properties) {
public final void setSchemaRegistryClient(final String url, final Map<String, ?> properties) {
this.schemaRegistryClient = RegistryClientFactory.create(url);
}

@Override
public void setSchemaRegistryClient(Map<String, ?> properties) {
String url = properties.get(this.getSchemaRegistryUrlKey()).toString();
public final void setSchemaRegistryClient(final Map<String, ?> properties) {
final String url = Objects.toString(properties.get(this.getSchemaRegistryUrlKey()), "");
this.schemaRegistryClient = RegistryClientFactory.create(url);
}

@Override
public Collection<String> getAllSubjects() throws KLoadGenException {
Collection<String> subjects = new ArrayList<>();
public final Collection<String> getAllSubjects() throws KLoadGenException {
final Collection<String> subjects = new ArrayList<>();
try {
List<SearchedArtifact> artifacts = this.schemaRegistryClient.searchArtifacts(null, null, null,
null, null, null, null, null, null).getArtifacts();
final List<SearchedArtifact> artifacts = this.schemaRegistryClient.searchArtifacts(null, null, null,
null, null, null, null, null, null).getArtifacts();

for (SearchedArtifact searchedArtifact : artifacts) {
subjects.add(searchedArtifact.getName());
for (final SearchedArtifact searchedArtifact : artifacts) {
subjects.add(searchedArtifact.getId());
}
return subjects;
} catch (RestClientException e) {
} catch (final RestClientException e) {
throw new KLoadGenException(e.getMessage());
}
}

@Override
public BaseSchemaMetadata<ApicurioSchemaMetadata> getLatestSchemaMetadata(String subjectName) throws KLoadGenException {
public final BaseSchemaMetadata<ApicurioSchemaMetadata> getLatestSchemaMetadata(final String artifactId) throws KLoadGenException {
try {
final SearchedArtifact searchedArtifact = getLastestSearchedArtifact(subjectName);
final SearchedArtifact searchedArtifact = getLastestSearchedArtifact(artifactId);
final ArtifactMetaData artifactMetaData = this.schemaRegistryClient.getArtifactMetaData(searchedArtifact.getGroupId(), searchedArtifact.getId());
return new BaseSchemaMetadata(new ApicurioSchemaMetadata(artifactMetaData));
} catch (RestClientException e) {
return new BaseSchemaMetadata<>(new ApicurioSchemaMetadata(artifactMetaData));
} catch (final RestClientException e) {
throw new KLoadGenException(e.getMessage());
}
}

@Override
public BaseParsedSchema<ApicurioParsedSchemaMetadata> getSchemaBySubject(String subjectName) {
public final BaseParsedSchema<ApicurioParsedSchemaMetadata> getSchemaBySubject(final String artifactId) {
final ApicurioParsedSchemaMetadata schema = new ApicurioParsedSchemaMetadata();
try {
List<SearchedArtifact> artifacts = this.schemaRegistryClient.searchArtifacts(null, subjectName, null,
null, null, null, null, null, null).getArtifacts();
if (artifacts.isEmpty()) {
throw new KLoadGenException(String.format("Schema %s not found", subjectName));
} else {
SearchedArtifact searchedArtifact = artifacts.get(0);
InputStream inputStream = this.schemaRegistryClient.getLatestArtifact(searchedArtifact.getGroupId(), searchedArtifact.getId());
String result = IOUtils.toString(inputStream, String.valueOf(StandardCharsets.UTF_8));

String searchedArtifactType = searchedArtifact.getType().toString();
if (SchemaTypeEnum.AVRO.name().equalsIgnoreCase(searchedArtifactType)) {
SchemaParser parserAvro = new AvroSchemaParser(null);
schema.setSchema(parserAvro.parseSchema(result.getBytes(StandardCharsets.UTF_8), new HashMap<>()));
} else if (SchemaTypeEnum.JSON.name().equalsIgnoreCase(searchedArtifactType)) {
SchemaParser parserJson = new JsonSchemaParser();
schema.setSchema(parserJson.parseSchema(result.getBytes(StandardCharsets.UTF_8), new HashMap<>()));
} else if (SchemaTypeEnum.PROTOBUF.name().equalsIgnoreCase(searchedArtifactType)) {
SchemaParser parserProtobuf = new ProtobufSchemaParser();
schema.setSchema(parserProtobuf.parseSchema(result.getBytes(StandardCharsets.UTF_8), new HashMap<>()));
} else {
throw new KLoadGenException(String.format("Schema type not supported %s", searchedArtifactType));
}
schema.setType(searchedArtifactType);
ParsedSchemaAdapter parsedSchemaAdapter = schema;
return new BaseParsedSchema(parsedSchemaAdapter);
}
} catch (IOException e) {
final SearchedArtifact searchedArtifact = getLastestSearchedArtifact(artifactId);
final InputStream inputStream = this.schemaRegistryClient.getLatestArtifact(searchedArtifact.getGroupId(), searchedArtifact.getId());
final String searchedArtifactType = searchedArtifact.getType();
setSchemaBySchemaType(schema, inputStream.readAllBytes(), searchedArtifactType);
schema.setType(searchedArtifactType);
return new BaseParsedSchema<>(schema);
} catch (final IOException e) {
throw new KLoadGenException(e);
}
}

public BaseParsedSchema<ApicurioParsedSchemaMetadata> getSchemaBySubjectAndId(String subjectName, BaseSchemaMetadata<? extends SchemaMetadataAdapter> metadata) {
private static void setSchemaBySchemaType(final ApicurioParsedSchemaMetadata schema, final byte[] result, final String searchedArtifactType) {

switch (SchemaTypeEnum.valueOf(searchedArtifactType)) {
case AVRO:
final SchemaParser<Schema, Object> parserAvro = new AvroSchemaParser<>(null);
schema.setSchema(parserAvro.parseSchema(result, new HashMap<>()));
break;
case PROTOBUF:
final SchemaParser<ProtobufSchema, Message> parserProtobuf = new ProtobufSchemaParser<>();
schema.setSchema(parserProtobuf.parseSchema(result, new HashMap<>()));
break;
case JSON:
final SchemaParser<JsonSchema, Object> parserJson = new JsonSchemaParser<>();
schema.setSchema(parserJson.parseSchema(result, new HashMap<>()));
break;
default:
throw new KLoadGenException(String.format("Schema type not supported %s", searchedArtifactType));

}
}

@Override
public final BaseParsedSchema<ApicurioParsedSchemaMetadata> getSchemaBySubjectAndId(
final String subjectName, final BaseSchemaMetadata<? extends SchemaMetadataAdapter> metadata) {
final ApicurioParsedSchemaMetadata schema = new ApicurioParsedSchemaMetadata();

SchemaMetadataAdapter schemaMetadataAdapter = metadata.getSchemaMetadataAdapter();
final SchemaMetadataAdapter schemaMetadataAdapter = metadata.getSchemaMetadataAdapter();
try {
InputStream inputStream = this.schemaRegistryClient.getContentByGlobalId(schemaMetadataAdapter.getGlobalId());
String result = IOUtils.toString(inputStream, String.valueOf(StandardCharsets.UTF_8));

String searchedArtifactType = schemaMetadataAdapter.getType();
if (SchemaTypeEnum.AVRO.name().equalsIgnoreCase(searchedArtifactType)) {
SchemaParser parserAvro = new AvroSchemaParser(null);
schema.setSchema(parserAvro.parseSchema(result.getBytes(StandardCharsets.UTF_8), new HashMap<>()));
} else if (SchemaTypeEnum.JSON.name().equalsIgnoreCase(searchedArtifactType)) {
SchemaParser parserJson = new JsonSchemaParser();
schema.setSchema(parserJson.parseSchema(result.getBytes(StandardCharsets.UTF_8), new HashMap<>()));
} else if (SchemaTypeEnum.PROTOBUF.name().equalsIgnoreCase(searchedArtifactType)) {
SchemaParser parserProtobuf = new ProtobufSchemaParser();
schema.setSchema(parserProtobuf.parseSchema(result.getBytes(StandardCharsets.UTF_8), new HashMap<>()));
} else {
throw new KLoadGenException(String.format("Schema type not supported %s", searchedArtifactType));
}
} catch (IOException e) {
final InputStream inputStream = this.schemaRegistryClient.getContentByGlobalId(schemaMetadataAdapter.getGlobalId());

final String searchedArtifactType = schemaMetadataAdapter.getType();
setSchemaBySchemaType(schema, inputStream.readAllBytes(), searchedArtifactType);
} catch (final IOException e) {
throw new RuntimeException(e);
}
ParsedSchemaAdapter parsedSchemaAdapter = schema;
return new BaseParsedSchema(parsedSchemaAdapter);
return new BaseParsedSchema<>(schema);
}

private SearchedArtifact getLastestSearchedArtifact(final String subjectName) {
boolean found = false;
SearchedArtifact best = null;
final Comparator<SearchedArtifact> comparator = Comparator.comparing(SearchedArtifact::getModifiedOn);
for (SearchedArtifact artifact : this.schemaRegistryClient.searchArtifacts(null, null, null, null, null, null, null, null,
null,
null, null).getArtifacts()) {
if (artifact.getName().equals(subjectName) && ArtifactState.ENABLED.equals(artifact.getState()) && !found || comparator.compare(artifact, best) > 0) {
found = true;
best = artifact;
private SearchedArtifact getLastestSearchedArtifact(final String artifactId) {
SearchedArtifact searchedArtifact = null;

for (final SearchedArtifact artifact : this.schemaRegistryClient.searchArtifacts(null, null, null, null, null, null,
null, null, null, null, null).getArtifacts()) {
if (artifact.getId().equals(artifactId) && ArtifactState.ENABLED.equals(artifact.getState())) {
searchedArtifact = artifact;
}
}
SearchedArtifact searchedArtifact = (found ? Optional.of(best) : Optional.<SearchedArtifact>empty())
.orElseThrow(() -> new KLoadGenException(String.format("Does not exist any enabled" +
"artifact " +
"registered with name %s",
subjectName)));

if (searchedArtifact == null) {
throw new KLoadGenException(String.format("Does not exist any enabled artifact registered with id %s", artifactId));
}

return searchedArtifact;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void testAvroLoadGenerator(final WireMockRuntimeInfo wmRuntimeInfo) throws KLoad
originals.put(schemaRegistryManager.getSchemaRegistryUrlKey(), wmRuntimeInfo.getHttpBaseUrl());

final AvroSRLoadGenerator avroLoadGenerator = new AvroSRLoadGenerator();
avroLoadGenerator.setUpGenerator(originals, "simple-schema", fieldValueMappingList);
avroLoadGenerator.setUpGenerator(originals, "dad37185-782b-4bed-9cf6-678d1d4587d9", fieldValueMappingList);
final Object message = avroLoadGenerator.nextMessage();
Assertions.assertThat(message).isNotNull().isInstanceOf(EnrichedRecord.class);

Expand Down

0 comments on commit 4bf87e9

Please sign in to comment.