Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suspend execution of tasks #6090

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions avocado/core/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from avocado.core.output import STD_OUTPUT
from avocado.core.parser import Parser
from avocado.core.settings import settings
from avocado.utils import process


class AvocadoApp:
Expand Down Expand Up @@ -90,13 +89,6 @@ def _run_cli_plugins(self):

@staticmethod
def _setup_signals():
def sigterm_handler(signum, frame): # pylint: disable=W0613
children = process.get_children_pids(os.getpid())
for child in children:
process.kill_process_tree(int(child))
raise SystemExit("Terminated")

signal.signal(signal.SIGTERM, sigterm_handler)
if hasattr(signal, "SIGTSTP"):
signal.signal(signal.SIGTSTP, signal.SIG_IGN) # ignore ctrl+z

Expand Down
102 changes: 102 additions & 0 deletions avocado/core/nrunner/runner.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import abc
import multiprocessing
import signal
import time
import traceback

from avocado.core.exceptions import TestInterrupt
from avocado.core.nrunner.runnable import RUNNERS_REGISTRY_STANDALONE_EXECUTABLE
from avocado.core.plugin_interfaces import RunnableRunner
from avocado.core.utils import messages
from avocado.utils import process

#: The amount of time (in seconds) between each internal status check
RUNNER_RUN_CHECK_INTERVAL = 0.01
Expand Down Expand Up @@ -86,3 +93,98 @@
most_current_execution_state_time = now
yield self.prepare_status("running")
time.sleep(RUNNER_RUN_CHECK_INTERVAL)


class PythonBaseRunner(BaseRunner, abc.ABC):
"""
Base class for Python runners
"""

def __init__(self):
super().__init__()
self.proc = None
self.sigtstp = multiprocessing.Lock()
self.sigstopped = False
self.timeout = float("inf")

def signal_handler(self, signum, frame): # pylint: disable=W0613
if signum == signal.SIGTERM.value:
raise TestInterrupt("Test interrupted: Timeout reached")
elif signum == signal.SIGTSTP.value:
if self.sigstopped:
self.sigstopped = False
sign = signal.SIGCONT
else:
self.sigstopped = True
sign = signal.SIGSTOP
if not self.proc: # Ignore ctrl+z when proc not yet started
return

Check warning on line 121 in avocado/core/nrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

avocado/core/nrunner/runner.py#L121

Added line #L121 was not covered by tests
with self.sigtstp:
self.timeout = float("inf")
process.kill_process_tree(self.proc.pid, sign, False)

def _monitor(self, time_started, queue):
next_status_time = None
while True:
time.sleep(RUNNER_RUN_CHECK_INTERVAL)
now = time.monotonic()
if queue.empty():
if next_status_time is None or now > next_status_time:
next_status_time = now + RUNNER_RUN_STATUS_INTERVAL
yield messages.RunningMessage.get()
if (now - time_started) > self.timeout:
self.proc.terminate()
else:
message = queue.get()
if message.get("type") == "early_state":
self.timeout = float(message.get("timeout") or float("inf"))
else:
yield message
if message.get("status") == "finished":
break
while self.sigstopped:
time.sleep(RUNNER_RUN_CHECK_INTERVAL)

def run(self, runnable):
signal.signal(signal.SIGTSTP, signal.SIG_IGN)
signal.signal(signal.SIGTERM, self.signal_handler)
signal.signal(signal.SIGTSTP, self.signal_handler)
# pylint: disable=W0201
self.runnable = runnable
yield messages.StartedMessage.get()
try:
queue = multiprocessing.SimpleQueue()
self.proc = multiprocessing.Process(
target=self._run, args=(self.runnable, queue)
)
while self.sigstopped:
pass

Check warning on line 161 in avocado/core/nrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

avocado/core/nrunner/runner.py#L161

Added line #L161 was not covered by tests
with self.sigtstp:
self.proc.start()
time_started = time.monotonic()
for message in self._monitor(time_started, queue):
yield message

except TestInterrupt:
self.proc.terminate()
for message in self._monitor(time_started, queue):
yield message
except Exception as e:
yield messages.StderrMessage.get(traceback.format_exc())
yield messages.FinishedMessage.get(

Check warning on line 174 in avocado/core/nrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

avocado/core/nrunner/runner.py#L172-L174

Added lines #L172 - L174 were not covered by tests
"error",
fail_reason=str(e),
fail_class=e.__class__.__name__,
traceback=traceback.format_exc(),
)

