Skip to content

Commit

Permalink
first attempt at using pytest fixtures for monitor tests
Browse files Browse the repository at this point in the history
  • Loading branch information
bgunnar5 committed Nov 29, 2023
1 parent 079d3ff commit a6965f2
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 59 deletions.
11 changes: 5 additions & 6 deletions merlin/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,32 +295,31 @@ def wait_for_workers(sleep: int, task_server: str, spec: "MerlinSpec"): # noqa
raise NoWorkersException("Monitor: no workers available to process the non-empty queue")


def check_workers_processing(queues_in_spec: List[str], task_server: str, app_name: str) -> bool:
def check_workers_processing(queues_in_spec: List[str], task_server: str) -> bool:
"""
Check if any workers are still processing tasks by querying the task server.
:param `queues_in_spec`: A list of queues to check if tasks are still active in
:param `task_server`: The task server from which to query
:param `app_name`: The name of the app we're querying
:returns: True if workers are still processing tasks, False otherwise
"""
result = False

if task_server == "celery":
result = check_celery_workers_processing(queues_in_spec, app_name)
from merlin.celery import app
result = check_celery_workers_processing(queues_in_spec, app)
else:
LOG.error("Celery is not specified as the task server!")

return result


def check_merlin_status(args: "Namespace", spec: "MerlinSpec", app_name: Optional[str] = "merlin") -> bool: # noqa
def check_merlin_status(args: "Namespace", spec: "MerlinSpec") -> bool: # noqa
"""
Function to check merlin workers and queues to keep the allocation alive
:param `args`: parsed CLI arguments
:param `spec`: the parsed spec.yaml as a MerlinSpec object
:param `app_name`: the name of the celery app to monitor
:returns: True if there are still tasks being processed, False otherwise
"""
# Initialize the variable to track if there are still active tasks
Expand Down Expand Up @@ -363,7 +362,7 @@ def check_merlin_status(args: "Namespace", spec: "MerlinSpec", app_name: Optiona
active_tasks = True
# If there are no jobs left, see if any workers are still processing them
elif total_jobs == 0:
active_tasks = check_workers_processing(queues_in_spec, args.task_server, app_name)
active_tasks = check_workers_processing(queues_in_spec, args.task_server)

LOG.debug(f"Monitor: active_tasks: {active_tasks}")
return active_tasks
51 changes: 27 additions & 24 deletions merlin/study/celeryadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,16 @@
from typing import Dict, List, Optional

from amqp.exceptions import ChannelError
from celery import Celery

from merlin.config import Config
from merlin.study.batch import batch_check_parallel, batch_worker_launch
from merlin.utils import apply_list_of_regex, check_machines, get_procs, get_yaml_var, is_running


LOG = logging.getLogger(__name__)

# TODO figure out a better way to handle the import of celery app and CONFIG

def run_celery(study, run_mode=None):
"""
Expand Down Expand Up @@ -272,16 +275,20 @@ def query_celery_workers(spec_worker_names, queues, workers_regex):
print()


def query_celery_queues(queues: List[str]) -> Dict[str, List[str]]:
def query_celery_queues(queues: List[str], app: Celery = None, config: Config = None) -> Dict[str, List[str]]:
"""
Build a dict of information about the number of jobs and consumers attached
to specific queues that we want information on.
:param `queues`: A list of the queues we want to know about
:param queues: A list of the queues we want to know about
:param app: The celery application (this will be none unless testing)
:param config: The configuration object that has the broker name (this will be none unless testing)
:returns: A dict of info on the number of jobs and consumers for each queue in `queues`
"""
from merlin.celery import app # pylint: disable=C0415
from merlin.config.configfile import CONFIG # pylint: disable=C0415
if app is None:
from merlin.celery import app # pylint: disable=C0415
if config is None:
from merlin.config.configfile import CONFIG as config # pylint: disable=C0415

# Initialize the dictionary with the info we want about our queues
queue_info = {queue: {"consumers": 0, "jobs": 0} for queue in queues}
Expand All @@ -303,7 +310,7 @@ def query_celery_queues(queues: List[str]) -> Dict[str, List[str]]:

# Redis doesn't keep track of consumers attached to queues like rabbit does
# so we have to count this ourselves here
if CONFIG.broker.name in ("rediss", "redis"):
if config.broker.name in ("rediss", "redis"):
# Get a dict of active queues by querying the celery app
active_queues = app.control.inspect().active_queues()
if active_queues is not None:
Expand Down Expand Up @@ -334,32 +341,28 @@ def get_workers_from_app():
return [*workers]


def check_celery_workers_processing(queues_in_spec: List[str], app_name: str) -> bool:
def check_celery_workers_processing(queues_in_spec: List[str], app: Celery) -> bool:
"""
Query celery to see if any workers are still processing tasks.
:param `queues_in_spec`: A list of queues to check if tasks are still active in
:param `app_name`: The name of the app we're querying
:param queues_in_spec: A list of queues to check if tasks are still active in
:param app: The celery app that we're querying
:returns: True if workers are still processing tasks, False otherwise
"""
# Inspect active call and store stdout
inspect_active_cmd = ["celery", "-A", app_name, "inspect", "active"]
LOG.debug(f"inspect_active_cmd: {inspect_active_cmd}")
process = subprocess.run(inspect_active_cmd, capture_output=True, text=True)
# Query celery for active tasks
active_tasks = app.control.inspect().active()
print(f"active_tasks: {active_tasks}")

# Parse stdout for each queue to see if there are still tasks being processed
if process.returncode == 0:
LOG.debug(f"celery inspect active stdout: {process.stdout}")
LOG.info("Workers are active")
active_tasks = any(queue in process.stdout for queue in queues_in_spec)
else:
LOG.error(
"Error running celery inspect active, setting 'active_tasks' to be False. "
f"There are likely no workers active. {process.stderr}"
)
active_tasks = False
# Search for the queues we provided if necessary
if active_tasks is not None:
for tasks in active_tasks.values():
for task in tasks:
if task["delivery_info"]["routing_key"] in queues_in_spec:
print("Workers are still active")
return True

return active_tasks
print("Workers are no longer active")
return False


def _get_workers_to_start(spec, steps):
Expand Down
29 changes: 26 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import redis
from _pytest.tmpdir import TempPathFactory
from celery import Celery
from celery.canvas import Signature


class RedisServerError(Exception):
Expand Down Expand Up @@ -154,7 +155,28 @@ def celery_app(redis_server: str) -> Celery: # pylint: disable=redefined-outer-
:param redis_server: The redis server uri we'll use to connect to redis
:returns: The celery app object we'll use for testing
"""
return Celery("test_app", broker=redis_server, backend=redis_server)
return Celery("merlin_test_app", broker=redis_server, backend=redis_server)


@pytest.fixture(scope="session")
def sleep_sig(celery_app: Celery) -> Signature:
"""
Create a task registered to our celery app and return a signature for it.
Once requested by a test, you can set the queue you'd like to send this to
with `sleep_sig.set(queue=<queue name>)`. Here, <queue name> will likely be
one of the queues defined in the `worker_queue_map` fixture.
:param celery_app: The celery app object we'll use for testing
:returns: A celery signature for a task that will sleep for 3 seconds
"""
# Create a celery task that sleeps for 3 sec
@celery_app.task
def sleep_task():
print("running sleep task")
sleep(3)

# Create a signature for this task
return sleep_task.s()


@pytest.fixture(scope="session")
Expand Down Expand Up @@ -267,16 +289,17 @@ def launch_workers(celery_app: Celery, worker_queue_map: Dict[str, str]): # pyl
:param celery_app: The celery app fixture that's connected to our redis server
:param worker_queue_map: A dict where the keys are worker names and the values are queue names
"""
print(f"launch_workers called")
# Create the processes that will start the workers and store them in a list
worker_processes = []
echo_processes = []
for worker, queue in worker_queue_map.items():
worker_launch_cmd = ["worker", "-n", worker, "-Q", queue, "--concurrency", "1", f"--logfile={worker}.log"]
worker_launch_cmd = ["worker", "-n", worker, "-Q", queue, "--concurrency", "1", f"--logfile={worker}.log", "--loglevel=DEBUG"]

# We have to use this dummy echo command to simulate a celery worker command that will show up with 'ps ux'
# We'll sleep for infinity here and then kill this process during shutdown
echo_process = subprocess.Popen( # pylint: disable=consider-using-with
f"echo 'celery test_app {' '.join(worker_launch_cmd)}'; sleep inf", shell=True
f"echo 'celery merlin_test_app {' '.join(worker_launch_cmd)}'; sleep inf", shell=True
)
echo_processes.append(echo_process)

Expand Down
Loading

0 comments on commit a6965f2

Please sign in to comment.