Skip to content

Commit

Permalink
fix(CLI): re-implement "node clean" (#223)
Browse files Browse the repository at this point in the history
A re-implementation of "node clean" (or just "clean" in alpenhorn-1).

The following changes have been made to the options available for this
command:

* `--acq`: Can be used multiple times to define a set of ACQs to clean
* `-archive-ok`, `--force`: The `--force` flag used to do two different
things: (1) skip the confirmation prompt (2) allow cleaning on archive
nodes. Now `--force` only does (1), and a new flag `--archive-ok` can be
used for (2). Also see the operations discussion below. for `--force`.
* `--check`.  Added.  See the operations discussion below.
* `--days`. Re-implemented using the new-ish `ArchiveFile.registered`
field. This makes it behave differently from `alpenhorn-1`, where it
looks at the times in the Info tables, but we don't generally use it in
CHIME, so this change shouldn't be a problem. Closes #89
* `--size`.  Ported from alpenhorn-1.  Closes #92.
* `--target`. Can be used multiple times. Files must be in ALL target
groups to be cleaned. Closes #65

The clean operation runs twice, which is why it's in a separate helper
function: once to find and print what would happen and then ask for
confirmation, and then a second time to actually do the update.

These are run completely independently because we don't want to hold the
`database_proxy.atomic` lock while waiting for the user to confirm the
operation. (Hence the state of the database may change while we aren't
in a transaction.)

The first run can be skipped with `--force`. The second run can be
skipped with `--check`, or by declining the confirmation.
  • Loading branch information
ketiltrout authored Nov 13, 2024
1 parent cfdbeda commit 989be6f
Show file tree
Hide file tree
Showing 3 changed files with 935 additions and 211 deletions.
214 changes: 3 additions & 211 deletions alpenhorn/cli/node/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
from ...db import ArchiveAcq, ArchiveFile, ArchiveFileCopy, StorageGroup, StorageNode

from .activate import activate
from .deactivate import deactivate
from .clean import clean
from .create import create
from .deactivate import deactivate
from .list import list_
from .modify import modify
from .rename import rename
Expand All @@ -29,6 +30,7 @@ def cli():


cli.add_command(activate, "activate")
cli.add_command(clean, "clean")
cli.add_command(create, "create")
cli.add_command(deactivate, "deactivate")
cli.add_command(list_, "list")
Expand Down Expand Up @@ -449,213 +451,3 @@ def verify(node_name, md5, fixdb, acq):
status += 2 if missing_files else 0

exit(status)


@cli.command()
@click.argument("node_name", metavar="NODE")
@click.option(
"--days", "-d", help="Clean files older than <days>.", type=int, default=None
)
@click.option("--cancel", help="Cancel files marked for cleaning", is_flag=True)
@click.option("--force", "-f", help="Force cleaning on an archive node.", is_flag=True)
@click.option("--now", "-n", help="Force immediate removal.", is_flag=True)
@click.option(
"--target",
metavar="TARGET_GROUP",
default=None,
type=str,
help="Only clean files already available in this group.",
)
@click.option(
"--acq", metavar="ACQ", default=None, type=str, help="Limit removal to acquisition."
)
def clean(node_name, days, cancel, force, now, target, acq):
"""Clean up NODE by marking older files as potentially removable.
Files will never be removed until they are available on at least two
archival nodes.
Normally, files are marked to be removed only if the disk space on the node
is running low. With the --now flag, they will be made available for
immediate removal. Either way, they will *never* be actually removed until
there are sufficient archival copies.
Using the --cancel option undoes previous cleaning operations by marking
files that are still on the node and that were marked as available for
removal as "must keep".
If --target is specified, the command will only affect files already
available in the TARGET_GROUP. This is useful for cleaning out intermediate
locations such as transport disks.
Using the --days flag will only clean correlator and housekeeping
files which have a timestamp associated with them. It will not
touch other types. If no --days flag is given, all files will be
considered for removal.
"""

if cancel and now:
print("Options --cancel and --now are mutually exclusive.")
exit(1)

config_connect()

try:
this_node = StorageNode.get(StorageNode.name == node_name)
except pw.DoesNotExist:
print('Storage node "%s" does not exist.' % node_name)
exit(1)

# Check to see if we are on an archive node
if this_node.storage_type == "A":
if force or click.confirm(
'DANGER: run clean on archive node "%s"?' % node_name
):
print('"%s" is an archive node. Forcing clean.' % node_name)
else:
print('Cannot clean archive node "%s" without forcing.' % node_name)
exit(1)

# Select FileCopys on this node.
files = ArchiveFileCopy.select(ArchiveFileCopy.id).where(
ArchiveFileCopy.node == this_node, ArchiveFileCopy.has_file == "Y"
)

if now:
# In 'now' cleaning, every copy will be set to wants_file="No", if it
# wasn't already
files = files.where(ArchiveFileCopy.wants_file != "N")
elif cancel:
# Undo any "Maybe" and "No" want_files and reset them to "Yes"
files = files.where(ArchiveFileCopy.wants_file != "Y")
else:
# In regular cleaning, we only mark as "Maybe" want_files that are
# currently "Yes", but leave "No" unchanged
files = files.where(ArchiveFileCopy.wants_file == "Y")

# Limit to acquisition
if acq is not None:
try:
acq = ArchiveAcq.get(name=acq)
except pw.DoesNotExit:
raise RuntimeError("Specified acquisition %s does not exist" % acq)

files_in_acq = ArchiveFile.select().where(ArchiveFile.acq == acq)

files = files.where(ArchiveFileCopy.file << files_in_acq)

# If the target option has been specified, only clean files also available there...
if target is not None:
# Fetch a reference to the target group
try:
target_group = StorageGroup.get(name=target)
except pw.DoesNotExist:
raise RuntimeError('Target group "%s" does not exist in the DB.' % target)

# First get the nodes at the destination...
nodes_at_target = StorageNode.select().where(StorageNode.group == target_group)

# Then use this to get a list of all files at the destination...
files_at_target = (
ArchiveFile.select()
.join(ArchiveFileCopy)
.where(
ArchiveFileCopy.node << nodes_at_target,
ArchiveFileCopy.has_file == "Y",
)
)

# Only match files that are also available at the target
files = files.where(ArchiveFileCopy.file << files_at_target)

# If --days has been set we need to restrict to files older than the given
# time. This only works for a few particular file types
if days is not None and days > 0:
# TODO: how to handle file types now?
raise "'--days' feature has not been implemented yet"

# # Get the time for the oldest files to keep
# oldest = datetime.datetime.now() - datetime.timedelta(days)
# oldest_unix = ephemeris.ensure_unix(oldest)
#
# # List of filetypes we want to update, needs a human readable name and a
# # FileInfo table.
# filetypes = [ ['correlation', di.CorrFileInfo],
# ['housekeeping', di.HKFileInfo] ]
#
# file_ids = []
#
# # Iterate over file types for cleaning
# for name, infotable in filetypes:
#
# # Filter to fetch only ones with a start time older than `oldest`
# oldfiles = files.join(ac.ArchiveFile).join(infotable)\
# .where(infotable.start_time < oldest_unix)
#
# local_file_ids = list(oldfiles)
#
# # Get number of correlation files
# count = oldfiles.count()
#
# if count > 0:
# size_bytes = ar.ArchiveFileCopy.select().where(ar.ArchiveFileCopy.id << local_file_ids)\
# .join(ac.ArchiveFile).aggregate(pw.fn.Sum(ac.ArchiveFile.size_b))
#
# size_gb = int(size_bytes) / 2**30.0
#
# print "Cleaning up %i %s files (%.1f GB) from %s " % (count, name, size_gb, node_name)
#
# file_ids += local_file_ids

# If days is not set, then just select all files that meet the requirements so far
else:
file_ids = [f for f in files]
count = files.count()

if count > 0:
size_bytes = (
ArchiveFileCopy.select()
.where(ArchiveFileCopy.id << file_ids)
.join(ArchiveFile)
.select(pw.fn.Sum(ArchiveFile.size_b))
.scalar()
)

size_gb = int(size_bytes) / 1073741824.0

print(
'Mark %i files (%.1f GB) from "%s" %s.'
% (
count,
size_gb,
node_name,
"for keeping" if cancel else "available for removal",
)
)

# If there are any files to clean, ask for confirmation and the mark them in
# the database for removal
if len(file_ids) > 0:
if force or click.confirm(" Are you sure?"):
print(" Marking...")

if cancel:
state = "Y"
else:
state = "N" if now else "M"

update = ArchiveFileCopy.update(wants_file=state).where(
ArchiveFileCopy.id << file_ids
)

n = update.execute()

if cancel:
print("Marked %i files for keeping." % n)
else:
print("Marked %i files available for removal." % n)

else:
print(" Cancelled. Exit without changes.")
else:
print("No files selected for cleaning on %s." % node_name)
Loading

0 comments on commit 989be6f

Please sign in to comment.