@abc.abstractmethod
def _run(self, runnable, queue):
"""
Run the test

:param runnable: the runnable object
:type runnable: :class:`Runnable`
:param queue: the queue to put messages
:type queue: :class:`multiprocessing.SimpleQueue`
"""
22 changes: 22 additions & 0 deletions avocado/core/plugin_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,28 @@
:rtype: bool
"""

async def stop_task(self, runtime_task):
"""Stop already spawned task.

:param runtime_task: wrapper for a Task with additional runtime
information.
:type runtime_task: :class:`avocado.core.task.runtime.RuntimeTask`
:returns: whether the task has been stopped or not.
:rtype: bool
"""
raise NotImplementedError()

Check warning on line 388 in avocado/core/plugin_interfaces.py

View check run for this annotation

Codecov / codecov/patch

avocado/core/plugin_interfaces.py#L388

Added line #L388 was not covered by tests

async def resume_task(self, runtime_task):
"""Resume already stopped task.

:param runtime_task: wrapper for a Task with additional runtime
information.
:type runtime_task: :class:`avocado.core.task.runtime.RuntimeTask`
:returns: whether the task has been resumed or not.
:rtype: bool
"""
raise NotImplementedError()

Check warning on line 399 in avocado/core/plugin_interfaces.py

View check run for this annotation

Codecov / codecov/patch

avocado/core/plugin_interfaces.py#L399

Added line #L399 was not covered by tests

@staticmethod
@abc.abstractmethod
async def check_task_requirements(runtime_task):
Expand Down
1 change: 1 addition & 0 deletions avocado/core/task/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class RuntimeTaskStatus(Enum):
FAIL_TRIAGE = "FINISHED WITH FAILURE ON TRIAGE"
FAIL_START = "FINISHED FAILING TO START"
STARTED = "STARTED"
PAUSED = "PAUSED"

@staticmethod
def finished_statuses():
Expand Down
26 changes: 26 additions & 0 deletions avocado/core/task/statemachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time

from avocado.core.exceptions import JobFailFast
from avocado.core.output import LOG_UI
from avocado.core.task.runtime import RuntimeTaskStatus
from avocado.core.teststatus import STATUSES_NOT_OK
from avocado.core.utils import messages
Expand Down Expand Up @@ -493,6 +494,31 @@
terminated = await self._terminate_tasks(task_status)
await self._send_finished_tasks_message(terminated, "Interrupted by user")

@staticmethod
async def stop_resume_tasks(state_machine, spawner):
async with state_machine.lock:
try:
for runtime_task in state_machine.monitored:
if runtime_task.status == RuntimeTaskStatus.STARTED:
await spawner.stop_task(runtime_task)
runtime_task.status = RuntimeTaskStatus.PAUSED
LOG_UI.warning(
f"{runtime_task.task.identifier}: {runtime_task.status.value}"
)
elif runtime_task.status == RuntimeTaskStatus.PAUSED:
await spawner.resume_task(runtime_task)
runtime_task.status = RuntimeTaskStatus.STARTED
LOG_UI.warning(
f"{runtime_task.task.identifier}: {runtime_task.status.value}"
)
except NotImplementedError:
LOG.warning(

Check warning on line 515 in avocado/core/task/statemachine.py

View check run for this annotation

Codecov / codecov/patch

avocado/core/task/statemachine.py#L514-L515

Added lines #L514 - L515 were not covered by tests
f"Sending signals to tasks is not implemented for spawner: {spawner}"
)
LOG_UI.warning(

Check warning on line 518 in avocado/core/task/statemachine.py

View check run for this annotation

Codecov / codecov/patch

avocado/core/task/statemachine.py#L518

Added line #L518 was not covered by tests
f"Sending signals to tasks is not implemented for spawner: {spawner}"
)

async def run(self):
"""Pushes Tasks forward and makes them do something with their lives."""
while True:
Expand Down
10 changes: 10 additions & 0 deletions avocado/plugins/runner_nrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import os
import platform
import random
import signal
import tempfile

from avocado.core.dispatcher import SpawnerDispatcher
Expand Down Expand Up @@ -269,6 +270,10 @@ def _abort_if_missing_runners(runnables):
)
raise JobError(msg)

@staticmethod
def signal_handler(spawner, state_machine):
asyncio.create_task(Worker.stop_resume_tasks(state_machine, spawner))

def run_suite(self, job, test_suite):
summary = set()

Expand Down Expand Up @@ -335,6 +340,11 @@ def run_suite(self, job, test_suite):
]
asyncio.ensure_future(self._update_status(job))
loop = asyncio.get_event_loop()
if hasattr(signal, "SIGTSTP"):
loop.add_signal_handler(
signal.SIGTSTP,
lambda: self.signal_handler(spawner, self.tsm),
)
try:
try:
loop.run_until_complete(
Expand Down
65 changes: 21 additions & 44 deletions avocado/plugins/runners/asset.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import sys
import time
from multiprocessing import Process, SimpleQueue, set_start_method
from multiprocessing import set_start_method

from avocado.core.nrunner.app import BaseRunnerApp
from avocado.core.nrunner.runner import RUNNER_RUN_STATUS_INTERVAL, BaseRunner
from avocado.core.nrunner.runner import PythonBaseRunner
from avocado.core.settings import settings
from avocado.utils import data_structures
from avocado.utils.asset import Asset


class AssetRunner(BaseRunner):
class AssetRunner(PythonBaseRunner):
"""Runner for dependencies of type package

This runner handles the fetch of files using the Avocado Assets utility.
Expand All @@ -36,7 +35,7 @@ class AssetRunner(BaseRunner):
CONFIGURATION_USED = ["datadir.paths.cache_dirs"]

@staticmethod
def _fetch_asset(name, asset_hash, algorithm, locations, cache_dirs, expire, queue):
def _fetch_asset(name, asset_hash, algorithm, locations, cache_dirs, expire):

asset_manager = Asset(
name, asset_hash, algorithm, locations, cache_dirs, expire
Expand All @@ -52,50 +51,25 @@ def _fetch_asset(name, asset_hash, algorithm, locations, cache_dirs, expire, que
result = "error"
stderr = str(exc)

output = {"result": result, "stdout": stdout, "stderr": stderr}
queue.put(output)
return {"result": result, "stdout": stdout, "stderr": stderr}

def run(self, runnable):
# pylint: disable=W0201
self.runnable = runnable
yield self.prepare_status("started")

name = self.runnable.kwargs.get("name")
def _run(self, runnable, queue):
name = runnable.kwargs.get("name")
# if name was passed correctly, run the Avocado Asset utility
if name is not None:
asset_hash = self.runnable.kwargs.get("asset_hash")
algorithm = self.runnable.kwargs.get("algorithm")
locations = self.runnable.kwargs.get("locations")
expire = self.runnable.kwargs.get("expire")
asset_hash = runnable.kwargs.get("asset_hash")
algorithm = runnable.kwargs.get("algorithm")
locations = runnable.kwargs.get("locations")
expire = runnable.kwargs.get("expire")
if expire is not None:
expire = data_structures.time_to_seconds(str(expire))

cache_dirs = self.runnable.config.get("datadir.paths.cache_dirs")
cache_dirs = runnable.config.get("datadir.paths.cache_dirs")
if cache_dirs is None:
cache_dirs = settings.as_dict().get("datadir.paths.cache_dirs")

# let's spawn it to another process to be able to update the
# status messages and avoid the Asset to lock this process
queue = SimpleQueue()
process = Process(
target=self._fetch_asset,
args=(
name,
asset_hash,
algorithm,
locations,
cache_dirs,
expire,
queue,
),
output = self._fetch_asset(
name, asset_hash, algorithm, locations, cache_dirs, expire
)
process.start()

while queue.empty():
time.sleep(RUNNER_RUN_STATUS_INTERVAL)
yield self.prepare_status("running")

output = queue.get()
result = output["result"]
stdout = output["stdout"]
stderr = output["stderr"]
Expand All @@ -104,10 +78,13 @@ def run(self, runnable):
result = "error"
stdout = ""
stderr = 'At least name should be passed as kwargs using name="uri".'

yield self.prepare_status("running", {"type": "stdout", "log": stdout.encode()})
yield self.prepare_status("running", {"type": "stderr", "log": stderr.encode()})
yield self.prepare_status("finished", {"result": result})
queue.put(
self.prepare_status("running", {"type": "stdout", "log": stdout.encode()})
)
queue.put(
self.prepare_status("running", {"type": "stderr", "log": stderr.encode()})
)
queue.put(self.prepare_status("finished", {"result": result}))


class RunnerApp(BaseRunnerApp):
Expand Down
Loading
Loading