diff --git a/avocado/core/app.py b/avocado/core/app.py index 7973e759e7..a494f584c2 100644 --- a/avocado/core/app.py +++ b/avocado/core/app.py @@ -25,7 +25,6 @@ from avocado.core.output import STD_OUTPUT from avocado.core.parser import Parser from avocado.core.settings import settings -from avocado.utils import process class AvocadoApp: @@ -90,13 +89,6 @@ def _run_cli_plugins(self): @staticmethod def _setup_signals(): - def sigterm_handler(signum, frame): # pylint: disable=W0613 - children = process.get_children_pids(os.getpid()) - for child in children: - process.kill_process_tree(int(child)) - raise SystemExit("Terminated") - - signal.signal(signal.SIGTERM, sigterm_handler) if hasattr(signal, "SIGTSTP"): signal.signal(signal.SIGTSTP, signal.SIG_IGN) # ignore ctrl+z diff --git a/avocado/core/nrunner/runner.py b/avocado/core/nrunner/runner.py index 8d92bd84ed..98533856ec 100644 --- a/avocado/core/nrunner/runner.py +++ b/avocado/core/nrunner/runner.py @@ -1,7 +1,14 @@ +import abc +import multiprocessing +import signal import time +import traceback +from avocado.core.exceptions import TestInterrupt from avocado.core.nrunner.runnable import RUNNERS_REGISTRY_STANDALONE_EXECUTABLE from avocado.core.plugin_interfaces import RunnableRunner +from avocado.core.utils import messages +from avocado.utils import process #: The amount of time (in seconds) between each internal status check RUNNER_RUN_CHECK_INTERVAL = 0.01 @@ -86,3 +93,98 @@ def running_loop(self, condition): most_current_execution_state_time = now yield self.prepare_status("running") time.sleep(RUNNER_RUN_CHECK_INTERVAL) + + +class PythonBaseRunner(BaseRunner, abc.ABC): + """ + Base class for Python runners + """ + + def __init__(self): + super().__init__() + self.proc = None + self.sigtstp = multiprocessing.Lock() + self.sigstopped = False + self.timeout = float("inf") + + def signal_handler(self, signum, frame): # pylint: disable=W0613 + if signum == signal.SIGTERM.value: + raise TestInterrupt("Test interrupted: Timeout reached") + elif signum == signal.SIGTSTP.value: + if self.sigstopped: + self.sigstopped = False + sign = signal.SIGCONT + else: + self.sigstopped = True + sign = signal.SIGSTOP + if not self.proc: # Ignore ctrl+z when proc not yet started + return + with self.sigtstp: + self.timeout = float("inf") + process.kill_process_tree(self.proc.pid, sign, False) + + def _monitor(self, time_started, queue): + next_status_time = None + while True: + time.sleep(RUNNER_RUN_CHECK_INTERVAL) + now = time.monotonic() + if queue.empty(): + if next_status_time is None or now > next_status_time: + next_status_time = now + RUNNER_RUN_STATUS_INTERVAL + yield messages.RunningMessage.get() + if (now - time_started) > self.timeout: + self.proc.terminate() + else: + message = queue.get() + if message.get("type") == "early_state": + self.timeout = float(message.get("timeout") or float("inf")) + else: + yield message + if message.get("status") == "finished": + break + while self.sigstopped: + time.sleep(RUNNER_RUN_CHECK_INTERVAL) + + def run(self, runnable): + signal.signal(signal.SIGTSTP, signal.SIG_IGN) + signal.signal(signal.SIGTERM, self.signal_handler) + signal.signal(signal.SIGTSTP, self.signal_handler) + # pylint: disable=W0201 + self.runnable = runnable + yield messages.StartedMessage.get() + try: + queue = multiprocessing.SimpleQueue() + self.proc = multiprocessing.Process( + target=self._run, args=(self.runnable, queue) + ) + while self.sigstopped: + pass + with self.sigtstp: + self.proc.start() + time_started = time.monotonic() + for message in self._monitor(time_started, queue): + yield message + + except TestInterrupt: + self.proc.terminate() + for message in self._monitor(time_started, queue): + yield message + except Exception as e: + yield messages.StderrMessage.get(traceback.format_exc()) + yield messages.FinishedMessage.get( + "error", + fail_reason=str(e), + fail_class=e.__class__.__name__, + traceback=traceback.format_exc(), + ) + + @abc.abstractmethod + def _run(self, runnable, queue): + """ + Run the test + + :param runnable: the runnable object + :type runnable: :class:`Runnable` + :param queue: the queue to put messages + :type queue: :class:`multiprocessing.SimpleQueue` + """ diff --git a/avocado/core/plugin_interfaces.py b/avocado/core/plugin_interfaces.py index 2ea0958394..a2e190ba8e 100644 --- a/avocado/core/plugin_interfaces.py +++ b/avocado/core/plugin_interfaces.py @@ -376,6 +376,28 @@ async def terminate_task(self, runtime_task): :rtype: bool """ + async def stop_task(self, runtime_task): + """Stop already spawned task. + + :param runtime_task: wrapper for a Task with additional runtime + information. + :type runtime_task: :class:`avocado.core.task.runtime.RuntimeTask` + :returns: whether the task has been stopped or not. + :rtype: bool + """ + raise NotImplementedError() + + async def resume_task(self, runtime_task): + """Resume already stopped task. + + :param runtime_task: wrapper for a Task with additional runtime + information. + :type runtime_task: :class:`avocado.core.task.runtime.RuntimeTask` + :returns: whether the task has been resumed or not. + :rtype: bool + """ + raise NotImplementedError() + @staticmethod @abc.abstractmethod async def check_task_requirements(runtime_task): diff --git a/avocado/core/task/runtime.py b/avocado/core/task/runtime.py index 25a86d7601..c150cca849 100644 --- a/avocado/core/task/runtime.py +++ b/avocado/core/task/runtime.py @@ -18,6 +18,7 @@ class RuntimeTaskStatus(Enum): FAIL_TRIAGE = "FINISHED WITH FAILURE ON TRIAGE" FAIL_START = "FINISHED FAILING TO START" STARTED = "STARTED" + PAUSED = "PAUSED" @staticmethod def finished_statuses(): diff --git a/avocado/core/task/statemachine.py b/avocado/core/task/statemachine.py index 3a5b1a089a..8fae8db6f9 100644 --- a/avocado/core/task/statemachine.py +++ b/avocado/core/task/statemachine.py @@ -5,6 +5,7 @@ import time from avocado.core.exceptions import JobFailFast +from avocado.core.output import LOG_UI from avocado.core.task.runtime import RuntimeTaskStatus from avocado.core.teststatus import STATUSES_NOT_OK from avocado.core.utils import messages @@ -493,6 +494,31 @@ async def terminate_tasks_interrupted(self): terminated = await self._terminate_tasks(task_status) await self._send_finished_tasks_message(terminated, "Interrupted by user") + @staticmethod + async def stop_resume_tasks(state_machine, spawner): + async with state_machine.lock: + try: + for runtime_task in state_machine.monitored: + if runtime_task.status == RuntimeTaskStatus.STARTED: + await spawner.stop_task(runtime_task) + runtime_task.status = RuntimeTaskStatus.PAUSED + LOG_UI.warning( + f"{runtime_task.task.identifier}: {runtime_task.status.value}" + ) + elif runtime_task.status == RuntimeTaskStatus.PAUSED: + await spawner.resume_task(runtime_task) + runtime_task.status = RuntimeTaskStatus.STARTED + LOG_UI.warning( + f"{runtime_task.task.identifier}: {runtime_task.status.value}" + ) + except NotImplementedError: + LOG.warning( + f"Sending signals to tasks is not implemented for spawner: {spawner}" + ) + LOG_UI.warning( + f"Sending signals to tasks is not implemented for spawner: {spawner}" + ) + async def run(self): """Pushes Tasks forward and makes them do something with their lives.""" while True: diff --git a/avocado/plugins/runner_nrunner.py b/avocado/plugins/runner_nrunner.py index 0d06483dea..fbfed3fe5d 100644 --- a/avocado/plugins/runner_nrunner.py +++ b/avocado/plugins/runner_nrunner.py @@ -21,6 +21,7 @@ import os import platform import random +import signal import tempfile from avocado.core.dispatcher import SpawnerDispatcher @@ -269,6 +270,10 @@ def _abort_if_missing_runners(runnables): ) raise JobError(msg) + @staticmethod + def signal_handler(spawner, state_machine): + asyncio.create_task(Worker.stop_resume_tasks(state_machine, spawner)) + def run_suite(self, job, test_suite): summary = set() @@ -335,6 +340,11 @@ def run_suite(self, job, test_suite): ] asyncio.ensure_future(self._update_status(job)) loop = asyncio.get_event_loop() + if hasattr(signal, "SIGTSTP"): + loop.add_signal_handler( + signal.SIGTSTP, + lambda: self.signal_handler(spawner, self.tsm), + ) try: try: loop.run_until_complete( diff --git a/avocado/plugins/runners/asset.py b/avocado/plugins/runners/asset.py index 8cb70afdc0..37478a259d 100644 --- a/avocado/plugins/runners/asset.py +++ b/avocado/plugins/runners/asset.py @@ -1,15 +1,14 @@ import sys -import time -from multiprocessing import Process, SimpleQueue, set_start_method +from multiprocessing import set_start_method from avocado.core.nrunner.app import BaseRunnerApp -from avocado.core.nrunner.runner import RUNNER_RUN_STATUS_INTERVAL, BaseRunner +from avocado.core.nrunner.runner import PythonBaseRunner from avocado.core.settings import settings from avocado.utils import data_structures from avocado.utils.asset import Asset -class AssetRunner(BaseRunner): +class AssetRunner(PythonBaseRunner): """Runner for dependencies of type package This runner handles the fetch of files using the Avocado Assets utility. @@ -36,7 +35,7 @@ class AssetRunner(BaseRunner): CONFIGURATION_USED = ["datadir.paths.cache_dirs"] @staticmethod - def _fetch_asset(name, asset_hash, algorithm, locations, cache_dirs, expire, queue): + def _fetch_asset(name, asset_hash, algorithm, locations, cache_dirs, expire): asset_manager = Asset( name, asset_hash, algorithm, locations, cache_dirs, expire @@ -52,50 +51,25 @@ def _fetch_asset(name, asset_hash, algorithm, locations, cache_dirs, expire, que result = "error" stderr = str(exc) - output = {"result": result, "stdout": stdout, "stderr": stderr} - queue.put(output) + return {"result": result, "stdout": stdout, "stderr": stderr} - def run(self, runnable): - # pylint: disable=W0201 - self.runnable = runnable - yield self.prepare_status("started") - - name = self.runnable.kwargs.get("name") + def _run(self, runnable, queue): + name = runnable.kwargs.get("name") # if name was passed correctly, run the Avocado Asset utility if name is not None: - asset_hash = self.runnable.kwargs.get("asset_hash") - algorithm = self.runnable.kwargs.get("algorithm") - locations = self.runnable.kwargs.get("locations") - expire = self.runnable.kwargs.get("expire") + asset_hash = runnable.kwargs.get("asset_hash") + algorithm = runnable.kwargs.get("algorithm") + locations = runnable.kwargs.get("locations") + expire = runnable.kwargs.get("expire") if expire is not None: expire = data_structures.time_to_seconds(str(expire)) - cache_dirs = self.runnable.config.get("datadir.paths.cache_dirs") + cache_dirs = runnable.config.get("datadir.paths.cache_dirs") if cache_dirs is None: cache_dirs = settings.as_dict().get("datadir.paths.cache_dirs") - - # let's spawn it to another process to be able to update the - # status messages and avoid the Asset to lock this process - queue = SimpleQueue() - process = Process( - target=self._fetch_asset, - args=( - name, - asset_hash, - algorithm, - locations, - cache_dirs, - expire, - queue, - ), + output = self._fetch_asset( + name, asset_hash, algorithm, locations, cache_dirs, expire ) - process.start() - - while queue.empty(): - time.sleep(RUNNER_RUN_STATUS_INTERVAL) - yield self.prepare_status("running") - - output = queue.get() result = output["result"] stdout = output["stdout"] stderr = output["stderr"] @@ -104,10 +78,13 @@ def run(self, runnable): result = "error" stdout = "" stderr = 'At least name should be passed as kwargs using name="uri".' - - yield self.prepare_status("running", {"type": "stdout", "log": stdout.encode()}) - yield self.prepare_status("running", {"type": "stderr", "log": stderr.encode()}) - yield self.prepare_status("finished", {"result": result}) + queue.put( + self.prepare_status("running", {"type": "stdout", "log": stdout.encode()}) + ) + queue.put( + self.prepare_status("running", {"type": "stderr", "log": stderr.encode()}) + ) + queue.put(self.prepare_status("finished", {"result": result})) class RunnerApp(BaseRunnerApp): diff --git a/avocado/plugins/runners/avocado_instrumented.py b/avocado/plugins/runners/avocado_instrumented.py index a08ecceaa2..69f0183752 100644 --- a/avocado/plugins/runners/avocado_instrumented.py +++ b/avocado/plugins/runners/avocado_instrumented.py @@ -1,25 +1,18 @@ import multiprocessing import os -import signal import sys import tempfile -import time import traceback -from avocado.core.exceptions import TestInterrupt from avocado.core.nrunner.app import BaseRunnerApp -from avocado.core.nrunner.runner import ( - RUNNER_RUN_CHECK_INTERVAL, - RUNNER_RUN_STATUS_INTERVAL, - BaseRunner, -) +from avocado.core.nrunner.runner import PythonBaseRunner from avocado.core.test import TestID from avocado.core.tree import TreeNodeEnvOnly from avocado.core.utils import loader, messages from avocado.core.varianter import is_empty_variant -class AvocadoInstrumentedTestRunner(BaseRunner): +class AvocadoInstrumentedTestRunner(PythonBaseRunner): """ Runner for avocado-instrumented tests @@ -44,11 +37,6 @@ class and method names should be separated by a ":". One "job.run.store_logging_stream", ] - @staticmethod - def signal_handler(signum, frame): # pylint: disable=W0613 - if signum == signal.SIGTERM.value: - raise TestInterrupt("Test interrupted: Timeout reached") - @staticmethod def _create_params(runnable): """Create params for the test""" @@ -65,8 +53,7 @@ def _create_params(runnable): paths = runnable.variant["paths"] return tree_nodes, paths - @staticmethod - def _run_avocado(runnable, queue): + def _run(self, runnable, queue): try: # This assumes that a proper resolution (see resolver module) # was performed, and that a URI contains: @@ -76,7 +63,6 @@ def _run_avocado(runnable, queue): # # To be defined: if the resolution uri should be composed like # this, or broken down and stored into other data fields - signal.signal(signal.SIGTERM, AvocadoInstrumentedTestRunner.signal_handler) module_path, klass_method = runnable.uri.split(":", 1) klass, method = klass_method.split(".", 1) @@ -137,58 +123,6 @@ def _run_avocado(runnable, queue): ) ) - @staticmethod - def _monitor(proc, time_started, queue): - timeout = float("inf") - next_status_time = None - while True: - time.sleep(RUNNER_RUN_CHECK_INTERVAL) - now = time.monotonic() - if queue.empty(): - if next_status_time is None or now > next_status_time: - next_status_time = now + RUNNER_RUN_STATUS_INTERVAL - yield messages.RunningMessage.get() - if (now - time_started) > timeout: - proc.terminate() - else: - message = queue.get() - if message.get("type") == "early_state": - timeout = float(message.get("timeout") or float("inf")) - else: - yield message - if message.get("status") == "finished": - break - - def run(self, runnable): - # pylint: disable=W0201 - signal.signal(signal.SIGTERM, AvocadoInstrumentedTestRunner.signal_handler) - self.runnable = runnable - yield messages.StartedMessage.get() - try: - queue = multiprocessing.SimpleQueue() - process = multiprocessing.Process( - target=self._run_avocado, args=(self.runnable, queue) - ) - - process.start() - - time_started = time.monotonic() - for message in self._monitor(process, time_started, queue): - yield message - - except TestInterrupt: - process.terminate() - for message in self._monitor(process, time_started, queue): - yield message - except Exception as e: - yield messages.StderrMessage.get(traceback.format_exc()) - yield messages.FinishedMessage.get( - "error", - fail_reason=str(e), - fail_class=e.__class__.__name__, - traceback=traceback.format_exc(), - ) - class RunnerApp(BaseRunnerApp): PROG_NAME = "avocado-runner-avocado-instrumented" diff --git a/avocado/plugins/runners/podman_image.py b/avocado/plugins/runners/podman_image.py index 06299d6b21..5fcfc0fddf 100644 --- a/avocado/plugins/runners/podman_image.py +++ b/avocado/plugins/runners/podman_image.py @@ -1,16 +1,15 @@ import asyncio import logging import sys -import time -from multiprocessing import Process, SimpleQueue, set_start_method +from multiprocessing import set_start_method from avocado.core.nrunner.app import BaseRunnerApp -from avocado.core.nrunner.runner import RUNNER_RUN_STATUS_INTERVAL, BaseRunner +from avocado.core.nrunner.runner import PythonBaseRunner from avocado.core.utils import messages from avocado.utils.podman import AsyncPodman, PodmanException -class PodmanImageRunner(BaseRunner): +class PodmanImageRunner(PythonBaseRunner): """Runner for dependencies of type podman-image This runner handles download and verification. @@ -27,39 +26,27 @@ class PodmanImageRunner(BaseRunner): name = "podman-image" description = f"Runner for dependencies of type {name}" - def _run_podman_pull(self, uri, queue): + def _run(self, runnable, queue): # Silence the podman utility from outputting messages into # the regular handler, which will go to stdout. The # exceptions caught here still contain all the needed # information for debugging in case of errors. - logging.getLogger("avocado.utils.podman").addHandler(logging.NullHandler()) - try: - podman = AsyncPodman() - loop = asyncio.get_event_loop() - loop.run_until_complete(podman.execute("pull", uri)) - queue.put({"result": "pass"}) - except PodmanException as ex: - queue.put( - {"result": "fail", "fail_reason": f"Could not pull podman image: {ex}"} - ) - - def run(self, runnable): - yield messages.StartedMessage.get() - if not runnable.uri: reason = "uri identifying the podman image is required" - yield messages.FinishedMessage.get("error", reason) + queue.put(messages.FinishedMessage.get("error", reason)) else: - queue = SimpleQueue() - process = Process(target=self._run_podman_pull, args=(runnable.uri, queue)) - process.start() - while queue.empty(): - time.sleep(RUNNER_RUN_STATUS_INTERVAL) - yield messages.RunningMessage.get() - - output = queue.get() - result = output.pop("result") - yield messages.FinishedMessage.get(result, **output) + logging.getLogger("avocado.utils.podman").addHandler(logging.NullHandler()) + try: + podman = AsyncPodman() + loop = asyncio.get_event_loop() + loop.run_until_complete(podman.execute("pull", runnable.uri)) + queue.put(messages.FinishedMessage.get(result="pass")) + except PodmanException as ex: + queue.put( + messages.FinishedMessage.get( + result="fail", fail_reason=f"Could not pull podman image: {ex}" + ) + ) class RunnerApp(BaseRunnerApp): diff --git a/avocado/plugins/runners/python_unittest.py b/avocado/plugins/runners/python_unittest.py index de26d273d1..d09b9c8c85 100644 --- a/avocado/plugins/runners/python_unittest.py +++ b/avocado/plugins/runners/python_unittest.py @@ -2,20 +2,15 @@ import multiprocessing import os import sys -import time import traceback from unittest import TestLoader, TextTestRunner from avocado.core.nrunner.app import BaseRunnerApp -from avocado.core.nrunner.runner import ( - RUNNER_RUN_CHECK_INTERVAL, - RUNNER_RUN_STATUS_INTERVAL, - BaseRunner, -) +from avocado.core.nrunner.runner import PythonBaseRunner from avocado.core.utils import messages -class PythonUnittestRunner(BaseRunner): +class PythonUnittestRunner(PythonBaseRunner): """ Runner for Python unittests @@ -87,21 +82,38 @@ def module_class_method(self): return ".".join(unittest) - @classmethod - def _run_unittest(cls, module_path, module_class_method, queue): - sys.path.insert(0, module_path) + def _run(self, runnable, queue): + # pylint: disable=W0201 + self.runnable = runnable + + if not self.module_class_method: + queue.put( + messages.StderrMessage.get( + "Invalid URI: could not be converted to an unittest dotted name." + ) + ) + queue.put( + messages.FinishedMessage.get( + "error", + fail_reason="Invalid URI: could not be converted to an unittest dotted name.", + fail_class=ValueError.__class__.__name__, + traceback=traceback.format_exc(), + ) + ) + + sys.path.insert(0, self.module_path) stream = io.StringIO() try: loader = TestLoader() - suite = loader.loadTestsFromName(module_class_method) + suite = loader.loadTestsFromName(self.module_class_method) except ValueError as ex: msg = "loadTestsFromName error finding unittest {}: {}" queue.put(messages.StderrMessage.get(traceback.format_exc())) queue.put( messages.FinishedMessage.get( "error", - fail_reason=msg.format(module_class_method, str(ex)), + fail_reason=msg.format(self.module_class_method, str(ex)), fail_class=ex.__class__.__name__, traceback=traceback.format_exc(), ) @@ -144,55 +156,6 @@ def _run_unittest(cls, module_path, module_class_method, queue): stream.close() queue.put(messages.FinishedMessage.get(result, fail_reason=fail_reason)) - def run(self, runnable): - # pylint: disable=W0201 - self.runnable = runnable - try: - yield messages.StartedMessage.get() - queue = multiprocessing.SimpleQueue() - process = multiprocessing.Process( - target=self._run_unittest, - args=(self.module_path, self.module_class_method, queue), - ) - - if not self.module_class_method: - raise ValueError( - "Invalid URI: could not be converted to an unittest dotted name." - ) - process.start() - - most_current_execution_state_time = None - while True: - if queue.empty(): - time.sleep(RUNNER_RUN_CHECK_INTERVAL) - now = time.monotonic() - if most_current_execution_state_time is not None: - next_execution_state_mark = ( - most_current_execution_state_time - + RUNNER_RUN_STATUS_INTERVAL - ) - if ( - most_current_execution_state_time is None - or now > next_execution_state_mark - ): - most_current_execution_state_time = now - yield messages.RunningMessage.get() - - else: - message = queue.get() - yield message - if message.get("status") == "finished": - break - - except Exception as e: - yield messages.StderrMessage.get(traceback.format_exc()) - yield messages.FinishedMessage.get( - "error", - fail_reason=str(e), - fail_class=e.__class__.__name__, - traceback=traceback.format_exc(), - ) - class RunnerApp(BaseRunnerApp): PROG_NAME = "avocado-runner-python-unittest" diff --git a/avocado/plugins/runners/sysinfo.py b/avocado/plugins/runners/sysinfo.py index 3096fc98f4..e8171477e3 100644 --- a/avocado/plugins/runners/sysinfo.py +++ b/avocado/plugins/runners/sysinfo.py @@ -1,15 +1,10 @@ import multiprocessing import os import sys -import time import traceback from avocado.core.nrunner.app import BaseRunnerApp -from avocado.core.nrunner.runner import ( - RUNNER_RUN_CHECK_INTERVAL, - RUNNER_RUN_STATUS_INTERVAL, - BaseRunner, -) +from avocado.core.nrunner.runner import PythonBaseRunner from avocado.core.utils import messages from avocado.utils import sysinfo as sysinfo_collectible from avocado.utils.software_manager import manager @@ -124,7 +119,7 @@ def _set_collectibles(self): self.collectibles.add(sysinfo_collectible.Logfile(fail_filename)) -class SysinfoRunner(BaseRunner): +class SysinfoRunner(PythonBaseRunner): """ Runner for gathering sysinfo @@ -146,56 +141,31 @@ class SysinfoRunner(BaseRunner): "sysinfo.collect.locale", ] - def run(self, runnable): + def _run(self, runnable, queue): # pylint: disable=W0201 - self.runnable = runnable - yield self.prepare_status("started") - sysinfo_config = self.runnable.kwargs.get("sysinfo", {}) - test_fail = self.runnable.kwargs.get("test_fail", False) - if self.runnable.uri not in ["pre", "post"]: - yield messages.StderrMessage.get( - f"Unsupported uri " - f"{self.runnable.uri}. " - f"Possible values, 'pre', 'post'" + sysinfo_config = runnable.kwargs.get("sysinfo", {}) + test_fail = runnable.kwargs.get("test_fail", False) + if runnable.uri not in ["pre", "post"]: + queue.put( + messages.StderrMessage.get( + f"Unsupported uri " + f"{self.runnable.uri}. " + f"Possible values, 'pre', 'post'" + ) ) - yield messages.FinishedMessage.get("error") + queue.put(messages.FinishedMessage.get("error")) + if self.runnable.uri == "pre": + sysinfo = PreSysInfo(self.runnable.config, sysinfo_config, queue) + else: + sysinfo = PostSysInfo( + self.runnable.config, sysinfo_config, queue, test_fail + ) try: - queue = multiprocessing.SimpleQueue() - if self.runnable.uri == "pre": - sysinfo = PreSysInfo(self.runnable.config, sysinfo_config, queue) - else: - sysinfo = PostSysInfo( - self.runnable.config, sysinfo_config, queue, test_fail - ) - sysinfo_process = multiprocessing.Process(target=sysinfo.collect) - - sysinfo_process.start() - - most_current_execution_state_time = None - while True: - time.sleep(RUNNER_RUN_CHECK_INTERVAL) - now = time.monotonic() - if queue.empty(): - if most_current_execution_state_time is not None: - next_execution_state_mark = ( - most_current_execution_state_time - + RUNNER_RUN_STATUS_INTERVAL - ) - if ( - most_current_execution_state_time is None - or now > next_execution_state_mark - ): - most_current_execution_state_time = now - yield messages.RunningMessage.get() - else: - message = queue.get() - yield message - if message.get("status") == "finished": - break + sysinfo.collect() except Exception: # pylint: disable=W0703 - yield messages.StderrMessage.get(traceback.format_exc()) - yield messages.FinishedMessage.get("error") + queue.put(messages.StderrMessage.get(traceback.format_exc())) + queue.put(messages.FinishedMessage.get("error")) class RunnerApp(BaseRunnerApp): diff --git a/avocado/plugins/spawners/process.py b/avocado/plugins/spawners/process.py index 0f3294c6b1..8cf7839c0d 100644 --- a/avocado/plugins/spawners/process.py +++ b/avocado/plugins/spawners/process.py @@ -1,5 +1,6 @@ import asyncio import os +import signal import socket from avocado.core.dependencies.requirements import cache @@ -53,12 +54,16 @@ async def spawn_task(self, runtime_task): # pylint: disable=E1133 try: + preexec_fn = None + if "setpgrp" in dir(os): + preexec_fn = os.setpgrp proc = await asyncio.create_subprocess_exec( runner, *args, stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL, env=get_python_path_env_if_egg(), + preexec_fn=preexec_fn ) except (FileNotFoundError, PermissionError): return False @@ -105,6 +110,16 @@ async def terminate_task(self, runtime_task): pass return returncode is not None + async def stop_task(self, runtime_task): + try: + runtime_task.spawner_handle.process.send_signal(signal.SIGTSTP) + except ProcessLookupError: + return False + return + + async def resume_task(self, runtime_task): + await self.stop_task(runtime_task) + @staticmethod async def check_task_requirements(runtime_task): """Check the runtime task requirements needed to be able to run""" diff --git a/docs/source/guides/contributor/chapters/tips.rst b/docs/source/guides/contributor/chapters/tips.rst index bdf4abd29d..27cc097c86 100644 --- a/docs/source/guides/contributor/chapters/tips.rst +++ b/docs/source/guides/contributor/chapters/tips.rst @@ -33,6 +33,24 @@ During the execution look for:: avocado --show avocado.utils.debug run examples/tests/assets.py +Interrupting test +----------------- + +In case you want to "pause" the running test, you can use SIGTSTP (ctrl+z) +signal sent to the main avocado process. This signal is forwarded to test +and it's children processes. To resume testing you repeat the same signal. + +.. note:: + The job timeouts are still enabled on stopped processes. + +.. note:: + It is supported on on process spawner only. + +.. warning:: + This feature is meant only for debugging purposes and it can + cause unreliable behavior especially if the signal is sent during the + test initialization. Therefore use it with caution. + Line-profiler ------------- diff --git a/selftests/check.py b/selftests/check.py index c3d7a433d8..0871932d08 100755 --- a/selftests/check.py +++ b/selftests/check.py @@ -30,7 +30,7 @@ "unit": 682, "jobs": 11, "functional-parallel": 317, - "functional-serial": 7, + "functional-serial": 8, "optional-plugins": 0, "optional-plugins-golang": 2, "optional-plugins-html": 3, diff --git a/selftests/functional/serial/basic.py b/selftests/functional/serial/basic.py new file mode 100644 index 0000000000..74d9af9e96 --- /dev/null +++ b/selftests/functional/serial/basic.py @@ -0,0 +1,55 @@ +import os +import re +import signal +import time + +from avocado.utils import process, script +from selftests.utils import AVOCADO, TestCaseTmpDir + +SLEEP_TEST = """import time + +from avocado import Test + + +class SleepTest(Test): + + timeout = 10 + + def test(self): + self.log.debug("Sleeping starts: %s", time.time()) + time.sleep(5) + self.log.debug("Sleeping ends: %s", time.time()) +""" + + +class RunnerOperationTest(TestCaseTmpDir): + def test_pause(self): + with script.TemporaryScript( + "sleep.py", + SLEEP_TEST, + ) as tst: + cmd_line = f"{AVOCADO} run --disable-sysinfo --job-results-dir {self.tmpdir.name} -- {tst}" + proc = process.SubProcess(cmd_line) + proc.start() + init = True + while init: + output = proc.get_stdout() + if b"STARTED" in output: + init = False + time.sleep(1) + proc.send_signal(signal.SIGTSTP) + time.sleep(10) + proc.send_signal(signal.SIGTSTP) + proc.wait() + full_log_path = os.path.join(self.tmpdir.name, "latest", "full.log") + with open(full_log_path, encoding="utf-8") as full_log_file: + full_log = full_log_file.read() + self.assertIn("SleepTest.test: PAUSED", full_log) + self.assertIn("SleepTest.test: STARTED", full_log) + self.assertIn("Sleeping starts:", full_log) + self.assertIn("Sleeping ends:", full_log) + regex_start = re.search("Sleeping starts: ([0-9]*)", full_log) + regex_end = re.search("Sleeping ends: ([0-9]*)", full_log) + start_time = int(regex_start.group(1)) + end_time = int(regex_end.group(1)) + self.assertGreaterEqual(end_time - start_time, 10)