From 5cb811f416c6fe099cf2405835ad867df125a62d Mon Sep 17 00:00:00 2001 From: Tarik Zegmott Date: Fri, 31 May 2024 15:17:28 -0400 Subject: [PATCH] feat(scout): add scout command to the cli (#83) * feat(scout): add scout command to the cli * test(cli): fixed by adding scout lines Signed-off-by: Tarik Zegmott * style(scout): style change Signed-off-by: Tarik Zegmott --------- Signed-off-by: Tarik Zegmott --- dtcli/cli.py | 7 +- dtcli/config.py | 2 +- dtcli/scout.py | 158 ++++++++++++++++++++++++++++++++++ dtcli/utilities/cadcclient.py | 112 ++++++++++++++++++++++++ tests/test_cli.py | 1 + 5 files changed, 276 insertions(+), 4 deletions(-) create mode 100644 dtcli/scout.py diff --git a/dtcli/cli.py b/dtcli/cli.py index 2110b5a..d57c099 100644 --- a/dtcli/cli.py +++ b/dtcli/cli.py @@ -5,7 +5,7 @@ from pkg_resources import get_distribution from rich import console, pretty -from dtcli import clear, config, ls, ps, pull +from dtcli import clear, config, ls, ps, pull, scout from dtcli.utilities import utilities pretty.install() @@ -36,11 +36,12 @@ def version(): ) +cli.add_command(clear.clear) +cli.add_command(config.config) cli.add_command(ls.list, aliases=["ls"]) cli.add_command(ps.ps) cli.add_command(pull.pull) -cli.add_command(clear.clear) -cli.add_command(config.config) +cli.add_command(scout.scout) def check_version() -> None: diff --git a/dtcli/config.py b/dtcli/config.py index e69b80e..d234bba 100644 --- a/dtcli/config.py +++ b/dtcli/config.py @@ -134,7 +134,7 @@ def procure(config: Path = CONFIG, key: Optional[str] = None) -> Any: Dict[str, Any]: Configuration. """ try: - with open(CONFIG.as_posix()) as stream: + with open(config.as_posix()) as stream: configuration = yaml.safe_load(stream) if key: return configuration[key] diff --git a/dtcli/scout.py b/dtcli/scout.py new file mode 100644 index 0000000..9df49ba --- /dev/null +++ b/dtcli/scout.py @@ -0,0 +1,158 @@ +"""Datatrail Scout Command.""" + +import logging +from typing import List + +import click +import requests +from cadcutils.exceptions import BadRequestException +from rich.console import Console +from rich.table import Table + +from dtcli.config import procure +from dtcli.ls import list as ls +from dtcli.utilities import cadcclient +from dtcli.utilities.utilities import set_log_level, validate_scope + +logger = logging.getLogger("scout") + +console = Console() +error_console = Console(stderr=True, style="bold red") + + +@click.command(name="scout", help="Scout a dataset.") +@click.argument("dataset", required=True, type=click.STRING, nargs=1) +@click.argument("scopes", required=False, type=click.STRING, nargs=-1) +@click.option("-v", "--verbose", count=True, help="Verbosity: v=INFO, vv=DEBUG.") +@click.option("-q", "--quiet", is_flag=True, help="Set log level to ERROR.") +@click.pass_context +def scout( + ctx: click.Context, + dataset: str, + scopes: List[str], + verbose: int, + quiet: bool, +): + """Scout a dataset. + + Args: + ctx (click.Context): Click context. + dataset (str): Name of dataset. + scopes (List[str]): Scopes of dataset. + verbose (int): Verbosity: v=INFO, vv=DUBUG. + quiet (bool): Set log level to ERROR. + + Returns: + None + """ + # Set logging level. + set_log_level(logger, verbose, quiet) + logger.debug("`scout` called with:") + logger.debug(f"dataset: {dataset} [{type(dataset)}]") + logger.debug(f"scopes: {scopes} [{type(scopes)}]") + logger.debug(f"verbose: {verbose} [{type(verbose)}]") + logger.debug(f"quiet: {quiet} [{type(quiet)}]") + + # Check if scopes are valid. + if scopes: + logger.debug(f"Scopes limited to: {list(scopes)}") + try: + if not all([validate_scope(scope) for scope in scopes]): + error_console.print("A scope is invalid.") + console.print("Valid scopes are:") + ctx.invoke(ls) + return None + except Exception as e: + error_console.print(e) + return None + + # Load configuration. + try: + config = procure() + server = config["server"] + logger.debug("Configuration loaded successfully.") + except Exception: + logger.error( + "No configuration file found. Create one with `datatrail config init`." + ) + return {"error": "No config. Create one with `datatrail config init`."} + + # Scout dataset. + endpoint = ( + f"/query/dataset/scout?name={dataset}" + if not scopes + else f"/query/dataset/scout?name={dataset}&{'&'.join([f'scopes={s}' for s in scopes])}" # noqa: E501 + ) + url = server + endpoint + logger.debug(f"URL: {url}") + response = requests.get(url) + data = response.json() + logger.debug(f"Data: {data}") + + for scope in data.keys(): + basepath = data.get(scope).get("basepath") + query = f"select count(*) from inventory.Artifact where uri like 'cadc:CHIMEFRB/{basepath}%'" # noqa: E501 + try: + count, _ = cadcclient.query(query) + count = int(count[0]) + except BadRequestException as error: + error_console.print("Query failed.") + error_console.print(error) + return None + data[scope]["observed"]["minoc"] = count + + keys_missing_in_observed = list( + set(data[scope]["expected"].keys()) - set(data[scope]["observed"].keys()) + ) + keys_missing_in_expected = list( + set(data[scope]["observed"].keys()) - set(data[scope]["expected"].keys()) + ) + + for key in keys_missing_in_observed: + data[scope]["observed"][key] = 0 + + for key in keys_missing_in_expected: + data[scope]["expected"][key] = 0 + + show_scout_results(dataset, data) + + +def show_scout_results(dataset: str, data: dict): + """Create and display a table with scout results. + + Args: + dataset: Name of dataset. + data: Data to display. + """ + # Display results. + scopes = list(data.keys()) + storage_elements = list(data[scopes[0]]["observed"].keys()) + table = Table( + title=f"Scout Results for {dataset}", + header_style="magenta", + title_style="bold magenta", + ) + table.add_column("Scope", style="bold") + for se in storage_elements: + table.add_column(se, style="bold") + + for scope in scopes: + # Observed + row = [scope] + for se in storage_elements: + row.append(str(data[scope]["observed"][se])) + table.add_row(*row, style="blue") + + # Expected + row = [scope] + for se in storage_elements: + row.append(str(data[scope]["expected"][se])) + table.add_row(*row, style="yellow", end_section=True) + + console.print(table) + console.print("Legend: [blue]Observed[/blue], [yellow]Expected[/yellow]") + console.print( + "NOTE: In the case where more files are expected at a site other than \ +minoc, that this may be due to the file type filtering when querying each site. This \ +is a known limitation of the current implementation.", + ) diff --git a/dtcli/utilities/cadcclient.py b/dtcli/utilities/cadcclient.py index 44d4118..52900fd 100644 --- a/dtcli/utilities/cadcclient.py +++ b/dtcli/utilities/cadcclient.py @@ -252,3 +252,115 @@ def size(directory: str, namespace: str = "cadc:CHIMEFRB", timeout: int = 60) -> content = buffer.getvalue() sys.stdout = sys.__stdout__ return float(content.split("\n")[0]) + + +def dataset_md5s( + directory: str, + namespace: str = "cadc:CHIMEFRB", + timeout: int = 60, + verbose: int = 0, +) -> Dict[str, str]: + """Get list of files in a directory. + + Args: + directory (str): Directory to get the size of. + certfile (str, optional): Certificate file. Defaults to None. + namespace (str, optional): Minoc Namespace. Defaults to "cadc:CHIMEFRB". + timeout (int, optional): Timeout. Defaults to 60. + verbose (int, optional): Verbosity. Defaults to 0. + + Returns: + Dict[str, str]: Dictionary of file paths and their md5 checksums. + + Example: + >>> dataset_md5s("data/gbo/baseband/raw/2024/01/10/astro_350955086") + """ + # Set logging level. + logger.setLevel("WARNING") + if verbose == 1: + logger.setLevel("INFO") + elif verbose > 1: + logger.setLevel("DEBUG") + + query = f"select uri,contentChecksum from inventory.Artifact where uri like '{namespace}/{directory}%'" # noqa + query = query.replace("//", "/") + logger.info(f"Running query: {query}") + buffer = StringIO() + sys.stdout = buffer + _, _, queryClient = _connect() + queryClient.query( # type: ignore + query=query, + output_file=None, + response_format="csv", + tmptable=None, + lang="ADQL", + timeout=timeout, + data_only=True, + no_column_names=True, + ) + content = buffer.getvalue() + sys.stdout = sys.__stdout__ + paths = [] + md5s = [] + for line in content.split("\n"): + if line == "": + continue + path = line.split(",")[0].replace(namespace + "/", "") + try: + md5 = line.split(",")[1].replace("md5:", "") + except IndexError: + md5 = "" + paths.append(path) + md5s.append(md5) + data: Dict[str, str] = {} + for p, m in zip(paths, md5s): + data[p] = m + return data + + +def query( + query: str, + namespace: str = "cadc:CHIMEFRB", + timeout: int = 60, + verbose: int = 0, +) -> List[Any]: + """Get list of files in a directory. + + Args: + query (str): SQL query. + certfile (str, optional): Certificate file. Defaults to None. + namespace (str, optional): Minoc Namespace. Defaults to "cadc:CHIMEFRB". + timeout (int, optional): Timeout. Defaults to 60. + verbose (int, optional): Verbosity. Defaults to 0. + + Returns: + List[str]: List of files in the directory. + + Example: + >>> size("/data/chime/intensity/raw/2023/01/01/") + """ + # Set logging level. + logger.setLevel("WARNING") + if verbose == 1: + logger.setLevel("INFO") + elif verbose > 1: + logger.setLevel("DEBUG") + + query = query.replace("//", "/") + logger.info(f"Running query: {query}") + buffer = StringIO() + sys.stdout = buffer + _, _, queryClient = _connect() + queryClient.query( # type: ignore + query=query, + output_file=None, + response_format="csv", + tmptable=None, + lang="ADQL", + timeout=timeout, + data_only=True, + no_column_names=True, + ) + content = buffer.getvalue() + sys.stdout = sys.__stdout__ + return [line.split(",") for line in content.split("\n")] diff --git a/tests/test_cli.py b/tests/test_cli.py index 49fcd1b..041f3d7 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -53,6 +53,7 @@ def test_cli_help(runner: CliRunner) -> None: list (ls) List scopes & datasets ps Details of a dataset. pull Download a dataset. + scout Scout a dataset. version Show versions. """ assert result.exit_code == 0