diff --git a/CHANGELOG.md b/CHANGELOG.md index d2ef4eea..5ed0e68d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- Merlin manager capability to monitor celery workers. - Added additional tests for the `merlin run` and `merlin purge` commands - Aliased types to represent different types of pytest fixtures - New test condition `StepFinishedFilesCount` to help search for `MERLIN_FINISHED` files in output workspaces diff --git a/merlin/main.py b/merlin/main.py index 6ca79e86..2e033e6d 100644 --- a/merlin/main.py +++ b/merlin/main.py @@ -57,6 +57,7 @@ from merlin.server.server_commands import config_server, init_server, restart_server, start_server, status_server, stop_server from merlin.spec.expansion import RESERVED, get_spec_with_expansion from merlin.spec.specification import MerlinSpec +from merlin.study.celerymanageradapter import run_manager, start_manager, stop_manager from merlin.study.status import DetailedStatus, Status from merlin.study.status_constants import VALID_RETURN_CODES, VALID_STATUS_FILTERS from merlin.study.status_renderers import status_renderer_factory @@ -359,7 +360,7 @@ def stop_workers(args): LOG.warning(f"Worker '{worker_name}' is unexpanded. Target provenance spec instead?") # Send stop command to router - router.stop_workers(args.task_server, worker_names, args.queues, args.workers) + router.stop_workers(args.task_server, worker_names, args.queues, args.workers, args.level.upper()) def print_info(args): @@ -400,6 +401,35 @@ def process_example(args: Namespace) -> None: setup_example(args.workflow, args.path) +def process_manager(args: Namespace): + """ + Process the command for managing the workers. + + This function interprets the command provided in the `args` namespace and + executes the corresponding manager function. It supports three commands: + "run", "start", and "stop". + + :param args: parsed CLI arguments + """ + if args.command == "run": + run_manager(query_frequency=args.query_frequency, query_timeout=args.query_timeout, worker_timeout=args.worker_timeout) + elif args.command == "start": + try: + start_manager( + query_frequency=args.query_frequency, query_timeout=args.query_timeout, worker_timeout=args.worker_timeout + ) + LOG.info("Manager started successfully.") + except Exception as e: + LOG.error(f"Unable to start manager.\n{e}") + elif args.command == "stop": + if stop_manager(): + LOG.info("Manager stopped successfully.") + else: + LOG.error("Unable to stop manager.") + else: + print("Run manager with a command. Try 'merlin manager -h' for more details") + + def process_monitor(args): """ CLI command to monitor merlin workers and queues to keep @@ -905,6 +935,75 @@ def generate_worker_touching_parsers(subparsers: ArgumentParser) -> None: help="regex match for specific workers to stop", ) + # merlin manager + manager: ArgumentParser = subparsers.add_parser( + "manager", + help="Watchdog application to manage workers", + description="A daemon process that helps to restart and communicate with workers while running.", + formatter_class=ArgumentDefaultsHelpFormatter, + ) + manager.set_defaults(func=process_manager) + + def add_manager_options(manager_parser: ArgumentParser): + """ + Add shared options for manager subcommands. + + The `manager run` and `manager start` subcommands have the same options. + Rather than writing duplicate code for these we'll use this function + to add the arguments to these subcommands. + + :param manager_parser: The ArgumentParser object to add these options to + """ + manager_parser.add_argument( + "-qf", + "--query_frequency", + action="store", + type=int, + default=60, + help="The frequency at which workers will be queried for response.", + ) + manager_parser.add_argument( + "-qt", + "--query_timeout", + action="store", + type=float, + default=0.5, + help="The timeout for the query response that are sent to workers.", + ) + manager_parser.add_argument( + "-wt", + "--worker_timeout", + action="store", + type=int, + default=180, + help="The sum total (query_frequency*tries) time before an attempt is made to restart worker.", + ) + + manager_commands: ArgumentParser = manager.add_subparsers(dest="command") + manager_run = manager_commands.add_parser( + "run", + help="Run the daemon process", + description="Run manager", + formatter_class=ArgumentDefaultsHelpFormatter, + ) + add_manager_options(manager_run) + manager_run.set_defaults(func=process_manager) + manager_start = manager_commands.add_parser( + "start", + help="Start the daemon process", + description="Start manager", + formatter_class=ArgumentDefaultsHelpFormatter, + ) + add_manager_options(manager_start) + manager_start.set_defaults(func=process_manager) + manager_stop = manager_commands.add_parser( + "stop", + help="Stop the daemon process", + description="Stop manager", + formatter_class=ArgumentDefaultsHelpFormatter, + ) + manager_stop.set_defaults(func=process_manager) + # merlin monitor monitor: ArgumentParser = subparsers.add_parser( "monitor", diff --git a/merlin/managers/__init__.py b/merlin/managers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/merlin/managers/celerymanager.py b/merlin/managers/celerymanager.py new file mode 100644 index 00000000..fe136d1e --- /dev/null +++ b/merlin/managers/celerymanager.py @@ -0,0 +1,215 @@ +############################################################################### +# Copyright (c) 2023, Lawrence Livermore National Security, LLC. +# Produced at the Lawrence Livermore National Laboratory +# Written by the Merlin dev team, listed in the CONTRIBUTORS file. +# +# +# LLNL-CODE-797170 +# All rights reserved. +# This file is part of Merlin, Version: 1.12.1. +# +# For details, see https://github.com/LLNL/merlin. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +############################################################################### +import logging +import os +import subprocess +import time + +import psutil + +from merlin.managers.redis_connection import RedisConnectionManager + + +LOG = logging.getLogger(__name__) + + +class WorkerStatus: + running = "Running" + stalled = "Stalled" + stopped = "Stopped" + rebooting = "Rebooting" + + +WORKER_INFO = { + "status": WorkerStatus.running, + "pid": -1, + "monitored": 1, # This setting is for debug mode + "num_unresponsive": 0, + "processing_work": 1, +} + + +class CeleryManager: + def __init__(self, query_frequency: int = 60, query_timeout: float = 0.5, worker_timeout: int = 180): + """ + Initializer for Celery Manager + + :param query_frequency: The frequency at which workers will be queried with ping commands + :param query_timeout: The timeout for the query pings that are sent to workers + :param worker_timeout: The sum total(query_frequency*tries) time before an attempt is made to restart worker. + """ + self.query_frequency = query_frequency + self.query_timeout = query_timeout + self.worker_timeout = worker_timeout + + @staticmethod + def get_worker_status_redis_connection() -> RedisConnectionManager: + """Get the redis connection for info regarding the worker and manager status.""" + return RedisConnectionManager(1) + + @staticmethod + def get_worker_args_redis_connection() -> RedisConnectionManager: + """Get the redis connection for info regarding the args used to generate each worker.""" + return RedisConnectionManager(2) + + def get_celery_workers_status(self, workers: list) -> dict: + """ + Get the worker status of a current worker that is being managed + + :param workers: Workers that are checked. + :return: The result dictionary for each worker and the response. + """ + from merlin.celery import app + + celery_app = app.control + ping_result = celery_app.ping(workers, timeout=self.query_timeout) + worker_results = {worker: status for d in ping_result for worker, status in d.items()} + return worker_results + + def stop_celery_worker(self, worker: str) -> bool: + """ + Stop a celery worker by kill the worker with pid + + :param worker: Worker that is being stopped. + :return: The result of whether a worker was stopped. + """ + + # Get the PID associated with the worker + with self.get_worker_status_redis_connection() as worker_status_connect: + worker_pid = int(worker_status_connect.hget(worker, "pid")) + worker_status = worker_status_connect.hget(worker, "status") + + # TODO be wary of stalled state workers (should not happen since we use psutil.Process.kill()) + # Check to see if the pid exists and worker is set as running + if worker_status == WorkerStatus.running and psutil.pid_exists(worker_pid): + # Check to see if the pid is associated with celery + worker_process = psutil.Process(worker_pid) + if "celery" in worker_process.name(): + # Kill the pid if both conditions are right + worker_process.kill() + return True + return False + + def restart_celery_worker(self, worker: str) -> bool: + """ + Restart a celery worker with the same arguements and parameters during its creation + + :param worker: Worker that is being restarted. + :return: The result of whether a worker was restarted. + """ + + # Stop the worker that is currently running (if possible) + self.stop_celery_worker(worker) + + # Start the worker again with the args saved in redis db + with self.get_worker_args_redis_connection() as worker_args_connect, self.get_worker_status_redis_connection() as worker_status_connect: + + # Get the args and remove the worker_cmd from the hash set + args = worker_args_connect.hgetall(worker) + worker_cmd = args["worker_cmd"] + del args["worker_cmd"] + kwargs = args + for key in args: + if args[key].startswith("link:"): + kwargs[key] = worker_args_connect.hgetall(args[key].split(":", 1)[1]) + elif args[key] == "True": + kwargs[key] = True + elif args[key] == "False": + kwargs[key] = False + + # Run the subprocess for the worker and save the PID + process = subprocess.Popen(worker_cmd, **kwargs) + worker_status_connect.hset(worker, "pid", process.pid) + + return True + + def run(self): + """ + Main manager loop for monitoring and managing Celery workers. + + This method continuously monitors the status of Celery workers by + checking their health and attempting to restart any that are + unresponsive. It updates the Redis database with the current + status of the manager and the workers. + """ + manager_info = { + "status": "Running", + "pid": os.getpid(), + } + + with self.get_worker_status_redis_connection() as redis_connection: + LOG.debug(f"MANAGER: setting manager key in redis to hold the following info {manager_info}") + redis_connection.hset("manager", mapping=manager_info) + + # TODO figure out what to do with "processing_work" entry for the merlin monitor + while True: # TODO Make it so that it will stop after a list of workers is stopped + # Get the list of running workers + workers = redis_connection.keys() + LOG.debug(f"MANAGER: workers: {workers}") + workers.remove("manager") + workers = [worker for worker in workers if int(redis_connection.hget(worker, "monitored"))] + LOG.info(f"MANAGER: Monitoring {workers} workers") + + # Check/ Ping each worker to see if they are still running + if workers: + worker_results = self.get_celery_workers_status(workers) + + # If running set the status on redis that it is running + LOG.info(f"MANAGER: Responsive workers: {worker_results.keys()}") + for worker in list(worker_results.keys()): + redis_connection.hset(worker, "status", WorkerStatus.running) + + # If not running attempt to restart it + for worker in workers: + if worker not in worker_results: + LOG.info(f"MANAGER: Worker '{worker}' is unresponsive.") + # If time where the worker is unresponsive is less than the worker time out then just increment + num_unresponsive = int(redis_connection.hget(worker, "num_unresponsive")) + 1 + if num_unresponsive * self.query_frequency < self.worker_timeout: + # Attempt to restart worker + LOG.info(f"MANAGER: Attempting to restart worker '{worker}'...") + if self.restart_celery_worker(worker): + # If successful set the status to running and reset num_unresponsive + redis_connection.hset(worker, "status", WorkerStatus.running) + redis_connection.hset(worker, "num_unresponsive", 0) + LOG.info(f"MANAGER: Worker '{worker}' restarted.") + else: + # If failed set the status to stalled + redis_connection.hset(worker, "status", WorkerStatus.stalled) + LOG.error(f"MANAGER: Could not restart worker '{worker}'.") + else: + redis_connection.hset(worker, "num_unresponsive", num_unresponsive) + # Sleep for the query_frequency for the next iteration + time.sleep(self.query_frequency) + + +if __name__ == "__main__": + cm = CeleryManager() + cm.run() diff --git a/merlin/managers/redis_connection.py b/merlin/managers/redis_connection.py new file mode 100644 index 00000000..c000cabb --- /dev/null +++ b/merlin/managers/redis_connection.py @@ -0,0 +1,91 @@ +############################################################################### +# Copyright (c) 2023, Lawrence Livermore National Security, LLC. +# Produced at the Lawrence Livermore National Laboratory +# Written by the Merlin dev team, listed in the CONTRIBUTORS file. +# +# +# LLNL-CODE-797170 +# All rights reserved. +# This file is part of Merlin, Version: 1.12.2b1. +# +# For details, see https://github.com/LLNL/merlin. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +############################################################################### +""" +This module stores a manager for redis connections. +""" +import logging + +import redis + + +LOG = logging.getLogger(__name__) + + +class RedisConnectionManager: + """ + A context manager for handling redis connections. + This will ensure safe opening and closing of Redis connections. + """ + + def __init__(self, db_num: int): + self.db_num = db_num + self.connection = None + + def __enter__(self): + self.connection = self.get_redis_connection() + return self.connection + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.connection: + LOG.debug(f"MANAGER: Closing connection at db_num: {self.db_num}") + self.connection.close() + + def get_redis_connection(self) -> redis.Redis: + """ + Generic redis connection function to get the results backend redis server with a given db number increment. + + :return: Redis connection object that can be used to access values for the manager. + """ + from merlin.config.configfile import CONFIG + from merlin.config.results_backend import get_backend_password + + password_file = CONFIG.results_backend.password + try: + password = get_backend_password(password_file) + except IOError: + password = None + if hasattr(CONFIG.results_backend, "password"): + password = CONFIG.results_backend.password + + has_ssl = hasattr(CONFIG.results_backend, "cert_reqs") + ssl_cert_reqs = "required" + if has_ssl: + ssl_cert_reqs = CONFIG.results_backend.cert_reqs + + return redis.Redis( + host=CONFIG.results_backend.server, + port=CONFIG.results_backend.port, + db=CONFIG.results_backend.db_num + self.db_num, # Increment db_num to avoid conflicts + username=CONFIG.results_backend.username, + password=password, + decode_responses=True, + ssl=has_ssl, + ssl_cert_reqs=ssl_cert_reqs, + ) diff --git a/merlin/router.py b/merlin/router.py index 55232326..435100a0 100644 --- a/merlin/router.py +++ b/merlin/router.py @@ -190,7 +190,7 @@ def get_workers(task_server): return [] -def stop_workers(task_server, spec_worker_names, queues, workers_regex): +def stop_workers(task_server, spec_worker_names, queues, workers_regex, debug_lvl): """ Stops workers. @@ -198,12 +198,13 @@ def stop_workers(task_server, spec_worker_names, queues, workers_regex): :param `spec_worker_names`: Worker names to stop, drawn from a spec. :param `queues` : The queues to stop :param `workers_regex` : Regex for workers to stop + :param debug_lvl: The debug level to use (INFO, DEBUG, ERROR, etc.) """ LOG.info("Stopping workers...") if task_server == "celery": # pylint: disable=R1705 # Stop workers - stop_celery_workers(queues, spec_worker_names, workers_regex) + stop_celery_workers(queues, spec_worker_names, workers_regex, debug_lvl) else: LOG.error("Celery is not specified as the task server!") diff --git a/merlin/study/celeryadapter.py b/merlin/study/celeryadapter.py index 60bded1b..5f95bf18 100644 --- a/merlin/study/celeryadapter.py +++ b/merlin/study/celeryadapter.py @@ -46,7 +46,9 @@ from merlin.common.dumper import dump_handler from merlin.config import Config +from merlin.managers.celerymanager import CeleryManager, WorkerStatus from merlin.study.batch import batch_check_parallel, batch_worker_launch +from merlin.study.celerymanageradapter import add_monitor_workers, remove_monitor_workers from merlin.utils import apply_list_of_regex, check_machines, get_procs, get_yaml_var, is_running @@ -500,15 +502,22 @@ def check_celery_workers_processing(queues_in_spec: List[str], app: Celery) -> b """ # Query celery for active tasks active_tasks = app.control.inspect().active() + result = False - # Search for the queues we provided if necessary - if active_tasks is not None: - for tasks in active_tasks.values(): - for task in tasks: - if task["delivery_info"]["routing_key"] in queues_in_spec: - return True + with CeleryManager.get_worker_status_redis_connection() as redis_connection: + # Search for the queues we provided if necessary + if active_tasks is not None: + for worker, tasks in active_tasks.items(): + for task in tasks: + if task["delivery_info"]["routing_key"] in queues_in_spec: + result = True - return False + # Set the entry in the Redis DB for the manager to signify if the worker + # is still doing work + worker_still_processing = 1 if result else 0 + redis_connection.hset(worker, "processing_work", worker_still_processing) + + return result def _get_workers_to_start(spec, steps): @@ -761,8 +770,36 @@ def launch_celery_worker(worker_cmd, worker_list, kwargs): :side effect: Launches a celery worker via a subprocess """ try: - _ = subprocess.Popen(worker_cmd, **kwargs) # pylint: disable=R1732 + process = subprocess.Popen(worker_cmd, **kwargs) # pylint: disable=R1732 + # Get the worker name from worker_cmd and add to be monitored by celery manager + worker_cmd_list = worker_cmd.split() + worker_name = worker_cmd_list[worker_cmd_list.index("-n") + 1].replace("%h", kwargs["env"]["HOSTNAME"]) + worker_name = "celery@" + worker_name worker_list.append(worker_cmd) + + # Adding the worker args to redis db + with CeleryManager.get_worker_args_redis_connection() as redis_connection: + args = kwargs.copy() + # Save worker command with the arguements + args["worker_cmd"] = worker_cmd + # Store the nested dictionaries into a separate key with a link. + # Note: This only support single nested dicts(for simplicity) and + # further nesting can be accomplished by making this recursive. + for key in kwargs: + if type(kwargs[key]) is dict: + key_name = worker_name + "_" + key + redis_connection.hmset(name=key_name, mapping=kwargs[key]) + args[key] = "link:" + key_name + if type(kwargs[key]) is bool: + if kwargs[key]: + args[key] = "True" + else: + args[key] = "False" + redis_connection.hmset(name=worker_name, mapping=args) + + # Adding the worker to redis db to be monitored + add_monitor_workers(workers=((worker_name, process.pid),)) + LOG.info(f"Added {worker_name} to be monitored") except Exception as e: # pylint: disable=C0103 LOG.error(f"Cannot start celery workers, {e}") raise @@ -801,7 +838,7 @@ def purge_celery_tasks(queues, force): return subprocess.run(purge_command, shell=True).returncode -def stop_celery_workers(queues=None, spec_worker_names=None, worker_regex=None): # pylint: disable=R0912 +def stop_celery_workers(queues=None, spec_worker_names=None, worker_regex=None, debug_lvl="INFO"): # pylint: disable=R0912 """Send a stop command to celery workers. Default behavior is to stop all connected workers. @@ -866,6 +903,8 @@ def stop_celery_workers(queues=None, spec_worker_names=None, worker_regex=None): if workers_to_stop: LOG.info(f"Sending stop to these workers: {workers_to_stop}") app.control.broadcast("shutdown", destination=workers_to_stop) + remove_entry = False if debug_lvl == "DEBUG" else True + remove_monitor_workers(workers=workers_to_stop, worker_status=WorkerStatus.stopped, remove_entry=remove_entry) else: LOG.warning("No workers found to stop") diff --git a/merlin/study/celerymanageradapter.py b/merlin/study/celerymanageradapter.py new file mode 100644 index 00000000..6dc07bab --- /dev/null +++ b/merlin/study/celerymanageradapter.py @@ -0,0 +1,145 @@ +############################################################################### +# Copyright (c) 2023, Lawrence Livermore National Security, LLC. +# Produced at the Lawrence Livermore National Laboratory +# Written by the Merlin dev team, listed in the CONTRIBUTORS file. +# +# +# LLNL-CODE-797170 +# All rights reserved. +# This file is part of Merlin, Version: 1.12.1. +# +# For details, see https://github.com/LLNL/merlin. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +############################################################################### +import logging +import subprocess + +import psutil + +from merlin.managers.celerymanager import WORKER_INFO, CeleryManager, WorkerStatus + + +LOG = logging.getLogger(__name__) + + +def add_monitor_workers(workers: list): + """ + Adds workers to be monitored by the celery manager. + :param list workers: A list of tuples which includes (worker_name, pid) + """ + if workers is None or len(workers) <= 0: + return + + LOG.info( + f"MANAGER: Attempting to have the manager monitor the following workers {[worker_name for worker_name, _ in workers]}." + ) + monitored_workers = [] + + with CeleryManager.get_worker_status_redis_connection() as redis_connection: + for worker in workers: + LOG.debug(f"MANAGER: Checking if connection for worker '{worker}' exists...") + if redis_connection.exists(worker[0]): + LOG.debug(f"MANAGER: Connection for worker '{worker}' exists. Setting this worker to be monitored") + redis_connection.hset(worker[0], "monitored", 1) + redis_connection.hset(worker[0], "pid", worker[1]) + monitored_workers.append(worker[0]) + else: + LOG.debug(f"MANAGER: Connection for worker '{worker}' does not exist. Not monitoring this worker.") + worker_info = WORKER_INFO + worker_info["pid"] = worker[1] + redis_connection.hmset(name=worker[0], mapping=worker_info) + LOG.info(f"MANAGER: Manager is monitoring the following workers {monitored_workers}.") + + +def remove_monitor_workers(workers: list, worker_status: WorkerStatus = None, remove_entry: bool = True): + """ + Remove workers from being monitored by the celery manager. + :param list workers: A worker names + """ + if workers is None or len(workers) <= 0: + return + with CeleryManager.get_worker_status_redis_connection() as redis_connection: + for worker in workers: + if redis_connection.exists(worker): + redis_connection.hset(worker, "monitored", 0) + if worker_status is not None: + redis_connection.hset(worker, "status", worker_status) + if remove_entry: + redis_connection.delete(worker) + + +def is_manager_runnning() -> bool: + """ + Check to see if the manager is running + + :return: True if manager is running and False if not. + """ + with CeleryManager.get_worker_args_redis_connection() as redis_connection: + manager_status = redis_connection.hgetall("manager") + return manager_status["status"] == WorkerStatus.running and psutil.pid_exists(manager_status["pid"]) + + +def run_manager(query_frequency: int = 60, query_timeout: float = 0.5, worker_timeout: int = 180) -> bool: + """ + A process locking function that calls the celery manager with proper arguments. + + :param query_frequency: The frequency at which workers will be queried with ping commands + :param query_timeout: The timeout for the query pings that are sent to workers + :param worker_timeout: The sum total(query_frequency*tries) time before an attempt is made to restart worker. + """ + celerymanager = CeleryManager(query_frequency=query_frequency, query_timeout=query_timeout, worker_timeout=worker_timeout) + celerymanager.run() + + +def start_manager(query_frequency: int = 60, query_timeout: float = 0.5, worker_timeout: int = 180) -> bool: + """ + A Non-locking function that calls the celery manager with proper arguments. + + :param query_frequency: The frequency at which workers will be queried with ping commands + :param query_timeout: The timeout for the query pings that are sent to workers + :param worker_timeout: The sum total(query_frequency*tries) time before an attempt is made to restart worker. + :return bool: True if the manager was started successfully. + """ + subprocess.Popen( + f"merlin manager run -qf {query_frequency} -qt {query_timeout} -wt {worker_timeout}", + shell=True, + close_fds=True, + stdout=subprocess.PIPE, + ) + return True + + +def stop_manager() -> bool: + """ + Stop the manager process using it's pid. + + :return bool: True if the manager was stopped successfully and False otherwise. + """ + with CeleryManager.get_worker_status_redis_connection() as redis_connection: + LOG.debug(f"MANAGER: manager keys: {redis_connection.hgetall('manager')}") + manager_pid = int(redis_connection.hget("manager", "pid")) + manager_status = redis_connection.hget("manager", "status") + LOG.debug(f"MANAGER: manager_status: {manager_status}") + LOG.debug(f"MANAGER: pid exists: {psutil.pid_exists(manager_pid)}") + + # Check to make sure that the manager is running and the pid exists + if manager_status == WorkerStatus.running and psutil.pid_exists(manager_pid): + psutil.Process(manager_pid).terminate() + return True + return False