diff --git a/.github/workflows/push-pr_workflow.yml b/.github/workflows/push-pr_workflow.yml index e2d9e164e..bef6f8608 100644 --- a/.github/workflows/push-pr_workflow.yml +++ b/.github/workflows/push-pr_workflow.yml @@ -129,7 +129,7 @@ jobs: - name: Run pytest over unit test suite run: | - python3 -m pytest tests/unit/ + python3 -m pytest -v --order-scope=module tests/unit/ - name: Run integration test suite for local tests run: | diff --git a/Makefile b/Makefile index 2f9db031b..95355c297 100644 --- a/Makefile +++ b/Makefile @@ -87,7 +87,7 @@ install-dev: virtualenv install-merlin install-workflow-deps # tests require a valid dev install of merlin unit-tests: . $(VENV)/bin/activate; \ - $(PYTHON) -m pytest $(UNIT); \ + $(PYTHON) -m pytest -v --order-scope=module $(UNIT); \ # run CLI tests - these require an active install of merlin in a venv diff --git a/requirements/dev.txt b/requirements/dev.txt index 895a89249..6e8722b4b 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -11,3 +11,4 @@ sphinx>=2.0.0 alabaster johnnydep deepdiff +pytest-order diff --git a/tests/celery_test_workers.py b/tests/celery_test_workers.py new file mode 100644 index 000000000..ca08f569e --- /dev/null +++ b/tests/celery_test_workers.py @@ -0,0 +1,192 @@ +############################################################################### +# Copyright (c) 2023, Lawrence Livermore National Security, LLC. +# Produced at the Lawrence Livermore National Laboratory +# Written by the Merlin dev team, listed in the CONTRIBUTORS file. +# +# +# LLNL-CODE-797170 +# All rights reserved. +# This file is part of Merlin, Version: 1.11.1. +# +# For details, see https://github.com/LLNL/merlin. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +############################################################################### +""" +Module to define functionality for test workers and how to start/stop +them in their own processes. +""" +import multiprocessing +import os +import signal +import subprocess +from time import sleep +from typing import Dict, List + +from celery import Celery + + +class CeleryTestWorkersManager: + """ + A class to handle the setup and teardown of celery workers. + """ + + def __init__(self, app: Celery): + self.app = app + self.running_workers = [] + self.worker_processes = {} + self.echo_processes = {} + + def _is_worker_ready(self, worker_name: str, verbose: bool = False) -> bool: + """ + Check to see if the worker is up and running yet. + + :param worker_name: The name of the worker we're checking on + :param verbose: If true, enable print statements to show where we're at in execution + :returns: True if the worker is running. False otherwise. + """ + ping = self.app.control.inspect().ping(destination=[f"celery@{worker_name}"]) + if verbose: + print(f"ping: {ping}") + return ping is not None and f"celery@{worker_name}" in ping + + def _wait_for_worker_launch(self, worker_name: str, verbose: bool = False): + """ + Poll the worker over a fixed interval of time. If the worker doesn't show up + within the time limit then we'll raise a timeout error. Otherwise, the worker + is up and running and we can continue with our tests. + + :param worker_name: The name of the worker we're checking on + :param verbose: If true, enable print statements to show where we're at in execution + """ + max_wait_time = 2 # Maximum wait time in seconds + wait_interval = 0.5 # Interval between checks in seconds + waited_time = 0 + worker_ready = False + + if verbose: + print(f"waiting for {worker_name} to launch...") + + # Wait until the worker is ready + while waited_time < max_wait_time: + if self._is_worker_ready(worker_name, verbose=verbose): + worker_ready = True + break + + sleep(wait_interval) + waited_time += wait_interval + + if not worker_ready: + raise TimeoutError("Celery workers did not start within the expected time.") + + if verbose: + print(f"{worker_name} launched") + + def start_worker(self, worker_launch_cmd: List[str]): + """ + This is where a worker is actually started. Each worker maintains control of a process until + we tell it to stop, that's why we have to use the multiprocessing library for this. We have to use + app.worker_main instead of the normal "celery -A worker" command to launch the workers + since our celery app is created in a pytest fixture and is unrecognizable by the celery command. + For each worker, the output of it's logs are sent to + /tmp/`whoami`/pytest-of-`whoami`/pytest-current/integration_outfiles_current/ under a file with a name + similar to: test_worker_*.log. + NOTE: pytest-current/ will have the results of the most recent test run. If you want to see a previous run + check under pytest-/. HOWEVER, only the 3 most recent test runs will be saved. + + :param worker_launch_cmd: The command to launch a worker + """ + self.app.worker_main(worker_launch_cmd) + + def launch_worker(self, worker_name: str, queues: List[str], concurrency: int = 1): + """ + Launch a single worker. We'll add the process that the worker is running in to the list of worker processes. + We'll also create an echo process to simulate a celery worker command that will show up with 'ps ux'. + + :param worker_name: The name to give to the worker + :param queues: A list of queues that the worker will be watching + :param concurrency: The concurrency value of the worker (how many child processes to have the worker spin up) + """ + # Check to make sure we have a unique worker name so we can track all processes + if worker_name in self.worker_processes: + self.stop_all_workers() + raise ValueError(f"The worker {worker_name} is already running. Choose a different name.") + + # Create the launch command for this worker + worker_launch_cmd = ["worker", "-n", worker_name, "-Q", ",".join(queues), "--concurrency", str(concurrency), f"--logfile={worker_name}.log", "--loglevel=DEBUG"] + + # Create an echo command to simulate a running celery worker since our celery worker will be spun up in + # a different process and we won't be able to see it with 'ps ux' like we normally would + echo_process = subprocess.Popen( # pylint: disable=consider-using-with + f"echo 'celery merlin_test_app {' '.join(worker_launch_cmd)}'; sleep inf", + shell=True, + preexec_fn=os.setpgrp, # Make this the parent of the group so we can kill the 'sleep inf' that's spun up + ) + self.echo_processes[worker_name] = echo_process + + # Start the worker in a separate process since it'll take control of the entire process until we kill it + worker_process = multiprocessing.Process(target=self.start_worker, args=(worker_launch_cmd,)) + worker_process.start() + self.worker_processes[worker_name] = worker_process + self.running_workers.append(worker_name) + + # Wait for the worker to launch properly + try: + self._wait_for_worker_launch(worker_name, verbose=False) + except TimeoutError as exc: + self.stop_all_workers() + raise exc + + def launch_workers(self, worker_info: Dict[str, Dict]): + """ + Launch multiple workers. This will call `launch_worker` to launch each worker + individually. + + :param worker_info: A dict of worker info with the form + {"worker_name": {"concurrency": , "queues": }} + """ + for worker_name, worker_settings in worker_info.items(): + self.launch_worker(worker_name, worker_settings["queues"], worker_settings["concurrency"]) + + def stop_worker(self, worker_name: str): + """ + Stop a single running worker and its associated processes. + + :param worker_name: The name of the worker to shutdown + """ + # Send a shutdown signal to the worker + self.app.control.broadcast("shutdown", destination=[f"celery@{worker_name}"]) + + # Try to terminate the process gracefully + if self.worker_processes[worker_name] is not None: + self.worker_processes[worker_name].terminate() + process_exit_code = self.worker_processes[worker_name].join(timeout=3) + + # If it won't terminate then force kill it + if process_exit_code is None: + self.worker_processes[worker_name].kill() + + # Terminate the echo process and its sleep inf subprocess + os.killpg(os.getpgid(self.echo_processes[worker_name].pid), signal.SIGTERM) + + def stop_all_workers(self): + """ + Stop all of the running workers and the processes associated with them. + """ + for worker_name in self.running_workers: + self.stop_worker(worker_name) diff --git a/tests/conftest.py b/tests/conftest.py index 167c4ee53..ecafbf0c1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -31,17 +31,17 @@ This module contains pytest fixtures to be used throughout the entire integration test suite. """ -import multiprocessing import os import subprocess from time import sleep -from typing import Dict, List +from typing import Dict import pytest import redis from _pytest.tmpdir import TempPathFactory from celery import Celery from celery.canvas import Signature +from tests.celery_test_workers import CeleryTestWorkersManager class RedisServerError(Exception): @@ -125,7 +125,7 @@ def redis_server(merlin_server_dir: str, redis_pass: str) -> str: # pylint: dis # Start the local redis server try: # Need to set LC_ALL='C' before starting the server or else redis causes a failure - subprocess.run("export LC_ALL='C'; merlin server start", shell=True, capture_output=True, text=True, timeout=5) + subprocess.run("export LC_ALL='C'; merlin server start", shell=True, timeout=5) except subprocess.TimeoutExpired: pass @@ -189,99 +189,8 @@ def worker_queue_map() -> Dict[str, str]: return {f"test_worker_{i}": f"test_queue_{i}" for i in range(3)} -def are_workers_ready(app: Celery, num_workers: int, verbose: bool = False) -> bool: - """ - Check to see if the workers are up and running yet. - - :param app: The celery app fixture that's connected to our redis server - :param num_workers: An int representing the number of workers we're looking to have started - :param verbose: If true, enable print statements to show where we're at in execution - :returns: True if all workers are running. False otherwise. - """ - app_stats = app.control.inspect().stats() - if verbose: - print(f"app_stats: {app_stats}") - return app_stats is not None and len(app_stats) == num_workers - - -def wait_for_worker_launch(app: Celery, num_workers: int, verbose: bool = False): - """ - Poll the workers over a fixed interval of time. If the workers don't show up - within the time limit then we'll raise a timeout error. Otherwise, the workers - are up and running and we can continue with our tests. - - :param app: The celery app fixture that's connected to our redis server - :param num_workers: An int representing the number of workers we're looking to have started - :param verbose: If true, enable print statements to show where we're at in execution - """ - max_wait_time = 2 # Maximum wait time in seconds - wait_interval = 0.5 # Interval between checks in seconds - waited_time = 0 - - if verbose: - print("waiting for workers to launch...") - - # Wait until all workers are ready - while not are_workers_ready(app, num_workers, verbose=verbose) and waited_time < max_wait_time: - sleep(wait_interval) - waited_time += wait_interval - - # If all workers are not ready after the maximum wait time, raise an error - if not are_workers_ready(app, num_workers, verbose=verbose): - raise TimeoutError("Celery workers did not start within the expected time.") - - if verbose: - print("workers launched") - - -def shutdown_processes(worker_processes: List[multiprocessing.Process], echo_processes: List[subprocess.Popen]): - """ - Given lists of processes, shut them all down. Worker processes were created with the - multiprocessing library and echo processes were created with the subprocess library, - so we have to shut them down slightly differently. - - :param worker_processes: A list of worker processes to terminate - :param echo_processes: A list of echo processes to terminate - """ - # Worker processes were created with the multiprocessing library - for worker_process in worker_processes: - # Try to terminate the process gracefully - worker_process.terminate() - process_exit_code = worker_process.join(timeout=3) - - # If it won't terminate then force kill it - if process_exit_code is None: - worker_process.kill() - - # Gracefully terminate the echo processes - for echo_process in echo_processes: - echo_process.terminate() - echo_process.wait() - - # The echo processes will spawn 3 sleep inf processes that we also need to kill - subprocess.run("ps ux | grep 'sleep inf' | grep -v grep | awk '{print $2}' | xargs kill", shell=True) - - -def start_worker(app: Celery, worker_launch_cmd: List[str]): - """ - This is where a worker is actually started. Each worker maintains control of a process until - we tell it to stop, that's why we have to use the multiprocessing library for this. We have to use - app.worker_main instead of the normal "celery -A worker" command to launch the workers - since our celery app is created in a pytest fixture and is unrecognizable by the celery command. - For each worker, the output of it's logs are sent to - /tmp/`whoami`/pytest-of-`whoami`/pytest-current/integration_outfiles_current/ under a file with a name - similar to: test_worker_*.log. - NOTE: pytest-current/ will have the results of the most recent test run. If you want to see a previous run - check under pytest-/. HOWEVER, only the 3 most recent test runs will be saved. - - :param app: The celery app fixture that's connected to our redis server - :param worker_launch_cmd: The command to launch a worker - """ - app.worker_main(worker_launch_cmd) - - @pytest.fixture(scope="class") -def launch_workers(celery_app: Celery, worker_queue_map: Dict[str, str]): # pylint: disable=redefined-outer-name +def launch_workers(celery_app: Celery, worker_queue_map: Dict[str, str]): """ Launch the workers on the celery app fixture using the worker and queue names defined in the worker_queue_map fixture. @@ -289,37 +198,16 @@ def launch_workers(celery_app: Celery, worker_queue_map: Dict[str, str]): # pyl :param celery_app: The celery app fixture that's connected to our redis server :param worker_queue_map: A dict where the keys are worker names and the values are queue names """ - print(f"launch_workers called") - # Create the processes that will start the workers and store them in a list - worker_processes = [] - echo_processes = [] - for worker, queue in worker_queue_map.items(): - worker_launch_cmd = ["worker", "-n", worker, "-Q", queue, "--concurrency", "1", f"--logfile={worker}.log", "--loglevel=DEBUG"] - - # We have to use this dummy echo command to simulate a celery worker command that will show up with 'ps ux' - # We'll sleep for infinity here and then kill this process during shutdown - echo_process = subprocess.Popen( # pylint: disable=consider-using-with - f"echo 'celery merlin_test_app {' '.join(worker_launch_cmd)}'; sleep inf", shell=True - ) - echo_processes.append(echo_process) - - # We launch workers in their own process since they maintain control of a process until we stop them - worker_process = multiprocessing.Process(target=start_worker, args=(celery_app, worker_launch_cmd)) - worker_process.start() - worker_processes.append(worker_process) - - # Ensure that the workers start properly before letting tests use them - try: - num_workers = len(worker_queue_map) - wait_for_worker_launch(celery_app, num_workers, verbose=False) - except TimeoutError as exc: - # If workers don't launch in time, we need to make sure these processes stop - shutdown_processes(worker_processes, echo_processes) - raise exc - - # Give control to the tests that need to use workers + # Format worker info in a format the our workers manager will be able to read + # (basically just add in concurrency value to worker_queue_map) + worker_info = {worker_name: {"concurrency": 1, "queues": [queue]} for worker_name, queue in worker_queue_map.items()} + + # Create our workers manager and launch our workers + workers_manager = CeleryTestWorkersManager(celery_app) + workers_manager.launch_workers(worker_info) + + # Yield control to the tests that need workers launched yield - # Shut down the workers and terminate the processes - celery_app.control.broadcast("shutdown", destination=list(worker_queue_map.keys())) - shutdown_processes(worker_processes, echo_processes) + # Tests are done so shut down all of our workers + workers_manager.stop_all_workers() diff --git a/tests/unit/study/test_celeryadapter.py b/tests/unit/study/test_celeryadapter.py index d15f5263c..cd81c543c 100644 --- a/tests/unit/study/test_celeryadapter.py +++ b/tests/unit/study/test_celeryadapter.py @@ -33,6 +33,7 @@ from time import sleep from typing import Dict +import pytest from celery import Celery from celery.canvas import Signature from deepdiff import DeepDiff @@ -41,10 +42,16 @@ from merlin.study import celeryadapter +@pytest.fixture(before="TestInactive") class TestActive: """ This class will test functions in the celeryadapter.py module. It will run tests where we need active queues/workers to interact with. + + NOTE: The tests in this class must be ran before the TestInactive class or else the + Celery workers needed for this class don't start + + TODO: fix the bug noted above and then check if we still need pytest-order """ def test_query_celery_queues( @@ -124,7 +131,8 @@ def test_get_active_celery_queues( # Ensure there was no extra output that we weren't expecting assert queue_result == {} assert worker_result == [] - + + @pytest.mark.order(index=1) def test_check_celery_workers_processing_tasks( self, celery_app: Celery, @@ -137,12 +145,14 @@ def test_check_celery_workers_processing_tasks( a task that sleeps for 3 seconds to our workers before we run this test so that there should be a task for this function to find. + NOTE: the celery app fixture shows strange behavior when using app.control.inspect() calls (which + check_celery_workers_processing uses) so we have to run this test first in this class in order to + have it run properly. + :param celery_app: A pytest fixture for the test Celery app :param sleep_sig: A pytest fixture for a celery signature of a task that sleeps for 3 sec :param launch_workers: A pytest fixture that launches celery workers for us to interact with """ - # TODO figure out why this test fails when ran with the other tests - # Our active workers/queues are test_worker_[0-2]/test_queue_[0-2] so we're # sending this to test_queue_0 for test_worker_0 to process queue_for_signature = "test_queue_0" diff --git a/tests/unit/test_debug.py b/tests/unit/test_debug.py new file mode 100644 index 000000000..6a0a97016 --- /dev/null +++ b/tests/unit/test_debug.py @@ -0,0 +1,25 @@ +from typing import Dict +from time import sleep + +from celery import Celery +from celery.canvas import Signature + + +class TestOne: + + def test_one(self, celery_app: Celery, launch_workers: "Fixture", sleep_sig: Signature): # noqa: F821 + # queue_for_signature = "test_queue_0" + # sleep_sig.set(queue=queue_for_signature) + # result = sleep_sig.delay() + + # sleep(1) + + # print(f"active: {celery_app.control.inspect().active()}") + # result.get() + pass + + +class TestTwo: + + def test_one(self, launch_workers: "Fixture", worker_queue_map: Dict[str, str]): # noqa: F821 + pass \ No newline at end of file