Skip to content

Commit

Permalink
Make sure job_queue will not timeout when sending event
Browse files Browse the repository at this point in the history
This removes wait_for and thus potenital timeouterror and let async loop iterator handle the connection exceptions.
Additionally this make publish changes in job queue more resilient where the changes are not applied
until we published all changes.

Co-authored-by: Valentin Krasontovitsch <[email protected]>
  • Loading branch information
xjules and valentin-krasontovitsch committed Aug 25, 2023
1 parent a0e1259 commit 7bba975
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 3 deletions.
21 changes: 18 additions & 3 deletions src/ert/job_queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,17 @@
import time
from collections import deque
from threading import Semaphore
from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional, Union
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterable,
List,
Optional,
Tuple,
Union,
)

from cloudevents.conversion import to_json
from cloudevents.http import CloudEvent
Expand Down Expand Up @@ -289,7 +299,7 @@ async def _publish_changes(
]
)
while events:
await asyncio.wait_for(ee_connection.send(to_json(events[0])), 60)
await ee_connection.send(to_json(events[0]))
events.popleft()

async def _execution_loop_queue_via_websockets(
Expand All @@ -307,7 +317,7 @@ async def _execution_loop_queue_via_websockets(
for func in evaluators:
func()

changes = self.changes_after_transition()
changes, new_state = self.changes_without_transition()
# logically not necessary the way publish changes is implemented at the
# moment, but highly relevant before, and might be relevant in the
# future in case publish changes becomes expensive again
Expand All @@ -317,6 +327,7 @@ async def _execution_loop_queue_via_websockets(
changes,
ee_connection,
)
self._differ.transition_to_new_state(new_state)

if self.stopped:
raise asyncio.CancelledError
Expand Down Expand Up @@ -555,6 +566,10 @@ def changes_after_transition(self) -> Dict[int, str]:
old_state, new_state = self._differ.transition(self.job_list)
return self._differ.diff_states(old_state, new_state)

def changes_without_transition(self) -> Tuple[Dict[int, str], List[JobStatus]]:
old_state, new_state = self._differ.get_old_and_new_state(self.job_list)
return self._differ.diff_states(old_state, new_state), new_state

def add_dispatch_information_to_jobs_file(
self,
ens_id: str,
Expand Down
12 changes: 12 additions & 0 deletions src/ert/job_queue/queue_differ.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@ def add_state(self, queue_index: int, iens: int, state: JobStatus) -> None:
self._qindex_to_iens[queue_index] = iens
self._state.append(state)

def get_old_and_new_state(
self,
job_list: List[JobQueueNode],
) -> Tuple[List[JobStatus], List[JobStatus]]:
"""Calculate a new state, do not transition, return both old and new state."""
new_state = [job.status.value for job in job_list]
old_state = copy.copy(self._state)
return old_state, new_state

def transition_to_new_state(self, new_state: List[JobStatus]) -> None:
self._state = new_state

def transition(
self,
job_list: List[JobQueueNode],
Expand Down

0 comments on commit 7bba975

Please sign in to comment.