From 3418d8737b18ad7597fc00cf4b26f26e8dd305ad Mon Sep 17 00:00:00 2001 From: Philipp Mandler Date: Tue, 12 Dec 2023 17:50:18 +0100 Subject: [PATCH] Fix signal handling of worker when reload is enabled --- worker/run.py | 45 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 39 insertions(+), 6 deletions(-) diff --git a/worker/run.py b/worker/run.py index f96e94be..0527ad87 100644 --- a/worker/run.py +++ b/worker/run.py @@ -3,6 +3,7 @@ import argparse import asyncio import logging +import os import signal import traceback import urllib.parse @@ -50,22 +51,44 @@ def main(): args.websocket_base_url = urllib.parse.urlunparse(sync_url) if args.reload: + logging.info("Reload enabled") path = Path(__file__).parent - p = run_sync_in_process(args) - for _ in watch(path): + stop_watching_event = asyncio.Event() + p = run_sync_in_process(stop_watching_event, args) + + for _ in watch(path, stop_event=stop_watching_event): logging.info("Source code change detected, reloading worker") p.terminate() p.join() - p = run_sync_in_process(args) + p = run_sync_in_process(stop_watching_event, args) else: run_sync(args) -def run_sync_in_process(args): +def run_sync_in_process(stop_watching_event: asyncio.Event, args): p = Process(target=run_sync, args=(args,)) p.start() + + def handle_signal(sig, *args): + """ + Passes signals received by the watcher process to the actual worker process and + stops watching if shutdown is requested. + """ + if p.pid is None: + raise Exception("Process has no PID") + + os.kill(p.pid, sig) + + if sig == signal.SIGTERM: + stop_watching_event.set() + elif sig == signal.SIGUSR1: + stop_watching_event.clear() + + signal.signal(signal.SIGTERM, handle_signal) + signal.signal(signal.SIGUSR1, handle_signal) + return p @@ -85,10 +108,20 @@ async def run(args): from transcribee_worker.worker import Worker # noqa finish_event = asyncio.Event() + + def shutdown(): + logging.info("Gracefully shutting down...") + finish_event.set() + + def cancel_shutdown(): + finish_event.clear() + logging.info("Shutdown canceled") + + loop = asyncio.get_running_loop() # stop the worker gracefully on SIGTERM - asyncio.get_running_loop().add_signal_handler(signal.SIGTERM, finish_event.set) + loop.add_signal_handler(signal.SIGTERM, shutdown) # allow to cancel shutdown via SIGUSR1 - asyncio.get_running_loop().add_signal_handler(signal.SIGUSR1, finish_event.clear) + loop.add_signal_handler(signal.SIGUSR1, cancel_shutdown) worker = Worker( base_url=f"{args.coordinator}/api/v1/tasks",