Skip to content

Commit

Permalink
(partially) fix launch_workers fixture so it can be used in multiple …
Browse files Browse the repository at this point in the history
…classes
  • Loading branch information
bgunnar5 committed Nov 30, 2023
1 parent a6965f2 commit 26c5a60
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 132 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/push-pr_workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ jobs:
- name: Run pytest over unit test suite
run: |
python3 -m pytest tests/unit/
python3 -m pytest -v --order-scope=module tests/unit/
- name: Run integration test suite for local tests
run: |
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ install-dev: virtualenv install-merlin install-workflow-deps
# tests require a valid dev install of merlin
unit-tests:
. $(VENV)/bin/activate; \
$(PYTHON) -m pytest $(UNIT); \
$(PYTHON) -m pytest -v --order-scope=module $(UNIT); \


# run CLI tests - these require an active install of merlin in a venv
Expand Down
1 change: 1 addition & 0 deletions requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ sphinx>=2.0.0
alabaster
johnnydep
deepdiff
pytest-order
192 changes: 192 additions & 0 deletions tests/celery_test_workers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
###############################################################################
# Copyright (c) 2023, Lawrence Livermore National Security, LLC.
# Produced at the Lawrence Livermore National Laboratory
# Written by the Merlin dev team, listed in the CONTRIBUTORS file.
# <[email protected]>
#
# LLNL-CODE-797170
# All rights reserved.
# This file is part of Merlin, Version: 1.11.1.
#
# For details, see https://github.com/LLNL/merlin.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
###############################################################################
"""
Module to define functionality for test workers and how to start/stop
them in their own processes.
"""
import multiprocessing
import os
import signal
import subprocess
from time import sleep
from typing import Dict, List

from celery import Celery


class CeleryTestWorkersManager:
"""
A class to handle the setup and teardown of celery workers.
"""

def __init__(self, app: Celery):
self.app = app
self.running_workers = []
self.worker_processes = {}
self.echo_processes = {}

def _is_worker_ready(self, worker_name: str, verbose: bool = False) -> bool:
"""
Check to see if the worker is up and running yet.
:param worker_name: The name of the worker we're checking on
:param verbose: If true, enable print statements to show where we're at in execution
:returns: True if the worker is running. False otherwise.
"""
ping = self.app.control.inspect().ping(destination=[f"celery@{worker_name}"])
if verbose:
print(f"ping: {ping}")
return ping is not None and f"celery@{worker_name}" in ping

def _wait_for_worker_launch(self, worker_name: str, verbose: bool = False):
"""
Poll the worker over a fixed interval of time. If the worker doesn't show up
within the time limit then we'll raise a timeout error. Otherwise, the worker
is up and running and we can continue with our tests.
:param worker_name: The name of the worker we're checking on
:param verbose: If true, enable print statements to show where we're at in execution
"""
max_wait_time = 2 # Maximum wait time in seconds
wait_interval = 0.5 # Interval between checks in seconds
waited_time = 0
worker_ready = False

if verbose:
print(f"waiting for {worker_name} to launch...")

# Wait until the worker is ready
while waited_time < max_wait_time:
if self._is_worker_ready(worker_name, verbose=verbose):
worker_ready = True
break

sleep(wait_interval)
waited_time += wait_interval

if not worker_ready:
raise TimeoutError("Celery workers did not start within the expected time.")

if verbose:
print(f"{worker_name} launched")

def start_worker(self, worker_launch_cmd: List[str]):
"""
This is where a worker is actually started. Each worker maintains control of a process until
we tell it to stop, that's why we have to use the multiprocessing library for this. We have to use
app.worker_main instead of the normal "celery -A <app name> worker" command to launch the workers
since our celery app is created in a pytest fixture and is unrecognizable by the celery command.
For each worker, the output of it's logs are sent to
/tmp/`whoami`/pytest-of-`whoami`/pytest-current/integration_outfiles_current/ under a file with a name
similar to: test_worker_*.log.
NOTE: pytest-current/ will have the results of the most recent test run. If you want to see a previous run
check under pytest-<integer value>/. HOWEVER, only the 3 most recent test runs will be saved.
:param worker_launch_cmd: The command to launch a worker
"""
self.app.worker_main(worker_launch_cmd)

def launch_worker(self, worker_name: str, queues: List[str], concurrency: int = 1):
"""
Launch a single worker. We'll add the process that the worker is running in to the list of worker processes.
We'll also create an echo process to simulate a celery worker command that will show up with 'ps ux'.
:param worker_name: The name to give to the worker
:param queues: A list of queues that the worker will be watching
:param concurrency: The concurrency value of the worker (how many child processes to have the worker spin up)
"""
# Check to make sure we have a unique worker name so we can track all processes
if worker_name in self.worker_processes:
self.stop_all_workers()
raise ValueError(f"The worker {worker_name} is already running. Choose a different name.")

