Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds loop delay monitor and logger #138

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion synchronicity/synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
import contextlib
import functools
import inspect
import logging
import platform
import threading
import time
import typing
import warnings
from contextlib import asynccontextmanager
from typing import ForwardRef, Optional

from synchronicity.annotations import evaluated_annotation
Expand All @@ -18,6 +21,9 @@
from .exceptions import UserCodeException, unwrap_coro_exception, wrap_coro_exception
from .interface import Interface

logger = logging.getLogger("synchronicity")


_BUILTIN_ASYNC_METHODS = {
"__aiter__": "__iter__",
"__aenter__": "__enter__",
Expand Down Expand Up @@ -82,6 +88,24 @@ def _type_requires_aio_usage(annotation, declaration_module):
return False



@asynccontextmanager
async def loop_delay_monitor(monitor_period: float, warning_threshold: float):
async def monitor():
while 1:
t0 = time.monotonic()
await asyncio.sleep(monitor_period)
duration = time.monotonic() - t0
delay = duration - monitor_period
if delay >= warning_threshold:
logger.warning(f"Detected an event loop delay of {delay:.2f}s")

loop_task = asyncio.create_task(monitor())
try:
yield
finally:
loop_task.cancel()

def should_have_aio_interface(func):
# determines if a blocking function gets an .aio attribute with an async interface to the function or not
if inspect.iscoroutinefunction(func) or inspect.isasyncgenfunction(func):
Expand All @@ -102,7 +126,11 @@ def __init__(
self,
multiwrap_warning=False,
async_leakage_warning=True,
loop_delay_monitor_period = None, # seconds
loop_delay_monitor_threshold = None # seconds
):
self._loop_delay_monitor_period = loop_delay_monitor_period
self._loop_delay_monitor_threshold = loop_delay_monitor_threshold
self._multiwrap_warning = multiwrap_warning
self._async_leakage_warning = async_leakage_warning
self._loop = None
Expand Down Expand Up @@ -156,7 +184,11 @@ async def loop_inner():
self._loop = asyncio.get_running_loop()
self._stopping = asyncio.Event()
is_ready.set()
await self._stopping.wait() # wait until told to stop
if self._loop_delay_monitor_period:
async with loop_delay_monitor(self._loop_delay_monitor_period, self._loop_delay_monitor_threshold):
await self._stopping.wait() # wait until told to stop
else:
await self._stopping.wait()

try:
asyncio.run(loop_inner())
Expand All @@ -168,6 +200,7 @@ async def loop_inner():
raise exc

self._thread = threading.Thread(target=thread_inner, daemon=True)
self._thread.name = f"synchronicity-thread ({id(self)})"
self._thread.start()
is_ready.wait() # TODO: this might block for a very short time
return self._loop
Expand Down
Loading