Skip to content

Commit

Permalink
Merge pull request #243 from zmedico/fix_issue_95_WorkerPool_Wait_for…
Browse files Browse the repository at this point in the history
…_previous_task_in_try_send_to_primary_thread

Add main_thread_only execmodel
  • Loading branch information
RonnyPfannschmidt authored Feb 24, 2024
2 parents c0f794a + 772da33 commit 678df09
Show file tree
Hide file tree
Showing 10 changed files with 202 additions and 34 deletions.
15 changes: 15 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
2.1.0 (UNRELEASED)
------------------

* `#243 <https://github.com/pytest-dev/execnet/pull/243>`__: Added ``main_thread_only``
execmodel which is derived from the thread execmodel and only executes ``remote_exec``
calls in the main thread.

Callers of ``remote_exec`` must use the returned channel to wait for a task to complete
before they call remote_exec again, otherwise the ``remote_exec`` call will fail with a
``concurrent remote_exec would cause deadlock`` error. The main_thread_only execmodel
provides solutions for `#96 <https://github.com/pytest-dev/execnet/issues/96>`__ and
`pytest-dev/pytest-xdist#620 <https://github.com/pytest-dev/pytest-xdist/issues/620>`__
(pending a new `pytest-xdist` release).

Also fixed ``init_popen_io`` to use ``closefd=False`` for shared stdin and stdout file
descriptors, preventing ``Bad file descriptor`` errors triggered by test_stdouterrin_setnull.
* Removed support for Python 3.7.
* Added official support for Python 3.12.


2.0.2 (2023-07-09)
------------------

Expand Down
10 changes: 5 additions & 5 deletions doc/basics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,14 @@ processes then you often want to call ``group.terminate()``
yourself and specify a larger or not timeout.


threading models: gevent, eventlet, thread
===========================================
threading models: gevent, eventlet, thread, main_thread_only
====================================================================

.. versionadded:: 1.2 (status: experimental!)

execnet supports "thread", "eventlet" and "gevent" as thread models
on each of the two sides. You need to decide which model to use
before you create any gateways::
execnet supports "main_thread_only", "thread", "eventlet" and "gevent"
as thread models on each of the two sides. You need to decide which
model to use before you create any gateways::

# content of threadmodel.py
import execnet
Expand Down
79 changes: 66 additions & 13 deletions src/execnet/gateway_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def sleep(self, delay):
raise NotImplementedError()

@abc.abstractmethod
def fdopen(self, fd, mode, bufsize=1):
def fdopen(self, fd, mode, bufsize=1, closefd=True):
raise NotImplementedError()

@abc.abstractmethod
Expand Down Expand Up @@ -113,10 +113,10 @@ def start(self, func, args=()):

return _thread.start_new_thread(func, args)

def fdopen(self, fd, mode, bufsize=1):
def fdopen(self, fd, mode, bufsize=1, closefd=True):
import os

return os.fdopen(fd, mode, bufsize, encoding="utf-8")
return os.fdopen(fd, mode, bufsize, encoding="utf-8", closefd=closefd)

def Lock(self):
import threading
Expand All @@ -134,6 +134,10 @@ def Event(self):
return threading.Event()


class MainThreadOnlyExecModel(ThreadExecModel):
backend = "main_thread_only"


class EventletExecModel(ExecModel):
backend = "eventlet"

Expand Down Expand Up @@ -170,10 +174,10 @@ def start(self, func, args=()):

return eventlet.spawn_n(func, *args)

def fdopen(self, fd, mode, bufsize=1):
def fdopen(self, fd, mode, bufsize=1, closefd=True):
import eventlet.green.os

return eventlet.green.os.fdopen(fd, mode, bufsize)
return eventlet.green.os.fdopen(fd, mode, bufsize, closefd=closefd)

def Lock(self):
import eventlet.green.threading
Expand Down Expand Up @@ -227,11 +231,11 @@ def start(self, func, args=()):

return gevent.spawn(func, *args)

def fdopen(self, fd, mode, bufsize=1):
def fdopen(self, fd, mode, bufsize=1, closefd=True):
# XXX
import gevent.fileobject

return gevent.fileobject.FileObjectThread(fd, mode, bufsize)
return gevent.fileobject.FileObjectThread(fd, mode, bufsize, closefd=closefd)

