From a723b83669491aca6a4adb8b9dac9c3113e77a8e Mon Sep 17 00:00:00 2001 From: David Schultz Date: Wed, 9 Oct 2024 12:55:46 -0500 Subject: [PATCH] add a way to manually queue tasks for a dataset from idle to waiting --- iceprod/client_auth.py | 2 +- iceprod/scheduled_tasks/queue_tasks.py | 36 ++++++++++++++++++-------- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/iceprod/client_auth.py b/iceprod/client_auth.py index 4f545675c..c7463a02d 100644 --- a/iceprod/client_auth.py +++ b/iceprod/client_auth.py @@ -7,7 +7,7 @@ def add_auth_to_argparse(parser): """Add auth args to argparse.""" config = from_environment({ - 'API_URL': 'https://iceprod2-api.icecube.wisc.edu', + 'API_URL': 'https://api.iceprod.icecube.aq', 'OAUTH_URL': 'https://keycloak.icecube.wisc.edu/auth/realms/IceCube', 'OAUTH_CLIENT_ID': 'iceprod', 'OAUTH_CLIENT_SECRET': '', diff --git a/iceprod/scheduled_tasks/queue_tasks.py b/iceprod/scheduled_tasks/queue_tasks.py index ebcc82442..71fb64867 100644 --- a/iceprod/scheduled_tasks/queue_tasks.py +++ b/iceprod/scheduled_tasks/queue_tasks.py @@ -18,7 +18,7 @@ NTASKS_PER_CYCLE = 1000 -async def run(rest_client, ntasks=NTASKS, ntasks_per_cycle=NTASKS_PER_CYCLE, debug=False): +async def run(rest_client, dataset_id=None, ntasks=NTASKS, ntasks_per_cycle=NTASKS_PER_CYCLE, debug=False): """ Actual runtime / loop. @@ -29,7 +29,11 @@ async def run(rest_client, ntasks=NTASKS, ntasks_per_cycle=NTASKS_PER_CYCLE, deb try: num_tasks_waiting = 0 num_tasks_queued = 0 - tasks = await rest_client.request('GET', '/task_counts/status') + if dataset_id: + route = f'/datasets/{dataset_id}/task_counts/status' + else: + route = '/task_counts/status' + tasks = await rest_client.request('GET', route) if 'idle' in tasks: num_tasks_waiting = tasks['idle'] if 'waiting' in tasks: @@ -40,13 +44,22 @@ async def run(rest_client, ntasks=NTASKS, ntasks_per_cycle=NTASKS_PER_CYCLE, deb logger.warning(f'tasks to waiting: {tasks_to_queue}') if tasks_to_queue > 0: - args = { - 'status': 'idle', - 'keys': 'task_id|depends', - 'sort': 'priority=-1', - 'limit': 5*tasks_to_queue, - } - ret = await rest_client.request('GET', '/tasks', args) + if dataset_id: + route = f'/datasets/{dataset_id}/tasks' + args = { + 'status': 'idle', + 'keys': 'task_id|depends', + } + else: + route = '/tasks' + args = { + 'status': 'idle', + 'keys': 'task_id|depends', + 'sort': 'priority=-1', + 'limit': 5*tasks_to_queue, + } + ret = await rest_client.request('GET', route, args) + idle_tasks = ret.values() if dataset_id else ret['tasks'] queue_tasks = [] tasks_queue_pending = 0 deps_futures = set() @@ -59,7 +72,7 @@ async def check_deps(task): await rest_client.request('PATCH', f'/tasks/{task["task_id"]}', {'priority': 0}) return None return task - for task in ret['tasks']: + for task in idle_tasks: tasks_queue_pending += 1 if not task.get('depends', None): logger.info('queueing task %s', task['task_id']) @@ -116,6 +129,7 @@ async def check_deps(task): def main(): parser = argparse.ArgumentParser(description='run a scheduled task once') add_auth_to_argparse(parser) + parser.add_argument('--dataset-id', help='dataset id') parser.add_argument('--ntasks', type=int, default=os.environ.get('NTASKS', NTASKS), help='number of tasks to keep queued') parser.add_argument('--ntasks_per_cycle', type=int, default=os.environ.get('NTASKS_PER_CYCLE', NTASKS_PER_CYCLE), @@ -130,7 +144,7 @@ def main(): rest_client = create_rest_client(args) - asyncio.run(run(rest_client, ntasks=args.ntasks, ntasks_per_cycle=args.ntasks_per_cycle, debug=args.debug)) + asyncio.run(run(rest_client, dataset_id=args.dataset_id, ntasks=args.ntasks, ntasks_per_cycle=args.ntasks_per_cycle, debug=args.debug)) if __name__ == '__main__':