diff --git a/pulpcore/tasking/tasks.py b/pulpcore/tasking/tasks.py index 2c55486966..3444f4a0f7 100644 --- a/pulpcore/tasking/tasks.py +++ b/pulpcore/tasking/tasks.py @@ -3,6 +3,7 @@ import contextvars import importlib import logging +import signal import sys import traceback from datetime import timedelta @@ -20,6 +21,7 @@ TASK_STATES, TASK_DISPATCH_LOCK, ) +from pulpcore.exceptions import TimeoutException from pulpcore.tasking.kafka import send_task_notification _logger = logging.getLogger(__name__) @@ -47,10 +49,28 @@ def wakeup_worker(): def execute_task(task): - # This extra stack is needed to isolate the current_task ContextVar contextvars.copy_context().run(_execute_task, task) +def execute_immediate_task(task): + # set alarm timeout + IMMEDIATE_TASK_TIMEOUT = 60 * 5 + + def timeout_handler(signum, frame): + raise TimeoutException(f"Immediate task time: {IMMEDIATE_TASK_TIMEOUT}") + + signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(IMMEDIATE_TASK_TIMEOUT) + + try: + # This extra stack is needed to isolate the current_task ContextVar + contextvars.copy_context().run(_execute_task, task) + except TimeoutException: + raise + finally: + signal.alarm(0) + + def _execute_task(task): # Store the task id in the context for `Task.current()`. current_task.set(task) @@ -226,9 +246,13 @@ def dispatch( ).exists() ): task.unblock() - execute_task(task) - if resources: - notify_workers = True + try: + execute_immediate_task(task) + except TimeoutException: + pass + finally: + if resources: + notify_workers = True elif deferred: notify_workers = True else: