Skip to content

Commit

Permalink
fix consumer count bug and run fix-style
Browse files Browse the repository at this point in the history
  • Loading branch information
bgunnar5 committed Oct 19, 2023
1 parent 5b7d1e8 commit be1806e
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 23 deletions.
2 changes: 1 addition & 1 deletion merlin/exceptions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,4 @@ class NoJobsQueuedException(Exception):
"""

def __init__(self, message):
super().__init__(message)
super().__init__(message)
7 changes: 6 additions & 1 deletion merlin/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,13 @@ def process_monitor(args):
"""
LOG.info("Monitor: checking queues ...")
spec, _ = get_merlin_spec_with_override(args)

# Give the user time to queue up jobs in case they haven't already
time.sleep(args.sleep)

# Check if we still need our allocation
while router.check_merlin_status(args, spec):
LOG.info("Monitor: found tasks in queues")
LOG.info("Monitor: found tasks in queues and/or tasks being processed")
time.sleep(args.sleep)
LOG.info("Monitor: ... stop condition met")

Expand Down
54 changes: 42 additions & 12 deletions merlin/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@
import subprocess
import time
from datetime import datetime
from typing import List, Optional
from typing import Dict, List, Optional

from merlin.exceptions import NoWorkersException
from merlin.study.celeryadapter import (
check_celery_workers_processing,
create_celery_config,
get_active_celery_queues,
get_workers_from_app,
purge_celery_tasks,
query_celery_queues,
Expand Down Expand Up @@ -240,6 +241,25 @@ def create_config(task_server: str, config_dir: str, broker: str, test: str) ->
LOG.error("Only celery can be configured currently.")


def get_active_queues(task_server: str) -> Dict[str, List[str]]:
"""
Get a dictionary of active queues and the workers attached to these queues.
:param `task_server`: The task server to query for active queues
:returns: A dict where keys are queue names and values are a list of workers watching them
"""
active_queues = {}

if task_server == "celery":
from merlin.celery import app # pylint: disable=C0415

active_queues, _ = get_active_celery_queues(app)
else:
LOG.error("Only celery can be configured currently.")

return active_queues


def wait_for_workers(sleep: int, task_server: str, spec: "MerlinSpec"):
"""
Wait on workers to start up. Check on worker start 10 times with `sleep` seconds between
Expand Down Expand Up @@ -306,17 +326,32 @@ def check_merlin_status(args: "Namespace", spec: "MerlinSpec", app_name: Optiona

# Get info about jobs and workers in our spec from celery
queue_status = query_status(args.task_server, spec, args.steps, verbose=False)
LOG.info(f"Monitor: queue_status: {queue_status}")
LOG.debug(f"Monitor: queue_status: {queue_status}")

# Count the number of jobs and workers that are active
total_consumers = 0
# Count the number of jobs that are active
# (Adding up the number of consumers in the same way is inaccurate so we won't do that)
total_jobs = 0
for queue_info in queue_status.values():
total_jobs += queue_info["jobs"]
total_consumers += queue_info["consumers"]

# Get the queues defined in the spec
queues_in_spec = spec.get_queue_list(["all"])
LOG.debug(f"Monitor: queues_in_spec: {queues_in_spec}")

# Get the active queues and the workers that are watching them
active_queues = get_active_queues(args.task_server)
LOG.debug(f"Monitor: active_queues: {active_queues}")

# Count the number of workers that are active
consumers = set()
for active_queue, workers_on_queue in active_queues.items():
if active_queue in queues_in_spec:
consumers |= set(workers_on_queue)
LOG.debug(f"Monitor: consumers found: {consumers}")
total_consumers = len(consumers)

LOG.info(f"Monitor: found {total_jobs} jobs in queues and {total_consumers} workers alive")

# If we're here, jobs should be queued
# If there are no workers, wait for the workers to start
if total_consumers == 0:
wait_for_workers(args.sleep, args.task_server, spec)
Expand All @@ -326,12 +361,7 @@ def check_merlin_status(args: "Namespace", spec: "MerlinSpec", app_name: Optiona
active_tasks = True
# If there are no jobs left, see if any workers are still processing them
elif total_jobs == 0:
# Get the queues defined in the spec
queues_in_spec = spec.get_queue_list(["all"])
LOG.debug(f"queues_in_spec: {queues_in_spec}")

# Check for any tasks that are still being processed
active_tasks = check_workers_processing(queues_in_spec, args.task_server, app_name)

LOG.debug(f"active_tasks: {active_tasks}")
LOG.debug(f"Monitor: active_tasks: {active_tasks}")
return active_tasks
20 changes: 11 additions & 9 deletions merlin/study/celeryadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@
from contextlib import suppress
from typing import Dict, List

from amqp.exceptions import ChannelError

from merlin.study.batch import batch_check_parallel, batch_worker_launch
from merlin.utils import apply_list_of_regex, check_machines, get_procs, get_yaml_var, is_running

from amqp.exceptions import ChannelError


LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -96,7 +96,7 @@ def get_running_queues():
return running_queues


def get_queues(app):
def get_active_celery_queues(app):
"""Get all active queues and workers for a celery application.
Unlike get_running_queues, this goes through the application's server.
Expand All @@ -111,7 +111,7 @@ def get_queues(app):
:example:
>>> from merlin.celery import app
>>> queues, workers = get_queues(app)
>>> queues, workers = get_active_celery_queues(app)
>>> queue_names = [*queues]
>>> workers_on_q0 = queues[queue_names[0]]
>>> workers_not_on_q0 = [worker for worker in workers
Expand All @@ -133,7 +133,7 @@ def get_queues(app):

def get_active_workers(app):
"""
This is the inverse of get_queues() defined above. This function
This is the inverse of get_active_celery_queues() defined above. This function
builds a dict where the keys are worker names and the values are lists
of queues attached to the worker.
Expand Down Expand Up @@ -221,7 +221,7 @@ def query_celery_workers(spec_worker_names, queues, workers_regex):
# --queues flag
if queues:
# Get a mapping between queues and the workers watching them
queue_worker_map, _ = get_queues(app)
queue_worker_map, _ = get_active_celery_queues(app)
# Remove duplicates and prepend the celery queue tag to all queues
queues = list(set(queues))
celerize_queues(queues)
Expand Down Expand Up @@ -285,7 +285,9 @@ def query_celery_queues(queues: List[str]) -> Dict[str, List[str]]:
for queue in queues:
try:
# Count the number of jobs and consumers for each queue
_, queue_info[queue]["jobs"], queue_info[queue]["consumers"] = channel.queue_declare(queue=queue, passive=True)
_, queue_info[queue]["jobs"], queue_info[queue]["consumers"] = channel.queue_declare(
queue=queue, passive=True
)
# Redis likes to throw this error when a queue we're looking for has no jobs
except ChannelError:
pass
Expand Down Expand Up @@ -343,7 +345,7 @@ def check_celery_workers_processing(queues_in_spec: List[str], app_name: str) ->
active_tasks = any(queue in process.stdout for queue in queues_in_spec)
else:
LOG.error(
"Error running celery inspect active, setting 'active_tasks' to be False. " \
"Error running celery inspect active, setting 'active_tasks' to be False. "
f"There are likely no workers active. {process.stderr}"
)
active_tasks = False
Expand Down Expand Up @@ -663,7 +665,7 @@ def stop_celery_workers(queues=None, spec_worker_names=None, worker_regex=None):
from merlin.celery import app # pylint: disable=C0415

LOG.debug(f"Sending stop to queues: {queues}, worker_regex: {worker_regex}, spec_worker_names: {spec_worker_names}")
active_queues, _ = get_queues(app)
active_queues, _ = get_active_celery_queues(app)

# If not specified, get all the queues
if queues is None:
Expand Down

0 comments on commit be1806e

Please sign in to comment.