From be1806e2c8bd4f4b778bec808fb51f7200c8cb00 Mon Sep 17 00:00:00 2001 From: Brian Gunnarson Date: Thu, 19 Oct 2023 14:29:29 -0700 Subject: [PATCH] fix consumer count bug and run fix-style --- merlin/exceptions/__init__.py | 2 +- merlin/main.py | 7 ++++- merlin/router.py | 54 +++++++++++++++++++++++++++-------- merlin/study/celeryadapter.py | 20 +++++++------ 4 files changed, 60 insertions(+), 23 deletions(-) diff --git a/merlin/exceptions/__init__.py b/merlin/exceptions/__init__.py index ef8ad632e..d745b5764 100644 --- a/merlin/exceptions/__init__.py +++ b/merlin/exceptions/__init__.py @@ -112,4 +112,4 @@ class NoJobsQueuedException(Exception): """ def __init__(self, message): - super().__init__(message) \ No newline at end of file + super().__init__(message) diff --git a/merlin/main.py b/merlin/main.py index 037fcaf51..eb244cdf6 100644 --- a/merlin/main.py +++ b/merlin/main.py @@ -354,8 +354,13 @@ def process_monitor(args): """ LOG.info("Monitor: checking queues ...") spec, _ = get_merlin_spec_with_override(args) + + # Give the user time to queue up jobs in case they haven't already + time.sleep(args.sleep) + + # Check if we still need our allocation while router.check_merlin_status(args, spec): - LOG.info("Monitor: found tasks in queues") + LOG.info("Monitor: found tasks in queues and/or tasks being processed") time.sleep(args.sleep) LOG.info("Monitor: ... stop condition met") diff --git a/merlin/router.py b/merlin/router.py index e48cf91b2..1b8f8295e 100644 --- a/merlin/router.py +++ b/merlin/router.py @@ -40,12 +40,13 @@ import subprocess import time from datetime import datetime -from typing import List, Optional +from typing import Dict, List, Optional from merlin.exceptions import NoWorkersException from merlin.study.celeryadapter import ( check_celery_workers_processing, create_celery_config, + get_active_celery_queues, get_workers_from_app, purge_celery_tasks, query_celery_queues, @@ -240,6 +241,25 @@ def create_config(task_server: str, config_dir: str, broker: str, test: str) -> LOG.error("Only celery can be configured currently.") +def get_active_queues(task_server: str) -> Dict[str, List[str]]: + """ + Get a dictionary of active queues and the workers attached to these queues. + + :param `task_server`: The task server to query for active queues + :returns: A dict where keys are queue names and values are a list of workers watching them + """ + active_queues = {} + + if task_server == "celery": + from merlin.celery import app # pylint: disable=C0415 + + active_queues, _ = get_active_celery_queues(app) + else: + LOG.error("Only celery can be configured currently.") + + return active_queues + + def wait_for_workers(sleep: int, task_server: str, spec: "MerlinSpec"): """ Wait on workers to start up. Check on worker start 10 times with `sleep` seconds between @@ -306,17 +326,32 @@ def check_merlin_status(args: "Namespace", spec: "MerlinSpec", app_name: Optiona # Get info about jobs and workers in our spec from celery queue_status = query_status(args.task_server, spec, args.steps, verbose=False) - LOG.info(f"Monitor: queue_status: {queue_status}") + LOG.debug(f"Monitor: queue_status: {queue_status}") - # Count the number of jobs and workers that are active - total_consumers = 0 + # Count the number of jobs that are active + # (Adding up the number of consumers in the same way is inaccurate so we won't do that) total_jobs = 0 for queue_info in queue_status.values(): total_jobs += queue_info["jobs"] - total_consumers += queue_info["consumers"] + + # Get the queues defined in the spec + queues_in_spec = spec.get_queue_list(["all"]) + LOG.debug(f"Monitor: queues_in_spec: {queues_in_spec}") + + # Get the active queues and the workers that are watching them + active_queues = get_active_queues(args.task_server) + LOG.debug(f"Monitor: active_queues: {active_queues}") + + # Count the number of workers that are active + consumers = set() + for active_queue, workers_on_queue in active_queues.items(): + if active_queue in queues_in_spec: + consumers |= set(workers_on_queue) + LOG.debug(f"Monitor: consumers found: {consumers}") + total_consumers = len(consumers) + LOG.info(f"Monitor: found {total_jobs} jobs in queues and {total_consumers} workers alive") - # If we're here, jobs should be queued # If there are no workers, wait for the workers to start if total_consumers == 0: wait_for_workers(args.sleep, args.task_server, spec) @@ -326,12 +361,7 @@ def check_merlin_status(args: "Namespace", spec: "MerlinSpec", app_name: Optiona active_tasks = True # If there are no jobs left, see if any workers are still processing them elif total_jobs == 0: - # Get the queues defined in the spec - queues_in_spec = spec.get_queue_list(["all"]) - LOG.debug(f"queues_in_spec: {queues_in_spec}") - - # Check for any tasks that are still being processed active_tasks = check_workers_processing(queues_in_spec, args.task_server, app_name) - LOG.debug(f"active_tasks: {active_tasks}") + LOG.debug(f"Monitor: active_tasks: {active_tasks}") return active_tasks diff --git a/merlin/study/celeryadapter.py b/merlin/study/celeryadapter.py index 93ba274e0..11139c2e3 100644 --- a/merlin/study/celeryadapter.py +++ b/merlin/study/celeryadapter.py @@ -39,11 +39,11 @@ from contextlib import suppress from typing import Dict, List +from amqp.exceptions import ChannelError + from merlin.study.batch import batch_check_parallel, batch_worker_launch from merlin.utils import apply_list_of_regex, check_machines, get_procs, get_yaml_var, is_running -from amqp.exceptions import ChannelError - LOG = logging.getLogger(__name__) @@ -96,7 +96,7 @@ def get_running_queues(): return running_queues -def get_queues(app): +def get_active_celery_queues(app): """Get all active queues and workers for a celery application. Unlike get_running_queues, this goes through the application's server. @@ -111,7 +111,7 @@ def get_queues(app): :example: >>> from merlin.celery import app - >>> queues, workers = get_queues(app) + >>> queues, workers = get_active_celery_queues(app) >>> queue_names = [*queues] >>> workers_on_q0 = queues[queue_names[0]] >>> workers_not_on_q0 = [worker for worker in workers @@ -133,7 +133,7 @@ def get_queues(app): def get_active_workers(app): """ - This is the inverse of get_queues() defined above. This function + This is the inverse of get_active_celery_queues() defined above. This function builds a dict where the keys are worker names and the values are lists of queues attached to the worker. @@ -221,7 +221,7 @@ def query_celery_workers(spec_worker_names, queues, workers_regex): # --queues flag if queues: # Get a mapping between queues and the workers watching them - queue_worker_map, _ = get_queues(app) + queue_worker_map, _ = get_active_celery_queues(app) # Remove duplicates and prepend the celery queue tag to all queues queues = list(set(queues)) celerize_queues(queues) @@ -285,7 +285,9 @@ def query_celery_queues(queues: List[str]) -> Dict[str, List[str]]: for queue in queues: try: # Count the number of jobs and consumers for each queue - _, queue_info[queue]["jobs"], queue_info[queue]["consumers"] = channel.queue_declare(queue=queue, passive=True) + _, queue_info[queue]["jobs"], queue_info[queue]["consumers"] = channel.queue_declare( + queue=queue, passive=True + ) # Redis likes to throw this error when a queue we're looking for has no jobs except ChannelError: pass @@ -343,7 +345,7 @@ def check_celery_workers_processing(queues_in_spec: List[str], app_name: str) -> active_tasks = any(queue in process.stdout for queue in queues_in_spec) else: LOG.error( - "Error running celery inspect active, setting 'active_tasks' to be False. " \ + "Error running celery inspect active, setting 'active_tasks' to be False. " f"There are likely no workers active. {process.stderr}" ) active_tasks = False @@ -663,7 +665,7 @@ def stop_celery_workers(queues=None, spec_worker_names=None, worker_regex=None): from merlin.celery import app # pylint: disable=C0415 LOG.debug(f"Sending stop to queues: {queues}, worker_regex: {worker_regex}, spec_worker_names: {spec_worker_names}") - active_queues, _ = get_queues(app) + active_queues, _ = get_active_celery_queues(app) # If not specified, get all the queues if queues is None: