From 989be6fc0fa714f34c58368ac0af383130b0061b Mon Sep 17 00:00:00 2001 From: ketiltrout Date: Wed, 13 Nov 2024 11:47:40 -0800 Subject: [PATCH] fix(CLI): re-implement "node clean" (#223) A re-implementation of "node clean" (or just "clean" in alpenhorn-1). The following changes have been made to the options available for this command: * `--acq`: Can be used multiple times to define a set of ACQs to clean * `-archive-ok`, `--force`: The `--force` flag used to do two different things: (1) skip the confirmation prompt (2) allow cleaning on archive nodes. Now `--force` only does (1), and a new flag `--archive-ok` can be used for (2). Also see the operations discussion below. for `--force`. * `--check`. Added. See the operations discussion below. * `--days`. Re-implemented using the new-ish `ArchiveFile.registered` field. This makes it behave differently from `alpenhorn-1`, where it looks at the times in the Info tables, but we don't generally use it in CHIME, so this change shouldn't be a problem. Closes #89 * `--size`. Ported from alpenhorn-1. Closes #92. * `--target`. Can be used multiple times. Files must be in ALL target groups to be cleaned. Closes #65 The clean operation runs twice, which is why it's in a separate helper function: once to find and print what would happen and then ask for confirmation, and then a second time to actually do the update. These are run completely independently because we don't want to hold the `database_proxy.atomic` lock while waiting for the user to confirm the operation. (Hence the state of the database may change while we aren't in a transaction.) The first run can be skipped with `--force`. The second run can be skipped with `--check`, or by declining the confirmation. --- alpenhorn/cli/node/__init__.py | 214 +-------------- alpenhorn/cli/node/clean.py | 443 +++++++++++++++++++++++++++++ tests/cli/node/test_clean.py | 489 +++++++++++++++++++++++++++++++++ 3 files changed, 935 insertions(+), 211 deletions(-) create mode 100644 alpenhorn/cli/node/clean.py create mode 100644 tests/cli/node/test_clean.py diff --git a/alpenhorn/cli/node/__init__.py b/alpenhorn/cli/node/__init__.py index cb8a99c96..bbf32810d 100644 --- a/alpenhorn/cli/node/__init__.py +++ b/alpenhorn/cli/node/__init__.py @@ -13,8 +13,9 @@ from ...db import ArchiveAcq, ArchiveFile, ArchiveFileCopy, StorageGroup, StorageNode from .activate import activate -from .deactivate import deactivate +from .clean import clean from .create import create +from .deactivate import deactivate from .list import list_ from .modify import modify from .rename import rename @@ -29,6 +30,7 @@ def cli(): cli.add_command(activate, "activate") +cli.add_command(clean, "clean") cli.add_command(create, "create") cli.add_command(deactivate, "deactivate") cli.add_command(list_, "list") @@ -449,213 +451,3 @@ def verify(node_name, md5, fixdb, acq): status += 2 if missing_files else 0 exit(status) - - -@cli.command() -@click.argument("node_name", metavar="NODE") -@click.option( - "--days", "-d", help="Clean files older than .", type=int, default=None -) -@click.option("--cancel", help="Cancel files marked for cleaning", is_flag=True) -@click.option("--force", "-f", help="Force cleaning on an archive node.", is_flag=True) -@click.option("--now", "-n", help="Force immediate removal.", is_flag=True) -@click.option( - "--target", - metavar="TARGET_GROUP", - default=None, - type=str, - help="Only clean files already available in this group.", -) -@click.option( - "--acq", metavar="ACQ", default=None, type=str, help="Limit removal to acquisition." -) -def clean(node_name, days, cancel, force, now, target, acq): - """Clean up NODE by marking older files as potentially removable. - - Files will never be removed until they are available on at least two - archival nodes. - - Normally, files are marked to be removed only if the disk space on the node - is running low. With the --now flag, they will be made available for - immediate removal. Either way, they will *never* be actually removed until - there are sufficient archival copies. - - Using the --cancel option undoes previous cleaning operations by marking - files that are still on the node and that were marked as available for - removal as "must keep". - - If --target is specified, the command will only affect files already - available in the TARGET_GROUP. This is useful for cleaning out intermediate - locations such as transport disks. - - Using the --days flag will only clean correlator and housekeeping - files which have a timestamp associated with them. It will not - touch other types. If no --days flag is given, all files will be - considered for removal. - """ - - if cancel and now: - print("Options --cancel and --now are mutually exclusive.") - exit(1) - - config_connect() - - try: - this_node = StorageNode.get(StorageNode.name == node_name) - except pw.DoesNotExist: - print('Storage node "%s" does not exist.' % node_name) - exit(1) - - # Check to see if we are on an archive node - if this_node.storage_type == "A": - if force or click.confirm( - 'DANGER: run clean on archive node "%s"?' % node_name - ): - print('"%s" is an archive node. Forcing clean.' % node_name) - else: - print('Cannot clean archive node "%s" without forcing.' % node_name) - exit(1) - - # Select FileCopys on this node. - files = ArchiveFileCopy.select(ArchiveFileCopy.id).where( - ArchiveFileCopy.node == this_node, ArchiveFileCopy.has_file == "Y" - ) - - if now: - # In 'now' cleaning, every copy will be set to wants_file="No", if it - # wasn't already - files = files.where(ArchiveFileCopy.wants_file != "N") - elif cancel: - # Undo any "Maybe" and "No" want_files and reset them to "Yes" - files = files.where(ArchiveFileCopy.wants_file != "Y") - else: - # In regular cleaning, we only mark as "Maybe" want_files that are - # currently "Yes", but leave "No" unchanged - files = files.where(ArchiveFileCopy.wants_file == "Y") - - # Limit to acquisition - if acq is not None: - try: - acq = ArchiveAcq.get(name=acq) - except pw.DoesNotExit: - raise RuntimeError("Specified acquisition %s does not exist" % acq) - - files_in_acq = ArchiveFile.select().where(ArchiveFile.acq == acq) - - files = files.where(ArchiveFileCopy.file << files_in_acq) - - # If the target option has been specified, only clean files also available there... - if target is not None: - # Fetch a reference to the target group - try: - target_group = StorageGroup.get(name=target) - except pw.DoesNotExist: - raise RuntimeError('Target group "%s" does not exist in the DB.' % target) - - # First get the nodes at the destination... - nodes_at_target = StorageNode.select().where(StorageNode.group == target_group) - - # Then use this to get a list of all files at the destination... - files_at_target = ( - ArchiveFile.select() - .join(ArchiveFileCopy) - .where( - ArchiveFileCopy.node << nodes_at_target, - ArchiveFileCopy.has_file == "Y", - ) - ) - - # Only match files that are also available at the target - files = files.where(ArchiveFileCopy.file << files_at_target) - - # If --days has been set we need to restrict to files older than the given - # time. This only works for a few particular file types - if days is not None and days > 0: - # TODO: how to handle file types now? - raise "'--days' feature has not been implemented yet" - - # # Get the time for the oldest files to keep - # oldest = datetime.datetime.now() - datetime.timedelta(days) - # oldest_unix = ephemeris.ensure_unix(oldest) - # - # # List of filetypes we want to update, needs a human readable name and a - # # FileInfo table. - # filetypes = [ ['correlation', di.CorrFileInfo], - # ['housekeeping', di.HKFileInfo] ] - # - # file_ids = [] - # - # # Iterate over file types for cleaning - # for name, infotable in filetypes: - # - # # Filter to fetch only ones with a start time older than `oldest` - # oldfiles = files.join(ac.ArchiveFile).join(infotable)\ - # .where(infotable.start_time < oldest_unix) - # - # local_file_ids = list(oldfiles) - # - # # Get number of correlation files - # count = oldfiles.count() - # - # if count > 0: - # size_bytes = ar.ArchiveFileCopy.select().where(ar.ArchiveFileCopy.id << local_file_ids)\ - # .join(ac.ArchiveFile).aggregate(pw.fn.Sum(ac.ArchiveFile.size_b)) - # - # size_gb = int(size_bytes) / 2**30.0 - # - # print "Cleaning up %i %s files (%.1f GB) from %s " % (count, name, size_gb, node_name) - # - # file_ids += local_file_ids - - # If days is not set, then just select all files that meet the requirements so far - else: - file_ids = [f for f in files] - count = files.count() - - if count > 0: - size_bytes = ( - ArchiveFileCopy.select() - .where(ArchiveFileCopy.id << file_ids) - .join(ArchiveFile) - .select(pw.fn.Sum(ArchiveFile.size_b)) - .scalar() - ) - - size_gb = int(size_bytes) / 1073741824.0 - - print( - 'Mark %i files (%.1f GB) from "%s" %s.' - % ( - count, - size_gb, - node_name, - "for keeping" if cancel else "available for removal", - ) - ) - - # If there are any files to clean, ask for confirmation and the mark them in - # the database for removal - if len(file_ids) > 0: - if force or click.confirm(" Are you sure?"): - print(" Marking...") - - if cancel: - state = "Y" - else: - state = "N" if now else "M" - - update = ArchiveFileCopy.update(wants_file=state).where( - ArchiveFileCopy.id << file_ids - ) - - n = update.execute() - - if cancel: - print("Marked %i files for keeping." % n) - else: - print("Marked %i files available for removal." % n) - - else: - print(" Cancelled. Exit without changes.") - else: - print("No files selected for cleaning on %s." % node_name) diff --git a/alpenhorn/cli/node/clean.py b/alpenhorn/cli/node/clean.py new file mode 100644 index 000000000..ccf001daf --- /dev/null +++ b/alpenhorn/cli/node/clean.py @@ -0,0 +1,443 @@ +"""alpenhorn node clean""" + +import click +import datetime +import peewee as pw + +from ...db import ( + StorageGroup, + StorageNode, + ArchiveAcq, + ArchiveFile, + ArchiveFileCopy, + database_proxy, + utcnow, +) +from ...common.util import pretty_bytes +from ..options import not_both +from ..cli import echo + + +def _run_query( + update, + ctx, + name, + acq, + archive_ok, + days: datetime.datetime, + size: int, + target, + clean_goal: str, + warn: bool = True, +) -> None: + """Actually do the clean query, either in check mode or update mode. + + We separate this from check() because we typically have to do the + whole thing twice, once at the start to get the results to present + to the user, and then again to actually do the update. + + We can't do it all at once because we don't want to be in the + middle of a transaction wile waiting for the user to confirm the clean. + + Parameters + ---------- + Most parameters are simply the CLI options passed in by click to `clean`. + Differing ones are: + + update: + True if we're performing the update; False otherwise. + clean_goal: + One of 'Y', 'M', 'N'. Indicating what we're setting wants_file to. + Replaces the `now` and `cancel` parameters. + days: + Converted to datetime + size: + Converted to bytes + warn: + True the first time this function is called (and we should print + warnings). + """ + + with database_proxy.atomic(): + # Check name + try: + node = StorageNode.get(name=name) + except pw.DoesNotExist: + raise click.ClickException("no such node: " + name) + + # Check to see if we are on an archive node and whether + # --archive-ok was given + if node.archive: + if archive_ok: + if warn: + echo(f'DANGER: "{name}" is an archive node. Forcing clean.') + else: + raise click.ClickException(f'Cannot clean archive node "{name}".') + + # Resolve targets + target_files = None + for gname in target: + # Resolve group name + try: + group = StorageGroup.get(name=gname) + except pw.DoesNotExist: + raise click.ClickException("No such target group: " + gname) + + # Get files in target + query = ( + ArchiveFile.select(ArchiveFile.id) + .join(ArchiveFileCopy) + .join(StorageNode) + .where( + StorageNode.group == group, + ArchiveFileCopy.has_file == "Y", + ArchiveFileCopy.wants_file == "Y", + ) + ) + + # If we already have a list of files on the target, intersect that + # list with this node + if target_files: + query = query.where(ArchiveFile.id << target_files) + + # Execute the query and record the result + target_files = set(query.scalars()) + + # Are there any target files left? + if not target_files: + echo("Nothing to do: no matching files in target.") + ctx.exit() + + # Resolve acqs + acqs = [] + for acqname in acq: + try: + acqs.append(ArchiveAcq.get(name=acqname)) + except pw.DoesNotExist: + raise click.ClickException("No such acquisition: " + acqname) + + # Find all candidate file copies + query = ArchiveFileCopy.select().where( + ArchiveFileCopy.node == node, ArchiveFileCopy.has_file == "Y" + ) + + # Join to ArchiveFile, if necessary + if acqs or days or size: + query = query.join(ArchiveFile) + + # Add a wants_file constraint, if appropriate (with --size) + # we need to skip this: we need to scan through all the files + # so the totals are correct. In that case, this constraint + # is handled when looping through copies. + if not size: + # Without a size, we filter out everything that's not a + # candidate for update + if clean_goal == "M": + # deferred: only mark as "M" want_files that are + # currently "Y", but leave "N" unchanged + query = query.where(ArchiveFileCopy.wants_file == "Y") + else: + # Otherwise, we're setting everything that's different + # from our goal to our goal (Y or N) + query = query.where(ArchiveFileCopy.wants_file != clean_goal) + + # Limit to acquisitions, if any + if acqs: + query = query.where(ArchiveFile.acq << acqs) + + # Limit to registration time, if requested. This will implicitly + # drop any file copies where the registration time is unknown. + if days: + query = query.where(ArchiveFile.registered > days) + + # This will contain stats of what we're going, so we can summarise + # later. The outer key is the current value of `wants_file` for a + # file, or "S" for files already cleaned in --size mode + results = { + "Y": {"count": 0, "size": 0}, + "M": {"count": 0, "size": 0}, + "N": {"count": 0, "size": 0}, + "S": {"count": 0, "size": 0}, + } + update_ids = [] + + # For --size check + total = 0 + + # Now loop over files, gathering candidates and statisticts + for copy in query.order_by(ArchiveFileCopy.id).execute(): + # Skip if we have --targets and this isn't one of them + if target_files and copy.file.id not in target_files: + continue + + # Handle null file size + file_size = 0 if copy.file.size_b is None else copy.file.size_b + + # If we have a size, we need to check whether the copy + # already has the right wants_file. If it does, we + # add its size to the total, but then skip it. + if size: + total += file_size + if copy.wants_file == clean_goal or ( + clean_goal == "M" and copy.wants_file == "N" + ): + results["S"]["count"] += 1 + results["S"]["size"] += file_size + + # If we're over size, we can stop + if size and total >= size: + break + + # Otherwise, skip to the next file + continue + + # Add to results list + results[copy.wants_file]["count"] += 1 + results[copy.wants_file]["size"] += file_size + + # Also save the copy id itself, if we're updating + if update: + update_ids.append(copy.id) + + # If we're over size, we can stop + if size and total >= size: + break + + # Sum everything + count = results["Y"]["count"] + results["M"]["count"] + results["N"]["count"] + size = results["Y"]["size"] + results["M"]["size"] + results["N"]["size"] + + # Is there anything to do? + if not count: + # Is it because a size constraint was already satisfied? + if results["S"]["count"] > 0: + echo("No files to clean (size constraint already satisfied).") + else: + echo("No files to clean.") + ctx.exit() + + # Now report + + # Do we need to print a break-down? + if clean_goal == "M": + breakdown = False + elif clean_goal == "Y": + breakdown = ( + count != results["M"]["count"] and count != results["N"]["count"] + ) + else: + breakdown = ( + count != results["M"]["count"] and count != results["Y"]["count"] + ) + + # Grammar + files = "files" if count != 1 else "file" + stop = ":" if breakdown else "." + + if clean_goal == "Y": + verb = "Cancelling" if update else "Would cancel" + gerund = " for cleaning" + elif clean_goal == "M": + verb = "Marking" if update else "Would mark" + gerund = " for deferred cleaning" + else: # clean_goal == 'N' + verb = "Releasing" if update else "Would release" + gerund = "" + + echo(f"{verb} {count} {files} ({pretty_bytes(size)}){gerund}{stop}") + + # Files already satisfying --size + if size and results["S"]["count"]: + files = "files" if results["S"]["count"] != 1 else "file" + echo( + f'{results["S"]["count"]} {files} ({pretty_bytes(results["S"]["size"])}) already contributing to size constraint.' + ) + + if breakdown: + if results["Y"]["count"]: + files = "files" if results["Y"]["count"] != 1 else "file" + echo( + f' {results["Y"]["count"]} present {files} ({pretty_bytes(results["Y"]["size"])})' + ) + if results["M"]["count"]: + files = "files" if results["M"]["count"] != 1 else "file" + echo( + f' {results["M"]["count"]} marked {files} ({pretty_bytes(results["M"]["size"])})' + ) + if results["N"]["count"]: + files = "files" if results["N"]["count"] != 1 else "file" + echo( + f' {results["N"]["count"]} released {files} ({pretty_bytes(results["N"]["size"])})' + ) + echo("") + + # Now update, if needed + if update: + count = ( + ArchiveFileCopy.update(wants_file=clean_goal) + .where(ArchiveFileCopy.id << update_ids) + .execute() + ) + files = "files" if count != 1 else "file" + echo(f"Updated {count} {files}.") + + +@click.command() +@click.argument("name", metavar="NODE") +@click.option( + "--acq", + metavar="ACQ", + default=None, + multiple=True, + help="Only clean files in acquisition ACQ. May be specified multiple times.", +) +@click.option( + "--archive-ok", help="Run the clean, even if NODE is an archive node.", is_flag=True +) +@click.option("--cancel", "-x", help="Cancel files marked for cleaning", is_flag=True) +@click.option( + "--check", + "-c", + help="Only check (and print) what would be cleaned, don't actually do anything." + " Incompatible with --force", + is_flag=True, +) +@click.option( + "--days", + "-d", + metavar="COUNT", + type=int, + help="Only clean files registered more than COUNT days ago.", +) +@click.option( + "--force", + "-f", + help="Force cleaning (skips confirmation). Incompatible with --check", + is_flag=True, +) +@click.option("--now", "-n", help="Force immediate removal.", is_flag=True) +@click.option( + "--size", + "-s", + metavar="SIZE", + type=float, + help="Stop cleaning files once the total size cleaned reaces SIZE GiB.", +) +@click.option( + "--target", + metavar="GROUP", + multiple=True, + help="Clean only files already in GROUP. If specified multiple times, clean only" + "files in all GROUPs.", +) +@click.pass_context +def clean(ctx, name, acq, archive_ok, cancel, check, days, force, now, size, target): + """Remove files from a Storage Node. + + There are two ways alpenhorn can schedule a file for cleaning: + + * Normally, files are cleaned by *marking* them for discretionary removal. + This tells the alpenhorn daemon that it may remove the marked files to + stay above the minimum free space set for NODE. If NODE has no minimum + free space set, then these marked files will never be deleted. The + minimum free space can be set using the "--min-avail" option to the + "node modify" command. + + * The other option, which can be selected with the "--now" flag, is to + *release* files for immediate removal. Released files will be removed + by the daemon as soon as possible. In this case, files marked for + discretionary removal will also be released for immediate removal if + they satisfy the specified criteria. + + Either way, if scheduled for cleaning, files will not be removed from a + Storage Node unless until they are available on at least two (other) + archive nodes. + + By default, alpenhorn will refuse run this command on an archive node. + This restriction may be overridden with the "--archive-ok" flag. This + restrition is also ignored when cancelling cleaning. + + In normal operation, this command will schedule *all* files on NODE for + cleaning, but the operation may be limited with the "--acq", "--days", + "--size", and "--target" options. + + Multiple --acq options may be given to provide a list of acquisitions to + restrict cleaning to. Files not in one of the specified acqusitions will + not be considered by this command. + + If --days is specified, only files registered more than COUNT days ago will + be considered for cleaning. + + If --target is specified, one or more times, the command will only affect + files already available in the target GROUP(s). This is useful for cleaning + out intermediate locations such as transport disks. If multiple GROUPs are + given, a file must be in _all_ GROUPs to be considered for cleaning. + + A SIZE given with --size indicates a target quantity of data to clean. + This restriction is applied _after_ the limits imposed by --acq and --target. + alpenhorn will ensure at least SIZE GiB of files are scheduled for cleaning. + Files are considered in registration order (the order in which they were first + added to the data index). In this mode, alpenhorn takes in to consideration + files previously scheduled for cleaning but not yet removed: if more than + SIZE GiB of files are already scheduled for cleaning, no new files will be + added. + + \b + CANCELLING CLEANING + ------------------- + + You may unschedule files for cleaning using the "--cancel" flag, which can + be similarly restricted with "--acq", "--days", and "--target", but not + "--size". + + Both kinds of cleaning (discretionary and immediate) will be cancelled, + but only for files not yet removed from the NODE by the daemon. + """ + # usage checks. + not_both(cancel, "cancel", now, "now") + not_both(cancel, "cancel", size is not None, "size") + not_both(check, "check", force, "force") + if days is not None and days <= 0: + raise click.UsageError("--days must be positive") + if size is not None and size <= 0: + raise click.UsageError("--size must be positive") + + # Convert days to a datetime + if days: + days = utcnow() - datetime.timedelta(days=-days) + + # Convert size to bytes + if size: + size = int(size * 2**30) + + # What's the user's goal? i.e. what are we going to be setting + # wants_file to? + if now: + clean_goal = "N" + elif cancel: + clean_goal = "Y" + else: + clean_goal = "M" + + # If we're forcing, skip the check phase + if not force: + # Doesn't return if there's nothing to clean + _run_query( + False, ctx, name, acq, archive_ok, days, size, target, clean_goal, warn=True + ) + + # With --check, we're done + if check: + ctx.exit() + + # Ask for confirmation + if not click.confirm("Continue?"): + echo("\nCancelled.") + ctx.exit() + echo() + + # Execute the query, if not in --check mode + if not check: + _run_query( + True, ctx, name, acq, archive_ok, days, size, target, clean_goal, warn=force + ) diff --git a/tests/cli/node/test_clean.py b/tests/cli/node/test_clean.py new file mode 100644 index 000000000..e4511ae35 --- /dev/null +++ b/tests/cli/node/test_clean.py @@ -0,0 +1,489 @@ +"""Test CLI: alpenhorn node clean""" + +import pytest +from datetime import timedelta + +from alpenhorn.db import ( + StorageGroup, + StorageNode, + ArchiveAcq, + ArchiveFile, + ArchiveFileCopy, + utcnow, +) + + +def test_no_node(clidb, cli): + """Test clean with no node.""" + + cli(1, ["node", "clean", "TEST"]) + + +def test_cancel_now(clidb, cli): + """Test --cancel --now fails.""" + + group = StorageGroup.create(name="Group") + node = StorageNode.create(name="NODE", group=group, storage_type="F") + + cli(2, ["node", "clean", "NODE", "--cancel", "--now"]) + + +def test_cancel_size(clidb, cli): + """Test --cancel --size fails.""" + + group = StorageGroup.create(name="Group") + node = StorageNode.create(name="NODE", group=group, storage_type="F") + + cli(2, ["node", "clean", "NODE", "--cancel", "--size=3"]) + + +def test_cancel_size(clidb, cli): + """Test --check --force fails.""" + + group = StorageGroup.create(name="Group") + node = StorageNode.create(name="NODE", group=group, storage_type="F") + + cli(2, ["node", "clean", "NODE", "--check", "--force"]) + + +def test_bad_days(clidb, cli): + """Test non-positive --days.""" + + group = StorageGroup.create(name="Group") + node = StorageNode.create(name="NODE", group=group, storage_type="F") + + cli(2, ["node", "clean", "NODE", "--days=0"]) + cli(2, ["node", "clean", "NODE", "--days=-1"]) + + +def test_bad_size(clidb, cli): + """Test non-positive --size.""" + + group = StorageGroup.create(name="Group") + node = StorageNode.create(name="NODE", group=group, storage_type="F") + + cli(2, ["node", "clean", "NODE", "--size=0"]) + cli(2, ["node", "clean", "NODE", "--size=-1"]) + + +def test_bad_size(clidb, cli): + """Test non-positive --size.""" + + group = StorageGroup.create(name="Group") + node = StorageNode.create(name="NODE", group=group, storage_type="F") + + cli(2, ["node", "clean", "NODE", "--size=0"]) + cli(2, ["node", "clean", "NODE", "--size=-1"]) + + +def test_cancel(clidb, cli): + """Test cancelling a clean.""" + + group = StorageGroup.create(name="Group") + node = StorageNode.create(name="NODE", group=group, storage_type="F") + acq = ArchiveAcq.create(name="Acq") + file = ArchiveFile.create(name="File", acq=acq, size_b=1234) + ArchiveFileCopy.create(node=node, file=file, has_file="Y", wants_file="Y") + + cli(0, ["node", "clean", "NODE"], input="N\n") + + # Clean should not have run + assert ArchiveFileCopy.get(node=node, file=file).wants_file == "Y" + + +def test_run(clidb, cli): + """Test a simple clean.""" + + group = StorageGroup.create(name="Group") + node = StorageNode.create(name="NODE", group=group, storage_type="F") + acq = ArchiveAcq.create(name="Acq") + fileY = ArchiveFile.create(name="FileY", acq=acq, size_b=2345) + ArchiveFileCopy.create(node=node, file=fileY, has_file="Y", wants_file="Y") + fileN = ArchiveFile.create(name="FileN", acq=acq, size_b=3456) + ArchiveFileCopy.create(node=node, file=fileN, has_file="Y", wants_file="N") + + cli(0, ["node", "clean", "NODE"], input="Y\n") + + # Clean should have run, but only on the 'Y' file + assert ArchiveFileCopy.get(node=node, file=fileY).wants_file == "M" + assert ArchiveFileCopy.get(node=node, file=fileN).wants_file == "N" + + +def test_now(clidb, cli): + """Test clean --now.""" + + group = StorageGroup.create(name="Group") + node = StorageNode.create(name="NODE", group=group, storage_type="F") + acq = ArchiveAcq.create(name="Acq") + fileY = ArchiveFile.create(name="FileY", acq=acq, size_b=4567) + ArchiveFileCopy.create(node=node, file=fileY, has_file="Y", wants_file="Y") + fileM = ArchiveFile.create(name="FileM", acq=acq, size_b=5678) + ArchiveFileCopy.create(node=node, file=fileM, has_file="Y", wants_file="M") + + cli(0, ["node", "clean", "NODE", "--now"], input="Y\n") + + # Clean should have run + assert ArchiveFileCopy.get(node=node, file=fileY).wants_file == "N" + + +def test_cancel(clidb, cli): + """Test clean --cancel.""" + + group = StorageGroup.create(name="Group") + node = StorageNode.create(name="NODE", group=group, storage_type="F") + acq = ArchiveAcq.create(name="Acq") + fileM = ArchiveFile.create(name="FileM", acq=acq, size_b=6789) + ArchiveFileCopy.create(node=node, file=fileM, has_file="Y", wants_file="M") + fileN = ArchiveFile.create(name="FileN", acq=acq, size_b=1234) + ArchiveFileCopy.create(node=node, file=fileN, has_file="Y", wants_file="N") + + cli(0, ["node", "clean", "NODE", "--cancel"], input="Y\n") + + # Clean should have run + assert ArchiveFileCopy.get(node=node, file=fileM).wants_file == "Y" + assert ArchiveFileCopy.get(node=node, file=fileN).wants_file == "Y" + + +def test_check(clidb, cli): + """Test clean --check.""" + + group = StorageGroup.create(name="Group") + node = StorageNode.create(name="NODE", group=group, storage_type="F") + acq = ArchiveAcq.create(name="Acq") + file = ArchiveFile.create(name="File", acq=acq, size_b=2345) + ArchiveFileCopy.create(node=node, file=file, has_file="Y", wants_file="Y") + + cli(0, ["node", "clean", "NODE", "--check"]) + + # Clean should not have run + assert ArchiveFileCopy.get(node=node, file=file).wants_file == "Y" + + +def test_force(clidb, cli): + """Test clean --force.""" + + group = StorageGroup.create(name="Group") + node = StorageNode.create(name="NODE", group=group, storage_type="F") + acq = ArchiveAcq.create(name="Acq") + file = ArchiveFile.create(name="File", acq=acq, size_b=3456) + ArchiveFileCopy.create(node=node, file=file, has_file="Y", wants_file="Y") + + cli(0, ["node", "clean", "NODE", "--force"]) + + # Clean should have run + assert ArchiveFileCopy.get(node=node, file=file).wants_file == "M" + + +def test_archive(clidb, cli): + """Test clean on an archive node.""" + + group = StorageGroup.create(name="Group") + node = StorageNode.create(name="NODE", group=group, storage_type="A") + acq = ArchiveAcq.create(name="Acq") + file = ArchiveFile.create(name="File", acq=acq, size_b=4567) + ArchiveFileCopy.create(node=node, file=file, has_file="Y", wants_file="Y") + + cli(1, ["node", "clean", "NODE"], input="Y\n") + + # Clean should not have run + assert ArchiveFileCopy.get(node=node, file=file).wants_file == "Y" + + +def test_archive_ok(clidb, cli): + """Test clean on an archive node with --archive-ok.""" + + group = StorageGroup.create(name="Group") + node = StorageNode.create(name="NODE", group=group, storage_type="A") + acq = ArchiveAcq.create(name="Acq") + file = ArchiveFile.create(name="File", acq=acq, size_b=5678) + ArchiveFileCopy.create(node=node, file=file, has_file="Y", wants_file="Y") + + cli(0, ["node", "clean", "NODE", "--archive-ok"], input="Y\n") + + # Clean should have run + assert ArchiveFileCopy.get(node=node, file=file).wants_file == "M" + + +def test_bad_target(clidb, cli): + """Test clean with a non-exsitent target.""" + + group = StorageGroup.create(name="Group1") + node = StorageNode.create(name="NODE", group=group, storage_type="F") + acq = ArchiveAcq.create(name="Acq") + file = ArchiveFile.create(name="File", acq=acq, size_b=6789) + ArchiveFileCopy.create(node=node, file=file, has_file="Y", wants_file="Y") + + cli(1, ["node", "clean", "NODE", "--force", "--target=Target"]) + + # File wasn't cleaned + assert ArchiveFileCopy.get(node=node, file=file).wants_file == "Y" + + +def test_target(clidb, cli): + """Test clean with some targets.""" + + group = StorageGroup.create(name="Group1") + node = StorageNode.create(name="NODE", group=group, storage_type="F") + target1 = StorageGroup.create(name="Target1") + tnode1 = StorageNode.create(name="TNode1", group=target1) + target2 = StorageGroup.create(name="Target2") + tnode2 = StorageNode.create(name="TNode2", group=target2) + target3 = StorageGroup.create(name="Target3") + tnode3 = StorageNode.create(name="TNode3", group=target3) + + acq = ArchiveAcq.create(name="Acq") + + # File1 is in Target1 + file1 = ArchiveFile.create(name="File1", acq=acq, size_b=1234) + ArchiveFileCopy.create(node=node, file=file1, has_file="Y", wants_file="Y") + ArchiveFileCopy.create(node=tnode1, file=file1, has_file="Y", wants_file="Y") + + # File2 is in Target2 + file2 = ArchiveFile.create(name="File2", acq=acq, size_b=2345) + ArchiveFileCopy.create(node=node, file=file2, has_file="Y", wants_file="Y") + ArchiveFileCopy.create(node=tnode2, file=file2, has_file="Y", wants_file="Y") + + # File3 is in Target1 and Target2 + file3 = ArchiveFile.create(name="File3", acq=acq, size_b=3456) + ArchiveFileCopy.create(node=node, file=file3, has_file="Y", wants_file="Y") + ArchiveFileCopy.create(node=tnode1, file=file3, has_file="Y", wants_file="Y") + ArchiveFileCopy.create(node=tnode2, file=file3, has_file="Y", wants_file="Y") + + # File4 is in Target3, but that's not one of the ones we're using as a target + file4 = ArchiveFile.create(name="File4", acq=acq, size_b=4567) + ArchiveFileCopy.create(node=node, file=file4, has_file="Y", wants_file="Y") + ArchiveFileCopy.create(node=tnode3, file=file4, has_file="Y", wants_file="Y") + + # File5 is in no target + file5 = ArchiveFile.create(name="File5", acq=acq, size_b=5678) + ArchiveFileCopy.create(node=node, file=file5, has_file="Y", wants_file="Y") + + cli(0, ["node", "clean", "NODE", "--force", "--target=Target1", "--target=Target2"]) + + # Only File3 was cleaned + assert ArchiveFileCopy.get(node=node, file=file1).wants_file == "Y" + assert ArchiveFileCopy.get(node=node, file=file2).wants_file == "Y" + assert ArchiveFileCopy.get(node=node, file=file3).wants_file == "M" + assert ArchiveFileCopy.get(node=node, file=file4).wants_file == "Y" + assert ArchiveFileCopy.get(node=node, file=file5).wants_file == "Y" + + +def test_empty_target(clidb, cli): + """Test clean with no files in the target.""" + + group = StorageGroup.create(name="Group1") + node = StorageNode.create(name="NODE", group=group, storage_type="F") + target1 = StorageGroup.create(name="Target1") + tnode1 = StorageNode.create(name="TNode1", group=target1) + target2 = StorageGroup.create(name="Target2") + tnode2 = StorageNode.create(name="TNode2", group=target2) + target3 = StorageGroup.create(name="Target3") + tnode3 = StorageNode.create(name="TNode3", group=target3) + + acq = ArchiveAcq.create(name="Acq") + + # File1 is in Target1 + file1 = ArchiveFile.create(name="File1", acq=acq, size_b=1234) + ArchiveFileCopy.create(node=node, file=file1, has_file="Y", wants_file="Y") + ArchiveFileCopy.create(node=tnode1, file=file1, has_file="Y", wants_file="Y") + + # File2 is in Target2 + file2 = ArchiveFile.create(name="File2", acq=acq, size_b=2345) + ArchiveFileCopy.create(node=node, file=file2, has_file="Y", wants_file="Y") + ArchiveFileCopy.create(node=tnode2, file=file2, has_file="Y", wants_file="Y") + + # File3 is in Target3, but that's not one of the ones we're using as a target + file3 = ArchiveFile.create(name="File3", acq=acq, size_b=3456) + ArchiveFileCopy.create(node=node, file=file3, has_file="Y", wants_file="Y") + ArchiveFileCopy.create(node=tnode3, file=file3, has_file="Y", wants_file="Y") + + cli(0, ["node", "clean", "NODE", "--force", "--target=Target1", "--target=Target2"]) + + # No files were cleaned + assert ArchiveFileCopy.get(node=node, file=file1).wants_file == "Y" + assert ArchiveFileCopy.get(node=node, file=file2).wants_file == "Y" + assert ArchiveFileCopy.get(node=node, file=file3).wants_file == "Y" + + +def test_bad_acq(clidb, cli): + """Test clean with a non-exsitent acq.""" + + group = StorageGroup.create(name="Group1") + node = StorageNode.create(name="NODE", group=group, storage_type="F") + acq = ArchiveAcq.create(name="Acq") + file = ArchiveFile.create(name="File", acq=acq, size_b=4567) + ArchiveFileCopy.create(node=node, file=file, has_file="Y", wants_file="Y") + + cli(1, ["node", "clean", "NODE", "--force", "--acq=BadAcq"]) + + # File wasn't cleaned + assert ArchiveFileCopy.get(node=node, file=file).wants_file == "Y" + + +def test_acq(clidb, cli): + """Test clean with some acq.""" + + group = StorageGroup.create(name="Group1") + node = StorageNode.create(name="NODE", group=group, storage_type="F") + + # File1 is in Acq1 + acq1 = ArchiveAcq.create(name="Acq1") + file1 = ArchiveFile.create(name="File", acq=acq1, size_b=5678) + ArchiveFileCopy.create(node=node, file=file1, has_file="Y", wants_file="Y") + + # File2 is in Acq2 + acq2 = ArchiveAcq.create(name="Acq2") + file2 = ArchiveFile.create(name="File", acq=acq2, size_b=6789) + ArchiveFileCopy.create(node=node, file=file2, has_file="Y", wants_file="Y") + + # File3 is in Acq3 + acq3 = ArchiveAcq.create(name="Acq3") + file3 = ArchiveFile.create(name="File", acq=acq3, size_b=1234) + ArchiveFileCopy.create(node=node, file=file3, has_file="Y", wants_file="Y") + + cli(0, ["node", "clean", "NODE", "--force", "--acq=Acq1", "--acq=Acq2"]) + + # Files 1 and 2 were cleaned + assert ArchiveFileCopy.get(node=node, file=file1).wants_file == "M" + assert ArchiveFileCopy.get(node=node, file=file2).wants_file == "M" + assert ArchiveFileCopy.get(node=node, file=file3).wants_file == "Y" + + +def test_empty_acq(clidb, cli): + """Test clean with no files in target acq.""" + + group = StorageGroup.create(name="Group1") + node = StorageNode.create(name="NODE", group=group, storage_type="F") + + # File1 is in Acq1 + acq1 = ArchiveAcq.create(name="Acq1") + file1 = ArchiveFile.create(name="File", acq=acq1, size_b=2345) + ArchiveFileCopy.create(node=node, file=file1, has_file="Y", wants_file="Y") + + # File2 is in Acq2 + acq2 = ArchiveAcq.create(name="Acq2") + file2 = ArchiveFile.create(name="File", acq=acq2, size_b=3456) + ArchiveFileCopy.create(node=node, file=file2, has_file="Y", wants_file="Y") + + # Acq3 is empty + acq3 = ArchiveAcq.create(name="Acq3") + + cli(0, ["node", "clean", "NODE", "--force", "--acq=Acq3"]) + + # No files were cleaned + assert ArchiveFileCopy.get(node=node, file=file1).wants_file == "Y" + assert ArchiveFileCopy.get(node=node, file=file2).wants_file == "Y" + + +def test_days(clidb, cli): + """Test clean with --days""" + + now = utcnow() + + group = StorageGroup.create(name="Group1") + node = StorageNode.create(name="NODE", group=group, storage_type="F") + + acq = ArchiveAcq.create(name="Acq") + + file1 = ArchiveFile.create( + name="File1", acq=acq, size_b=4567, registered=now - timedelta(days=-5) + ) + ArchiveFileCopy.create(node=node, file=file1, has_file="Y", wants_file="Y") + file2 = ArchiveFile.create( + name="File2", acq=acq, size_b=5678, registered=now - timedelta(days=-4) + ) + ArchiveFileCopy.create(node=node, file=file2, has_file="Y", wants_file="Y") + file3 = ArchiveFile.create( + name="File3", acq=acq, size_b=6789, registered=now - timedelta(days=-2) + ) + ArchiveFileCopy.create(node=node, file=file3, has_file="Y", wants_file="Y") + file4 = ArchiveFile.create( + name="File4", acq=acq, size_b=1234, registered=now - timedelta(days=-1) + ) + ArchiveFileCopy.create(node=node, file=file4, has_file="Y", wants_file="Y") + + cli(0, ["node", "clean", "NODE", "--force", "--days=3"]) + + # Only files 1 and 2 were cleaned + assert ArchiveFileCopy.get(node=node, file=file1).wants_file == "M" + assert ArchiveFileCopy.get(node=node, file=file2).wants_file == "M" + assert ArchiveFileCopy.get(node=node, file=file3).wants_file == "Y" + assert ArchiveFileCopy.get(node=node, file=file4).wants_file == "Y" + + +def test_size(clidb, cli): + """Test clean with --size""" + + group = StorageGroup.create(name="Group1") + node = StorageNode.create(name="NODE", group=group, storage_type="F") + + acq = ArchiveAcq.create(name="Acq") + + file1 = ArchiveFile.create(name="File1", acq=acq, size_b=1 * 2**30) + ArchiveFileCopy.create(node=node, file=file1, has_file="Y", wants_file="Y") + file2 = ArchiveFile.create(name="File2", acq=acq, size_b=2 * 2**30) + ArchiveFileCopy.create(node=node, file=file2, has_file="Y", wants_file="Y") + file3 = ArchiveFile.create(name="File3", acq=acq, size_b=3 * 2**30) + ArchiveFileCopy.create(node=node, file=file3, has_file="Y", wants_file="Y") + file4 = ArchiveFile.create(name="File4", acq=acq, size_b=4 * 2**30) + ArchiveFileCopy.create(node=node, file=file4, has_file="Y", wants_file="Y") + + cli(0, ["node", "clean", "NODE", "--force", "--size=5."]) + + # Only files 1, 2, 3 were cleaned (6 GiB) + assert ArchiveFileCopy.get(node=node, file=file1).wants_file == "M" + assert ArchiveFileCopy.get(node=node, file=file2).wants_file == "M" + assert ArchiveFileCopy.get(node=node, file=file3).wants_file == "M" + assert ArchiveFileCopy.get(node=node, file=file4).wants_file == "Y" + + +def test_size_part(clidb, cli): + """Test clean with --size partially satisfied""" + + group = StorageGroup.create(name="Group1") + node = StorageNode.create(name="NODE", group=group, storage_type="F") + + acq = ArchiveAcq.create(name="Acq") + + file1 = ArchiveFile.create(name="File1", acq=acq, size_b=1 * 2**30) + ArchiveFileCopy.create(node=node, file=file1, has_file="Y", wants_file="N") + file2 = ArchiveFile.create(name="File2", acq=acq, size_b=2 * 2**30) + ArchiveFileCopy.create(node=node, file=file2, has_file="Y", wants_file="Y") + file3 = ArchiveFile.create(name="File3", acq=acq, size_b=3 * 2**30) + ArchiveFileCopy.create(node=node, file=file3, has_file="Y", wants_file="Y") + file4 = ArchiveFile.create(name="File4", acq=acq, size_b=4 * 2**30) + ArchiveFileCopy.create(node=node, file=file4, has_file="Y", wants_file="Y") + + cli(0, ["node", "clean", "NODE", "--force", "--size=5."]) + + # Only files 2, 3 were cleaned (6 GiB) + assert ArchiveFileCopy.get(node=node, file=file1).wants_file == "N" + assert ArchiveFileCopy.get(node=node, file=file2).wants_file == "M" + assert ArchiveFileCopy.get(node=node, file=file3).wants_file == "M" + assert ArchiveFileCopy.get(node=node, file=file4).wants_file == "Y" + + +def test_size_part(clidb, cli): + """Test clean with --size already satisfied""" + + group = StorageGroup.create(name="Group1") + node = StorageNode.create(name="NODE", group=group, storage_type="F") + + acq = ArchiveAcq.create(name="Acq") + + file1 = ArchiveFile.create(name="File1", acq=acq, size_b=1 * 2**30) + ArchiveFileCopy.create(node=node, file=file1, has_file="Y", wants_file="N") + file2 = ArchiveFile.create(name="File2", acq=acq, size_b=2 * 2**30) + ArchiveFileCopy.create(node=node, file=file2, has_file="Y", wants_file="N") + file3 = ArchiveFile.create(name="File3", acq=acq, size_b=3 * 2**30) + ArchiveFileCopy.create(node=node, file=file3, has_file="Y", wants_file="Y") + file4 = ArchiveFile.create(name="File4", acq=acq, size_b=4 * 2**30) + ArchiveFileCopy.create(node=node, file=file4, has_file="Y", wants_file="Y") + + cli(0, ["node", "clean", "NODE", "--force", "--size=2."]) + + # No additional files were cleaned + assert ArchiveFileCopy.get(node=node, file=file1).wants_file == "N" + assert ArchiveFileCopy.get(node=node, file=file2).wants_file == "N" + assert ArchiveFileCopy.get(node=node, file=file3).wants_file == "Y" + assert ArchiveFileCopy.get(node=node, file=file4).wants_file == "Y"