From ffc3528f40e3b05f4f3b36454044f6939afcbd71 Mon Sep 17 00:00:00 2001 From: phala Date: Fri, 25 Mar 2022 10:04:10 +0100 Subject: [PATCH 1/2] Add ability to transfer data from nodes to a controller --- src/xdist/dsession.py | 15 +++++++++++---- src/xdist/plugin.py | 26 +++++++++++++++++++++++++- src/xdist/remote.py | 8 ++++++-- 3 files changed, 42 insertions(+), 7 deletions(-) diff --git a/src/xdist/dsession.py b/src/xdist/dsession.py index 2ae3db6b..3a02bb59 100644 --- a/src/xdist/dsession.py +++ b/src/xdist/dsession.py @@ -1,7 +1,9 @@ +from queue import Empty, Queue + import py import pytest -from xdist.workermanage import NodeManager +from xdist.remote import shared_key from xdist.scheduler import ( EachScheduling, LoadScheduling, @@ -9,9 +11,7 @@ LoadFileScheduling, LoadGroupScheduling, ) - - -from queue import Empty, Queue +from xdist.workermanage import NodeManager class Interrupted(KeyboardInterrupt): @@ -174,6 +174,13 @@ def worker_workerfinished(self, node): self.shouldstop = "{} received keyboard-interrupt".format(node) self.worker_errordown(node, "keyboard-interrupt") return + if "shared" in node.workeroutput: + shared = self.config.stash.setdefault(shared_key, {}) + for key, value in node.workeroutput["shared"].items(): + if key in shared: + shared[key].append(value) + else: + shared[key] = [value] if node in self.sched.nodes: crashitem = self.sched.remove_node(node) assert not crashitem, (crashitem, node) diff --git a/src/xdist/plugin.py b/src/xdist/plugin.py index d0448fa7..a81b7cf3 100644 --- a/src/xdist/plugin.py +++ b/src/xdist/plugin.py @@ -5,12 +5,14 @@ import py import pytest - +from _pytest.stash import StashKey PYTEST_GTE_7 = hasattr(pytest, "version_tuple") and pytest.version_tuple >= (7, 0) # type: ignore[attr-defined] _sys_path = list(sys.path) # freeze a copy of sys.path at interpreter startup +shared_key = StashKey["str"]() + @pytest.hookimpl def pytest_xdist_auto_num_workers(config): @@ -302,3 +304,25 @@ def testrun_uid(request): return request.config.workerinput["testrunuid"] else: return uuid.uuid4().hex + + +def get_shared_data(request_or_session): + """Return shared data and True, if it is ran from xdist_controller""" + if is_xdist_controller(request_or_session): + return request_or_session.config.stash.setdefault(shared_key, {}), True + return request_or_session.config.stash.setdefault(shared_key, {}), False + + +@pytest.fixture(scope="session") +def add_shared_data(request, worker_id): + """Adds data that will be collected from all workers and be accessible from master node in sessionfinish hook""" + + def _add(key, value): + shared = request.config.stash.setdefault(shared_key, {}) + if worker_id == "master": + # Worker shared_data are grouped together, master data aren't + shared[key] = [value] + else: + shared[key] = value + + return _add diff --git a/src/xdist/remote.py b/src/xdist/remote.py index 160b042a..8974b283 100644 --- a/src/xdist/remote.py +++ b/src/xdist/remote.py @@ -6,15 +6,16 @@ needs not to be installed in remote environments. """ -import sys import os +import sys import time import py import pytest +from _pytest.config import _prepareconfig, Config from execnet.gateway_base import dumps, DumpError -from _pytest.config import _prepareconfig, Config +from xdist.plugin import shared_key try: from setproctitle import setproctitle @@ -64,6 +65,9 @@ def pytest_sessionstart(self, session): def pytest_sessionfinish(self, exitstatus): # in pytest 5.0+, exitstatus is an IntEnum object self.config.workeroutput["exitstatus"] = int(exitstatus) + shared = self.config.stash.get(shared_key, None) + if shared: + self.config.workeroutput["shared"] = shared yield self.sendevent("workerfinished", workeroutput=self.config.workeroutput) From cf784db84a101004580b828be5a3dce6e1b1a3c7 Mon Sep 17 00:00:00 2001 From: phala Date: Fri, 25 Mar 2022 10:04:23 +0100 Subject: [PATCH 2/2] Add basic test for shared data --- testing/acceptance_test.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/testing/acceptance_test.py b/testing/acceptance_test.py index 3407272e..c548147c 100644 --- a/testing/acceptance_test.py +++ b/testing/acceptance_test.py @@ -1543,3 +1543,37 @@ def test_get_xdist_worker_id(self, fake_request) -> None: assert xdist.get_xdist_worker_id(fake_request) == "gw5" del fake_request.config.workerinput assert xdist.get_xdist_worker_id(fake_request) == "master" + + +class TestSharedData: + def test_shared_data_two_nodes(self, pytester: pytest.Pytester) -> None: + pytester.makepyfile( + """ + def test_shared1(add_shared_data): + add_shared_data("test","value2") + assert 1 + def test_shared2(add_shared_data): + add_shared_data("test","value1") + assert 1 + + """ + ) + pytester.makeconftest( + """ + from xdist.plugin import get_shared_data + def pytest_sessionfinish(session): + data, master = get_shared_data(session) + if master: + with open('shared_data', 'w') as f: + for key, values in data.items(): + for value in values: + f.write('data[%s] = %s\\n' % (key,value)) + """ + ) + result = pytester.inline_run("-x", "-v", "-n2") + assert result.ret == 0 + collected_file = pytester.path / "shared_data" + assert collected_file.is_file() + collected_text = collected_file.read_text() + assert "data[test] = value2" in collected_text + assert "data[test] = value1" in collected_text