From 5de2c3bf9f627b5bd2c202ff69d2089ce456f9ef Mon Sep 17 00:00:00 2001 From: Jonathan Karlsen Date: Mon, 4 Nov 2024 13:44:55 +0100 Subject: [PATCH] Have ensemble fail early when problems in monitor receiver task This commit fixes the issue where monitor would be hanging for 120 seconds until timeout even if `wait_for_evaluator` already failed in the receiver task. --- src/ert/ensemble_evaluator/monitor.py | 28 +++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/ert/ensemble_evaluator/monitor.py b/src/ert/ensemble_evaluator/monitor.py index 93bc2ec5e1e..477aac910ce 100644 --- a/src/ert/ensemble_evaluator/monitor.py +++ b/src/ert/ensemble_evaluator/monitor.py @@ -38,16 +38,14 @@ def __init__(self, ee_con_info: "EvaluatorConnectionInfo") -> None: self._event_queue: asyncio.Queue[Union[Event, EventSentinel]] = asyncio.Queue() self._connection: Optional[WebSocketClientProtocol] = None self._receiver_task: Optional[asyncio.Task[None]] = None - self._connected: asyncio.Event = asyncio.Event() + self._connected: asyncio.Future[None] = asyncio.Future() self._connection_timeout: float = 120.0 self._receiver_timeout: float = 60.0 async def __aenter__(self) -> "Monitor": self._receiver_task = asyncio.create_task(self._receiver()) try: - await asyncio.wait_for( - self._connected.wait(), timeout=self._connection_timeout - ) + await asyncio.wait_for(self._connected, timeout=self._connection_timeout) except asyncio.TimeoutError as exc: msg = "Couldn't establish connection with the ensemble evaluator!" logger.error(msg) @@ -64,7 +62,6 @@ async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None self._receiver_task, return_exceptions=True, ) - if self._connection: await self._connection.close() @@ -127,13 +124,16 @@ async def _receiver(self) -> None: headers = Headers() if self._ee_con_info.token: headers["token"] = self._ee_con_info.token - - await wait_for_evaluator( - base_url=self._ee_con_info.url, - token=self._ee_con_info.token, - cert=self._ee_con_info.cert, - timeout=5, - ) + try: + await wait_for_evaluator( + base_url=self._ee_con_info.url, + token=self._ee_con_info.token, + cert=self._ee_con_info.cert, + timeout=5, + ) + except Exception as e: + self._connected.set_exception(e) + raise e async for conn in connect( self._ee_con_info.client_uri, ssl=tls, @@ -147,13 +147,13 @@ async def _receiver(self) -> None: ): try: self._connection = conn - self._connected.set() + self._connected.set_result(None) async for raw_msg in self._connection: event = event_from_json(raw_msg) await self._event_queue.put(event) except (ConnectionRefusedError, ConnectionClosed, ClientError) as exc: self._connection = None - self._connected.clear() + self._connected = asyncio.Future() logger.debug( f"Monitor connection to EnsembleEvaluator went down, reconnecting: {exc}" )