Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-eq committed Dec 3, 2024
1 parent c686535 commit 9805541
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 5 deletions.
7 changes: 4 additions & 3 deletions src/_ert/forward_model_runner/reporting/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def __init__(self, evaluator_url, token=None, cert_path=None):
self._cert = f.read()
else:
self._cert = None

self.finished_fm_count = 0
self._statemachine = StateMachine()
self._statemachine.add_handler((Init,), self._init_handler)
self._statemachine.add_handler((Start, Running, Exited), self._job_handler)
Expand Down Expand Up @@ -157,12 +157,13 @@ def _job_handler(self, msg: Union[Start, Running, Exited]):
if not msg.success():
logger.error(f"Job {job_name} FAILED to start")
event = ForwardModelStepFailure(**job_msg, error_msg=msg.error_message)
self._dump_event(event)

elif isinstance(msg, Exited):
if msg.success():
logger.debug(f"Job {job_name} exited successfully")
self._dump_event(ForwardModelStepSuccess(**job_msg))
if self.finished_fm_count in [0, 3, 4]:
self._dump_event(ForwardModelStepSuccess(**job_msg))
self.finished_fm_count += 1
else:
logger.error(
_JOB_EXIT_FAILED_STRING.format(
Expand Down
40 changes: 38 additions & 2 deletions src/ert/ensemble_evaluator/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,6 @@ def update_from_event(
) -> "EnsembleSnapshot":
e_type = type(event)
timestamp = event.time

if source_snapshot is None:
source_snapshot = EnsembleSnapshot()
if e_type in get_args(RealizationEvent):
Expand Down Expand Up @@ -364,7 +363,9 @@ def update_from_event(
event.fm_step,
fm,
)

self._make_sure_previous_fm_is_not_running(
event.real, event.fm_step, source_snapshot
)
elif e_type in get_args(EnsembleEvent):
event = cast(EnsembleEvent, event)
self._ensemble_state = _ENSEMBLE_TYPE_EVENT_TO_STATUS[type(event)]
Expand All @@ -385,6 +386,41 @@ def update_fm_step(
self._fm_step_snapshots[real_id, fm_step_id].update(fm_step)
return self

def _make_sure_previous_fm_is_not_running(
self, real_id: str, fm_step_id: str, source_snapshot: "EnsembleSnapshot"
) -> None:
if fm_step_id == "0":
return
previous_fm_step_id = str(int(fm_step_id) - 1)
previous_fm_step = source_snapshot._fm_step_snapshots[
real_id, previous_fm_step_id
]
current_fm_start_time_from_source_snapshot = source_snapshot._fm_step_snapshots[
real_id, fm_step_id
].get("start_time")
current_fm_start_time_from_update_snapshot = self._fm_step_snapshots[
real_id, fm_step_id
].get("start_time")
if "status" in previous_fm_step and previous_fm_step["status"] == "Running":
logger.error(
(
f"Did not get finished event for {real_id=} {fm_step_id=}, "
"but next fm was started so we assume it finished successfully and carry on."
)
)
self._fm_step_snapshots[real_id, previous_fm_step_id]["status"] = "Finished"
self._fm_step_snapshots[real_id, previous_fm_step_id]["end_time"] = (
current_fm_start_time_from_source_snapshot
or current_fm_start_time_from_update_snapshot
)
previous_fm_step["status"] = "Finished"
previous_fm_step["end_time"] = (
current_fm_start_time_from_source_snapshot
or current_fm_start_time_from_update_snapshot
)
# might affect many of the prior FMs
# self._make_sure_previous_fm_is_not_running(real_id, fm_step_id, source_snapshot)


class FMStepSnapshot(TypedDict, total=False):
status: Optional[str]
Expand Down
1 change: 1 addition & 0 deletions src/ert/gui/model/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ def _update_snapshot(self, snapshot: EnsembleSnapshot, iter_: str) -> None:
if start_time := fm_step.get("start_time", None):
fm_step["start_time"] = convert_iso8601_to_datetime(start_time)
if end_time := fm_step.get("end_time", None):
print(f"set end_time for {fm_step_id=}")
fm_step["end_time"] = convert_iso8601_to_datetime(end_time)
# Errors may be unset as the queue restarts the job
fm_step[ids.ERROR] = fm_step.get(ids.ERROR, "")
Expand Down

0 comments on commit 9805541

Please sign in to comment.