Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make synchronizer a context manager for managing event loop life cycle explicitly #158

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion synchronicity/synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import contextlib
import functools
import inspect
import logging
import platform
import threading
import typing
Expand All @@ -20,6 +21,9 @@
from .exceptions import UserCodeException, unwrap_coro_exception, wrap_coro_exception
from .interface import Interface

logger = logging.getLogger("synchronicity")


_BUILTIN_ASYNC_METHODS = {
"__aiter__": "__iter__",
"__aenter__": "__enter__",
Expand Down Expand Up @@ -136,13 +140,19 @@ def __init__(

self.create_blocking(typing.Generic, "WrappedGeneric", __name__)

atexit.register(self._close_loop)
atexit.register(self._close_loop) # shut down at container exit if we haven't already

_PICKLE_ATTRS = [
"_multiwrap_warning",
"_async_leakage_warning",
]

def __enter__(self):
self._start_loop()

def __exit__(self, *args, **kwargs):
self._close_loop()

def __getstate__(self):
return dict([(attr, getattr(self, attr)) for attr in self._PICKLE_ATTRS])

Expand All @@ -155,6 +165,7 @@ def _start_loop(self):
if self._loop and self._loop.is_running():
return self._loop

logger.debug("starting up synchronicity event loop")
is_ready = threading.Event()

def thread_inner():
Expand Down Expand Up @@ -182,9 +193,11 @@ def _close_loop(self):
if self._thread is not None:
if not self._loop.is_closed():
# This also serves the purpose of waking up an idle loop
logger.debug("start shutting down synchronicity event loop")
self._loop.call_soon_threadsafe(self._stopping.set)
self._thread.join()
self._thread = None
logger.debug("finished shutting down synchronicity event loop")

def __del__(self):
self._close_loop()
Expand Down
8 changes: 8 additions & 0 deletions test/_shutdown.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import asyncio
import logging
import sys

from synchronicity import Synchronizer
from synchronicity.synchronizer import logger

logger.addHandler(logging.StreamHandler(sys.stdout))
logger.setLevel(logging.DEBUG)


async def run():
Expand All @@ -19,7 +25,9 @@ async def run():

s = Synchronizer()

print("calling wrapped func")
try:
s.create_blocking(run)()
except KeyboardInterrupt:
pass
print("eof")
36 changes: 36 additions & 0 deletions test/_shutdown_ctxmgr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import asyncio
import logging
import sys

from synchronicity import Synchronizer
from synchronicity.synchronizer import logger

logger.addHandler(logging.StreamHandler(sys.stdout))
logger.setLevel(logging.DEBUG)


async def run():
try:
while True:
print("running")
await asyncio.sleep(0.3)
except asyncio.CancelledError:
print("cancelled")
raise
finally:
print("stopping")
await asyncio.sleep(0.1)
print("exiting")


s = Synchronizer()
wrapped_func = s.create_blocking(run)

try:
with s:
print("calling wrapped func")
wrapped_func()
except KeyboardInterrupt:
pass

print("eof")
70 changes: 63 additions & 7 deletions test/shutdown_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@
import sys


def assert_prints(p: subprocess.Popen, *messages: str):
for msg in messages:
line = p.stdout.readline()
if not line:
print("STDERR")
print(p.stderr.read())
raise Exception("Unexpected empty line in output, see stderr:")
assert line == msg + "\n"


def test_shutdown():
# We run it in a separate process so we can simulate interrupting it
fn = os.path.join(os.path.dirname(__file__), "_shutdown.py")
Expand All @@ -12,12 +22,58 @@ def test_shutdown():
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env={"PYTHONUNBUFFERED": "1"},
encoding="utf8",
)
assert_prints(
p,
"calling wrapped func",
"starting up synchronicity event loop",
*(["running"] * 3), # wait for 3 "running" messages before siginting the process
)
for i in range(3): # this number doesn't matter, it's a while loop
assert p.stdout.readline() == b"running\n"
p.send_signal(signal.SIGINT)
assert p.stdout.readline() == b"cancelled\n"
assert p.stdout.readline() == b"stopping\n"
assert p.stdout.readline() == b"exiting\n"
stderr_content = p.stderr.read()
assert b"Traceback" not in stderr_content
assert_prints(
p,
"eof",
"start shutting down synchronicity event loop",
"cancelled",
"stopping",
"exiting",
"finished shutting down synchronicity event loop",
)
out, err = p.communicate()
assert out == ""
assert err == ""
assert p.returncode == 0


def test_shutdown_ctx_mgr():
# We run it in a separate process so we can simulate interrupting it
fn = os.path.join(os.path.dirname(__file__), "_shutdown_ctxmgr.py")
p = subprocess.Popen(
[sys.executable, fn],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env={"PYTHONUNBUFFERED": "1"},
encoding="utf8",
)
assert_prints(
p,
"starting up synchronicity event loop", # start up loop explicitly
"calling wrapped func",
*(["running"] * 3),
)
p.send_signal(signal.SIGINT)

assert_prints(
p,
"start shutting down synchronicity event loop",
"cancelled",
"stopping",
"exiting",
"finished shutting down synchronicity event loop", # shut down loop explicitly,
"eof",
)
out, err = p.communicate()
assert out == ""
assert err == ""
assert p.returncode == 0
Loading