Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(CLI): "file find" command #261

Merged
merged 1 commit into from
Nov 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions alpenhorn/cli/file/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from .clean import clean
from .create import create
from .find import find
from .import_ import import_
from .list import list_
from .modify import modify
Expand All @@ -19,6 +20,7 @@ def cli():

cli.add_command(clean, "clean")
cli.add_command(create, "create")
cli.add_command(find, "find")
cli.add_command(import_, "import")
cli.add_command(list_, "list")
cli.add_command(modify, "modify")
Expand Down
6 changes: 3 additions & 3 deletions alpenhorn/cli/file/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
cli_option,
not_both,
requires_other,
resolve_acqs,
resolve_acq,
validate_md5,
)

Expand Down Expand Up @@ -101,7 +101,7 @@ def create(name, acq_name, from_file, md5, prefix, size):
# database consistency).
with database_proxy.atomic():
# Resolve acq
acq = resolve_acqs([acq_name])[0]
acq = resolve_acq(acq_name)

# Check that "name" isn't already a file in acq
try:
Expand Down Expand Up @@ -144,7 +144,7 @@ def create(name, acq_name, from_file, md5, prefix, size):
# Create the ArchiveFile
with database_proxy.atomic():
# Resolve acq
acq = resolve_acqs([acq_name])[0]
acq = resolve_acq(acq_name)

# Check that "name" isn't already a file in acq. We _may_ have already
# done this once, but we need to do it again inside this transaction.
Expand Down
91 changes: 91 additions & 0 deletions alpenhorn/cli/file/find.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
"""alpenhorn file list command."""

import click
import peewee as pw
from tabulate import tabulate

from ...common.util import pretty_bytes
from ...db import (
ArchiveAcq,
ArchiveFile,
ArchiveFileCopy,
ArchiveFileCopyRequest,
StorageGroup,
StorageNode,
)
from ..cli import echo, pretty_time
from ..options import (
cli_option,
not_both,
both_or_neither,
files_in_nodes,
files_in_groups,
resolve_acq,
resolve_group,
resolve_node,
state_constraint,
)


@click.command()
@cli_option("acq")
@click.option("--corrupt", is_flag=True, help="Find corrupt files.")
@click.option(
"--group",
metavar="GROUP",
multiple=True,
help="May be specified multiple times. Limit search to GROUP(s).",
)
@click.option("--healthy", is_flag=True, help="Find healthy files.")
@click.option("--missing", is_flag=True, help="Find missing files.")
@click.option(
"--node",
metavar="NODE",
multiple=True,
help="May be specified multiple times. Limit search to NODE(s).",
)
@click.option("--suspect", is_flag=True, help="Find suspect files.")
@click.pass_context
def find(ctx, acq, corrupt, group, healthy, missing, node, suspect):
"""Find Files on Nodes.

Without options, lists every healthy File on every Node in the
Data Index. Options can be used to change and limit the list.
"""

nodes = resolve_node(node)
groups = resolve_group(group)
acqs = resolve_acq(acq)

state_expr = state_constraint(
corrupt=corrupt, healthy=healthy, missing=missing, suspect=suspect
)
if state_expr is None:
state_expr = state_constraint(healthy=True)

# Query
query = ArchiveFileCopy.select().join(ArchiveFile).where(state_expr)

# Add limits, when given
if acqs:
query = query.where(ArchiveFile.acq << acqs)
if groups:
query = query.switch(ArchiveFileCopy).join(StorageNode)
# Groups and Nodes need to be or'd together if both provided
if nodes:
query = query.where(
(ArchiveFileCopy.node << nodes) | (StorageNode.group << groups)
)
else:
query = query.where(StorageNode.group << groups)
elif nodes:
query = query.where(ArchiveFileCopy.node << nodes)

data = []
for copy in query.execute():
data.append((copy.file.path, copy.node.name, copy.state))

