From 01960de193d15edb70f27a518b538fa281d728c1 Mon Sep 17 00:00:00 2001 From: Sebastian Utz Date: Fri, 24 May 2024 14:06:36 +0200 Subject: [PATCH] cfr: Add support to export systables to a tarfile --- cratedb_toolkit/cfr/cli.py | 16 +++++++++++-- cratedb_toolkit/cfr/systable.py | 35 +++++++++++++++++++++++++---- tests/cfr/test_cli.py | 40 +++++++++++++++++++++++++++++++++ 3 files changed, 85 insertions(+), 6 deletions(-) diff --git a/cratedb_toolkit/cfr/cli.py b/cratedb_toolkit/cfr/cli.py index 4aab7450..e39b0b98 100644 --- a/cratedb_toolkit/cfr/cli.py +++ b/cratedb_toolkit/cfr/cli.py @@ -6,7 +6,7 @@ import click from click_aliases import ClickAliasedGroup -from cratedb_toolkit.cfr.systable import SystemTableExporter, SystemTableImporter +from cratedb_toolkit.cfr.systable import Archive, SystemTableExporter, SystemTableImporter from cratedb_toolkit.util.cli import ( boot_click, error_logger, @@ -46,8 +46,20 @@ def cli(ctx: click.Context, cratedb_sqlalchemy_url: str, verbose: bool, debug: b def sys_export(ctx: click.Context, target: str): cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"] try: - stc = SystemTableExporter(dburi=cratedb_sqlalchemy_url, target=path_from_url(target)) + target_path = path_from_url(target) + stc = SystemTableExporter(dburi=cratedb_sqlalchemy_url, target=target_path) + + archive = None + if target_path.name.endswith(".tgz") or target_path.name.endswith(".tar.gz"): + archive = Archive(stc) + path = stc.save() + + if archive is not None: + path = archive.make_tarfile() + archive.close() + logger.info(f"Created archive file {target}") + jd({"path": str(path)}) except Exception as ex: error_logger(ctx)(ex) diff --git a/cratedb_toolkit/cfr/systable.py b/cratedb_toolkit/cfr/systable.py index 9fb122a4..051549c3 100644 --- a/cratedb_toolkit/cfr/systable.py +++ b/cratedb_toolkit/cfr/systable.py @@ -17,6 +17,9 @@ import datetime as dt import logging +import os +import tarfile +import tempfile import typing as t from pathlib import Path @@ -87,19 +90,42 @@ def ddl(self, tablename_in: str, tablename_out: str, out_schema: str = None, wit return sql -class SystemTableExporter: +class PathProvider: + + def __init__(self, path: t.Union[Path]): + self.path = path + + +class Archive: + + def __init__(self, path_provider: PathProvider): + self.path_provider = path_provider + self.temp_dir = tempfile.TemporaryDirectory() + self.target_path = self.path_provider.path + self.path_provider.path = Path(self.temp_dir.name) + + def close(self): + self.temp_dir.cleanup() + + def make_tarfile(self) -> Path: + source_path = self.path_provider.path + with tarfile.open(self.target_path, "x:gz") as tar: + tar.add(source_path.absolute(), arcname=os.path.basename(source_path)) + return self.target_path + + +class SystemTableExporter(PathProvider): """ Export schema and data from CrateDB system tables. """ def __init__(self, dburi: str, target: t.Union[Path], data_format: DataFormat = "jsonl"): + super().__init__(target) self.dburi = dburi - self.target = target self.data_format = data_format self.adapter = DatabaseAdapter(dburi=self.dburi) self.info = InfoContainer(adapter=self.adapter) self.inspector = SystemTableInspector(dburi=self.dburi) - self.target.mkdir(exist_ok=True, parents=True) def read_table(self, tablename: str) -> pl.DataFrame: sql = f'SELECT * FROM "{SystemTableKnowledge.SYS_SCHEMA}"."{tablename}"' # noqa: S608 @@ -123,8 +149,9 @@ def dump_table(self, frame: pl.DataFrame, file: t.Union[t.TextIO, None] = None): raise NotImplementedError(f"Output format not implemented: {self.data_format}") def save(self) -> Path: + self.path.mkdir(exist_ok=True, parents=True) timestamp = dt.datetime.now().strftime("%Y-%m-%dT%H-%M-%S") - path = self.target / self.info.cluster_name / timestamp / "sys" + path = self.path / self.info.cluster_name / timestamp / "sys" logger.info(f"Exporting system tables to: {path}") system_tables = self.inspector.table_names() path_schema = path / ExportSettings.SCHEMA_PATH diff --git a/tests/cfr/test_cli.py b/tests/cfr/test_cli.py index 2ef639e9..412b096f 100644 --- a/tests/cfr/test_cli.py +++ b/tests/cfr/test_cli.py @@ -1,7 +1,9 @@ import json +import os.path import re import shutil import sys +import tarfile import tests @@ -49,6 +51,44 @@ def test_cfr_cli_export_success(cratedb, tmp_path, caplog): assert len(data_files) >= 10 +def test_cfr_cli_export_to_archive_file(cratedb, tmp_path, caplog): + """ + Verify `ctk cfr sys-export some-file.tgz` works. + """ + + target = os.path.join(tmp_path, "cluster-data.tgz") + + # Invoke command. + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb.database.dburi, "CFR_TARGET": str(tmp_path)}) + result = runner.invoke( + cli, + args=f"--debug sys-export {target}", + catch_exceptions=False, + ) + assert result.exit_code == 0 + + # Verify log output. + assert "Exporting system tables to" in caplog.text + assert re.search(r"Successfully exported \d+ system tables", caplog.text), "Log message missing" + + # Verify outcome. + path = Path(json.loads(result.output)["path"]) + assert "cluster-data.tgz" in path.name + + data_files = [] + schema_files = [] + with tarfile.open(path, "r") as tar: + name_list = tar.getnames() + for name in name_list: + if "data" in name: + data_files.append(name) + elif "schema" in name: + schema_files.append(name) + + assert len(schema_files) >= 19 + assert len(data_files) >= 10 + + def test_cfr_cli_export_failure(cratedb, tmp_path, caplog): """ Verify `ctk cfr sys-export` failure.