diff --git a/.gitignore b/.gitignore index c272cda7..e06bc42b 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ __pycache__ dist .coverage* coverage.xml +/cfr /foo /tmp /DOWNLOAD diff --git a/CHANGES.md b/CHANGES.md index 9ec95a6c..335cd695 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,7 +2,7 @@ ## Unreleased -- Add `cratedb-wtf` diagnostics program +- Add `ctk cfr` and `ctk wtf` diagnostics programs ## 2024/04/10 v0.0.10 - Dependencies: Unpin upper version bound of `dask`. Otherwise, compatibility diff --git a/cratedb_toolkit/cfr/README.md b/cratedb_toolkit/cfr/README.md new file mode 100644 index 00000000..e07d637a --- /dev/null +++ b/cratedb_toolkit/cfr/README.md @@ -0,0 +1,28 @@ +# CrateDB Cluster Flight Recorder (CFR) + +Collect required cluster information for support requests +and self-service debugging. + + +## Synopsis + +Define CrateDB database cluster address. +```shell +export CRATEDB_SQLALCHEMY_URL=crate://localhost/ +``` + +Export system table information into timestamped file, +by default into the `cfr/sys` directory. +```shell +ctk cfr sys-export +``` + +Export system table information into given directory. +```shell +ctk cfr sys-export file:///var/ctk/cfr/sys +``` + +Import system table information from given directory. +```shell +ctk cfr sys-import file://./cfr/sys/2024-04-16T05-43-37 +``` diff --git a/cratedb_toolkit/cfr/__init__.py b/cratedb_toolkit/cfr/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cratedb_toolkit/cfr/backlog.md b/cratedb_toolkit/cfr/backlog.md new file mode 100644 index 00000000..8435c2c6 --- /dev/null +++ b/cratedb_toolkit/cfr/backlog.md @@ -0,0 +1,6 @@ +# CFR Backlog + +## Iteration +1 +- sys-export: Does the program need capabilities to **LIMIT** cardinality + on `sys-export` operations, for example, when they are super large? +- sys-import: Accept target database schema. diff --git a/cratedb_toolkit/cfr/cli.py b/cratedb_toolkit/cfr/cli.py new file mode 100644 index 00000000..18256211 --- /dev/null +++ b/cratedb_toolkit/cfr/cli.py @@ -0,0 +1,67 @@ +# Copyright (c) 2021-2024, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. +import logging +import sys + +import click +from click_aliases import ClickAliasedGroup + +from cratedb_toolkit.cfr.systable import SystemTableExporter, SystemTableImporter +from cratedb_toolkit.util.cli import ( + boot_click, + error_logger, + make_command, +) +from cratedb_toolkit.util.data import jd, path_from_url + +logger = logging.getLogger(__name__) + + +cratedb_sqlalchemy_option = click.option( + "--cratedb-sqlalchemy-url", envvar="CRATEDB_SQLALCHEMY_URL", type=str, required=False, help="CrateDB SQLAlchemy URL" +) + + +@click.group(cls=ClickAliasedGroup) # type: ignore[arg-type] +@cratedb_sqlalchemy_option +@click.option("--verbose", is_flag=True, required=False, help="Turn on logging") +@click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level") +@click.option("--scrub", envvar="SCRUB", is_flag=True, required=False, help="Blank out identifiable information") +@click.version_option() +@click.pass_context +def cli(ctx: click.Context, cratedb_sqlalchemy_url: str, verbose: bool, debug: bool, scrub: bool): + """ + Diagnostics and informational utilities. + """ + if not cratedb_sqlalchemy_url: + logger.error("Unable to operate without database address") + sys.exit(1) + ctx.meta.update({"cratedb_sqlalchemy_url": cratedb_sqlalchemy_url, "scrub": scrub}) + return boot_click(ctx, verbose, debug) + + +@make_command(cli, "sys-export") +@click.argument("target", envvar="CFR_TARGET", type=str, required=False, default="file://./cfr/sys") +@click.pass_context +def sys_export(ctx: click.Context, target: str): + cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"] + try: + stc = SystemTableExporter(dburi=cratedb_sqlalchemy_url, target=path_from_url(target)) + path = stc.save() + jd({"path": str(path)}) + except Exception as ex: + error_logger(ctx)(ex) + sys.exit(1) + + +@make_command(cli, "sys-import") +@click.argument("source", envvar="CFR_SOURCE", type=str, required=True) +@click.pass_context +def sys_import(ctx: click.Context, source: str): + cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"] + try: + stc = SystemTableImporter(dburi=cratedb_sqlalchemy_url, source=path_from_url(source)) + stc.load() + except Exception as ex: + error_logger(ctx)(ex) + sys.exit(1) diff --git a/cratedb_toolkit/cfr/systable.py b/cratedb_toolkit/cfr/systable.py new file mode 100644 index 00000000..7d77407a --- /dev/null +++ b/cratedb_toolkit/cfr/systable.py @@ -0,0 +1,210 @@ +""" +CrateDB Diagnostics: System Tables Exporter and Importer. + +Schemas and results of following queries should be included: +```sql +SELECT * FROM sys.cluster +SELECT * FROM sys.nodes +SELECT * FROM sys.shards +SELECT * FROM sys.allocations +SELECT * FROM sys.jobs_log +SELECT * FROM sys.operations_log +``` + +https://cratedb.com/docs/python/en/latest/by-example/sqlalchemy/inspection-reflection.html +https://docs.sqlalchemy.org/en/20/faq/metadata_schema.html#how-can-i-get-the-create-table-drop-table-output-as-a-string +""" + +import datetime as dt +import logging +import typing as t +from pathlib import Path + +import polars as pl +import sqlalchemy as sa +from tqdm import tqdm + +from cratedb_toolkit.sqlalchemy.patch import patch_encoder +from cratedb_toolkit.util import DatabaseAdapter +from cratedb_toolkit.util.cli import error_logger + +logger = logging.getLogger(__name__) + + +DataFormat = t.Literal["csv", "jsonl", "ndjson", "parquet"] + + +class SystemTableKnowledge: + """ + Manage a few bits of knowledge about CrateDB internals. + """ + + # Name of CrateDB's schema for system tables. + SYS_SCHEMA = "sys" + + # TODO: Reflecting the `summits` table raises an error. + # AttributeError: 'UserDefinedType' object has no attribute 'get_col_spec' + REFLECTION_BLOCKLIST = ["summits"] + + +class ExportSettings: + """ + Manage a few bits of knowledge about how to export system tables from CrateDB. + """ + + # Subdirectories where to store schema vs. data information. + SCHEMA_PATH = "schema" + DATA_PATH = "data" + + # The filename prefix when storing tables to disk. + TABLE_FILENAME_PREFIX = "sys-" + + +class SystemTableInspector: + """ + Reflect schema information from CrateDB system tables. + """ + + def __init__(self, dburi: str): + self.dburi = dburi + self.adapter = DatabaseAdapter(dburi=self.dburi) + self.engine = self.adapter.engine + self.inspector = sa.inspect(self.engine) + + def table_names(self): + return self.inspector.get_table_names(schema=SystemTableKnowledge.SYS_SCHEMA) + + def ddl(self, tablename_in: str, tablename_out: str, out_schema: str = None, with_drop_table: bool = False) -> str: + meta = sa.MetaData(schema=SystemTableKnowledge.SYS_SCHEMA) + table = sa.Table(tablename_in, meta, autoload_with=self.engine) + table.schema = out_schema + table.name = tablename_out + sql = "" + if with_drop_table: + sql += sa.schema.DropTable(table, if_exists=True).compile(self.engine).string.strip() + ";\n" + sql += sa.schema.CreateTable(table, if_not_exists=True).compile(self.engine).string.strip() + ";\n" + return sql + + +class SystemTableExporter: + """ + Export schema and data from CrateDB system tables. + """ + + def __init__(self, dburi: str, target: t.Union[Path], data_format: DataFormat = "jsonl"): + self.dburi = dburi + self.target = target + self.data_format = data_format + self.adapter = DatabaseAdapter(dburi=self.dburi) + self.engine = self.adapter.engine + self.inspector = SystemTableInspector(dburi=self.dburi) + self.target.mkdir(exist_ok=True, parents=True) + + def dump_table(self, tablename: str, file: t.Union[t.TextIO, None] = None): + sql = f'SELECT * FROM "{SystemTableKnowledge.SYS_SCHEMA}"."{tablename}"' # noqa: S608 + # logger.info(f"Running SQL: {sql}") # noqa: ERA001 + df: pl.DataFrame = pl.read_database( + query=sql, # noqa: S608 + connection=self.engine, + ) + if self.data_format == "csv": + # polars.exceptions.ComputeError: CSV format does not support nested data + # return df.write_csv() # noqa: ERA001 + return df.to_pandas().to_csv(file) + elif self.data_format in ["jsonl", "ndjson"]: + return df.write_ndjson(file and file.buffer) # type: ignore[arg-type] + elif self.data_format in ["parquet", "pq"]: + return df.write_parquet(file and file.buffer) # type: ignore[arg-type] + else: + raise NotImplementedError(f"Output format not implemented: {self.data_format}") + + def save(self) -> Path: + timestamp = dt.datetime.now().strftime("%Y-%m-%dT%H-%M-%S") + path = self.target / timestamp + logger.info(f"Exporting system tables to: {path}") + system_tables = self.inspector.table_names() + path_schema = path / ExportSettings.SCHEMA_PATH + path_data = path / ExportSettings.DATA_PATH + path_schema.mkdir(parents=True, exist_ok=True) + path_data.mkdir(parents=True, exist_ok=True) + table_count = 0 + for tablename in tqdm(system_tables, disable=None): + if tablename in SystemTableKnowledge.REFLECTION_BLOCKLIST: + continue + path_table_schema = path_schema / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.sql" + path_table_data = path_data / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.{self.data_format}" + tablename_out = f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}" + with open(path_table_schema, "w") as fh_schema: + print(self.inspector.ddl(tablename_in=tablename, tablename_out=tablename_out), file=fh_schema) + mode = "w" + if self.data_format in ["parquet", "pq"]: + mode = "wb" + with open(path_table_data, mode) as fh_data: + self.dump_table(tablename=tablename, file=t.cast(t.TextIO, fh_data)) + table_count += 1 + logger.info(f"Successfully exported {table_count} system tables") + return path + + +class SystemTableImporter: + """ + Import schema and data about CrateDB system tables. + """ + + def __init__(self, dburi: str, source: Path, data_format: DataFormat = "jsonl", debug: bool = False): + self.dburi = dburi + self.source = source + self.data_format = data_format + self.debug = debug + self.adapter = DatabaseAdapter(dburi=self.dburi) + + def table_names(self): + path_schema = self.source / ExportSettings.SCHEMA_PATH + names = [] + for item in path_schema.glob("*.sql"): + name = item.name.replace(ExportSettings.TABLE_FILENAME_PREFIX, "").replace(".sql", "") + names.append(name) + return names + + def load(self): + path_schema = self.source / ExportSettings.SCHEMA_PATH + path_data = self.source / ExportSettings.DATA_PATH + + if not path_schema.exists(): + raise FileNotFoundError(f"Path does not exist: {path_schema}") + + logger.info(f"Importing system tables from: {self.source}") + + for tablename in tqdm(self.table_names()): + tablename_restored = ExportSettings.TABLE_FILENAME_PREFIX + tablename + + path_table_schema = path_schema / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.sql" + path_table_data = path_data / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.{self.data_format}" + + # Skip empty files. + if Path(path_table_data).stat().st_size == 0: + continue + + # Invoke SQL DDL. + schema_sql = path_table_schema.read_text() + self.adapter.run_sql(schema_sql) + + # Load data. + try: + df: pl.DataFrame = self.load_table(path_table_data) + df.write_database(table_name=tablename_restored, connection=self.dburi, if_table_exists="append") + except Exception as ex: + error_logger(self.debug)(f"Importing table failed: {tablename}. Reason: {ex}") + + # df.to_pandas().to_sql(name=tablename, con=self.adapter.engine, if_exists="append", index=False) # noqa: ERA001, E501 + + def load_table(self, path: Path) -> pl.DataFrame: + if path.suffix in [".jsonl"]: + return pl.read_ndjson(path) + elif path.suffix in [".parquet", ".pq"]: + return pl.read_parquet(path) + else: + raise NotImplementedError(f"Input format not implemented: {path.suffix}") + + +patch_encoder() diff --git a/cratedb_toolkit/cli.py b/cratedb_toolkit/cli.py index 9315a701..661b7631 100644 --- a/cratedb_toolkit/cli.py +++ b/cratedb_toolkit/cli.py @@ -3,6 +3,7 @@ from cratedb_toolkit.util.cli import boot_click +from .cfr.cli import cli as cfr_cli from .cluster.cli import cli as cloud_cli from .io.cli import cli as io_cli from .job.cli import cli_list_jobs @@ -19,6 +20,7 @@ def cli(ctx: click.Context, verbose: bool, debug: bool): return boot_click(ctx, verbose, debug) +cli.add_command(cfr_cli, name="cfr") cli.add_command(cloud_cli, name="cluster") cli.add_command(io_cli, name="load") cli.add_command(shell_cli, name="shell") diff --git a/cratedb_toolkit/sqlalchemy/patch.py b/cratedb_toolkit/sqlalchemy/patch.py index 78c89770..67613c29 100644 --- a/cratedb_toolkit/sqlalchemy/patch.py +++ b/cratedb_toolkit/sqlalchemy/patch.py @@ -1,5 +1,11 @@ +import calendar +import datetime as dt +import json import typing as t +from decimal import Decimal +from uuid import UUID +import numpy as np import sqlalchemy as sa @@ -32,3 +38,37 @@ def get_table_names(self, connection: sa.Connection, schema: t.Optional[str] = N return get_table_names_dist(self, connection=connection, schema=schema, **kw) CrateDialect.get_table_names = get_table_names # type: ignore + + +def patch_encoder(): + import crate.client.http + + crate.client.http.CrateJsonEncoder = CrateJsonEncoderWithNumPy + + +class CrateJsonEncoderWithNumPy(json.JSONEncoder): + epoch_aware = dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc) + epoch_naive = dt.datetime(1970, 1, 1) + + def default(self, o): + # Vanilla CrateDB Python. + if isinstance(o, (Decimal, UUID)): + return str(o) + if isinstance(o, dt.datetime): + if o.tzinfo is not None: + delta = o - self.epoch_aware + else: + delta = o - self.epoch_naive + return int(delta.microseconds / 1000.0 + (delta.seconds + delta.days * 24 * 3600) * 1000.0) + if isinstance(o, dt.date): + return calendar.timegm(o.timetuple()) * 1000 + + # NumPy ndarray and friends. + # https://stackoverflow.com/a/49677241 + if isinstance(o, np.integer): + return int(o) + elif isinstance(o, np.floating): + return float(o) + elif isinstance(o, np.ndarray): + return o.tolist() + return json.JSONEncoder.default(self, o) diff --git a/cratedb_toolkit/util/cli.py b/cratedb_toolkit/util/cli.py index f5956dea..f5ebc3ea 100644 --- a/cratedb_toolkit/util/cli.py +++ b/cratedb_toolkit/util/cli.py @@ -108,3 +108,26 @@ def decorator(f): return f return decorator + + +def error_level_by_debug(debug: bool): + if debug: + return logger.exception + else: + return logger.error + + +def running_with_debug(ctx: click.Context) -> bool: + return ( + (ctx.parent and ctx.parent.params.get("debug", False)) + or (ctx.parent and ctx.parent.parent and ctx.parent.parent.params.get("debug", False)) + or False + ) + + +def error_logger(about: t.Union[click.Context, bool]) -> t.Callable: + if isinstance(about, click.Context): + return error_level_by_debug(running_with_debug(about)) + if isinstance(about, bool): + return error_level_by_debug(about) + raise TypeError(f"Unknown type for argument: {about}") diff --git a/cratedb_toolkit/util/data.py b/cratedb_toolkit/util/data.py index 62cdbb33..b7547bfe 100644 --- a/cratedb_toolkit/util/data.py +++ b/cratedb_toolkit/util/data.py @@ -2,6 +2,9 @@ import json import sys import typing as t +from pathlib import Path + +from yarl import URL def jd(data: t.Any): @@ -29,3 +32,9 @@ def default(self, o): return o.isoformat() return json.JSONEncoder.default(self, o) + + +def path_from_url(url: str): + url_obj = URL(url) + path = Path(url_obj.host or "" + url_obj.path or "") + return path.absolute() diff --git a/pyproject.toml b/pyproject.toml index 479cb2bd..ec125da8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -93,10 +93,12 @@ dependencies = [ "crate[sqlalchemy]>=0.34", "fastapi<0.105", 'importlib-metadata; python_version <= "3.7"', + "polars<0.21", "python-dotenv<2", "python-slugify<9", "sqlalchemy", "sqlparse<0.5", + "tqdm<5", 'typing-extensions<5; python_version <= "3.7"', "uvicorn<0.25", ] diff --git a/tests/cfr/__init__.py b/tests/cfr/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/cfr/test_cli.py b/tests/cfr/test_cli.py new file mode 100644 index 00000000..fc60705d --- /dev/null +++ b/tests/cfr/test_cli.py @@ -0,0 +1,40 @@ +import json +import re +from pathlib import Path + +from click.testing import CliRunner + +from cratedb_toolkit.cfr.cli import cli + + +def filenames(path: Path): + return sorted([item.name for item in path.iterdir()]) + + +def test_cfr_cli_export(cratedb, tmp_path, caplog): + """ + Verify `ctk cfr sys-export` works. + """ + + # Invoke command. + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb.database.dburi, "CFR_TARGET": str(tmp_path)}) + result = runner.invoke( + cli, + args="--debug sys-export", + catch_exceptions=False, + ) + assert result.exit_code == 0 + + # Verify log output. + assert "Exporting system tables to" in caplog.text + assert re.search(r"Successfully exported \d+ system tables", caplog.text), "Log message missing" + + # Verify outcome. + path = Path(json.loads(result.output)["path"]) + assert filenames(path) == ["data", "schema"] + + schema_files = filenames(path / "schema") + data_files = filenames(path / "schema") + + assert len(schema_files) >= 19 + assert len(data_files) >= 19