Skip to content

Commit

Permalink
add a way to manually queue tasks for a dataset from idle to waiting
Browse files Browse the repository at this point in the history
  • Loading branch information
dsschult committed Oct 9, 2024
1 parent 4becdab commit a723b83
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 12 deletions.
2 changes: 1 addition & 1 deletion iceprod/client_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
def add_auth_to_argparse(parser):
"""Add auth args to argparse."""
config = from_environment({
'API_URL': 'https://iceprod2-api.icecube.wisc.edu',
'API_URL': 'https://api.iceprod.icecube.aq',
'OAUTH_URL': 'https://keycloak.icecube.wisc.edu/auth/realms/IceCube',
'OAUTH_CLIENT_ID': 'iceprod',
'OAUTH_CLIENT_SECRET': '',
Expand Down
36 changes: 25 additions & 11 deletions iceprod/scheduled_tasks/queue_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
NTASKS_PER_CYCLE = 1000


async def run(rest_client, ntasks=NTASKS, ntasks_per_cycle=NTASKS_PER_CYCLE, debug=False):
async def run(rest_client, dataset_id=None, ntasks=NTASKS, ntasks_per_cycle=NTASKS_PER_CYCLE, debug=False):
"""
Actual runtime / loop.
Expand All @@ -29,7 +29,11 @@ async def run(rest_client, ntasks=NTASKS, ntasks_per_cycle=NTASKS_PER_CYCLE, deb
try:
num_tasks_waiting = 0
num_tasks_queued = 0
tasks = await rest_client.request('GET', '/task_counts/status')
if dataset_id:
route = f'/datasets/{dataset_id}/task_counts/status'
else:
route = '/task_counts/status'
tasks = await rest_client.request('GET', route)
if 'idle' in tasks:
num_tasks_waiting = tasks['idle']
if 'waiting' in tasks:
Expand All @@ -40,13 +44,22 @@ async def run(rest_client, ntasks=NTASKS, ntasks_per_cycle=NTASKS_PER_CYCLE, deb
logger.warning(f'tasks to waiting: {tasks_to_queue}')

if tasks_to_queue > 0:
args = {
'status': 'idle',
'keys': 'task_id|depends',
'sort': 'priority=-1',
'limit': 5*tasks_to_queue,
}
ret = await rest_client.request('GET', '/tasks', args)
if dataset_id:
route = f'/datasets/{dataset_id}/tasks'
args = {
'status': 'idle',
'keys': 'task_id|depends',
}
else:
route = '/tasks'
args = {
'status': 'idle',
'keys': 'task_id|depends',
'sort': 'priority=-1',
'limit': 5*tasks_to_queue,
}
ret = await rest_client.request('GET', route, args)
idle_tasks = ret.values() if dataset_id else ret['tasks']
queue_tasks = []
tasks_queue_pending = 0
deps_futures = set()
Expand All @@ -59,7 +72,7 @@ async def check_deps(task):
await rest_client.request('PATCH', f'/tasks/{task["task_id"]}', {'priority': 0})
return None
return task
for task in ret['tasks']:
for task in idle_tasks:
tasks_queue_pending += 1
if not task.get('depends', None):
logger.info('queueing task %s', task['task_id'])
Expand Down Expand Up @@ -116,6 +129,7 @@ async def check_deps(task):
def main():
parser = argparse.ArgumentParser(description='run a scheduled task once')
add_auth_to_argparse(parser)
parser.add_argument('--dataset-id', help='dataset id')
parser.add_argument('--ntasks', type=int, default=os.environ.get('NTASKS', NTASKS),
help='number of tasks to keep queued')
parser.add_argument('--ntasks_per_cycle', type=int, default=os.environ.get('NTASKS_PER_CYCLE', NTASKS_PER_CYCLE),
Expand All @@ -130,7 +144,7 @@ def main():

rest_client = create_rest_client(args)

asyncio.run(run(rest_client, ntasks=args.ntasks, ntasks_per_cycle=args.ntasks_per_cycle, debug=args.debug))
asyncio.run(run(rest_client, dataset_id=args.dataset_id, ntasks=args.ntasks, ntasks_per_cycle=args.ntasks_per_cycle, debug=args.debug))


if __name__ == '__main__':
Expand Down

0 comments on commit a723b83

Please sign in to comment.