Skip to content

Commit

Permalink
Have ensemble fail early when problems in monitor receiver task
Browse files Browse the repository at this point in the history
This commit fixes the issue where monitor would be hanging for 120 seconds until timeout even if `wait_for_evaluator` already failed in the receiver task.
  • Loading branch information
jonathan-eq committed Nov 4, 2024
1 parent 90d11ec commit 5de2c3b
Showing 1 changed file with 14 additions and 14 deletions.
28 changes: 14 additions & 14 deletions src/ert/ensemble_evaluator/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,14 @@ def __init__(self, ee_con_info: "EvaluatorConnectionInfo") -> None:
self._event_queue: asyncio.Queue[Union[Event, EventSentinel]] = asyncio.Queue()
self._connection: Optional[WebSocketClientProtocol] = None
self._receiver_task: Optional[asyncio.Task[None]] = None
self._connected: asyncio.Event = asyncio.Event()
self._connected: asyncio.Future[None] = asyncio.Future()
self._connection_timeout: float = 120.0
self._receiver_timeout: float = 60.0

async def __aenter__(self) -> "Monitor":
self._receiver_task = asyncio.create_task(self._receiver())
try:
await asyncio.wait_for(
self._connected.wait(), timeout=self._connection_timeout
)
await asyncio.wait_for(self._connected, timeout=self._connection_timeout)
except asyncio.TimeoutError as exc:
msg = "Couldn't establish connection with the ensemble evaluator!"
logger.error(msg)
Expand All @@ -64,7 +62,6 @@ async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None
self._receiver_task,
return_exceptions=True,
)

if self._connection:
await self._connection.close()

Expand Down Expand Up @@ -127,13 +124,16 @@ async def _receiver(self) -> None:
headers = Headers()
if self._ee_con_info.token:
headers["token"] = self._ee_con_info.token

await wait_for_evaluator(
base_url=self._ee_con_info.url,
token=self._ee_con_info.token,
cert=self._ee_con_info.cert,
timeout=5,
)
try:
await wait_for_evaluator(
base_url=self._ee_con_info.url,
token=self._ee_con_info.token,
cert=self._ee_con_info.cert,
timeout=5,
)
except Exception as e:
self._connected.set_exception(e)
raise e
async for conn in connect(
self._ee_con_info.client_uri,
ssl=tls,
Expand All @@ -147,13 +147,13 @@ async def _receiver(self) -> None:
):
try:
self._connection = conn
self._connected.set()
self._connected.set_result(None)
async for raw_msg in self._connection:
event = event_from_json(raw_msg)
await self._event_queue.put(event)
except (ConnectionRefusedError, ConnectionClosed, ClientError) as exc:
self._connection = None
self._connected.clear()
self._connected = asyncio.Future()
logger.debug(
f"Monitor connection to EnsembleEvaluator went down, reconnecting: {exc}"
)

0 comments on commit 5de2c3b

Please sign in to comment.