Skip to content

Commit

Permalink
cfr: Add ctk cfr diagnostics program
Browse files Browse the repository at this point in the history
Add basic implementation for `sys-export` and `sys-import` subcommands.

It is about exporting system tables of CrateDB into SQL DDL and JSONL
files, and re-importing them for later analysis.
  • Loading branch information
amotl committed Apr 17, 2024
1 parent f3db9b3 commit f0902b2
Show file tree
Hide file tree
Showing 14 changed files with 429 additions and 1 deletion.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ __pycache__
dist
.coverage*
coverage.xml
/cfr
/foo
/tmp
/DOWNLOAD
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


## Unreleased
- Add `cratedb-wtf` diagnostics program
- Add `ctk cfr` and `ctk wtf` diagnostics programs

## 2024/04/10 v0.0.10
- Dependencies: Unpin upper version bound of `dask`. Otherwise, compatibility
Expand Down
28 changes: 28 additions & 0 deletions cratedb_toolkit/cfr/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# CrateDB Cluster Flight Recorder (CFR)

Collect required cluster information for support requests
and self-service debugging.


## Synopsis

Define CrateDB database cluster address.
```shell
export CRATEDB_SQLALCHEMY_URL=crate://localhost/
```

Export system table information into timestamped file,
by default into the `cfr/sys` directory.
```shell
ctk cfr sys-export
```

Export system table information into given directory.
```shell
ctk cfr sys-export file:///var/ctk/cfr/sys
```

Import system table information from given directory.
```shell
ctk cfr sys-import file://./cfr/sys/2024-04-16T05-43-37
```
Empty file added cratedb_toolkit/cfr/__init__.py
Empty file.
6 changes: 6 additions & 0 deletions cratedb_toolkit/cfr/backlog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# CFR Backlog

## Iteration +1
- sys-export: Does the program need capabilities to **LIMIT** cardinality
on `sys-export` operations, for example, when they are super large?
- sys-import: Accept target database schema.
67 changes: 67 additions & 0 deletions cratedb_toolkit/cfr/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright (c) 2021-2024, Crate.io Inc.
# Distributed under the terms of the AGPLv3 license, see LICENSE.
import logging
import sys

import click
from click_aliases import ClickAliasedGroup

from cratedb_toolkit.cfr.systable import SystemTableExporter, SystemTableImporter
from cratedb_toolkit.util.cli import (
boot_click,
error_logger,
make_command,
)
from cratedb_toolkit.util.data import jd, path_from_url

logger = logging.getLogger(__name__)


cratedb_sqlalchemy_option = click.option(
"--cratedb-sqlalchemy-url", envvar="CRATEDB_SQLALCHEMY_URL", type=str, required=False, help="CrateDB SQLAlchemy URL"
)


@click.group(cls=ClickAliasedGroup) # type: ignore[arg-type]
@cratedb_sqlalchemy_option
@click.option("--verbose", is_flag=True, required=False, help="Turn on logging")
@click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level")
@click.option("--scrub", envvar="SCRUB", is_flag=True, required=False, help="Blank out identifiable information")
@click.version_option()
@click.pass_context
def cli(ctx: click.Context, cratedb_sqlalchemy_url: str, verbose: bool, debug: bool, scrub: bool):
"""
Diagnostics and informational utilities.
"""
if not cratedb_sqlalchemy_url:
logger.error("Unable to operate without database address")
sys.exit(1)
ctx.meta.update({"cratedb_sqlalchemy_url": cratedb_sqlalchemy_url, "scrub": scrub})
return boot_click(ctx, verbose, debug)


@make_command(cli, "sys-export")
@click.argument("target", envvar="CFR_TARGET", type=str, required=False, default="file://./cfr/sys")
@click.pass_context
def sys_export(ctx: click.Context, target: str):
cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"]
try:
stc = SystemTableExporter(dburi=cratedb_sqlalchemy_url, target=path_from_url(target))
path = stc.save()
jd({"path": str(path)})
except Exception as ex:
error_logger(ctx)(ex)
sys.exit(1)


@make_command(cli, "sys-import")
@click.argument("source", envvar="CFR_SOURCE", type=str, required=True)
@click.pass_context
def sys_import(ctx: click.Context, source: str):
cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"]
try:
stc = SystemTableImporter(dburi=cratedb_sqlalchemy_url, source=path_from_url(source))
stc.load()
except Exception as ex:
error_logger(ctx)(ex)
sys.exit(1)
210 changes: 210 additions & 0 deletions cratedb_toolkit/cfr/systable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
"""
CrateDB Diagnostics: System Tables Exporter and Importer.
Schemas and results of following queries should be included:
```sql
SELECT * FROM sys.cluster
SELECT * FROM sys.nodes
SELECT * FROM sys.shards
SELECT * FROM sys.allocations
SELECT * FROM sys.jobs_log
SELECT * FROM sys.operations_log
```
https://cratedb.com/docs/python/en/latest/by-example/sqlalchemy/inspection-reflection.html
https://docs.sqlalchemy.org/en/20/faq/metadata_schema.html#how-can-i-get-the-create-table-drop-table-output-as-a-string
"""