def Lock(self):
import gevent.lock
Expand All @@ -254,6 +258,8 @@ def get_execmodel(backend):
return backend
if backend == "thread":
return ThreadExecModel()
elif backend == "main_thread_only":
return MainThreadOnlyExecModel()
elif backend == "eventlet":
return EventletExecModel()
elif backend == "gevent":
Expand Down Expand Up @@ -322,7 +328,7 @@ def __init__(self, execmodel, hasprimary=False):
self._shuttingdown = False
self._waitall_events = []
if hasprimary:
if self.execmodel.backend != "thread":
if self.execmodel.backend not in ("thread", "main_thread_only"):
raise ValueError("hasprimary=True requires thread model")
self._primary_thread_task_ready = self.execmodel.Event()
else:
Expand All @@ -332,7 +338,7 @@ def integrate_as_primary_thread(self):
"""integrate the thread with which we are called as a primary
thread for executing functions triggered with spawn().
"""
assert self.execmodel.backend == "thread", self.execmodel
assert self.execmodel.backend in ("thread", "main_thread_only"), self.execmodel
primary_thread_task_ready = self._primary_thread_task_ready
# interacts with code at REF1
while 1:
Expand All @@ -345,7 +351,11 @@ def integrate_as_primary_thread(self):
with self._running_lock:
if self._shuttingdown:
break
primary_thread_task_ready.clear()
# Only clear if _try_send_to_primary_thread has not
# yet set the next self._primary_thread_task reply
# after waiting for this one to complete.
if reply is self._primary_thread_task:
primary_thread_task_ready.clear()

def trigger_shutdown(self):
with self._running_lock:
Expand Down Expand Up @@ -376,6 +386,19 @@ def _try_send_to_primary_thread(self, reply):
# wake up primary thread
primary_thread_task_ready.set()
return True
elif (
self.execmodel.backend == "main_thread_only"
and self._primary_thread_task is not None
):
self._primary_thread_task.waitfinish()
self._primary_thread_task = reply
# wake up primary thread (it's okay if this is already set
# because we waited for the previous task to finish above
# and integrate_as_primary_thread will not clear it when
# it enters self._running_lock if it detects that a new
# task is available)
primary_thread_task_ready.set()
return True
return False

def spawn(self, func, *args, **kwargs):
Expand Down Expand Up @@ -857,6 +880,9 @@ def reconfigure(self, py2str_as_py3str=True, py3str_as_py2str=False):

ENDMARKER = object()
INTERRUPT_TEXT = "keyboard-interrupted"
MAIN_THREAD_ONLY_DEADLOCK_TEXT = (
"concurrent remote_exec would cause deadlock for main_thread_only execmodel"
)


class ChannelFactory:
Expand Down Expand Up @@ -1105,6 +1131,20 @@ def join(self, timeout=None):

class WorkerGateway(BaseGateway):
def _local_schedulexec(self, channel, sourcetask):
if self._execpool.execmodel.backend == "main_thread_only":
# It's necessary to wait for a short time in order to ensure
# that we do not report a false-positive deadlock error, since
# channel close does not elicit a response that would provide
# a guarantee to remote_exec callers that the previous task
# has released the main thread. If the timeout expires then it
# should be practically impossible to report a false-positive.
if not self._executetask_complete.wait(timeout=1):
channel.close(MAIN_THREAD_ONLY_DEADLOCK_TEXT)
return
# It's only safe to clear here because the above wait proves
# that there is not a previous task about to set it again.
self._executetask_complete.clear()

sourcetask = loads_internal(sourcetask)
self._execpool.spawn(self.executetask, (channel, sourcetask))

Expand Down Expand Up @@ -1132,8 +1172,14 @@ def serve(self):
def trace(msg):
self._trace("[serve] " + msg)

hasprimary = self.execmodel.backend == "thread"
hasprimary = self.execmodel.backend in ("thread", "main_thread_only")
self._execpool = WorkerPool(self.execmodel, hasprimary=hasprimary)
self._executetask_complete = None
if self.execmodel.backend == "main_thread_only":
self._executetask_complete = self.execmodel.Event()
# Initialize state to indicate that there is no previous task
# executing so that we don't need a separate flag to track this.
self._executetask_complete.set()
trace("spawning receiver thread")
self._initreceive()
try:
Expand Down Expand Up @@ -1176,6 +1222,11 @@ def executetask(self, item):
return
self._trace("ignoring EOFError because receiving finished")
channel.close()
if self._executetask_complete is not None:
# Indicate that this task has finished executing, meaning
# that there is no possibility of it triggering a deadlock
# for the next spawn call.
self._executetask_complete.set()


#
Expand Down Expand Up @@ -1631,8 +1682,10 @@ def init_popen_io(execmodel):
os.dup2(fd, 2)
os.close(fd)
io = Popen2IO(stdout, stdin, execmodel)
sys.stdin = execmodel.fdopen(0, "r", 1)
sys.stdout = execmodel.fdopen(1, "w", 1)
# Use closefd=False since 0 and 1 are shared with
# sys.__stdin__ and sys.__stdout__.
sys.stdin = execmodel.fdopen(0, "r", 1, closefd=False)
sys.stdout = execmodel.fdopen(1, "w", 1, closefd=False)
return io


