diff --git a/src/karapace/backup/api.py b/src/karapace/backup/api.py index d06c99ebe..3da9a2304 100644 --- a/src/karapace/backup/api.py +++ b/src/karapace/backup/api.py @@ -373,13 +373,20 @@ def _handle_restore_topic( instruction: RestoreTopic, config: Config, skip_topic_creation: bool = False, + override_replication_factor: int | None = None, ) -> None: if skip_topic_creation: return + repl_factor = instruction.replication_factor + if override_replication_factor is not None: + LOG.info( + "Overriding replication factor with: %d (was: %d)", override_replication_factor, instruction.replication_factor + ) + repl_factor = override_replication_factor if not _maybe_create_topic( config=config, name=instruction.topic_name, - replication_factor=instruction.replication_factor, + replication_factor=repl_factor, topic_configs=instruction.topic_configs, ): raise BackupTopicAlreadyExists(f"Topic to restore '{instruction.topic_name}' already exists") @@ -426,6 +433,7 @@ def restore_backup( backup_location: ExistingFile, topic_name: TopicName, skip_topic_creation: bool = False, + override_replication_factor: int | None = None, ) -> None: """Restores a backup from the specified location into the configured topic. @@ -475,7 +483,7 @@ def _check_producer_exception() -> None: _handle_restore_topic_legacy(instruction, config, skip_topic_creation) producer = stack.enter_context(_producer(config, instruction.topic_name)) elif isinstance(instruction, RestoreTopic): - _handle_restore_topic(instruction, config, skip_topic_creation) + _handle_restore_topic(instruction, config, skip_topic_creation, override_replication_factor) producer = stack.enter_context(_producer(config, instruction.topic_name)) elif isinstance(instruction, ProducerSend): if producer is None: diff --git a/src/karapace/backup/cli.py b/src/karapace/backup/cli.py index 5e3d72854..7125b1e04 100644 --- a/src/karapace/backup/cli.py +++ b/src/karapace/backup/cli.py @@ -76,6 +76,15 @@ def parse_args() -> argparse.Namespace: ), ) + parser_restore.add_argument( + "--override-replication-factor", + help=( + "Override the replication factor that is save in the backup. This is needed when restoring a backup from a" + "downsized cluster (like scaling down from 6 to 3 nodes). This has effect only for V3 backups." + ), + type=int, + ) + return parser.parse_args() @@ -115,6 +124,7 @@ def dispatch(args: argparse.Namespace) -> None: backup_location=api.locate_backup_file(location), topic_name=api.normalize_topic_name(args.topic, config), skip_topic_creation=args.skip_topic_creation, + override_replication_factor=args.override_replication_factor, ) except BackupDataRestorationError: traceback.print_exc() diff --git a/tests/integration/backup/test_v3_backup.py b/tests/integration/backup/test_v3_backup.py index 332b09f0a..6f2e5df35 100644 --- a/tests/integration/backup/test_v3_backup.py +++ b/tests/integration/backup/test_v3_backup.py @@ -4,7 +4,7 @@ """ from __future__ import annotations -from aiokafka.errors import UnknownTopicOrPartitionError +from aiokafka.errors import InvalidReplicationFactorError, UnknownTopicOrPartitionError from collections.abc import Iterator from confluent_kafka import Message, TopicPartition from confluent_kafka.admin import NewTopic @@ -698,6 +698,56 @@ def __exit__(self, exc_type, exc_value, exc_traceback): ) +def test_backup_restoration_override_replication_factor( + admin_client: KafkaAdminClient, + kafka_servers: KafkaServers, + producer: KafkaProducer, + new_topic: NewTopic, +) -> None: + backup_directory = Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / new_topic.topic + metadata_path = backup_directory / f"{new_topic.topic}.metadata" + config = set_config_defaults( + { + "bootstrap_uri": kafka_servers.bootstrap_servers, + } + ) + + # pupulate the topic and create a backup + for i in range(10): + producer.send( + new_topic.topic, + key=f"message-key-{i}", + value=f"message-value-{i}-" + 1000 * "X", + ) + producer.flush() + api.create_backup( + config=config, + backup_location=backup_directory, + topic_name=TopicName(new_topic.topic), + version=BackupVersion.V3, + replication_factor=6, + ) + + # make sure topic doesn't exist beforehand. + _delete_topic(admin_client, new_topic.topic) + + # assert that the restore would fail without the replication factor override + with pytest.raises(InvalidReplicationFactorError): + api.restore_backup( + config=config, + backup_location=metadata_path, + topic_name=TopicName(new_topic.topic), + ) + + # finally restore the backup with override + api.restore_backup( + config=config, + backup_location=metadata_path, + topic_name=TopicName(new_topic.topic), + override_replication_factor=1, + ) + + def no_color_env() -> dict[str, str]: env = os.environ.copy() try: