diff --git a/inspector/inspector.py b/inspector/inspector.py index ca65a08..717bdcb 100644 --- a/inspector/inspector.py +++ b/inspector/inspector.py @@ -3,6 +3,7 @@ preserve memory for running the benchmarks. All other modules should be imported lazily, where needed. """ +from concurrent.futures import ThreadPoolExecutor import click import lib import logging @@ -126,32 +127,71 @@ def start(ctx, exclude, start_only): runner.create(vendor, {}, RESOURCE_OPTS.get(vendor) | dict(instance=server, instance_opts=instance_opts)) -@cli.command() -@click.pass_context -def cleanup(ctx): +def cleanup_task(vendor, server, data_dir): from datetime import datetime from sc_runner import runner - for srv in servers(): - vendor = srv.vendor_id - server = srv.api_reference - data_dir = os.path.join(ctx.parent.params["repo_path"], "data", vendor, server) - tasks = list(lib.get_tasks(vendor)) - if not tasks: + tasks = list(lib.get_tasks(vendor)) + if not tasks: + return + + # see if we have to destroy the resources in the Pulumi stack + destroy = "" + + start_times = [] + already_ended = [] + for task in tasks: + meta = lib.load_task_meta(task, data_dir=data_dir) + if not meta.start: continue - start_times = [] - # get the newest start time - for task in tasks: - meta = lib.load_task_meta(task, data_dir=data_dir) - if not meta.start: - continue - start_times.append(meta.start) - if start_times: - last_start = max(start_times) - # destroy the stack after a given amount of time - if datetime.now() - lib.DESTROY_AFTER >= last_start: - logging.info(f"Destroying {vendor}/{server}, last task start date: {last_start}") - runner.destroy(vendor, {}, RESOURCE_OPTS.get(vendor) | dict(instance=server)) + start_times.append(meta.start) + if meta.end: + if meta.end >= meta.start: + already_ended.append(True) + else: + already_ended.append(False) + + # if all tasks have already finished, we can destroy the stack + if already_ended and all(already_ended): + destroy = f"Destroying {vendor}/{server}, all tasks have finished" + + # if DESTROY_AFTER has already passed since the newest start time, we should destroy the stack/instance + if start_times: + last_start = max(start_times) + if datetime.now() - lib.DESTROY_AFTER >= last_start: + destroy = f"Destroying {vendor}/{server}, last task start date: {last_start}" + + if destroy: + # In order not to cause unnecessary locks in Pulumi, we first get the stack's resources to see if + # it's already empty, and in that case, we don't destroy it. + try: + stack = runner.get_stack(vendor, {}, RESOURCE_OPTS.get(vendor, {}) | dict(instance=server)) + except AttributeError: + # this vendor is not yet supported + return + resources = stack.export_stack().deployment.get("resources", []) + if len(resources) <= 1: + # a non-existent stack will have zero, a clean (already destroyed) stack should have exactly one + # resource (the Pulumi Stack itself). If we can see either of these, we have nothing to clean up. + logging.debug(f"Pulumi stack for {vendor}/{server} has {len(resources)} resources, no cleanup needed") + return + logging.info(destroy) + runner.destroy(vendor, {}, RESOURCE_OPTS.get(vendor) | dict(instance=server)) + + +@cli.command() +@click.pass_context +@click.option("--threads", type=int, default=32, show_default=True, + help="Number of threads to run Pulumi concurrently. Each thread consumes around 60 MiB of RAM.") +def cleanup(ctx, threads): + from sc_runner import runner + with ThreadPoolExecutor(max_workers=threads) as executor: + for srv in servers(): + vendor = srv.vendor_id + server = srv.api_reference + data_dir = os.path.join(ctx.parent.params["repo_path"], "data", vendor, server) + # process this in a thread as getting Pulumi state is very slow + executor.submit(cleanup_task, vendor, server, data_dir) @cli.command()