Skip to content

Commit

Permalink
feat(scout): add scout command to the cli (#83)
Browse files Browse the repository at this point in the history
* feat(scout): add scout command to the cli

* test(cli): fixed by adding scout lines

Signed-off-by: Tarik Zegmott <[email protected]>

* style(scout): style change

Signed-off-by: Tarik Zegmott <[email protected]>

---------

Signed-off-by: Tarik Zegmott <[email protected]>
  • Loading branch information
tjzegmott authored May 31, 2024
1 parent e7e70e8 commit 5cb811f
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 4 deletions.
7 changes: 4 additions & 3 deletions dtcli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pkg_resources import get_distribution
from rich import console, pretty

from dtcli import clear, config, ls, ps, pull
from dtcli import clear, config, ls, ps, pull, scout
from dtcli.utilities import utilities

pretty.install()
Expand Down Expand Up @@ -36,11 +36,12 @@ def version():
)


cli.add_command(clear.clear)
cli.add_command(config.config)
cli.add_command(ls.list, aliases=["ls"])
cli.add_command(ps.ps)
cli.add_command(pull.pull)
cli.add_command(clear.clear)
cli.add_command(config.config)
cli.add_command(scout.scout)


def check_version() -> None:
Expand Down
2 changes: 1 addition & 1 deletion dtcli/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def procure(config: Path = CONFIG, key: Optional[str] = None) -> Any:
Dict[str, Any]: Configuration.
"""
try:
with open(CONFIG.as_posix()) as stream:
with open(config.as_posix()) as stream:
configuration = yaml.safe_load(stream)
if key:
return configuration[key]
Expand Down
158 changes: 158 additions & 0 deletions dtcli/scout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
"""Datatrail Scout Command."""

import logging
from typing import List

import click
import requests
from cadcutils.exceptions import BadRequestException
from rich.console import Console
from rich.table import Table

from dtcli.config import procure
from dtcli.ls import list as ls
from dtcli.utilities import cadcclient
from dtcli.utilities.utilities import set_log_level, validate_scope

logger = logging.getLogger("scout")

console = Console()
error_console = Console(stderr=True, style="bold red")


@click.command(name="scout", help="Scout a dataset.")
@click.argument("dataset", required=True, type=click.STRING, nargs=1)
@click.argument("scopes", required=False, type=click.STRING, nargs=-1)
@click.option("-v", "--verbose", count=True, help="Verbosity: v=INFO, vv=DEBUG.")
@click.option("-q", "--quiet", is_flag=True, help="Set log level to ERROR.")
@click.pass_context
def scout(
ctx: click.Context,
dataset: str,
scopes: List[str],
verbose: int,
quiet: bool,
):
"""Scout a dataset.
Args:
ctx (click.Context): Click context.
dataset (str): Name of dataset.
scopes (List[str]): Scopes of dataset.
verbose (int): Verbosity: v=INFO, vv=DUBUG.
quiet (bool): Set log level to ERROR.
Returns:
None
"""
# Set logging level.
set_log_level(logger, verbose, quiet)
logger.debug("`scout` called with:")
logger.debug(f"dataset: {dataset} [{type(dataset)}]")
logger.debug(f"scopes: {scopes} [{type(scopes)}]")
logger.debug(f"verbose: {verbose} [{type(verbose)}]")
logger.debug(f"quiet: {quiet} [{type(quiet)}]")

# Check if scopes are valid.
if scopes:
logger.debug(f"Scopes limited to: {list(scopes)}")
try:
if not all([validate_scope(scope) for scope in scopes]):
error_console.print("A scope is invalid.")
console.print("Valid scopes are:")
ctx.invoke(ls)
return None
except Exception as e:
error_console.print(e)
return None

# Load configuration.
try:
config = procure()
server = config["server"]
logger.debug("Configuration loaded successfully.")
except Exception:
logger.error(
"No configuration file found. Create one with `datatrail config init`."
)
return {"error": "No config. Create one with `datatrail config init`."}

# Scout dataset.
endpoint = (
f"/query/dataset/scout?name={dataset}"
if not scopes
else f"/query/dataset/scout?name={dataset}&{'&'.join([f'scopes={s}' for s in scopes])}" # noqa: E501
)
url = server + endpoint
logger.debug(f"URL: {url}")
response = requests.get(url)
data = response.json()
logger.debug(f"Data: {data}")

for scope in data.keys():
basepath = data.get(scope).get("basepath")
query = f"select count(*) from inventory.Artifact where uri like 'cadc:CHIMEFRB/{basepath}%'" # noqa: E501
try:
count, _ = cadcclient.query(query)
count = int(count[0])
except BadRequestException as error:
error_console.print("Query failed.")
error_console.print(error)
return None
data[scope]["observed"]["minoc"] = count

keys_missing_in_observed = list(
set(data[scope]["expected"].keys()) - set(data[scope]["observed"].keys())
)
keys_missing_in_expected = list(
set(data[scope]["observed"].keys()) - set(data[scope]["expected"].keys())
)

for key in keys_missing_in_observed:
data[scope]["observed"][key] = 0

for key in keys_missing_in_expected:
data[scope]["expected"][key] = 0

show_scout_results(dataset, data)


def show_scout_results(dataset: str, data: dict):
"""Create and display a table with scout results.
Args:
dataset: Name of dataset.
data: Data to display.
"""
# Display results.
scopes = list(data.keys())
storage_elements = list(data[scopes[0]]["observed"].keys())
table = Table(
title=f"Scout Results for {dataset}",
header_style="magenta",
title_style="bold magenta",
)
table.add_column("Scope", style="bold")
for se in storage_elements:
table.add_column(se, style="bold")

for scope in scopes:
# Observed
row = [scope]
for se in storage_elements:
row.append(str(data[scope]["observed"][se]))
table.add_row(*row, style="blue")

# Expected
row = [scope]
for se in storage_elements:
row.append(str(data[scope]["expected"][se]))
table.add_row(*row, style="yellow", end_section=True)

console.print(table)
console.print("Legend: [blue]Observed[/blue], [yellow]Expected[/yellow]")
console.print(
"NOTE: In the case where more files are expected at a site other than \
minoc, that this may be due to the file type filtering when querying each site. This \
is a known limitation of the current implementation.",
)
112 changes: 112 additions & 0 deletions dtcli/utilities/cadcclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,3 +252,115 @@ def size(directory: str, namespace: str = "cadc:CHIMEFRB", timeout: int = 60) ->
content = buffer.getvalue()
sys.stdout = sys.__stdout__
return float(content.split("\n")[0])


def dataset_md5s(
directory: str,
namespace: str = "cadc:CHIMEFRB",
timeout: int = 60,
verbose: int = 0,
) -> Dict[str, str]:
"""Get list of files in a directory.
Args:
directory (str): Directory to get the size of.
certfile (str, optional): Certificate file. Defaults to None.
namespace (str, optional): Minoc Namespace. Defaults to "cadc:CHIMEFRB".
timeout (int, optional): Timeout. Defaults to 60.
verbose (int, optional): Verbosity. Defaults to 0.
Returns:
Dict[str, str]: Dictionary of file paths and their md5 checksums.
Example:
>>> dataset_md5s("data/gbo/baseband/raw/2024/01/10/astro_350955086")
"""
# Set logging level.
logger.setLevel("WARNING")
if verbose == 1:
logger.setLevel("INFO")
elif verbose > 1:
logger.setLevel("DEBUG")

query = f"select uri,contentChecksum from inventory.Artifact where uri like '{namespace}/{directory}%'" # noqa
query = query.replace("//", "/")
logger.info(f"Running query: {query}")
buffer = StringIO()
sys.stdout = buffer
_, _, queryClient = _connect()
queryClient.query( # type: ignore
query=query,
output_file=None,
response_format="csv",
tmptable=None,
lang="ADQL",
timeout=timeout,
data_only=True,
no_column_names=True,
)
content = buffer.getvalue()
sys.stdout = sys.__stdout__
paths = []
md5s = []
for line in content.split("\n"):
if line == "":
continue
path = line.split(",")[0].replace(namespace + "/", "")
try:
md5 = line.split(",")[1].replace("md5:", "")
except IndexError:
md5 = ""
paths.append(path)
md5s.append(md5)
data: Dict[str, str] = {}
for p, m in zip(paths, md5s):
data[p] = m
return data


def query(
query: str,
namespace: str = "cadc:CHIMEFRB",
timeout: int = 60,
verbose: int = 0,
) -> List[Any]:
"""Get list of files in a directory.
Args:
query (str): SQL query.
certfile (str, optional): Certificate file. Defaults to None.
namespace (str, optional): Minoc Namespace. Defaults to "cadc:CHIMEFRB".
timeout (int, optional): Timeout. Defaults to 60.
verbose (int, optional): Verbosity. Defaults to 0.
Returns:
List[str]: List of files in the directory.
Example:
>>> size("/data/chime/intensity/raw/2023/01/01/")
"""
# Set logging level.
logger.setLevel("WARNING")
if verbose == 1:
logger.setLevel("INFO")
elif verbose > 1:
logger.setLevel("DEBUG")

query = query.replace("//", "/")
logger.info(f"Running query: {query}")
buffer = StringIO()
sys.stdout = buffer
_, _, queryClient = _connect()
queryClient.query( # type: ignore
query=query,
output_file=None,
response_format="csv",
tmptable=None,
lang="ADQL",
timeout=timeout,
data_only=True,
no_column_names=True,
)
content = buffer.getvalue()
sys.stdout = sys.__stdout__
return [line.split(",") for line in content.split("\n")]
1 change: 1 addition & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def test_cli_help(runner: CliRunner) -> None:
list (ls) List scopes & datasets
ps Details of a dataset.
pull Download a dataset.
scout Scout a dataset.
version Show versions.
"""
assert result.exit_code == 0
Expand Down

0 comments on commit 5cb811f

Please sign in to comment.