# Create the launch command for this worker
worker_launch_cmd = ["worker", "-n", worker_name, "-Q", ",".join(queues), "--concurrency", str(concurrency), f"--logfile={worker_name}.log", "--loglevel=DEBUG"]

# Create an echo command to simulate a running celery worker since our celery worker will be spun up in
# a different process and we won't be able to see it with 'ps ux' like we normally would
echo_process = subprocess.Popen( # pylint: disable=consider-using-with
f"echo 'celery merlin_test_app {' '.join(worker_launch_cmd)}'; sleep inf",
shell=True,
preexec_fn=os.setpgrp, # Make this the parent of the group so we can kill the 'sleep inf' that's spun up
)
self.echo_processes[worker_name] = echo_process

# Start the worker in a separate process since it'll take control of the entire process until we kill it
worker_process = multiprocessing.Process(target=self.start_worker, args=(worker_launch_cmd,))
worker_process.start()
self.worker_processes[worker_name] = worker_process
self.running_workers.append(worker_name)

# Wait for the worker to launch properly
try:
self._wait_for_worker_launch(worker_name, verbose=False)
except TimeoutError as exc:
self.stop_all_workers()
raise exc

def launch_workers(self, worker_info: Dict[str, Dict]):
"""
Launch multiple workers. This will call `launch_worker` to launch each worker
individually.
:param worker_info: A dict of worker info with the form
{"worker_name": {"concurrency": <int>, "queues": <list of queue names>}}
"""
for worker_name, worker_settings in worker_info.items():
self.launch_worker(worker_name, worker_settings["queues"], worker_settings["concurrency"])

def stop_worker(self, worker_name: str):
"""
Stop a single running worker and its associated processes.
:param worker_name: The name of the worker to shutdown
"""
# Send a shutdown signal to the worker
self.app.control.broadcast("shutdown", destination=[f"celery@{worker_name}"])

# Try to terminate the process gracefully
if self.worker_processes[worker_name] is not None:
self.worker_processes[worker_name].terminate()
process_exit_code = self.worker_processes[worker_name].join(timeout=3)

# If it won't terminate then force kill it
if process_exit_code is None:
self.worker_processes[worker_name].kill()

# Terminate the echo process and its sleep inf subprocess
os.killpg(os.getpgid(self.echo_processes[worker_name].pid), signal.SIGTERM)

