From 29393f418428e88f23b42ece03f3a9d44bb9a3e7 Mon Sep 17 00:00:00 2001 From: Ethan Van Der Heijden Date: Sun, 22 Sep 2024 03:09:30 -0400 Subject: [PATCH 1/4] fix: properly clean up threads when stopping Inotify. Improve Eventlet tests. (#1070) * Improve cleaning up Inotify threads and add eventlet test cases. * Align SkipRepeatsQueue with Eventlet's Queue implementation. * Only run eventlet tests in Linux. --- src/watchdog/observers/inotify_c.py | 42 +++++++++++++++++--- src/watchdog/utils/bricks.py | 13 +++--- tests/isolated/eventlet_observer_stops.py | 30 ++++++++++++++ tests/isolated/eventlet_skip_repeat_queue.py | 33 +++++++++++++++ tests/markers.py | 7 ---- tests/test_inotify_c.py | 11 ++++- tests/test_isolated.py | 24 +++++++++++ tests/test_skip_repeats_queue.py | 17 +------- tests/utils.py | 28 +++++++++++++ 9 files changed, 168 insertions(+), 37 deletions(-) create mode 100644 tests/isolated/eventlet_observer_stops.py create mode 100644 tests/isolated/eventlet_skip_repeat_queue.py delete mode 100644 tests/markers.py create mode 100644 tests/test_isolated.py diff --git a/src/watchdog/observers/inotify_c.py b/src/watchdog/observers/inotify_c.py index d6765e14..74c74a6f 100644 --- a/src/watchdog/observers/inotify_c.py +++ b/src/watchdog/observers/inotify_c.py @@ -5,6 +5,7 @@ import ctypes.util import errno import os +import select import struct import threading from ctypes import c_char_p, c_int, c_uint32 @@ -148,6 +149,9 @@ def __init__(self, path: bytes, *, recursive: bool = False, event_mask: int | No Inotify._raise_error() self._inotify_fd = inotify_fd self._lock = threading.Lock() + self._closed = False + self._waiting_to_read = True + self._kill_r, self._kill_w = os.pipe() # Stores the watch descriptor for a given path. self._wd_for_path: dict[bytes, int] = {} @@ -230,13 +234,19 @@ def remove_watch(self, path: bytes) -> None: def close(self) -> None: """Closes the inotify instance and removes all associated watches.""" with self._lock: - if self._path in self._wd_for_path: - wd = self._wd_for_path[self._path] - inotify_rm_watch(self._inotify_fd, wd) + if not self._closed: + self._closed = True - # descriptor may be invalid because file was deleted - with contextlib.suppress(OSError): - os.close(self._inotify_fd) + if self._path in self._wd_for_path: + wd = self._wd_for_path[self._path] + inotify_rm_watch(self._inotify_fd, wd) + + if self._waiting_to_read: + # inotify_rm_watch() should write data to _inotify_fd and wake + # the thread, but writing to the kill channel will gaurentee this + os.write(self._kill_w, b'!') + else: + self._close_resources() def read_events(self, *, event_buffer_size: int = DEFAULT_EVENT_BUFFER_SIZE) -> list[InotifyEvent]: """Reads events from inotify and yields them.""" @@ -276,6 +286,21 @@ def _recursive_simulate(src_path: bytes) -> list[InotifyEvent]: event_buffer = None while True: try: + with self._lock: + if self._closed: + return [] + + self._waiting_to_read = True + + select.select([self._inotify_fd, self._kill_r], [], []) + + with self._lock: + self._waiting_to_read = False + + if self._closed: + self._close_resources() + return [] + event_buffer = os.read(self._inotify_fd, event_buffer_size) except OSError as e: if e.errno == errno.EINTR: @@ -340,6 +365,11 @@ def _recursive_simulate(src_path: bytes) -> list[InotifyEvent]: return event_list + def _close_resources(self): + os.close(self._inotify_fd) + os.close(self._kill_r) + os.close(self._kill_w) + # Non-synchronized methods. def _add_dir_watch(self, path: bytes, mask: int, *, recursive: bool) -> None: """Adds a watch (optionally recursively) for the given directory path diff --git a/src/watchdog/utils/bricks.py b/src/watchdog/utils/bricks.py index 8dd0afa7..cdf9af23 100644 --- a/src/watchdog/utils/bricks.py +++ b/src/watchdog/utils/bricks.py @@ -72,14 +72,13 @@ def _init(self, maxsize: int) -> None: super()._init(maxsize) self._last_item = None - def _put(self, item: Any) -> None: + def put(self, item: Any, block: bool = True, timeout: float | None = None) -> None: if self._last_item is None or item != self._last_item: - super()._put(item) - self._last_item = item - else: - # `put` increments `unfinished_tasks` even if we did not put - # anything into the queue here - self.unfinished_tasks -= 1 + super().put(item, block, timeout) + + def _put(self, item: Any) -> None: + super()._put(item) + self._last_item = item def _get(self) -> Any: item = super()._get() diff --git a/tests/isolated/eventlet_observer_stops.py b/tests/isolated/eventlet_observer_stops.py new file mode 100644 index 00000000..1cf82bdd --- /dev/null +++ b/tests/isolated/eventlet_observer_stops.py @@ -0,0 +1,30 @@ +if __name__ == '__main__': + import eventlet + + eventlet.monkey_patch() + + import signal + import sys + import tempfile + + from watchdog.observers import Observer + from watchdog.events import LoggingEventHandler + + with tempfile.TemporaryDirectory() as temp_dir: + def run_observer(): + event_handler = LoggingEventHandler() + observer = Observer() + observer.schedule(event_handler, temp_dir) + observer.start() + eventlet.sleep(1) + observer.stop() + + def on_alarm(signum, frame): + print("Observer.stop() never finished!", file=sys.stderr) + sys.exit(1) + + signal.signal(signal.SIGALRM, on_alarm) + signal.alarm(4) + + thread = eventlet.spawn(run_observer) + thread.wait() diff --git a/tests/isolated/eventlet_skip_repeat_queue.py b/tests/isolated/eventlet_skip_repeat_queue.py new file mode 100644 index 00000000..05373934 --- /dev/null +++ b/tests/isolated/eventlet_skip_repeat_queue.py @@ -0,0 +1,33 @@ +if __name__ == '__main__': + import eventlet + + eventlet.monkey_patch() + + from watchdog.utils.bricks import SkipRepeatsQueue + + q = SkipRepeatsQueue(10) + q.put('A') + q.put('A') + q.put('A') + q.put('A') + q.put('B') + q.put('A') + + value = q.get() + assert value == 'A' + q.task_done() + + assert q.unfinished_tasks == 2 + + value = q.get() + assert value == 'B' + q.task_done() + + assert q.unfinished_tasks == 1 + + value = q.get() + assert value == 'A' + q.task_done() + + assert q.empty() + assert q.unfinished_tasks == 0 diff --git a/tests/markers.py b/tests/markers.py deleted file mode 100644 index 66291fca..00000000 --- a/tests/markers.py +++ /dev/null @@ -1,7 +0,0 @@ -from __future__ import annotations - -from platform import python_implementation - -import pytest - -cpython_only = pytest.mark.skipif(python_implementation() != "CPython", reason="CPython only.") diff --git a/tests/test_inotify_c.py b/tests/test_inotify_c.py index 5b34e6c5..8d4b59d4 100644 --- a/tests/test_inotify_c.py +++ b/tests/test_inotify_c.py @@ -11,6 +11,7 @@ import errno import logging import os +import select import struct from typing import TYPE_CHECKING from unittest.mock import patch @@ -56,6 +57,13 @@ def test_late_double_deletion(helper: Helper, p: P, event_queue: TestEventQueue, + struct_inotify(wd=3, mask=const.IN_IGNORED) ) + select_bkp = select.select + + def fakeselect(read_list, *args, **kwargs): + if inotify_fd in read_list: + return [inotify_fd], [], [] + return select_bkp(read_list, *args, **kwargs) + os_read_bkp = os.read def fakeread(fd, length): @@ -92,8 +100,9 @@ def inotify_rm_watch(fd, wd): mock3 = patch.object(inotify_c, "inotify_init", new=inotify_init) mock4 = patch.object(inotify_c, "inotify_add_watch", new=inotify_add_watch) mock5 = patch.object(inotify_c, "inotify_rm_watch", new=inotify_rm_watch) + mock6 = patch.object(select, "select", new=fakeselect) - with mock1, mock2, mock3, mock4, mock5: + with mock1, mock2, mock3, mock4, mock5, mock6: start_watching(path=p("")) # Watchdog Events for evt_cls in [DirCreatedEvent, DirDeletedEvent] * 2: diff --git a/tests/test_isolated.py b/tests/test_isolated.py new file mode 100644 index 00000000..2d3ff972 --- /dev/null +++ b/tests/test_isolated.py @@ -0,0 +1,24 @@ +import pytest +import importlib + +from watchdog.utils import platform + +from .utils import run_isolated_test + + +# Kqueue isn't supported by Eventlet, so BSD is out +# Current usage ReadDirectoryChangesW on Windows is blocking, though async may be possible +@pytest.mark.skipif(not platform.is_linux(), reason="Eventlet only supported in Linux") +def test_observer_stops_in_eventlet(): + if not importlib.util.find_spec('eventlet'): + pytest.skip("eventlet not installed") + + run_isolated_test('eventlet_observer_stops.py') + + +@pytest.mark.skipif(not platform.is_linux(), reason="Eventlet only supported in Linux") +def test_eventlet_skip_repeat_queue(): + if not importlib.util.find_spec('eventlet'): + pytest.skip("eventlet not installed") + + run_isolated_test('eventlet_skip_repeat_queue.py') diff --git a/tests/test_skip_repeats_queue.py b/tests/test_skip_repeats_queue.py index 27b1dfe4..d6f0f411 100644 --- a/tests/test_skip_repeats_queue.py +++ b/tests/test_skip_repeats_queue.py @@ -1,14 +1,10 @@ from __future__ import annotations -import pytest - from watchdog import events from watchdog.utils.bricks import SkipRepeatsQueue -from .markers import cpython_only - -def basic_actions(): +def test_basic_queue(): q = SkipRepeatsQueue() e1 = (2, "fred") @@ -25,10 +21,6 @@ def basic_actions(): assert q.empty() -def test_basic_queue(): - basic_actions() - - def test_allow_nonconsecutive(): q = SkipRepeatsQueue() @@ -86,10 +78,3 @@ def test_consecutives_allowed_across_empties(): q.put(e1) # this repeat is allowed because 'last' added is now gone from queue assert e1 == q.get() assert q.empty() - - -@cpython_only -def test_eventlet_monkey_patching(): - eventlet = pytest.importorskip("eventlet") - eventlet.monkey_patch() - basic_actions() diff --git a/tests/utils.py b/tests/utils.py index 8b82f3a8..d8f05b15 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -2,6 +2,8 @@ import dataclasses import os +import subprocess +import sys from queue import Queue from typing import Protocol @@ -97,3 +99,29 @@ def close(self) -> None: alive = [emitter.is_alive() for emitter in self.emitters] self.emitters = [] assert alive == [False] * len(alive) + + +def run_isolated_test(path): + ISOALTED_TEST_PREFIX = os.path.join('tests', 'isolated') + path = os.path.abspath(os.path.join(ISOALTED_TEST_PREFIX, path)) + + src_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'src') + new_env = os.environ.copy() + new_env['PYTHONPATH'] = os.pathsep.join(sys.path + [src_dir]) + + new_argv = [sys.executable, path] + + p = subprocess.Popen( + new_argv, + env=new_env, + ) + + # in case test goes haywire, don't let it run forever + timeout = 10 + try: + p.communicate(timeout=timeout) + except subprocess.TimeoutExpired: + p.kill() + assert False, 'timed out' + + assert p.returncode == 0 From 59650f8fe72ac498374434a10a3dd49f3c3d1429 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micka=C3=ABl=20Schoentgen?= Date: Fri, 27 Sep 2024 17:46:54 +0200 Subject: [PATCH 2/4] fix: polish #1070 --- src/watchdog/observers/inotify_c.py | 4 ++-- src/watchdog/utils/bricks.py | 5 ++++- tests/isolated/__init__.py | 0 tests/isolated/eventlet_observer_stops.py | 7 ++++--- tests/isolated/eventlet_skip_repeat_queue.py | 20 ++++++++++---------- tests/test_isolated.py | 11 ++++++----- tests/utils.py | 10 +++++----- tox.ini | 2 +- 8 files changed, 32 insertions(+), 27 deletions(-) create mode 100644 tests/isolated/__init__.py diff --git a/src/watchdog/observers/inotify_c.py b/src/watchdog/observers/inotify_c.py index 74c74a6f..bb2b1aa6 100644 --- a/src/watchdog/observers/inotify_c.py +++ b/src/watchdog/observers/inotify_c.py @@ -244,7 +244,7 @@ def close(self) -> None: if self._waiting_to_read: # inotify_rm_watch() should write data to _inotify_fd and wake # the thread, but writing to the kill channel will gaurentee this - os.write(self._kill_w, b'!') + os.write(self._kill_w, b"!") else: self._close_resources() @@ -365,7 +365,7 @@ def _recursive_simulate(src_path: bytes) -> list[InotifyEvent]: return event_list - def _close_resources(self): + def _close_resources(self) -> None: os.close(self._inotify_fd) os.close(self._kill_r) os.close(self._kill_w) diff --git a/src/watchdog/utils/bricks.py b/src/watchdog/utils/bricks.py index cdf9af23..6aca8e42 100644 --- a/src/watchdog/utils/bricks.py +++ b/src/watchdog/utils/bricks.py @@ -72,7 +72,10 @@ def _init(self, maxsize: int) -> None: super()._init(maxsize) self._last_item = None - def put(self, item: Any, block: bool = True, timeout: float | None = None) -> None: + def put(self, item: Any, block: bool = True, timeout: float | None = None) -> None: # noqa: FBT001,FBT002 + """This method will be used by `eventlet`, when enabled, so we cannot use force proper keyword-only + arguments nor touch the signature. Also, the `timeout` argument will be ignored in that case. + """ if self._last_item is None or item != self._last_item: super().put(item, block, timeout) diff --git a/tests/isolated/__init__.py b/tests/isolated/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/isolated/eventlet_observer_stops.py b/tests/isolated/eventlet_observer_stops.py index 1cf82bdd..9586861e 100644 --- a/tests/isolated/eventlet_observer_stops.py +++ b/tests/isolated/eventlet_observer_stops.py @@ -1,4 +1,4 @@ -if __name__ == '__main__': +if __name__ == "__main__": import eventlet eventlet.monkey_patch() @@ -7,10 +7,11 @@ import sys import tempfile - from watchdog.observers import Observer from watchdog.events import LoggingEventHandler + from watchdog.observers import Observer with tempfile.TemporaryDirectory() as temp_dir: + def run_observer(): event_handler = LoggingEventHandler() observer = Observer() @@ -20,7 +21,7 @@ def run_observer(): observer.stop() def on_alarm(signum, frame): - print("Observer.stop() never finished!", file=sys.stderr) + print("Observer.stop() never finished!", file=sys.stderr) # noqa: T201 sys.exit(1) signal.signal(signal.SIGALRM, on_alarm) diff --git a/tests/isolated/eventlet_skip_repeat_queue.py b/tests/isolated/eventlet_skip_repeat_queue.py index 05373934..50cb3c02 100644 --- a/tests/isolated/eventlet_skip_repeat_queue.py +++ b/tests/isolated/eventlet_skip_repeat_queue.py @@ -1,4 +1,4 @@ -if __name__ == '__main__': +if __name__ == "__main__": import eventlet eventlet.monkey_patch() @@ -6,27 +6,27 @@ from watchdog.utils.bricks import SkipRepeatsQueue q = SkipRepeatsQueue(10) - q.put('A') - q.put('A') - q.put('A') - q.put('A') - q.put('B') - q.put('A') + q.put("A") + q.put("A") + q.put("A") + q.put("A") + q.put("B") + q.put("A") value = q.get() - assert value == 'A' + assert value == "A" q.task_done() assert q.unfinished_tasks == 2 value = q.get() - assert value == 'B' + assert value == "B" q.task_done() assert q.unfinished_tasks == 1 value = q.get() - assert value == 'A' + assert value == "A" q.task_done() assert q.empty() diff --git a/tests/test_isolated.py b/tests/test_isolated.py index 2d3ff972..16e326b4 100644 --- a/tests/test_isolated.py +++ b/tests/test_isolated.py @@ -1,6 +1,7 @@ -import pytest import importlib +import pytest + from watchdog.utils import platform from .utils import run_isolated_test @@ -10,15 +11,15 @@ # Current usage ReadDirectoryChangesW on Windows is blocking, though async may be possible @pytest.mark.skipif(not platform.is_linux(), reason="Eventlet only supported in Linux") def test_observer_stops_in_eventlet(): - if not importlib.util.find_spec('eventlet'): + if not importlib.util.find_spec("eventlet"): pytest.skip("eventlet not installed") - run_isolated_test('eventlet_observer_stops.py') + run_isolated_test("eventlet_observer_stops.py") @pytest.mark.skipif(not platform.is_linux(), reason="Eventlet only supported in Linux") def test_eventlet_skip_repeat_queue(): - if not importlib.util.find_spec('eventlet'): + if not importlib.util.find_spec("eventlet"): pytest.skip("eventlet not installed") - run_isolated_test('eventlet_skip_repeat_queue.py') + run_isolated_test("eventlet_skip_repeat_queue.py") diff --git a/tests/utils.py b/tests/utils.py index d8f05b15..759a9b0d 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -102,12 +102,12 @@ def close(self) -> None: def run_isolated_test(path): - ISOALTED_TEST_PREFIX = os.path.join('tests', 'isolated') - path = os.path.abspath(os.path.join(ISOALTED_TEST_PREFIX, path)) + isolated_test_prefix = os.path.join("tests", "isolated") + path = os.path.abspath(os.path.join(isolated_test_prefix, path)) - src_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'src') + src_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "src") new_env = os.environ.copy() - new_env['PYTHONPATH'] = os.pathsep.join(sys.path + [src_dir]) + new_env["PYTHONPATH"] = os.pathsep.join([*sys.path, src_dir]) new_argv = [sys.executable, path] @@ -122,6 +122,6 @@ def run_isolated_test(path): p.communicate(timeout=timeout) except subprocess.TimeoutExpired: p.kill() - assert False, 'timed out' + raise assert p.returncode == 0 diff --git a/tox.ini b/tox.ini index 0080220d..5d49cd18 100644 --- a/tox.ini +++ b/tox.ini @@ -33,7 +33,7 @@ extras = watchmedo commands = python -m ruff format docs/source/examples src tests - python -m ruff check --fix src docs/source/examples tests + python -m ruff check --fix --unsafe-fixes src docs/source/examples tests [testenv:types] usedevelop = true From ad6df50908fca071ac8ad6558453bb28b522f794 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micka=C3=ABl=20Schoentgen?= Date: Fri, 27 Sep 2024 18:04:06 +0200 Subject: [PATCH 3/4] Version 5.0.3 --- changelog.rst | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/changelog.rst b/changelog.rst index a3724ad5..3573bab1 100644 --- a/changelog.rst +++ b/changelog.rst @@ -3,13 +3,13 @@ Changelog --------- -5.0.3 (dev) -~~~~~~~~~~~ +5.0.3 +~~~~~ -2024-xx-xx • `full history `__ +2024-09-27 • `full history `__ -- -- Thanks to our beloved contributors: @BoboTiG +- [inotify] Improve cleaning up ``Inotify`` threads, and add ``eventlet`` test cases (`#1070 `__) +- Thanks to our beloved contributors: @BoboTiG, @ethan-vanderheijden 5.0.2 ~~~~~ From 6a4f1cf846e03c8701fcc53b9e910077b5824e59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micka=C3=ABl=20Schoentgen?= Date: Fri, 27 Sep 2024 18:10:35 +0200 Subject: [PATCH 4/4] Bump the version --- changelog.rst | 8 ++++++++ docs/source/global.rst.inc | 2 +- src/watchdog/version.py | 2 +- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/changelog.rst b/changelog.rst index 3573bab1..4546f7df 100644 --- a/changelog.rst +++ b/changelog.rst @@ -3,6 +3,14 @@ Changelog --------- +5.0.4-dev +~~~~~~~~~ + +2024-xx-xx • `full history `__ + +- +- Thanks to our beloved contributors: @BoboTiG, @ + 5.0.3 ~~~~~ diff --git a/docs/source/global.rst.inc b/docs/source/global.rst.inc index ba78f361..47ef98c3 100644 --- a/docs/source/global.rst.inc +++ b/docs/source/global.rst.inc @@ -4,7 +4,7 @@ .. |author_email| replace:: contact@tiger-222.fr .. |copyright| replace:: Copyright 2011-2024 Yesudeep Mangalapilly, Mickaël Schoentgen & contributors. .. |project_name| replace:: ``watchdog`` -.. |project_version| replace:: 5.0.3 +.. |project_version| replace:: 5.0.4 .. _issue tracker: https://github.com/gorakhargosh/watchdog/issues .. _code repository: https://github.com/gorakhargosh/watchdog diff --git a/src/watchdog/version.py b/src/watchdog/version.py index 2da71f1f..63746d3e 100644 --- a/src/watchdog/version.py +++ b/src/watchdog/version.py @@ -4,7 +4,7 @@ # ``docs/source/global.rst.inc`` file as well. VERSION_MAJOR = 5 VERSION_MINOR = 0 -VERSION_BUILD = 3 +VERSION_BUILD = 4 VERSION_INFO = (VERSION_MAJOR, VERSION_MINOR, VERSION_BUILD) VERSION_STRING = f"{VERSION_MAJOR}.{VERSION_MINOR}.{VERSION_BUILD}"