Skip to content

Commit

Permalink
make publish changes in job queue more resilient
Browse files Browse the repository at this point in the history
  • Loading branch information
valentin-krasontovitsch committed Aug 25, 2023
1 parent 98d3465 commit 8191f0b
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 2 deletions.
19 changes: 17 additions & 2 deletions src/ert/job_queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,17 @@
import time
from collections import deque
from threading import Semaphore
from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional, Union
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterable,
List,
Optional,
Tuple,
Union,
)

from cloudevents.conversion import to_json
from cloudevents.http import CloudEvent
Expand Down Expand Up @@ -312,7 +322,7 @@ async def _execution_loop_queue_via_websockets(
for func in evaluators:
func()

changes = self.changes_after_transition()
changes, new_state = self.changes_without_transition()
# logically not necessary the way publish changes is implemented at the
# moment, but highly relevant before, and might be relevant in the
# future in case publish changes becomes expensive again
Expand All @@ -322,6 +332,7 @@ async def _execution_loop_queue_via_websockets(
changes,
ee_connection,
)
self._differ.transition_to_new_state(new_state)

if self.stopped:
raise asyncio.CancelledError
Expand Down Expand Up @@ -560,6 +571,10 @@ def changes_after_transition(self) -> Dict[int, str]:
old_state, new_state = self._differ.transition(self.job_list)
return self._differ.diff_states(old_state, new_state)

def changes_without_transition(self) -> Tuple[Dict[int, str], List[JobStatus]]:
old_state, new_state = self._differ.get_old_and_new_state(self.job_list)
return self._differ.diff_states(old_state, new_state), new_state

def add_dispatch_information_to_jobs_file(
self,
ens_id: str,
Expand Down
12 changes: 12 additions & 0 deletions src/ert/job_queue/queue_differ.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@ def add_state(self, queue_index: int, iens: int, state: JobStatus) -> None:
self._qindex_to_iens[queue_index] = iens
self._state.append(state)

def get_old_and_new_state(
self,
job_list: List[JobQueueNode],
) -> Tuple[List[JobStatus], List[JobStatus]]:
"""Calculate a new state, do not transition, return both old and new state."""
new_state = [job.status.value for job in job_list]
old_state = copy.copy(self._state)
return old_state, new_state

def transition_to_new_state(self, new_state: List[JobStatus]) -> None:
self._state = new_state

def transition(
self,
job_list: List[JobQueueNode],
Expand Down

0 comments on commit 8191f0b

Please sign in to comment.