def stop_all_workers(self):
"""
Stop all of the running workers and the processes associated with them.
"""
for worker_name in self.running_workers:
self.stop_worker(worker_name)
142 changes: 15 additions & 127 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,17 @@
This module contains pytest fixtures to be used throughout the entire
integration test suite.
"""
import multiprocessing
import os
import subprocess
from time import sleep
from typing import Dict, List
from typing import Dict

import pytest
import redis
from _pytest.tmpdir import TempPathFactory
from celery import Celery
from celery.canvas import Signature
from tests.celery_test_workers import CeleryTestWorkersManager


class RedisServerError(Exception):
Expand Down Expand Up @@ -125,7 +125,7 @@ def redis_server(merlin_server_dir: str, redis_pass: str) -> str: # pylint: dis
# Start the local redis server
try:
# Need to set LC_ALL='C' before starting the server or else redis causes a failure
subprocess.run("export LC_ALL='C'; merlin server start", shell=True, capture_output=True, text=True, timeout=5)
subprocess.run("export LC_ALL='C'; merlin server start", shell=True, timeout=5)
except subprocess.TimeoutExpired:
pass

Expand Down Expand Up @@ -189,137 +189,25 @@ def worker_queue_map() -> Dict[str, str]:
return {f"test_worker_{i}": f"test_queue_{i}" for i in range(3)}


def are_workers_ready(app: Celery, num_workers: int, verbose: bool = False) -> bool:
"""
Check to see if the workers are up and running yet.
:param app: The celery app fixture that's connected to our redis server
:param num_workers: An int representing the number of workers we're looking to have started
:param verbose: If true, enable print statements to show where we're at in execution
:returns: True if all workers are running. False otherwise.
"""
app_stats = app.control.inspect().stats()
if verbose:
print(f"app_stats: {app_stats}")
return app_stats is not None and len(app_stats) == num_workers


def wait_for_worker_launch(app: Celery, num_workers: int, verbose: bool = False):
"""
Poll the workers over a fixed interval of time. If the workers don't show up
within the time limit then we'll raise a timeout error. Otherwise, the workers
are up and running and we can continue with our tests.
:param app: The celery app fixture that's connected to our redis server
:param num_workers: An int representing the number of workers we're looking to have started
:param verbose: If true, enable print statements to show where we're at in execution
"""
max_wait_time = 2 # Maximum wait time in seconds
wait_interval = 0.5 # Interval between checks in seconds
waited_time = 0

if verbose:
print("waiting for workers to launch...")

# Wait until all workers are ready
while not are_workers_ready(app, num_workers, verbose=verbose) and waited_time < max_wait_time:
sleep(wait_interval)
waited_time += wait_interval

# If all workers are not ready after the maximum wait time, raise an error
if not are_workers_ready(app, num_workers, verbose=verbose):
raise TimeoutError("Celery workers did not start within the expected time.")

if verbose:
print("workers launched")


def shutdown_processes(worker_processes: List[multiprocessing.Process], echo_processes: List[subprocess.Popen]):
"""
Given lists of processes, shut them all down. Worker processes were created with the
multiprocessing library and echo processes were created with the subprocess library,
so we have to shut them down slightly differently.
:param worker_processes: A list of worker processes to terminate
:param echo_processes: A list of echo processes to terminate
"""
# Worker processes were created with the multiprocessing library
for worker_process in worker_processes:
# Try to terminate the process gracefully
worker_process.terminate()
process_exit_code = worker_process.join(timeout=3)

# If it won't terminate then force kill it
if process_exit_code is None:
worker_process.kill()

# Gracefully terminate the echo processes
for echo_process in echo_processes:
echo_process.terminate()
echo_process.wait()

# The echo processes will spawn 3 sleep inf processes that we also need to kill
subprocess.run("ps ux | grep 'sleep inf' | grep -v grep | awk '{print $2}' | xargs kill", shell=True)


def start_worker(app: Celery, worker_launch_cmd: List[str]):
"""
This is where a worker is actually started. Each worker maintains control of a process until
we tell it to stop, that's why we have to use the multiprocessing library for this. We have to use
app.worker_main instead of the normal "celery -A <app name> worker" command to launch the workers
since our celery app is created in a pytest fixture and is unrecognizable by the celery command.
For each worker, the output of it's logs are sent to
/tmp/`whoami`/pytest-of-`whoami`/pytest-current/integration_outfiles_current/ under a file with a name
similar to: test_worker_*.log.
NOTE: pytest-current/ will have the results of the most recent test run. If you want to see a previous run
check under pytest-<integer value>/. HOWEVER, only the 3 most recent test runs will be saved.
:param app: The celery app fixture that's connected to our redis server
:param worker_launch_cmd: The command to launch a worker
"""
app.worker_main(worker_launch_cmd)


@pytest.fixture(scope="class")
def launch_workers(celery_app: Celery, worker_queue_map: Dict[str, str]): # pylint: disable=redefined-outer-name
def launch_workers(celery_app: Celery, worker_queue_map: Dict[str, str]):
"""
Launch the workers on the celery app fixture using the worker and queue names
defined in the worker_queue_map fixture.
:param celery_app: The celery app fixture that's connected to our redis server
:param worker_queue_map: A dict where the keys are worker names and the values are queue names
"""
print(f"launch_workers called")
# Create the processes that will start the workers and store them in a list
worker_processes = []
echo_processes = []
for worker, queue in worker_queue_map.items():
worker_launch_cmd = ["worker", "-n", worker, "-Q", queue, "--concurrency", "1", f"--logfile={worker}.log", "--loglevel=DEBUG"]

# We have to use this dummy echo command to simulate a celery worker command that will show up with 'ps ux'
# We'll sleep for infinity here and then kill this process during shutdown
echo_process = subprocess.Popen( # pylint: disable=consider-using-with
f"echo 'celery merlin_test_app {' '.join(worker_launch_cmd)}'; sleep inf", shell=True
)
echo_processes.append(echo_process)

# We launch workers in their own process since they maintain control of a process until we stop them
worker_process = multiprocessing.Process(target=start_worker, args=(celery_app, worker_launch_cmd))
worker_process.start()
worker_processes.append(worker_process)

# Ensure that the workers start properly before letting tests use them
try:
num_workers = len(worker_queue_map)
wait_for_worker_launch(celery_app, num_workers, verbose=False)
except TimeoutError as exc:
# If workers don't launch in time, we need to make sure these processes stop
shutdown_processes(worker_processes, echo_processes)
raise exc

# Give control to the tests that need to use workers
# Format worker info in a format the our workers manager will be able to read
# (basically just add in concurrency value to worker_queue_map)
worker_info = {worker_name: {"concurrency": 1, "queues": [queue]} for worker_name, queue in worker_queue_map.items()}

# Create our workers manager and launch our workers
workers_manager = CeleryTestWorkersManager(celery_app)
workers_manager.launch_workers(worker_info)

# Yield control to the tests that need workers launched
yield

# Shut down the workers and terminate the processes
celery_app.control.broadcast("shutdown", destination=list(worker_queue_map.keys()))
shutdown_processes(worker_processes, echo_processes)
# Tests are done so shut down all of our workers
workers_manager.stop_all_workers()
Loading

0 comments on commit 26c5a60

Please sign in to comment.