From 71ff3bbb8dba8671c517a8fa8c064ff30e3a30cd Mon Sep 17 00:00:00 2001 From: Elia Migliore Date: Mon, 29 Jul 2024 14:43:42 +0200 Subject: [PATCH 1/8] feat: enable the customization of the kafka properties --- karapace/kafka_utils.py | 1 + .../backup/test_session_timeout.py | 109 ++++++++++++++++++ tests/integration/conftest.py | 33 +++++- tests/integration/utils/kafka_server.py | 11 +- 4 files changed, 144 insertions(+), 10 deletions(-) create mode 100644 tests/integration/backup/test_session_timeout.py diff --git a/karapace/kafka_utils.py b/karapace/kafka_utils.py index 4b92d8ec6..129ad96d4 100644 --- a/karapace/kafka_utils.py +++ b/karapace/kafka_utils.py @@ -40,6 +40,7 @@ def kafka_consumer_from_config(config: Config, topic: str) -> Iterator[KafkaCons sasl_plain_username=config["sasl_plain_username"], sasl_plain_password=config["sasl_plain_password"], auto_offset_reset="earliest", + session_timeout_ms=config["session_timeout_ms"], metadata_max_age_ms=config["metadata_max_age_ms"], ) try: diff --git a/tests/integration/backup/test_session_timeout.py b/tests/integration/backup/test_session_timeout.py new file mode 100644 index 000000000..b585e759a --- /dev/null +++ b/tests/integration/backup/test_session_timeout.py @@ -0,0 +1,109 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" +from aiokafka.errors import NoBrokersAvailable +from confluent_kafka.admin import NewTopic +from karapace.backup.api import BackupVersion, create_backup +from karapace.config import Config, DEFAULTS, set_config_defaults +from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka_utils import kafka_producer_from_config +from pathlib import Path +from tests.integration.conftest import create_kafka_server +from tests.integration.utils.config import KafkaDescription +from tests.integration.utils.kafka_server import KafkaServers +from tests.integration.utils.network import PortRangeInclusive + +import pytest + +SESSION_TIMEOUT_MS = 65000 +GROUP_MIN_SESSION_TIMEOUT_MS = 60000 +GROUP_MAX_SESSION_TIMEOUT_MS = 70000 + + +# use a dedicated kafka server with specific values for +# group.min.session.timeout.ms and group.max.session.timeout.ms +@pytest.fixture(scope="function", name="kafka_server_session_timeout") +def fixture_kafka_server( + kafka_description: KafkaDescription, + port_range: PortRangeInclusive, + tmp_path_factory: pytest.TempPathFactory, +): + # use custom data and log dir to avoid conflict with other kafka servers + session_datadir = tmp_path_factory.mktemp("kafka_server_min_data") + session_logdir = tmp_path_factory.mktemp("kafka_server_min_log") + kafka_config_extra = { + "group.min.session.timeout.ms": GROUP_MIN_SESSION_TIMEOUT_MS, + "group.max.session.timeout.ms": GROUP_MAX_SESSION_TIMEOUT_MS, + } + yield from create_kafka_server( + session_datadir, + session_logdir, + kafka_description, + port_range, + kafka_config_extra, + ) + + +def test_producer_with_custom_kafka_properties_does_not_fail( + kafka_server_session_timeout: KafkaServers, + new_topic: NewTopic, + tmp_path: Path, +) -> None: + """ + This test checks wether the custom properties are accepted by kafka. + We know by the implementation of the consumer startup code that if + `group.session.min.timeout.ms` > `session.timeout.ms` the consumer + will raise an exception during the startup. + This test ensures that the `session.timeout.ms` can be injected in + the kafka config so that the exception isn't raised + """ + config = set_config_defaults( + Config(bootstrap_uri=kafka_server_session_timeout.bootstrap_servers, session_timeout_ms=SESSION_TIMEOUT_MS) + ) + + admin_client = KafkaAdminClient(bootstrap_servers=kafka_server_session_timeout.bootstrap_servers) + admin_client.new_topic(new_topic.topic, num_partitions=1, replication_factor=1) + + with kafka_producer_from_config(config) as producer: + producer.send( + new_topic.topic, + key=b"foo", + value=b"bar", + partition=0, + headers=[ + ("some-header", b"some header value"), + ("other-header", b"some other header value"), + ], + timestamp=1683474657, + ) + producer.flush() + + # without performing the backup the exception isn't raised. + create_backup( + config=config, + backup_location=tmp_path / "backup", + topic_name=new_topic.topic, + version=BackupVersion.V3, + replication_factor=1, + ) + + +def test_producer_with_custom_kafka_properties_fail( + kafka_server_session_timeout: KafkaServers, + new_topic: NewTopic, +) -> None: + """ + This test checks wether the custom properties are accepted by kafka. + We know by the implementation of the consumer startup code that if + `group.session.min.timeout.ms` > `session.timeout.ms` the consumer + will raise an exception during the startup. + This test ensures that the `session.timeout.ms` can be injected in + the kafka config so that the exception isn't raised + """ + admin_client = KafkaAdminClient(bootstrap_servers=kafka_server_session_timeout.bootstrap_servers) + admin_client.new_topic(new_topic.topic, num_partitions=1, replication_factor=1) + + with pytest.raises(NoBrokersAvailable): + with kafka_producer_from_config(DEFAULTS) as producer: + _ = producer diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 6ed4f4b22..cc86a3c36 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -4,6 +4,8 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ +from __future__ import annotations + from _pytest.fixtures import SubRequest from aiohttp.pytest_plugin import AiohttpClient from aiohttp.test_utils import TestClient @@ -32,7 +34,7 @@ from tests.integration.utils.synchronization import lock_path_for from tests.integration.utils.zookeeper import configure_and_start_zk from tests.utils import repeat_until_successful_request -from typing import AsyncIterator, Iterator, List, Optional +from typing import AsyncIterator, Iterator from urllib.parse import urlparse import asyncio @@ -120,6 +122,24 @@ def fixture_kafka_server( yield kafka_servers return + yield from create_kafka_server( + session_datadir, + session_logdir, + kafka_description, + port_range, + ) + + +def create_kafka_server( + session_datadir: Path, + session_logdir: Path, + kafka_description: KafkaDescription, + port_range: PortRangeInclusive, + kafka_properties: dict[str, int | str] | None = None, +) -> Iterator[KafkaServers]: + if kafka_properties is None: + kafka_properties = {} + zk_dir = session_logdir / "zk" # File used to share data among test runners, including the dynamic @@ -170,6 +190,7 @@ def fixture_kafka_server( kafka_config=kafka_config, kafka_description=kafka_description, log4j_config=KAFKA_LOG4J, + kafka_properties=kafka_properties, ) stack.callback(stop_process, kafka_proc) @@ -269,7 +290,7 @@ async def fixture_rest_async( tmp_path: Path, kafka_servers: KafkaServers, registry_async_client: Client, -) -> AsyncIterator[Optional[KafkaRest]]: +) -> AsyncIterator[KafkaRest | None]: # Do not start a REST api when the user provided an external service. Doing # so would cause this node to join the existing group and participate in # the election process. Without proper configuration for the listeners that @@ -342,7 +363,7 @@ async def fixture_rest_async_novalidation( tmp_path: Path, kafka_servers: KafkaServers, registry_async_client: Client, -) -> AsyncIterator[Optional[KafkaRest]]: +) -> AsyncIterator[KafkaRest | None]: # Do not start a REST api when the user provided an external service. Doing # so would cause this node to join the existing group and participate in # the election process. Without proper configuration for the listeners that @@ -415,7 +436,7 @@ async def fixture_rest_async_registry_auth( loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument kafka_servers: KafkaServers, registry_async_client_auth: Client, -) -> AsyncIterator[Optional[KafkaRest]]: +) -> AsyncIterator[KafkaRest | None]: # Do not start a REST api when the user provided an external service. Doing # so would cause this node to join the existing group and participate in # the election process. Without proper configuration for the listeners that @@ -486,7 +507,7 @@ async def fixture_registry_async_pair( session_logdir: Path, kafka_servers: KafkaServers, port_range: PortRangeInclusive, -) -> AsyncIterator[List[str]]: +) -> AsyncIterator[list[str]]: """Starts a cluster of two Schema Registry servers and returns their URL endpoints.""" config1: Config = {"bootstrap_uri": kafka_servers.bootstrap_servers} @@ -701,7 +722,7 @@ async def fixture_registry_async_auth_pair( session_logdir: Path, kafka_servers: KafkaServers, port_range: PortRangeInclusive, -) -> AsyncIterator[List[str]]: +) -> AsyncIterator[list[str]]: """Starts a cluster of two Schema Registry servers with authentication enabled and returns their URL endpoints.""" config1: Config = { diff --git a/tests/integration/utils/kafka_server.py b/tests/integration/utils/kafka_server.py index fb6fff7e4..520315b73 100644 --- a/tests/integration/utils/kafka_server.py +++ b/tests/integration/utils/kafka_server.py @@ -2,6 +2,8 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ +from __future__ import annotations + from aiokafka.errors import AuthenticationFailedError, NoBrokersAvailable from dataclasses import dataclass from karapace.kafka.admin import KafkaAdminClient @@ -11,7 +13,6 @@ from tests.integration.utils.config import KafkaConfig, KafkaDescription, ZKConfig from tests.integration.utils.process import get_java_process_configuration from tests.utils import write_ini -from typing import Dict, List import logging import os @@ -24,7 +25,7 @@ @dataclass class KafkaServers: - bootstrap_servers: List[str] + bootstrap_servers: list[str] def __post_init__(self) -> None: is_bootstrap_uris_valid = ( @@ -100,7 +101,7 @@ def kafka_java_args( logs_dir: str, log4j_properties_path: str, kafka_description: KafkaDescription, -) -> List[str]: +) -> list[str]: msg = f"Couldn't find kafka installation at {kafka_description.install_dir} to run integration tests." assert kafka_description.install_dir.exists(), msg java_args = [ @@ -121,6 +122,7 @@ def configure_and_start_kafka( kafka_config: KafkaConfig, kafka_description: KafkaDescription, log4j_config: str, + kafka_properties: dict[str, str | int], ) -> Popen: config_path = Path(kafka_config.logdir) / "server.properties" @@ -167,6 +169,7 @@ def configure_and_start_kafka( "zookeeper.connection.timeout.ms": 6000, "zookeeper.connect": f"127.0.0.1:{zk_config.client_port}", } + kafka_ini.update(kafka_properties) write_ini(config_path, kafka_ini) @@ -179,6 +182,6 @@ def configure_and_start_kafka( kafka_description=kafka_description, ), ) - env: Dict[bytes, bytes] = {} + env: dict[bytes, bytes] = {} proc = Popen(kafka_cmd, env=env) # pylint: disable=consider-using-with return proc From e3899bdc15601e83984c5010a14eb01235bcc8b6 Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Wed, 28 Aug 2024 14:44:29 +0200 Subject: [PATCH 2/8] tests,schema-reader: kafka message (`_schemas` topic) handling error tests We add tests to validate the possible error cases while parsing the message key: - we test for invalid JSON for key - missing `keytype` in the key data - we test for invalid `keytype` - we test for invalid config value - we test for invalid subject delete value - we test for invalid version number within schema value - we test for generic invalid protobuf schema --- karapace/protobuf/exception.py | 18 +-- karapace/schema_reader.py | 8 +- tests/unit/test_schema_reader.py | 197 +++++++++++++++++++++++++++++++ tests/utils.py | 16 +++ 4 files changed, 226 insertions(+), 13 deletions(-) diff --git a/karapace/protobuf/exception.py b/karapace/protobuf/exception.py index f37686256..58569bac9 100644 --- a/karapace/protobuf/exception.py +++ b/karapace/protobuf/exception.py @@ -12,14 +12,6 @@ from karapace.protobuf.schema import ProtobufSchema -class IllegalStateException(Exception): - pass - - -class IllegalArgumentException(Exception): - pass - - class Error(Exception): """Base class for errors in this module.""" @@ -28,10 +20,18 @@ class ProtobufException(Error): """Generic Protobuf schema error.""" -class ProtobufTypeException(Error): +class ProtobufTypeException(ProtobufException): """Generic Protobuf type error.""" +class IllegalStateException(ProtobufException): + pass + + +class IllegalArgumentException(ProtobufException): + pass + + class ProtobufUnresolvedDependencyException(ProtobufException): """a Protobuf schema has unresolved dependency""" diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 9ded95651..85e822ed1 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -28,7 +28,7 @@ from karapace.config import Config from karapace.coordinator.master_coordinator import MasterCoordinator from karapace.dependency import Dependency -from karapace.errors import InvalidReferences, InvalidSchema, ShutdownException +from karapace.errors import InvalidReferences, InvalidSchema, InvalidVersion, ShutdownException from karapace.in_memory_database import InMemoryDatabase from karapace.kafka.admin import KafkaAdminClient from karapace.kafka.common import translate_from_kafkaerror @@ -399,7 +399,7 @@ def handle_messages(self) -> None: try: self.handle_msg(key, value) - except (InvalidSchema, TypeError) as exc: + except (InvalidSchema, InvalidVersion, TypeError) as exc: self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc) continue finally: @@ -486,8 +486,8 @@ def _handle_msg_config(self, key: dict, value: dict | None) -> None: def _handle_msg_delete_subject(self, key: dict, value: dict | None) -> None: # pylint: disable=unused-argument if value is None: - LOG.warning("DELETE_SUBJECT record doesnt have a value, should have") - return + LOG.warning("DELETE_SUBJECT record does not have a value, should have") + raise ValueError("DELETE_SUBJECT record does not have a value, should have") subject = value["subject"] version = Version(value["version"]) diff --git a/tests/unit/test_schema_reader.py b/tests/unit/test_schema_reader.py index afbcbb976..7026ea853 100644 --- a/tests/unit/test_schema_reader.py +++ b/tests/unit/test_schema_reader.py @@ -10,6 +10,7 @@ from confluent_kafka import Message from dataclasses import dataclass from karapace.config import DEFAULTS +from karapace.errors import CorruptKafkaRecordException, ShutdownException from karapace.in_memory_database import InMemoryDatabase from karapace.kafka.consumer import KafkaConsumer from karapace.key_format import KeyFormatter @@ -18,11 +19,15 @@ KafkaSchemaReader, MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP, MAX_MESSAGES_TO_CONSUME_ON_STARTUP, + MessageType, OFFSET_EMPTY, OFFSET_UNINITIALIZED, ) +from karapace.schema_type import SchemaType from karapace.typing import SchemaId, Version from tests.base_testcase import BaseTestCase +from tests.utils import schema_protobuf_invalid +from typing import Callable, List, Tuple from unittest.mock import Mock import confluent_kafka @@ -318,3 +323,195 @@ def test_handle_msg_delete_subject_logs(caplog: LogCaptureFixture) -> None: assert log.name == "karapace.schema_reader" assert log.levelname == "WARNING" assert log.message == "Hard delete: version: Version(2) for subject: 'test-subject' did not exist, should have" + + +@dataclass +class KafkaMessageHandlingErrorTestCase(BaseTestCase): + key: bytes + value: bytes + schema_type: SchemaType + message_type: MessageType + expected_error: ShutdownException + expected_log_message: str + + +@pytest.fixture(name="schema_reader_with_consumer_messages_factory") +def fixture_schema_reader_with_consumer_messages_factory() -> Callable[[Tuple[List[Message]]], KafkaSchemaReader]: + def factory(consumer_messages: Tuple[List[Message]]) -> KafkaSchemaReader: + key_formatter_mock = Mock(spec=KeyFormatter) + consumer_mock = Mock(spec=KafkaConsumer) + + consumer_mock.consume.side_effect = consumer_messages + # Return tuple (beginning, end), end offset is the next upcoming record offset + consumer_mock.get_watermark_offsets.return_value = (0, 4) + + # Update the config to run the schema reader in strict mode so errors can be raised + config = DEFAULTS.copy() + config["kafka_schema_reader_strict_mode"] = True + + offset_watcher = OffsetWatcher() + schema_reader = KafkaSchemaReader( + config=config, + offset_watcher=offset_watcher, + key_formatter=key_formatter_mock, + master_coordinator=None, + database=InMemoryDatabase(), + ) + schema_reader.consumer = consumer_mock + schema_reader.offset = 0 + assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_ON_STARTUP + return schema_reader + + return factory + + +@pytest.fixture(name="message_factory") +def fixture_message_factory() -> Callable[[bytes, bytes, int], Message]: + def factory(key: bytes, value: bytes, offset: int = 1) -> Message: + message = Mock(spec=Message) + message.key.return_value = key + message.value.return_value = value + message.offset.return_value = offset + message.error.return_value = None + return message + + return factory + + +@pytest.mark.parametrize( + "test_case", + [ + KafkaMessageHandlingErrorTestCase( + test_name="Message key is not valid JSON", + key=b'{subject1::::"test""version":1"magic":1}', + value=b'{"value": "value does not matter at this stage, just correct JSON"}', + schema_type=None, + message_type=MessageType.schema, + expected_error=CorruptKafkaRecordException, + expected_log_message="Invalid JSON in msg.key() at offset 1", + ), + KafkaMessageHandlingErrorTestCase( + test_name="Keytype is missing from message key", + key=b'{"subject":"test","version":1,"magic":1}', + value=b'{"value": "value does not matter at this stage, just correct JSON"}', + schema_type=None, + message_type=MessageType.schema, + expected_error=CorruptKafkaRecordException, + expected_log_message=( + "The message {'subject': 'test', 'version': 1, 'magic': 1}-" + "{'value': 'value does not matter at this stage, just correct JSON'} " + "has been discarded because doesn't contain the `keytype` key in the key" + ), + ), + KafkaMessageHandlingErrorTestCase( + test_name="Keytype is invalid on message key", + key=b'{"keytype":"NOT_A_VALID_KEY_TYPE","subject":"test","version":1,"magic":1}', + value=b'{"value": "value does not matter at this stage, just correct JSON"}', + schema_type=None, + message_type=None, + expected_error=CorruptKafkaRecordException, + expected_log_message=( + "The message {'keytype': 'NOT_A_VALID_KEY_TYPE', 'subject': 'test', 'version': 1, 'magic': 1}-" + "{'value': 'value does not matter at this stage, just correct JSON'} " + "has been discarded because the NOT_A_VALID_KEY_TYPE is not managed" + ), + ), + KafkaMessageHandlingErrorTestCase( + test_name="Config message value is not valid JSON", + key=b'{"keytype":"CONFIG","subject":null,"magic":0}', + value=(b'no-valid-jason"compatibilityLevel": "BACKWARD""'), + schema_type=None, + message_type=MessageType.config, + expected_error=CorruptKafkaRecordException, + expected_log_message="Invalid JSON in msg.value() at offset 1", + ), + KafkaMessageHandlingErrorTestCase( + test_name="Config message value is not valid config setting", + key=b'{"keytype":"CONFIG","subject":null,"magic":0}', + value=b'{"not_the_key_name":"INVALID_CONFIG"}', + schema_type=None, + message_type=MessageType.config, + expected_error=CorruptKafkaRecordException, + expected_log_message=( + "The message {'keytype': 'CONFIG', 'subject': None, 'magic': 0}-" + "{'not_the_key_name': 'INVALID_CONFIG'} has been discarded because the CONFIG is not managed" + ), + ), + KafkaMessageHandlingErrorTestCase( + test_name="Version in schema message value is not valid", + key=b'{"keytype":"SCHEMA","subject":"test","version":1,"magic":1}', + value=( + b'{"subject": "test", "version": "invalid-version", "id": 1, "deleted": false,' + b'"schema": "{\\"name\\": \\"test\\", \\"type\\": \\"record\\", \\"fields\\": ' + b'[{\\"name\\": \\"test_field\\", \\"type\\": [\\"string\\", \\"int\\"]}]}"}' + ), + schema_type=SchemaType.AVRO, + message_type=MessageType.schema, + expected_error=CorruptKafkaRecordException, + expected_log_message=( + "The message {'keytype': 'SCHEMA', 'subject': 'test', 'version': 1, 'magic': 1}-" + "{'subject': 'test', 'version': 'invalid-version', 'id': 1, 'deleted': False, 'schema': " + '\'{"name": "test", "type": "record", "fields": [{"name": "test_field", "type": ["string", "int"]}]}\'} ' + "has been discarded because the SCHEMA is not managed" + ), + ), + KafkaMessageHandlingErrorTestCase( + test_name="Message value is not valid JSON", + key=b'{"keytype":"SCHEMA","subject":"test","version":1,"magic":1}', + value=( + b'no-valid-json"version": 1, "id": 1, "deleted": false,' + b'"schema": "{\\"name\\": \\"test\\", \\"type\\": \\"record\\", \\"fields\\": ' + b'[{\\"name\\": \\"test_field\\", \\"type\\": [\\"string\\", \\"int\\"]}]}"}' + ), + schema_type=SchemaType.AVRO, + message_type=MessageType.schema, + expected_error=CorruptKafkaRecordException, + expected_log_message="Invalid JSON in msg.value() at offset 1", + ), + KafkaMessageHandlingErrorTestCase( + test_name="Delete subject message value is missing `subject` field", + key=b'{"keytype":"DELETE_SUBJECT","subject":"test","version":1,"magic":1}', + value=b'{"not-subject-key":"test","version":1}', + schema_type=None, + message_type=MessageType.delete_subject, + expected_error=CorruptKafkaRecordException, + expected_log_message=( + "The message {'keytype': 'DELETE_SUBJECT', 'subject': 'test', 'version': 1, 'magic': 1}-" + "{'not-subject-key': 'test', 'version': 1} has been discarded because the DELETE_SUBJECT is not managed" + ), + ), + KafkaMessageHandlingErrorTestCase( + test_name="Protobuf schema is invalid", + key=b'{"keytype":"SCHEMA","subject":"test","version":1,"magic":1}', + value=( + b'{"schemaType": "PROTOBUF", "subject": "test", "version": 1, "id": 1, "deleted": false, "schema":' + + json.dumps(schema_protobuf_invalid).encode() + + b"}" + ), + schema_type=SchemaType.PROTOBUF, + message_type=MessageType.schema, + expected_error=CorruptKafkaRecordException, + expected_log_message="Schema is not valid ProtoBuf definition", + ), + ], +) +def test_message_error_handling( + caplog: LogCaptureFixture, + test_case: KafkaMessageHandlingErrorTestCase, + schema_reader_with_consumer_messages_factory: Callable[[Tuple[List[Message]]], KafkaSchemaReader], + message_factory: Callable[[bytes, bytes, int], Message], +) -> None: + message = message_factory(key=test_case.key, value=test_case.value) + consumer_messages = ([message],) + schema_reader = schema_reader_with_consumer_messages_factory(consumer_messages) + + with caplog.at_level(logging.WARNING, logger="karapace.schema_reader"): + with pytest.raises(test_case.expected_error): + schema_reader.handle_messages() + + assert schema_reader.offset == 1 + assert not schema_reader.ready + for log in caplog.records: + assert log.name == "karapace.schema_reader" + assert log.levelname == "WARNING" + assert log.message == test_case.expected_log_message diff --git a/tests/utils.py b/tests/utils.py index f38097858..352745f94 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -148,6 +148,22 @@ {"q": 3, "sensor_type": "L1", "nums": [3, 4], "order": {"item": "ABC01223"}}, ] +schema_protobuf_invalid = """ +|o3" +| +|opti -- om.codingharbour.protobuf"; +|option java_outer_classname = "TestEnumOrder"; +| +|message Message { +| int32 +| speed =; +|} +|Enum +| HIGH = 0 +| MIDDLE = ; +""" +schema_protobuf_invalid = trim_margin(schema_protobuf_invalid) + schema_data_second = {"protobuf": (schema_protobuf_second, test_objects_protobuf_second)} second_schema_json = json.dumps( From 783fbb91b106266d6ad210d222be59819bf37fbc Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 18 Sep 2024 08:30:20 +0000 Subject: [PATCH 3/8] build(deps): bump flask-cors from 4.0.1 to 4.0.2 in /requirements Bumps [flask-cors](https://github.com/corydolphin/flask-cors) from 4.0.1 to 4.0.2. - [Release notes](https://github.com/corydolphin/flask-cors/releases) - [Changelog](https://github.com/corydolphin/flask-cors/blob/main/CHANGELOG.md) - [Commits](https://github.com/corydolphin/flask-cors/compare/4.0.1...4.0.2) --- updated-dependencies: - dependency-name: flask-cors dependency-type: indirect ... Signed-off-by: dependabot[bot] --- requirements/requirements-dev.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index e86c81a58..e9024431f 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -75,7 +75,7 @@ flask==3.0.3 # flask-cors # flask-login # locust -flask-cors==4.0.1 +flask-cors==4.0.2 # via locust flask-login==0.6.3 # via locust From b950e30bcbc09f4d77b5b8d318f67a912e2182e9 Mon Sep 17 00:00:00 2001 From: Jarkko Jaakola Date: Wed, 18 Sep 2024 10:07:20 +0300 Subject: [PATCH 4/8] chore: include hidden files when uploading coverage See: https://github.com/py-cov-action/python-coverage-comment-action/issues/470 --- .github/workflows/tests.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index fd9a63c2b..1e5dee5a4 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -56,6 +56,7 @@ jobs: with: name: "coverage-${{ matrix.python-version }}" path: ".coverage.${{ matrix.python-version }}" + include-hidden-files: true coverage: name: Coverage report From c402081eb7e48bce4e8a2423fe8b8da05a87eeb6 Mon Sep 17 00:00:00 2001 From: Jarkko Jaakola Date: Wed, 18 Sep 2024 20:47:40 +0300 Subject: [PATCH 5/8] test: delete topic and wait for removal in backup tests --- tests/integration/backup/test_v3_backup.py | 86 ++++++++++++---------- 1 file changed, 47 insertions(+), 39 deletions(-) diff --git a/tests/integration/backup/test_v3_backup.py b/tests/integration/backup/test_v3_backup.py index 59b788541..e9f295834 100644 --- a/tests/integration/backup/test_v3_backup.py +++ b/tests/integration/backup/test_v3_backup.py @@ -89,6 +89,23 @@ def _raise(exception: Exception) -> NoReturn: raise exception +def _delete_topic(admin_client: KafkaAdminClient, topic_name: str) -> None: + try: + admin_client.delete_topic(topic_name) + except UnknownTopicOrPartitionError: + logger.info("No previously existing topic.") + else: + logger.info("Deleted topic from previous run.") + + start_time = time.monotonic() + while True: + topics = admin_client.cluster_metadata().get("topics", {}) + if topic_name not in topics: + break + if time.monotonic() - start_time > 60: + raise TimeoutError(f"Topic {topic_name} still in cluster metadata topics {topics}") + + def test_roundtrip_from_kafka_state( tmp_path: Path, new_topic: NewTopic, @@ -130,6 +147,7 @@ def test_roundtrip_from_kafka_state( "karapace_schema_backup", "get", "--use-format-v3", + "--verbose", "--config", str(config_file), "--topic", @@ -154,7 +172,7 @@ def test_roundtrip_from_kafka_state( assert metadata_path.exists() # Delete the source topic. - admin_client.delete_topic(new_topic.topic) + _delete_topic(admin_client, new_topic.topic) # todo: assert new topic uuid != old topic uuid? # Execute backup restoration. @@ -162,6 +180,7 @@ def test_roundtrip_from_kafka_state( [ "karapace_schema_backup", "restore", + "--verbose", "--config", str(config_file), "--topic", @@ -230,6 +249,7 @@ def test_roundtrip_empty_topic( "karapace_schema_backup", "get", "--use-format-v3", + "--verbose", "--config", str(config_file), "--topic", @@ -249,13 +269,14 @@ def test_roundtrip_empty_topic( (metadata_path,) = backup_directory.iterdir() # Delete the source topic. - admin_client.delete_topic(new_topic.topic) + _delete_topic(admin_client, new_topic.topic) # Execute backup restoration. subprocess.run( [ "karapace_schema_backup", "restore", + "--verbose", "--config", str(config_file), "--topic", @@ -293,6 +314,7 @@ def test_errors_when_omitting_replication_factor(config_file: Path) -> None: [ "karapace_schema_backup", "get", + "--verbose", "--use-format-v3", f"--config={config_file!s}", "--topic=foo-bar", @@ -317,12 +339,7 @@ def test_exits_with_return_code_3_for_data_restoration_error( ) # Make sure topic doesn't exist beforehand. - try: - admin_client.delete_topic(topic_name) - except UnknownTopicOrPartitionError: - logger.info("No previously existing topic.") - else: - logger.info("Deleted topic from previous run.") + _delete_topic(admin_client, topic_name) admin_client.new_topic(topic_name) with pytest.raises(subprocess.CalledProcessError) as er: @@ -330,6 +347,7 @@ def test_exits_with_return_code_3_for_data_restoration_error( [ "karapace_schema_backup", "restore", + "--verbose", "--config", str(config_file), "--topic", @@ -357,18 +375,14 @@ def test_roundtrip_from_file( (data_file,) = metadata_path.parent.glob("*.data") # Make sure topic doesn't exist beforehand. - try: - admin_client.delete_topic(topic_name) - except UnknownTopicOrPartitionError: - logger.info("No previously existing topic.") - else: - logger.info("Deleted topic from previous run.") + _delete_topic(admin_client, topic_name) # Execute backup restoration. subprocess.run( [ "karapace_schema_backup", "restore", + "--verbose", "--config", str(config_file), "--topic", @@ -387,6 +401,7 @@ def test_roundtrip_from_file( [ "karapace_schema_backup", "get", + "--verbose", "--use-format-v3", "--config", str(config_file), @@ -450,13 +465,7 @@ def test_roundtrip_from_file_skipping_topic_creation( (data_file,) = metadata_path.parent.glob("*.data") # Create topic exactly as it was stored on backup file - try: - admin_client.delete_topic(topic_name) - except UnknownTopicOrPartitionError: - logger.info("No previously existing topic.") - else: - logger.info("Deleted topic from previous run.") - + _delete_topic(admin_client, topic_name) admin_client.new_topic(topic_name) # Execute backup restoration. @@ -464,6 +473,7 @@ def test_roundtrip_from_file_skipping_topic_creation( [ "karapace_schema_backup", "restore", + "--verbose", "--config", str(config_file), "--topic", @@ -484,6 +494,7 @@ def test_roundtrip_from_file_skipping_topic_creation( "karapace_schema_backup", "get", "--use-format-v3", + "--verbose", "--config", str(config_file), "--topic", @@ -542,12 +553,7 @@ def test_backup_restoration_fails_when_topic_does_not_exist_and_skip_creation_is metadata_path = backup_directory / f"{topic_name}.metadata" # Make sure topic doesn't exist beforehand. - try: - admin_client.delete_topic(topic_name) - except UnknownTopicOrPartitionError: - logger.info("No previously existing topic.") - else: - logger.info("Deleted topic from previous run.") + _delete_topic(admin_client, topic_name) config = set_config_defaults( { @@ -595,12 +601,7 @@ def test_backup_restoration_fails_when_producer_send_fails_on_unknown_topic_or_p metadata_path = backup_directory / f"{topic_name}.metadata" # Make sure topic doesn't exist beforehand. - try: - admin_client.delete_topic(topic_name) - except UnknownTopicOrPartitionError: - logger.info("No previously existing topic.") - else: - logger.info("Deleted topic from previous run.") + _delete_topic(admin_client, topic_name) config = set_config_defaults( { @@ -650,12 +651,7 @@ def test_backup_restoration_fails_when_producer_send_fails_on_buffer_error( metadata_path = backup_directory / f"{topic_name}.metadata" # Make sure topic doesn't exist beforehand. - try: - admin_client.delete_topic(topic_name) - except UnknownTopicOrPartitionError: - logger.info("No previously existing topic.") - else: - logger.info("Deleted topic from previous run.") + _delete_topic(admin_client, topic_name) config = set_config_defaults( { @@ -719,6 +715,7 @@ def test_can_inspect_v3(self) -> None: [ "karapace_schema_backup", "inspect", + "--verbose", "--location", str(metadata_path), ], @@ -769,6 +766,7 @@ def test_can_inspect_v3_with_future_checksum_algorithm(self) -> None: [ "karapace_schema_backup", "inspect", + "--verbose", "--location", str(metadata_path), ], @@ -832,6 +830,7 @@ def test_can_inspect_v1(self) -> None: [ "karapace_schema_backup", "inspect", + "--verbose", "--location", str(backup_path), ], @@ -856,6 +855,7 @@ def test_can_verify_file_integrity(self) -> None: [ "karapace_schema_backup", "verify", + "--verbose", f"--location={metadata_path!s}", "--level=file", ], @@ -883,6 +883,7 @@ def test_can_verify_record_integrity(self) -> None: [ "karapace_schema_backup", "verify", + "--verbose", f"--location={metadata_path!s}", "--level=record", ], @@ -924,6 +925,7 @@ def test_can_verify_file_integrity_from_large_topic( "karapace_schema_backup", "get", "--use-format-v3", + "--verbose", f"--config={config_file!s}", f"--topic={new_topic.topic!s}", "--replication-factor=1", @@ -938,6 +940,7 @@ def test_can_verify_file_integrity_from_large_topic( [ "karapace_schema_backup", "verify", + "--verbose", f"--location={metadata_path!s}", "--level=file", ], @@ -979,6 +982,7 @@ def test_can_verify_record_integrity_from_large_topic( "karapace_schema_backup", "get", "--use-format-v3", + "--verbose", f"--config={config_file!s}", f"--topic={new_topic.topic}", "--replication-factor=1", @@ -993,6 +997,7 @@ def test_can_verify_record_integrity_from_large_topic( [ "karapace_schema_backup", "verify", + "--verbose", f"--location={metadata_path}", "--level=record", ], @@ -1022,6 +1027,7 @@ def test_can_refute_file_integrity(self) -> None: [ "karapace_schema_backup", "verify", + "--verbose", f"--location={metadata_path!s}", "--level=file", ], @@ -1051,6 +1057,7 @@ def test_can_refute_record_integrity(self) -> None: [ "karapace_schema_backup", "verify", + "--verbose", f"--location={metadata_path!s}", "--level=record", ], @@ -1093,6 +1100,7 @@ def test_gives_non_successful_exit_code_for_legacy_backup_format( [ "karapace_schema_backup", "verify", + "--verbose", f"--location={backup_path}", "--level=file", ], From aa3aa9da5276d9e9784119a065293a2dc59eba74 Mon Sep 17 00:00:00 2001 From: Jarkko Jaakola Date: Thu, 19 Sep 2024 10:26:43 +0300 Subject: [PATCH 6/8] test: use unique topic name in backup test --- tests/integration/backup/test_v3_backup.py | 5 +++-- .../83bdbd2a/83bdbd2a.metadata | Bin 0 -> 204 bytes .../83bdbd2a/83bdbd2a:0.data | Bin 0 -> 112 bytes 3 files changed, 3 insertions(+), 2 deletions(-) create mode 100644 tests/integration/test_data/backup_v3_single_partition/83bdbd2a/83bdbd2a.metadata create mode 100644 tests/integration/test_data/backup_v3_single_partition/83bdbd2a/83bdbd2a:0.data diff --git a/tests/integration/backup/test_v3_backup.py b/tests/integration/backup/test_v3_backup.py index e9f295834..8e01365ed 100644 --- a/tests/integration/backup/test_v3_backup.py +++ b/tests/integration/backup/test_v3_backup.py @@ -548,7 +548,7 @@ def test_backup_restoration_fails_when_topic_does_not_exist_and_skip_creation_is admin_client: KafkaAdminClient, kafka_servers: KafkaServers, ) -> None: - topic_name = "596ddf6b" + topic_name = "83bdbd2a" backup_directory = Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / topic_name metadata_path = backup_directory / f"{topic_name}.metadata" @@ -583,13 +583,14 @@ def __exit__(self, exc_type, exc_value, exc_traceback): with patch("karapace.backup.api._producer") as p: p.return_value = LowTimeoutProducer() - with pytest.raises(BackupDataRestorationError): + with pytest.raises(BackupDataRestorationError) as excinfo: api.restore_backup( config=config, backup_location=metadata_path, topic_name=TopicName(topic_name), skip_topic_creation=True, ) + excinfo.match("Error while producing restored messages") def test_backup_restoration_fails_when_producer_send_fails_on_unknown_topic_or_partition( diff --git a/tests/integration/test_data/backup_v3_single_partition/83bdbd2a/83bdbd2a.metadata b/tests/integration/test_data/backup_v3_single_partition/83bdbd2a/83bdbd2a.metadata new file mode 100644 index 0000000000000000000000000000000000000000..5a5a9e4071bb6602f1710cb876471f0e52f2c9ba GIT binary patch literal 204 zcmdN7Gv;DoU^vVskeyhRSdf^Us${HZs%M~UY^a-VXli0&kevE!@$`37liJpSNEQJL z@I9 glN`)yD+9fh#F9h?fiEIoSynumEX=~d#K6P_0GAs Date: Tue, 17 Sep 2024 12:50:17 +0300 Subject: [PATCH 7/8] chore: add retry to tests requiring forwarding --- tests/integration/conftest.py | 11 ++ tests/integration/test_master_coordinator.py | 29 +++-- .../integration/test_schema_registry_auth.py | 118 +++++++++++------- tests/integration/utils/rest_client.py | 81 ++++++++++++ 4 files changed, 180 insertions(+), 59 deletions(-) create mode 100644 tests/integration/utils/rest_client.py diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index cc86a3c36..a4d97ddbb 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -31,6 +31,7 @@ ) from tests.integration.utils.network import PortRangeInclusive from tests.integration.utils.process import stop_process, wait_for_port_subprocess +from tests.integration.utils.rest_client import RetryRestClient from tests.integration.utils.synchronization import lock_path_for from tests.integration.utils.zookeeper import configure_and_start_zk from tests.utils import repeat_until_successful_request @@ -576,6 +577,11 @@ async def fixture_registry_async_client( await client.close() +@pytest.fixture(scope="function", name="registry_async_retry_client") +async def fixture_registry_async_retry_client(registry_async_client: Client) -> RetryRestClient: + return RetryRestClient(registry_async_client) + + @pytest.fixture(scope="function", name="credentials_folder") def fixture_credentials_folder() -> str: integration_test_folder = os.path.dirname(__file__) @@ -715,6 +721,11 @@ async def fixture_registry_async_client_auth( await client.close() +@pytest.fixture(scope="function", name="registry_async_retry_client_auth") +async def fixture_registry_async_retry_client_auth(registry_async_client_auth: Client) -> RetryRestClient: + return RetryRestClient(registry_async_client_auth) + + @pytest.fixture(scope="function", name="registry_async_auth_pair") async def fixture_registry_async_auth_pair( request: SubRequest, diff --git a/tests/integration/test_master_coordinator.py b/tests/integration/test_master_coordinator.py index 225539f8d..9acdffdb4 100644 --- a/tests/integration/test_master_coordinator.py +++ b/tests/integration/test_master_coordinator.py @@ -8,12 +8,12 @@ from karapace.coordinator.master_coordinator import MasterCoordinator from tests.integration.utils.kafka_server import KafkaServers from tests.integration.utils.network import PortRangeInclusive +from tests.integration.utils.rest_client import RetryRestClient from tests.utils import new_random_name import asyncio import json import pytest -import requests async def init_admin(config): @@ -195,7 +195,10 @@ async def test_no_eligible_master(kafka_servers: KafkaServers, port_range: PortR await mc.close() -async def test_schema_request_forwarding(registry_async_pair): +async def test_schema_request_forwarding( + registry_async_pair, + registry_async_retry_client: RetryRestClient, +) -> None: master_url, slave_url = registry_async_pair max_tries, counter = 5, 0 wait_time = 0.5 @@ -209,11 +212,11 @@ async def test_schema_request_forwarding(registry_async_pair): else: path = "config" for compat in ["FULL", "BACKWARD", "FORWARD", "NONE"]: - resp = requests.put(f"{slave_url}/{path}", json={"compatibility": compat}) + resp = await registry_async_retry_client.put(f"{slave_url}/{path}", json={"compatibility": compat}) assert resp.ok while True: assert counter < max_tries, "Compat update not propagated" - resp = requests.get(f"{master_url}/{path}") + resp = await registry_async_retry_client.get(f"{master_url}/{path}") if not resp.ok: print(f"Invalid http status code: {resp.status_code}") continue @@ -232,14 +235,16 @@ async def test_schema_request_forwarding(registry_async_pair): # New schema updates, last compatibility is None for s in [schema, other_schema]: - resp = requests.post(f"{slave_url}/subjects/{subject}/versions", json={"schema": json.dumps(s)}) + resp = await registry_async_retry_client.post( + f"{slave_url}/subjects/{subject}/versions", json={"schema": json.dumps(s)} + ) assert resp.ok data = resp.json() assert "id" in data, data counter = 0 while True: assert counter < max_tries, "Subject schema data not propagated yet" - resp = requests.get(f"{master_url}/subjects/{subject}/versions") + resp = await registry_async_retry_client.get(f"{master_url}/subjects/{subject}/versions") if not resp.ok: print(f"Invalid http status code: {resp.status_code}") counter += 1 @@ -255,12 +260,14 @@ async def test_schema_request_forwarding(registry_async_pair): break # Schema deletions - resp = requests.delete(f"{slave_url}/subjects/{subject}/versions/1") + resp = await registry_async_retry_client.delete(f"{slave_url}/subjects/{subject}/versions/1") assert resp.ok counter = 0 while True: assert counter < max_tries, "Subject version deletion not propagated yet" - resp = requests.get(f"{master_url}/subjects/{subject}/versions/1") + resp = await registry_async_retry_client.get( + f"{master_url}/subjects/{subject}/versions/1", expected_response_code=404 + ) if resp.ok: print(f"Subject {subject} still has version 1 on master") counter += 1 @@ -270,16 +277,16 @@ async def test_schema_request_forwarding(registry_async_pair): break # Subject deletion - resp = requests.get(f"{master_url}/subjects/") + resp = await registry_async_retry_client.get(f"{master_url}/subjects/") assert resp.ok data = resp.json() assert subject in data - resp = requests.delete(f"{slave_url}/subjects/{subject}") + resp = await registry_async_retry_client.delete(f"{slave_url}/subjects/{subject}") assert resp.ok counter = 0 while True: assert counter < max_tries, "Subject deletion not propagated yet" - resp = requests.get(f"{master_url}/subjects/") + resp = await registry_async_retry_client.get(f"{master_url}/subjects/") if not resp.ok: print("Could not retrieve subject list on master") counter += 1 diff --git a/tests/integration/test_schema_registry_auth.py b/tests/integration/test_schema_registry_auth.py index 1305c5cbf..5f780f3ce 100644 --- a/tests/integration/test_schema_registry_auth.py +++ b/tests/integration/test_schema_registry_auth.py @@ -4,9 +4,9 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from karapace.client import Client from karapace.kafka.admin import KafkaAdminClient from karapace.schema_models import SchemaType, ValidatedTypedSchema +from tests.integration.utils.rest_client import RetryRestClient from tests.utils import ( new_random_name, new_topic, @@ -20,7 +20,6 @@ import aiohttp import asyncio -import requests NEW_TOPIC_TIMEOUT = 10 @@ -29,22 +28,26 @@ reader = aiohttp.BasicAuth("reader", "secret") -async def test_sr_auth(registry_async_client_auth: Client) -> None: +async def test_sr_auth(registry_async_retry_client_auth: RetryRestClient) -> None: subject = new_random_name("cave-") - res = await registry_async_client_auth.post(f"subjects/{quote(subject)}/versions", json={"schema": schema_avro_json}) + res = await registry_async_retry_client_auth.post( + f"subjects/{quote(subject)}/versions", json={"schema": schema_avro_json}, expected_response_code=401 + ) assert res.status_code == 401 - res = await registry_async_client_auth.post( + res = await registry_async_retry_client_auth.post( f"subjects/{quote(subject)}/versions", json={"schema": schema_avro_json}, auth=aladdin ) assert res.status_code == 200 sc_id = res.json()["id"] assert sc_id >= 0 - res = await registry_async_client_auth.get(f"subjects/{quote(subject)}/versions/latest") + res = await registry_async_retry_client_auth.get( + f"subjects/{quote(subject)}/versions/latest", expected_response_code=401 + ) assert res.status_code == 401 - res = await registry_async_client_auth.get(f"subjects/{quote(subject)}/versions/latest", auth=aladdin) + res = await registry_async_retry_client_auth.get(f"subjects/{quote(subject)}/versions/latest", auth=aladdin) assert res.status_code == 200 assert sc_id == res.json()["id"] assert ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json) == ValidatedTypedSchema.parse( @@ -52,116 +55,130 @@ async def test_sr_auth(registry_async_client_auth: Client) -> None: ) -async def test_sr_auth_endpoints(registry_async_client_auth: Client) -> None: +async def test_sr_auth_endpoints(registry_async_retry_client_auth: RetryRestClient) -> None: """Test endpoints for authorization""" subject = new_random_name("any-") - res = await registry_async_client_auth.post( - f"compatibility/subjects/{quote(subject)}/versions/1", json={"schema": schema_avro_json} + res = await registry_async_retry_client_auth.post( + f"compatibility/subjects/{quote(subject)}/versions/1", + json={"schema": schema_avro_json}, + expected_response_code=401, ) assert res.status_code == 401 - res = await registry_async_client_auth.get(f"config/{quote(subject)}") + res = await registry_async_retry_client_auth.get(f"config/{quote(subject)}", expected_response_code=401) assert res.status_code == 401 - res = await registry_async_client_auth.put(f"config/{quote(subject)}", json={"compatibility": "NONE"}) + res = await registry_async_retry_client_auth.put( + f"config/{quote(subject)}", + json={"compatibility": "NONE"}, + expected_response_code=401, + ) assert res.status_code == 401 - res = await registry_async_client_auth.get("config") + res = await registry_async_retry_client_auth.get("config", expected_response_code=401) assert res.status_code == 401 - res = await registry_async_client_auth.put("config", json={"compatibility": "NONE"}) + res = await registry_async_retry_client_auth.put("config", json={"compatibility": "NONE"}, expected_response_code=401) assert res.status_code == 401 - res = await registry_async_client_auth.get("schemas/ids/1/versions") + res = await registry_async_retry_client_auth.get("schemas/ids/1/versions", expected_response_code=401) assert res.status_code == 401 # This is an exception that does not require authorization - res = await registry_async_client_auth.get("schemas/types") + res = await registry_async_retry_client_auth.get("schemas/types") assert res.status_code == 200 # but let's verify it answers normally if sending authorization header - res = await registry_async_client_auth.get("schemas/types", auth=admin) + res = await registry_async_retry_client_auth.get("schemas/types", auth=admin) assert res.status_code == 200 - res = await registry_async_client_auth.post(f"subjects/{quote(subject)}/versions", json={"schema": schema_avro_json}) + res = await registry_async_retry_client_auth.post( + f"subjects/{quote(subject)}/versions", json={"schema": schema_avro_json}, expected_response_code=401 + ) assert res.status_code == 401 - res = await registry_async_client_auth.delete(f"subjects/{quote(subject)}/versions/1") + res = await registry_async_retry_client_auth.delete(f"subjects/{quote(subject)}/versions/1", expected_response_code=401) assert res.status_code == 401 - res = await registry_async_client_auth.get(f"subjects/{quote(subject)}/versions/1/schema") + res = await registry_async_retry_client_auth.get( + f"subjects/{quote(subject)}/versions/1/schema", expected_response_code=401 + ) assert res.status_code == 401 - res = await registry_async_client_auth.get(f"subjects/{quote(subject)}/versions/1/referencedby") + res = await registry_async_retry_client_auth.get( + f"subjects/{quote(subject)}/versions/1/referencedby", expected_response_code=401 + ) assert res.status_code == 401 - res = await registry_async_client_auth.delete(f"subjects/{quote(subject)}") + res = await registry_async_retry_client_auth.delete(f"subjects/{quote(subject)}", expected_response_code=401) assert res.status_code == 401 - res = await registry_async_client_auth.get("mode") + res = await registry_async_retry_client_auth.get("mode", expected_response_code=401) assert res.status_code == 401 - res = await registry_async_client_auth.get(f"mode/{quote(subject)}") + res = await registry_async_retry_client_auth.get(f"mode/{quote(subject)}", expected_response_code=401) assert res.status_code == 401 -async def test_sr_list_subjects(registry_async_client_auth: Client) -> None: +async def test_sr_list_subjects(registry_async_retry_client_auth: RetryRestClient) -> None: cavesubject = new_random_name("cave-") carpetsubject = new_random_name("carpet-") - res = await registry_async_client_auth.post( + res = await registry_async_retry_client_auth.post( f"subjects/{quote(cavesubject)}/versions", json={"schema": schema_avro_json}, auth=aladdin ) assert res.status_code == 200 sc_id = res.json()["id"] assert sc_id >= 0 - res = await registry_async_client_auth.post( + res = await registry_async_retry_client_auth.post( f"subjects/{quote(carpetsubject)}/versions", json={"schema": schema_avro_json}, auth=admin ) assert res.status_code == 200 - res = await registry_async_client_auth.get("subjects", auth=admin) + res = await registry_async_retry_client_auth.get("subjects", auth=admin) assert res.status_code == 200 assert [cavesubject, carpetsubject] == res.json() - res = await registry_async_client_auth.get(f"subjects/{quote(carpetsubject)}/versions") + res = await registry_async_retry_client_auth.get(f"subjects/{quote(carpetsubject)}/versions", expected_response_code=401) assert res.status_code == 401 - res = await registry_async_client_auth.get(f"subjects/{quote(carpetsubject)}/versions", auth=admin) + res = await registry_async_retry_client_auth.get(f"subjects/{quote(carpetsubject)}/versions", auth=admin) assert res.status_code == 200 assert [sc_id] == res.json() - res = await registry_async_client_auth.get("subjects", auth=aladdin) + res = await registry_async_retry_client_auth.get("subjects", auth=aladdin) assert res.status_code == 200 assert [cavesubject] == res.json() - res = await registry_async_client_auth.get(f"subjects/{quote(carpetsubject)}/versions", auth=aladdin) + res = await registry_async_retry_client_auth.get( + f"subjects/{quote(carpetsubject)}/versions", auth=aladdin, expected_response_code=403 + ) assert res.status_code == 403 - res = await registry_async_client_auth.get("subjects", auth=reader) + res = await registry_async_retry_client_auth.get("subjects", auth=reader) assert res.status_code == 200 assert [carpetsubject] == res.json() - res = await registry_async_client_auth.get(f"subjects/{quote(carpetsubject)}/versions", auth=reader) + res = await registry_async_retry_client_auth.get(f"subjects/{quote(carpetsubject)}/versions", auth=reader) assert res.status_code == 200 assert [1] == res.json() -async def test_sr_ids(registry_async_client_auth: Client) -> None: +async def test_sr_ids(registry_async_retry_client_auth: RetryRestClient) -> None: cavesubject = new_random_name("cave-") carpetsubject = new_random_name("carpet-") - res = await registry_async_client_auth.post( + res = await registry_async_retry_client_auth.post( f"subjects/{quote(cavesubject)}/versions", json={"schema": schema_avro_json}, auth=aladdin ) assert res.status_code == 200 avro_sc_id = res.json()["id"] assert avro_sc_id >= 0 - res = await registry_async_client_auth.post( + res = await registry_async_retry_client_auth.post( f"subjects/{quote(carpetsubject)}/versions", json={"schemaType": "JSON", "schema": schema_jsonschema_json}, auth=admin, @@ -170,36 +187,39 @@ async def test_sr_ids(registry_async_client_auth: Client) -> None: jsonschema_sc_id = res.json()["id"] assert jsonschema_sc_id >= 0 - res = await registry_async_client_auth.get(f"schemas/ids/{avro_sc_id}", auth=aladdin) + res = await registry_async_retry_client_auth.get(f"schemas/ids/{avro_sc_id}", auth=aladdin) assert res.status_code == 200 - res = await registry_async_client_auth.get(f"schemas/ids/{jsonschema_sc_id}", auth=aladdin) + res = await registry_async_retry_client_auth.get( + f"schemas/ids/{jsonschema_sc_id}", auth=aladdin, expected_response_code=404 + ) assert res.status_code == 404 assert {"error_code": 40403, "message": "Schema not found"} == res.json() - res = await registry_async_client_auth.get(f"schemas/ids/{avro_sc_id}", auth=reader) + res = await registry_async_retry_client_auth.get(f"schemas/ids/{avro_sc_id}", auth=reader, expected_response_code=404) assert res.status_code == 404 assert {"error_code": 40403, "message": "Schema not found"} == res.json() - res = await registry_async_client_auth.get(f"schemas/ids/{jsonschema_sc_id}", auth=reader) + res = await registry_async_retry_client_auth.get(f"schemas/ids/{jsonschema_sc_id}", auth=reader) assert res.status_code == 200 -async def test_sr_auth_forwarding(registry_async_auth_pair: List[str]) -> None: - auth = requests.auth.HTTPBasicAuth("admin", "admin") +async def test_sr_auth_forwarding( + registry_async_auth_pair: List[str], registry_async_retry_client_auth: RetryRestClient +) -> None: + auth = aiohttp.BasicAuth("admin", "admin") # Test primary/replica forwarding with global config setting primary_url, replica_url = registry_async_auth_pair max_tries, counter = 5, 0 wait_time = 0.5 for compat in ["FULL", "BACKWARD", "FORWARD", "NONE"]: - resp = requests.put(f"{replica_url}/config", json={"compatibility": compat}, auth=auth) + resp = await registry_async_retry_client_auth.put(f"{replica_url}/config", json={"compatibility": compat}, auth=auth) assert resp.ok while True: assert counter < max_tries, "Compat update not propagated" - resp = requests.get(f"{primary_url}/config", auth=auth) - if not resp.ok: - continue + resp = await registry_async_retry_client_auth.get(f"{primary_url}/config", auth=auth) + assert resp.ok data = resp.json() if "compatibilityLevel" not in data: counter += 1 @@ -213,7 +233,9 @@ async def test_sr_auth_forwarding(registry_async_auth_pair: List[str]) -> None: # Test that Kafka REST API works when configured with Schema Registry requiring authorization -async def test_rest_api_with_sr_auth(rest_async_client_registry_auth: Client, admin_client: KafkaAdminClient) -> None: +async def test_rest_api_with_sr_auth( + rest_async_client_registry_auth: RetryRestClient, admin_client: KafkaAdminClient +) -> None: client = rest_async_client_registry_auth topic = new_topic(admin_client, prefix="cave-rest-") diff --git a/tests/integration/utils/rest_client.py b/tests/integration/utils/rest_client.py new file mode 100644 index 000000000..05539a34f --- /dev/null +++ b/tests/integration/utils/rest_client.py @@ -0,0 +1,81 @@ +""" +karapace - Test rest client with retries + +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" +from __future__ import annotations + +from aiohttp import BasicAuth +from collections.abc import Mapping +from karapace.client import Client, Headers, Path, Result +from karapace.typing import JsonData +from tenacity import retry, stop_after_attempt, wait_fixed +from typing import Final + +RETRY_WAIT_SECONDS: Final = 0.5 + + +class UnexpectedResponseStatus(Exception): + pass + + +class RetryRestClient: + def __init__(self, client: Client): + self._karapace_client = client + + @retry(stop=stop_after_attempt(5), wait=wait_fixed(RETRY_WAIT_SECONDS)) + async def get( + self, + path: Path, + json: JsonData | None = None, + headers: Headers | None = None, + auth: BasicAuth | None = None, + params: Mapping[str, str] | None = None, + expected_response_code: int = 200, + ) -> Result: + response: Result = await self._karapace_client.get(path, headers=headers, json=json, auth=auth, params=params) + if response.status_code != expected_response_code: + raise UnexpectedResponseStatus(f"Unexpected status code: {response!r}") + return response + + @retry(stop=stop_after_attempt(5), wait=wait_fixed(RETRY_WAIT_SECONDS)) + async def delete( + self, + path: Path, + headers: Headers | None = None, + auth: BasicAuth | None = None, + expected_response_code: int = 200, + ) -> Result: + response: Result = await self._karapace_client.delete(path=path, headers=headers, auth=auth) + if response.status_code != expected_response_code: + raise UnexpectedResponseStatus(f"Unexpected status code: {response!r}") + return response + + @retry(stop=stop_after_attempt(5), wait=wait_fixed(RETRY_WAIT_SECONDS)) + async def post( + self, + path: Path, + json: JsonData, + headers: Headers | None = None, + auth: BasicAuth | None = None, + expected_response_code: int = 200, + ) -> Result: + response: Result = await self._karapace_client.post(path=path, json=json, headers=headers, auth=auth) + if response.status_code != expected_response_code: + raise UnexpectedResponseStatus(f"Unexpected status code: {response!r}") + return response + + @retry(stop=stop_after_attempt(5), wait=wait_fixed(RETRY_WAIT_SECONDS)) + async def put( + self, + path: Path, + json: JsonData, + headers: Headers | None = None, + auth: BasicAuth | None = None, + expected_response_code: int = 200, + ) -> Result: + response: Result = await self._karapace_client.put(path=path, json=json, headers=headers, auth=auth) + if response.status_code != expected_response_code: + raise UnexpectedResponseStatus(f"Unexpected status code: {response!r}") + return response From 8c097332e04d8178a90ac828f5e05e551c418824 Mon Sep 17 00:00:00 2001 From: Jarkko Jaakola Date: Wed, 18 Sep 2024 13:12:32 +0300 Subject: [PATCH 8/8] chore: add --numprocesses to GH action pytest args --- .github/workflows/tests.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 1e5dee5a4..029a3a011 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -39,11 +39,11 @@ jobs: - run: make unit-tests env: COVERAGE_FILE: ".coverage.${{ matrix.python-version }}" - PYTEST_ARGS: "--cov=karapace --cov-append" + PYTEST_ARGS: "--cov=karapace --cov-append --numprocesses 4" - run: make integration-tests env: COVERAGE_FILE: ".coverage.${{ matrix.python-version }}" - PYTEST_ARGS: "--cov=karapace --cov-append --random-order" + PYTEST_ARGS: "--cov=karapace --cov-append --random-order --numprocesses 4" - name: Archive logs uses: actions/upload-artifact@v4