diff --git a/src/ert/ensemble_evaluator/evaluator.py b/src/ert/ensemble_evaluator/evaluator.py index cfb498afc46..bdd22697d59 100644 --- a/src/ert/ensemble_evaluator/evaluator.py +++ b/src/ert/ensemble_evaluator/evaluator.py @@ -25,6 +25,7 @@ from websockets.exceptions import ConnectionClosedError from websockets.legacy.server import WebSocketServerProtocol +from ert.async_utils import new_event_loop from ert.serialization import evaluator_marshaller, evaluator_unmarshaller from ._builder import Ensemble @@ -65,7 +66,7 @@ def __init__(self, ensemble: Ensemble, config: EvaluatorServerConfig, iter_: int self._config: EvaluatorServerConfig = config self._ensemble: Ensemble = ensemble - self._loop = asyncio.new_event_loop() + self._loop = new_event_loop() self._done = self._loop.create_future() self._clients: Set[WebSocketServerProtocol] = set() diff --git a/src/ert/ensemble_evaluator/evaluator_tracker.py b/src/ert/ensemble_evaluator/evaluator_tracker.py index 37267b2f0af..5dbd239420d 100644 --- a/src/ert/ensemble_evaluator/evaluator_tracker.py +++ b/src/ert/ensemble_evaluator/evaluator_tracker.py @@ -10,7 +10,7 @@ from aiohttp import ClientError from websockets.exceptions import ConnectionClosedError -from ert.async_utils import get_event_loop +from ert.async_utils import get_event_loop, new_event_loop from ert.ensemble_evaluator.identifiers import ( EVTYPE_EE_SNAPSHOT, EVTYPE_EE_SNAPSHOT_UPDATE, @@ -62,7 +62,7 @@ def __init__( self._iter_snapshot: Dict[int, Snapshot] = {} def _drain_monitor(self) -> None: - asyncio.set_event_loop(asyncio.new_event_loop()) + asyncio.set_event_loop(new_event_loop()) drainer_logger = logging.getLogger("ert.ensemble_evaluator.drainer") while not self._model.isFinished(): try: diff --git a/src/ert/ensemble_evaluator/sync_ws_duplexer.py b/src/ert/ensemble_evaluator/sync_ws_duplexer.py index 32d227cbb84..a33e7f8ae10 100644 --- a/src/ert/ensemble_evaluator/sync_ws_duplexer.py +++ b/src/ert/ensemble_evaluator/sync_ws_duplexer.py @@ -14,6 +14,8 @@ from websockets.datastructures import Headers from websockets.typing import Data +from ert.async_utils import new_event_loop + from ._wait_for_evaluator import wait_for_evaluator @@ -50,7 +52,7 @@ def __init__( ssl_context = True if self._uri.startswith("wss") else None self._ssl_context: Optional[Union[bool, ssl.SSLContext]] = ssl_context - self._loop = asyncio.new_event_loop() + self._loop = new_event_loop() self._connection: asyncio.Task[None] = self._loop.create_task(self._connect()) self._ws: Optional[WebSocketClientProtocol] = None self._loop_thread = threading.Thread(target=self._loop.run_forever) diff --git a/tests/conftest.py b/tests/conftest.py index b253dbaa69b..14ddd5e4983 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -16,6 +16,7 @@ from qtpy.QtCore import QDir from ert.__main__ import ert_parser +from ert.async_utils import get_event_loop, new_event_loop from ert.cli import ENSEMBLE_EXPERIMENT_MODE from ert.cli.main import run_cli from ert.config import ErtConfig @@ -261,10 +262,7 @@ def try_queue_and_scheduler(request, monkeypatch): if should_enable_scheduler: # Flaky - the new scheduler needs an event loop, which might not be initialized yet. # This might be a bug in python 3.8, but it does not occur locally. - try: - asyncio.get_running_loop() - except RuntimeError: - asyncio.set_event_loop(asyncio.new_event_loop()) + _ = get_event_loop() monkeypatch.setattr( FeatureToggling._conf["scheduler"], "is_enabled", should_enable_scheduler diff --git a/tests/unit_tests/ensemble_evaluator/conftest.py b/tests/unit_tests/ensemble_evaluator/conftest.py index d84de207bdf..3d5b9053e66 100644 --- a/tests/unit_tests/ensemble_evaluator/conftest.py +++ b/tests/unit_tests/ensemble_evaluator/conftest.py @@ -69,6 +69,11 @@ def _make_ensemble_builder(monkeypatch, tmpdir, num_reals, num_jobs, job_sleep=0 "forward_model_ok", lambda _: (LoadStatus.LOAD_SUCCESSFUL, ""), ) + monkeypatch.setattr( + ert.scheduler.job, + "forward_model_ok", + lambda _: (LoadStatus.LOAD_SUCCESSFUL, ""), + ) builder = ert.ensemble_evaluator.EnsembleBuilder() with tmpdir.as_cwd(): forward_model_list = [] diff --git a/tests/unit_tests/ensemble_evaluator/ensemble_evaluator_utils.py b/tests/unit_tests/ensemble_evaluator/ensemble_evaluator_utils.py index 8952959d174..6f976c75d4b 100644 --- a/tests/unit_tests/ensemble_evaluator/ensemble_evaluator_utils.py +++ b/tests/unit_tests/ensemble_evaluator/ensemble_evaluator_utils.py @@ -6,12 +6,13 @@ from cloudevents.http import CloudEvent from _ert_job_runner.client import Client +from ert.async_utils import new_event_loop from ert.ensemble_evaluator import Ensemble, identifiers from ert.ensemble_evaluator._builder._realization import ForwardModel, Realization def _mock_ws(host, port, messages, delay_startup=0): - loop = asyncio.new_event_loop() + loop = new_event_loop() done = loop.create_future() async def _handler(websocket, path):