Skip to content

Commit

Permalink
Rework cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
bra-fsn committed May 31, 2024
1 parent d7415c9 commit 4c95d46
Showing 1 changed file with 62 additions and 22 deletions.
84 changes: 62 additions & 22 deletions inspector/inspector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
preserve memory for running the benchmarks.
All other modules should be imported lazily, where needed.
"""
from concurrent.futures import ThreadPoolExecutor
import click
import lib
import logging
Expand Down Expand Up @@ -126,32 +127,71 @@ def start(ctx, exclude, start_only):
runner.create(vendor, {}, RESOURCE_OPTS.get(vendor) | dict(instance=server, instance_opts=instance_opts))


@cli.command()
@click.pass_context
def cleanup(ctx):
def cleanup_task(vendor, server, data_dir):
from datetime import datetime
from sc_runner import runner

for srv in servers():
vendor = srv.vendor_id
server = srv.api_reference
data_dir = os.path.join(ctx.parent.params["repo_path"], "data", vendor, server)
tasks = list(lib.get_tasks(vendor))
if not tasks:
tasks = list(lib.get_tasks(vendor))
if not tasks:
return

# see if we have to destroy the resources in the Pulumi stack
destroy = ""

start_times = []
already_ended = []
for task in tasks:
meta = lib.load_task_meta(task, data_dir=data_dir)
if not meta.start:
continue
start_times = []
# get the newest start time
for task in tasks:
meta = lib.load_task_meta(task, data_dir=data_dir)
if not meta.start:
continue
start_times.append(meta.start)
if start_times:
last_start = max(start_times)
# destroy the stack after a given amount of time
if datetime.now() - lib.DESTROY_AFTER >= last_start:
logging.info(f"Destroying {vendor}/{server}, last task start date: {last_start}")
runner.destroy(vendor, {}, RESOURCE_OPTS.get(vendor) | dict(instance=server))
start_times.append(meta.start)
if meta.end:
if meta.end >= meta.start:
already_ended.append(True)
else:
already_ended.append(False)

# if all tasks have already finished, we can destroy the stack
if already_ended and all(already_ended):
destroy = f"Destroying {vendor}/{server}, all tasks have finished"

# if DESTROY_AFTER has already passed since the newest start time, we should destroy the stack/instance
if start_times:
last_start = max(start_times)
if datetime.now() - lib.DESTROY_AFTER >= last_start:
destroy = f"Destroying {vendor}/{server}, last task start date: {last_start}"

if destroy:
# In order not to cause unnecessary locks in Pulumi, we first get the stack's resources to see if
# it's already empty, and in that case, we don't destroy it.
try:
stack = runner.get_stack(vendor, {}, RESOURCE_OPTS.get(vendor, {}) | dict(instance=server))
except AttributeError:
# this vendor is not yet supported
return
resources = stack.export_stack().deployment.get("resources", [])
if len(resources) <= 1:
# a non-existent stack will have zero, a clean (already destroyed) stack should have exactly one
# resource (the Pulumi Stack itself). If we can see either of these, we have nothing to clean up.
logging.debug(f"Pulumi stack for {vendor}/{server} has {len(resources)} resources, no cleanup needed")
return
logging.info(destroy)
runner.destroy(vendor, {}, RESOURCE_OPTS.get(vendor) | dict(instance=server))


@cli.command()
@click.pass_context
@click.option("--threads", type=int, default=32, show_default=True,
help="Number of threads to run Pulumi concurrently. Each thread consumes around 60 MiB of RAM.")
def cleanup(ctx, threads):
from sc_runner import runner
with ThreadPoolExecutor(max_workers=threads) as executor:
for srv in servers():
vendor = srv.vendor_id
server = srv.api_reference
data_dir = os.path.join(ctx.parent.params["repo_path"], "data", vendor, server)
# process this in a thread as getting Pulumi state is very slow
executor.submit(cleanup_task, vendor, server, data_dir)


@cli.command()
Expand Down

0 comments on commit 4c95d46

Please sign in to comment.