Expand Down
2 changes: 1 addition & 1 deletion src/execnet/multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def makegateway(self, spec=None):
id=<string> specifies the gateway id
python=<path> specifies which python interpreter to execute
execmodel=model 'thread', 'eventlet', 'gevent' model for execution
execmodel=model 'thread', 'main_thread_only', 'eventlet', 'gevent' model for execution
chdir=<path> specifies to which directory to change
nice=<path> specifies process priority of new process
env:NAME=value specifies a remote environment variable setting.
Expand Down
8 changes: 5 additions & 3 deletions testing/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def anypython(request):
pytest.skip(f"no {name} found")
if "execmodel" in request.fixturenames and name != "sys.executable":
backend = request.getfixturevalue("execmodel").backend
if backend != "thread":
if backend not in ("thread", "main_thread_only"):
pytest.xfail(f"cannot run {backend!r} execmodel with bare {name}")
return executable

Expand Down Expand Up @@ -173,9 +173,11 @@ def gw(request, execmodel, group):
return gw


@pytest.fixture(params=["thread", "eventlet", "gevent"], scope="session")
@pytest.fixture(
params=["thread", "main_thread_only", "eventlet", "gevent"], scope="session"
)
def execmodel(request):
if request.param != "thread":
if request.param not in ("thread", "main_thread_only"):
pytest.importorskip(request.param)
if request.param in ("eventlet", "gevent") and sys.platform == "win32":
pytest.xfail(request.param + " does not work on win32")
Expand Down
25 changes: 19 additions & 6 deletions testing/test_basics.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,25 @@ class Arg:

@pytest.mark.skipif("not hasattr(os, 'dup')")
def test_stdouterrin_setnull(execmodel, capfd):
gateway_base.init_popen_io(execmodel)
os.write(1, b"hello")
os.read(0, 1)
out, err = capfd.readouterr()
assert not out
assert not err
# Backup and restore stdin state, and rely on capfd to handle
# this for stdout and stderr.
orig_stdin = sys.stdin
orig_stdin_fd = os.dup(0)
try:
# The returned Popen2IO instance can be garbage collected
# prematurely since we don't hold a reference here, but we
# tolerate this because it is intended to leave behind a
# sane state afterwards.
gateway_base.init_popen_io(execmodel)
os.write(1, b"hello")
os.read(0, 1)
out, err = capfd.readouterr()
assert not out
assert not err
finally:
sys.stdin = orig_stdin
os.dup2(orig_stdin_fd, 0)
os.close(orig_stdin_fd)


class PseudoChannel:
Expand Down
79 changes: 79 additions & 0 deletions testing/test_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,3 +525,82 @@ def sendback(channel):
if interleave_getstatus:
print(gw.remote_status())
assert ch.receive(timeout=0.5) == 1234


def test_assert_main_thread_only(execmodel, makegateway):
if execmodel.backend != "main_thread_only":
pytest.skip("can only run with main_thread_only")

gw = makegateway(spec=f"execmodel={execmodel.backend}//popen")

try:
# Submit multiple remote_exec requests in quick succession and
# assert that all tasks execute in the main thread. It is
# necessary to call receive on each channel before the next
# remote_exec call, since the channel will raise an error if
# concurrent remote_exec requests are submitted as in
# test_main_thread_only_concurrent_remote_exec_deadlock.
for i in range(10):
ch = gw.remote_exec(
"""
import time, threading
time.sleep(0.02)
channel.send(threading.current_thread() is threading.main_thread())
"""
)

try:
res = ch.receive()
finally:
ch.close()
# This doesn't actually block because we closed
# the channel already, but it does check for remote
# errors and raise them.
ch.waitclose()
if res is not True:
pytest.fail("remote raised\n%s" % res)
finally:
gw.exit()
gw.join()


def test_main_thread_only_concurrent_remote_exec_deadlock(execmodel, makegateway):
if execmodel.backend != "main_thread_only":
pytest.skip("can only run with main_thread_only")

gw = makegateway(spec=f"execmodel={execmodel.backend}//popen")
channels = []
try:
# Submit multiple remote_exec requests in quick succession and
# assert that MAIN_THREAD_ONLY_DEADLOCK_TEXT is raised if
# concurrent remote_exec requests are submitted for the
# main_thread_only execmodel (as compensation for the lack of
# back pressure in remote_exec calls which do not attempt to
# block until the remote main thread is idle).
for i in range(2):
channels.append(
gw.remote_exec(
"""
import threading
channel.send(threading.current_thread() is threading.main_thread())
# Wait forever, ensuring that the deadlock case triggers.
channel.gateway.execmodel.Event().wait()
"""
)
)

expected_results = (
True,
execnet.gateway_base.MAIN_THREAD_ONLY_DEADLOCK_TEXT,
)
for expected, ch in zip(expected_results, channels):
try:
res = ch.receive()
except execnet.RemoteError as e:
res = e.formatted
assert res == expected
finally:
for ch in channels:
ch.close()
gw.exit()
gw.join()
Loading

0 comments on commit 678df09

Please sign in to comment.