Skip to content

Commit

Permalink
AVRO references support
Browse files Browse the repository at this point in the history
  • Loading branch information
libretto committed Jul 29, 2024
1 parent 8c50eb0 commit fd811e3
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 3 deletions.
14 changes: 13 additions & 1 deletion karapace/schema_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def normalize_schema_str(
except JSONDecodeError as e:
LOG.info("Schema is not valid JSON")
raise e

elif schema_type == SchemaType.PROTOBUF:
if schema:
schema_str = str(schema)
Expand Down Expand Up @@ -180,6 +181,17 @@ def schema(self) -> Draft7Validator | AvroSchema | ProtobufSchema:
return parsed_typed_schema.schema


def avro_schema_merge(schema_str: str, dependencies: Mapping[str, Dependency]) -> str:
"""To support references in AVRO we recursively merge all referenced schemas with current schema"""
if dependencies:
merged_schema = ""
for dependency in dependencies.values():
merged_schema += avro_schema_merge(dependency.schema.schema_str, dependency.schema.dependencies) + ",\n"
merged_schema += schema_str
return "[\n" + merged_schema + "\n]"
return schema_str


def parse(
schema_type: SchemaType,
schema_str: str,
Expand All @@ -196,7 +208,7 @@ def parse(
if schema_type is SchemaType.AVRO:
try:
parsed_schema = parse_avro_schema_definition(
schema_str,
avro_schema_merge(schema_str, dependencies),
validate_enum_symbols=validate_avro_enum_symbols,
validate_names=validate_avro_names,
)
Expand Down
14 changes: 13 additions & 1 deletion karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,19 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:

parsed_schema: Draft7Validator | AvroSchema | ProtobufSchema | None = None
resolved_dependencies: dict[str, Dependency] | None = None
if schema_type_parsed in [SchemaType.AVRO, SchemaType.JSONSCHEMA]:
if schema_type_parsed == SchemaType.AVRO:
try:
if schema_references:
candidate_references = [reference_from_mapping(reference_data) for reference_data in schema_references]
resolved_references, resolved_dependencies = self.resolve_references(candidate_references)
schema_str = json.dumps(json.loads(schema_str), sort_keys=True)
except json.JSONDecodeError:
LOG.warning("Schema is not valid JSON")
return
except InvalidReferences:
LOG.exception("Invalid AVRO references")
return
elif schema_type_parsed == SchemaType.JSONSCHEMA:
try:
schema_str = json.dumps(json.loads(schema_str), sort_keys=True)
except json.JSONDecodeError:
Expand Down
2 changes: 1 addition & 1 deletion karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ def _validate_references(
content_type=content_type,
status=HTTPStatus.BAD_REQUEST,
)
if references and schema_type != SchemaType.PROTOBUF:
if references and schema_type != SchemaType.PROTOBUF and schema_type != SchemaType.AVRO:
self.r(
body={
"error_code": SchemaErrorCodes.REFERENCES_SUPPORT_NOT_IMPLEMENTED.value,
Expand Down
64 changes: 64 additions & 0 deletions tests/integration/test_schema_avro_references.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""
karapace - schema tests
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
import json

from karapace.client import Client

baseurl = "http://localhost:8081"


async def test_avro_references(registry_async_client: Client) -> None:
schema_country = {
"type": "record",
"name": "Country",
"namespace": "com.netapp",
"fields": [
{"name": "name", "type": "string"},
{"name": "code", "type": "string"}
]
}

schema_address = {
"type": "record",
"name": "Address",
"namespace": "com.netapp",
"fields": [
{"name": "street", "type": "string"},
{"name": "city", "type": "string"},
{"name": "postalCode", "type": "string"},
{"name": "country", "type": "Country"}
]

}

res = await registry_async_client.post(
f"subjects/country/versions", json={"schema": json.dumps(schema_country)}
)
assert res.status_code == 200
assert "id" in res.json()
country_references = [{"name": "country.proto", "subject": "country", "version": 1}]

res = await registry_async_client.post(
"subjects/address/versions",
json={"schemaType": "AVRO", "schema": json.dumps(schema_address), "references": country_references},
)
assert res.status_code == 200
assert "id" in res.json()
address_id = res.json()["id"]

# Check if the schema has now been registered under the subject

res = await registry_async_client.post(
"subjects/address",
json={"schemaType": "AVRO", "schema": json.dumps(schema_address), "references": country_references},
)
assert res.status_code == 200
assert "subject" in res.json()
assert "id" in res.json()
assert address_id == res.json()["id"]
assert "version" in res.json()
assert "schema" in res.json()

0 comments on commit fd811e3

Please sign in to comment.