if data:
echo(tabulate(sorted(data), headers=["File", "Node", "State"]))
else:
echo("No match.")
8 changes: 4 additions & 4 deletions alpenhorn/cli/file/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
both_or_neither,
files_in_nodes,
files_in_groups,
resolve_acqs,
resolve_acq,
resolve_group,
resolve_node,
state_constraint,
Expand Down Expand Up @@ -221,9 +221,9 @@ def list_(
detail_group = None
if details and not from_:
if node and not group and len(nodes) == 1:
detail_node = nodes[0]
detail_node = nodes.pop()
elif group and not node and len(groups) == 1:
detail_group = groups[0]
detail_group = groups.pop()

# The negative selection list
if absent_node and absent_group:
Expand Down Expand Up @@ -300,7 +300,7 @@ def list_(

# Apply the --acq limit, if given
if acq:
query = query.where(ArchiveFile.acq << resolve_acqs(acq))
query = query.where(ArchiveFile.acq << resolve_acq(acq))

# Apply syncability, if present
if syncable_files is not None:
Expand Down
4 changes: 2 additions & 2 deletions alpenhorn/cli/group/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
files_in_groups,
not_both,
requires_other,
resolve_acqs,
resolve_acq,
resolve_group,
)

Expand Down Expand Up @@ -383,7 +383,7 @@ def run_query(
raise click.ClickException("No such group: " + node_name)

# Resolve acqs
acqs = resolve_acqs(acq)
acqs = resolve_acq(acq)

# Cancel and sync are different enough that they've been broken up into two functions:
if cancel:
Expand Down
4 changes: 2 additions & 2 deletions alpenhorn/cli/node/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
cli_option,
files_in_groups,
not_both,
resolve_acqs,
resolve_acq,
resolve_node,
resolve_group,
)
Expand Down Expand Up @@ -97,7 +97,7 @@ def _run_query(
ctx.exit()

# Resolve acqs
acqs = resolve_acqs(acq)
acqs = resolve_acq(acq)

# Find all candidate file copies
query = ArchiveFileCopy.select().where(ArchiveFileCopy.node == node, has_file)
Expand Down
4 changes: 2 additions & 2 deletions alpenhorn/cli/node/verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from ...common.util import pretty_bytes
from ...db import ArchiveAcq, ArchiveFile, ArchiveFileCopy, database_proxy
from ..options import cli_option, not_both, resolve_acqs, resolve_node, state_constraint
from ..options import cli_option, not_both, resolve_acq, resolve_node, state_constraint
from ..cli import check_then_update, echo


Expand Down Expand Up @@ -48,7 +48,7 @@ def _run_query(
node = resolve_node(name)

# Resolve acqs
acqs = resolve_acqs(acq)
acqs = resolve_acq(acq)

# Find all candidate file copies
query = ArchiveFileCopy.select(ArchiveFileCopy.id).where(
Expand Down
38 changes: 24 additions & 14 deletions alpenhorn/cli/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,70 +262,80 @@ def requires_other(
raise click.UsageError(f"--{opt1_name} may only be used with --{opt2_name}")


def resolve_group(group: str | list[str]) -> StorageGroup | list[StorageGroup]:
def resolve_group(group: str | list[str]) -> StorageGroup | set[StorageGroup]:
"""Convert group name(s) into StorageGroup(s).

If given a single `str`, returns a single `StorageGroup`.
Otherwise, should be given a list of str and will return a
list of StorageGroups.
set of StorageGroups.

If any name can't be resolved, raises ClickException.
"""
one_group = isinstance(group, str)
if one_group:
group = [group]

groups = []
groups = set()
for name in group:
try:
groups.append(StorageGroup.get(name=name))
groups.add(StorageGroup.get(name=name))
except pw.DoesNotExist:
raise click.ClickException("no such group: " + name)

if one_group:
return groups[0]
return groups.pop()
return groups


def resolve_node(node: str | list[str]) -> StorageNode | list[StorageNode]:
def resolve_node(node: str | list[str]) -> StorageNode | set[StorageNode]:
"""Convert node name(s) into StorageNode(s).

If given a single `str`, returns a single `StorageNode`.
Otherwise, should be given a list of str and will return a
list of StorageNodes.
set of StorageNodes.

If any name can't be resolved, raises ClickException.
"""
one_node = isinstance(node, str)
if one_node:
node = [node]

nodes = []
nodes = set()
for name in node:
try:
nodes.append(StorageNode.get(name=name))
nodes.add(StorageNode.get(name=name))
except pw.DoesNotExist:
raise click.ClickException("no such node: " + name)

if one_node:
return nodes[0]
return nodes.pop()
return nodes


def resolve_acqs(acq: list[str]) -> list[ArchiveAcq]:
def resolve_acq(acq: str | list[str]) -> ArchiveAcq | set[ArchiveAcq]:
"""Convert --acq list to ArchiveAcq list.

If the input list is empty, so is the output list.
If given a single `str`, returns a single `ArchiveAcq`.
Otherwise, should be given a list of str and will return a
set of ArchiveAcqs.

Raises `click.ClickException` if a non-existent acqusition was
provided.
"""
acqs = []

one_acq = isinstance(acq, str)
if one_acq:
acq = [acq]

acqs = set()
for acqname in acq:
try:
acqs.append(ArchiveAcq.get(name=acqname))
acqs.add(ArchiveAcq.get(name=acqname))
except pw.DoesNotExist:
raise click.ClickException("No such acquisition: " + acqname)

if one_acq:
return acqs.pop()
return acqs


Expand Down
Loading