import datetime as dt
import logging
import typing as t
from pathlib import Path

import polars as pl
import sqlalchemy as sa
from tqdm import tqdm

from cratedb_toolkit.sqlalchemy.patch import patch_encoder
from cratedb_toolkit.util import DatabaseAdapter
from cratedb_toolkit.util.cli import error_logger

logger = logging.getLogger(__name__)


DataFormat = t.Literal["csv", "jsonl", "ndjson", "parquet"]


class SystemTableKnowledge:
"""
Manage a few bits of knowledge about CrateDB internals.
"""

# Name of CrateDB's schema for system tables.
SYS_SCHEMA = "sys"

# TODO: Reflecting the `summits` table raises an error.
# AttributeError: 'UserDefinedType' object has no attribute 'get_col_spec'
REFLECTION_BLOCKLIST = ["summits"]


class ExportSettings:
"""
Manage a few bits of knowledge about how to export system tables from CrateDB.
"""

# Subdirectories where to store schema vs. data information.
SCHEMA_PATH = "schema"
DATA_PATH = "data"

# The filename prefix when storing tables to disk.
TABLE_FILENAME_PREFIX = "sys-"


class SystemTableInspector:
"""
Reflect schema information from CrateDB system tables.
"""

def __init__(self, dburi: str):
self.dburi = dburi
self.adapter = DatabaseAdapter(dburi=self.dburi)
self.engine = self.adapter.engine
self.inspector = sa.inspect(self.engine)

def table_names(self):
return self.inspector.get_table_names(schema=SystemTableKnowledge.SYS_SCHEMA)

def ddl(self, tablename_in: str, tablename_out: str, out_schema: str = None, with_drop_table: bool = False) -> str:
meta = sa.MetaData(schema=SystemTableKnowledge.SYS_SCHEMA)
table = sa.Table(tablename_in, meta, autoload_with=self.engine)
table.schema = out_schema
table.name = tablename_out
sql = ""
if with_drop_table:
sql += sa.schema.DropTable(table, if_exists=True).compile(self.engine).string.strip() + ";\n"
sql += sa.schema.CreateTable(table, if_not_exists=True).compile(self.engine).string.strip() + ";\n"
return sql


class SystemTableExporter:
"""
Export schema and data from CrateDB system tables.
"""

def __init__(self, dburi: str, target: t.Union[Path], data_format: DataFormat = "jsonl"):
self.dburi = dburi
self.target = target
self.data_format = data_format
self.adapter = DatabaseAdapter(dburi=self.dburi)
self.engine = self.adapter.engine
self.inspector = SystemTableInspector(dburi=self.dburi)
self.target.mkdir(exist_ok=True, parents=True)

def dump_table(self, tablename: str, file: t.Union[t.TextIO, None] = None):
sql = f'SELECT * FROM "{SystemTableKnowledge.SYS_SCHEMA}"."{tablename}"' # noqa: S608
# logger.info(f"Running SQL: {sql}") # noqa: ERA001
df: pl.DataFrame = pl.read_database(
query=sql, # noqa: S608
connection=self.engine,
)
if self.data_format == "csv":
# polars.exceptions.ComputeError: CSV format does not support nested data
# return df.write_csv() # noqa: ERA001
return df.to_pandas().to_csv(file)
elif self.data_format in ["jsonl", "ndjson"]:
return df.write_ndjson(file and file.buffer) # type: ignore[arg-type]
elif self.data_format in ["parquet", "pq"]:
return df.write_parquet(file and file.buffer) # type: ignore[arg-type]
else:
raise NotImplementedError(f"Output format not implemented: {self.data_format}")

def save(self) -> Path:
timestamp = dt.datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
path = self.target / timestamp
logger.info(f"Exporting system tables to: {path}")
system_tables = self.inspector.table_names()
path_schema = path / ExportSettings.SCHEMA_PATH
path_data = path / ExportSettings.DATA_PATH
path_schema.mkdir(parents=True, exist_ok=True)
path_data.mkdir(parents=True, exist_ok=True)
table_count = 0
for tablename in tqdm(system_tables, disable=None):
if tablename in SystemTableKnowledge.REFLECTION_BLOCKLIST:
continue
path_table_schema = path_schema / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.sql"
path_table_data = path_data / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.{self.data_format}"
tablename_out = f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}"
with open(path_table_schema, "w") as fh_schema:
print(self.inspector.ddl(tablename_in=tablename, tablename_out=tablename_out), file=fh_schema)
mode = "w"
if self.data_format in ["parquet", "pq"]:
mode = "wb"
with open(path_table_data, mode) as fh_data:
self.dump_table(tablename=tablename, file=t.cast(t.TextIO, fh_data))
table_count += 1
logger.info(f"Successfully exported {table_count} system tables")
return path


