From 4f715e62479879a3b25ecce87e479fc1d72bed66 Mon Sep 17 00:00:00 2001 From: Brian Gunnarson Date: Thu, 19 Oct 2023 14:44:16 -0700 Subject: [PATCH] fix linter issues --- merlin/router.py | 12 +++++++----- merlin/study/celeryadapter.py | 4 ++-- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/merlin/router.py b/merlin/router.py index 1b8f8295e..92202829a 100644 --- a/merlin/router.py +++ b/merlin/router.py @@ -37,7 +37,6 @@ """ import logging import os -import subprocess import time from datetime import datetime from typing import Dict, List, Optional @@ -260,7 +259,7 @@ def get_active_queues(task_server: str) -> Dict[str, List[str]]: return active_queues -def wait_for_workers(sleep: int, task_server: str, spec: "MerlinSpec"): +def wait_for_workers(sleep: int, task_server: str, spec: "MerlinSpec"): # noqa """ Wait on workers to start up. Check on worker start 10 times with `sleep` seconds between each check. If no workers are started in time, raise an error to kill the monitor (there @@ -305,14 +304,17 @@ def check_workers_processing(queues_in_spec: List[str], task_server: str, app_na :param `app_name`: The name of the app we're querying :returns: True if workers are still processing tasks, False otherwise """ + result = False + if task_server == "celery": - return check_celery_workers_processing(queues_in_spec, app_name) + result = check_celery_workers_processing(queues_in_spec, app_name) else: LOG.error("Celery is not specified as the task server!") - return False + + return result -def check_merlin_status(args: "Namespace", spec: "MerlinSpec", app_name: Optional[str] = "merlin") -> bool: +def check_merlin_status(args: "Namespace", spec: "MerlinSpec", app_name: Optional[str] = "merlin") -> bool: # noqa """ Function to check merlin workers and queues to keep the allocation alive diff --git a/merlin/study/celeryadapter.py b/merlin/study/celeryadapter.py index 11139c2e3..053de60f5 100644 --- a/merlin/study/celeryadapter.py +++ b/merlin/study/celeryadapter.py @@ -298,8 +298,8 @@ def query_celery_queues(queues: List[str]) -> Dict[str, List[str]]: # Get a dict of active queues by querying the celery app active_queues = app.control.inspect().active_queues() if active_queues is not None: - # Loop through each worker in the output - for worker, active_queue_list in active_queues.items(): + # Loop through each active queue that was found + for active_queue_list in active_queues.values(): # Loop through each queue that each worker is watching for active_queue in active_queue_list: # If this is a queue we're looking for, increment the consumer count