class SystemTableImporter:
"""
Import schema and data about CrateDB system tables.
"""

def __init__(self, dburi: str, source: Path, data_format: DataFormat = "jsonl", debug: bool = False):
self.dburi = dburi
self.source = source
self.data_format = data_format
self.debug = debug
self.adapter = DatabaseAdapter(dburi=self.dburi)

def table_names(self):
path_schema = self.source / ExportSettings.SCHEMA_PATH
names = []
for item in path_schema.glob("*.sql"):
name = item.name.replace(ExportSettings.TABLE_FILENAME_PREFIX, "").replace(".sql", "")
names.append(name)
return names

def load(self):
path_schema = self.source / ExportSettings.SCHEMA_PATH
path_data = self.source / ExportSettings.DATA_PATH

if not path_schema.exists():
raise FileNotFoundError(f"Path does not exist: {path_schema}")

logger.info(f"Importing system tables from: {self.source}")

for tablename in tqdm(self.table_names()):
tablename_restored = ExportSettings.TABLE_FILENAME_PREFIX + tablename

path_table_schema = path_schema / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.sql"
path_table_data = path_data / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.{self.data_format}"

# Skip empty files.
if Path(path_table_data).stat().st_size == 0:
continue

# Invoke SQL DDL.
schema_sql = path_table_schema.read_text()
self.adapter.run_sql(schema_sql)

# Load data.
try:
df: pl.DataFrame = self.load_table(path_table_data)
df.write_database(table_name=tablename_restored, connection=self.dburi, if_table_exists="append")
except Exception as ex:
error_logger(self.debug)(f"Importing table failed: {tablename}. Reason: {ex}")

# df.to_pandas().to_sql(name=tablename, con=self.adapter.engine, if_exists="append", index=False) # noqa: ERA001, E501

def load_table(self, path: Path) -> pl.DataFrame:
if path.suffix in [".jsonl"]:
return pl.read_ndjson(path)
elif path.suffix in [".parquet", ".pq"]:
return pl.read_parquet(path)
else:
raise NotImplementedError(f"Input format not implemented: {path.suffix}")


patch_encoder()
2 changes: 2 additions & 0 deletions cratedb_toolkit/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from cratedb_toolkit.util.cli import boot_click

from .cfr.cli import cli as cfr_cli
from .cluster.cli import cli as cloud_cli
from .io.cli import cli as io_cli
from .job.cli import cli_list_jobs
Expand All @@ -19,6 +20,7 @@ def cli(ctx: click.Context, verbose: bool, debug: bool):
return boot_click(ctx, verbose, debug)


cli.add_command(cfr_cli, name="cfr")
cli.add_command(cloud_cli, name="cluster")
cli.add_command(io_cli, name="load")
cli.add_command(shell_cli, name="shell")
Expand Down
40 changes: 40 additions & 0 deletions cratedb_toolkit/sqlalchemy/patch.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import calendar
import datetime as dt
import json
import typing as t
from decimal import Decimal
from uuid import UUID

import numpy as np
import sqlalchemy as sa


Expand Down Expand Up @@ -32,3 +38,37 @@ def get_table_names(self, connection: sa.Connection, schema: t.Optional[str] = N
return get_table_names_dist(self, connection=connection, schema=schema, **kw)

CrateDialect.get_table_names = get_table_names # type: ignore


def patch_encoder():
import crate.client.http

crate.client.http.CrateJsonEncoder = CrateJsonEncoderWithNumPy


class CrateJsonEncoderWithNumPy(json.JSONEncoder):
epoch_aware = dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc)
epoch_naive = dt.datetime(1970, 1, 1)

def default(self, o):
# Vanilla CrateDB Python.
if isinstance(o, (Decimal, UUID)):
return str(o)
if isinstance(o, dt.datetime):
if o.tzinfo is not None:
delta = o - self.epoch_aware
else:
delta = o - self.epoch_naive
return int(delta.microseconds / 1000.0 + (delta.seconds + delta.days * 24 * 3600) * 1000.0)
if isinstance(o, dt.date):
return calendar.timegm(o.timetuple()) * 1000

# NumPy ndarray and friends.
# https://stackoverflow.com/a/49677241
if isinstance(o, np.integer):
return int(o)
elif isinstance(o, np.floating):
return float(o)
elif isinstance(o, np.ndarray):
return o.tolist()
return json.JSONEncoder.default(self, o)
Loading

0 comments on commit f0902b2

Please sign in to comment.