Skip to content

Commit

Permalink
Fix simulation_context
Browse files Browse the repository at this point in the history
  • Loading branch information
xjules committed Nov 17, 2023
1 parent 91e645e commit d64d365
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 25 deletions.
18 changes: 7 additions & 11 deletions src/ert/job_queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ def is_active(self) -> bool:
for real in self._realizations
)

def count_status(self, state: RealizationState) -> int:
def real_state(self, iens: int) -> RealizationState:
return self._realizations[iens].current_state

def count_real_state(self, state: RealizationState) -> int:
return len([real for real in self._realizations if real.current_state == state])

async def run_done_callback(self, state: RealizationState):
Expand Down Expand Up @@ -150,12 +153,6 @@ def queue_size(self) -> int:
def _add_realization(self, realization: QueueableRealization) -> None:
self._realizations.append(RealizationState(self, realization, retries=1))

def count_running(self) -> int:
return sum(
real.current_state == RealizationState.RUNNING
for real in self._realizations
)

def max_running(self) -> int:
max_running = 0
if self.driver.has_option("MAX_RUNNING"):
Expand All @@ -167,9 +164,8 @@ def max_running(self) -> int:
def available_capacity(self) -> bool:
return self.count_running() < self.max_running()

def all_success(self) -> bool:
return all(
real.current_state == RealizationState.SUCCESS for real in self._realizations)
def is_reals_state(self, state: RealizationState) -> bool:
return all(real.current_state == state for real in self._realizations)

async def launch_jobs(self) -> None:
while self.available_capacity():
Expand Down Expand Up @@ -331,7 +327,7 @@ async def execute(

await self._changes_to_publish.put(CLOSE_PUBLISHER_SENTINEL)

if not self.all_success():
if not self.is_reals_state(RealizationState.SUCCESS):
return EVTYPE_ENSEMBLE_FAILED

return EVTYPE_ENSEMBLE_STOPPED
Expand Down
26 changes: 12 additions & 14 deletions src/ert/simulator/simulation_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from ert.config import HookRuntime
from ert.enkf_main import create_run_path
from ert.job_queue import JobQueue, JobStatus
from ert.job_queue import JobQueue
from ert.realization_state import RealizationState
from ert.run_context import RunContext
from ert.runpaths import Runpaths
Expand Down Expand Up @@ -153,25 +153,25 @@ def isRunning(self) -> bool:
return self._sim_thread.is_alive() or self._job_queue.is_active()

def getNumPending(self) -> int:
return self._job_queue.count_status(JobStatus.PENDING) # type: ignore
return self._job_queue.count_status(RealizationState.PENDING) # type: ignore

def getNumRunning(self) -> int:
return self._job_queue.count_status(JobStatus.RUNNING) # type: ignore
return self._job_queue.count_status(RealizationState.RUNNING) # type: ignore

def getNumSuccess(self) -> int:
return self._job_queue.count_status(JobStatus.SUCCESS) # type: ignore
return self._job_queue.count_status(RealizationState.SUCCESS) # type: ignore

def getNumFailed(self) -> int:
return self._job_queue.count_status(JobStatus.FAILED) # type: ignore
return self._job_queue.count_status(RealizationState.FAILED) # type: ignore

def getNumWaiting(self) -> int:
return self._job_queue.count_status(JobStatus.WAITING) # type: ignore
return self._job_queue.count_status(RealizationState.WAITING) # type: ignore

def didRealizationSucceed(self, iens: int) -> bool:
queue_index = self.get_run_args(iens).queue_index
if queue_index is None:
raise ValueError("Queue index not set")
return self._job_queue.job_list[queue_index].queue_status == JobStatus.SUCCESS
return self._job_queue.real_state(queue_index) == RealizationState.SUCCESS

def didRealizationFail(self, iens: int) -> bool:
# For the purposes of this class, a failure should be anything (killed
Expand All @@ -184,9 +184,8 @@ def isRealizationFinished(self, iens: int) -> bool:
queue_index = run_arg.queue_index
if queue_index is not None:
return not (
self._job_queue.job_list[queue_index].is_running()
or self._job_queue.job_list[queue_index].queue_status
== JobStatus.WAITING
self._job_queue.real_state(queue_index)
in [RealizationState.SUCCESS, RealizationState.WAITING]
)
else:
# job was not submitted
Expand Down Expand Up @@ -238,7 +237,7 @@ def job_progress(self, iens: int) -> Optional[ForwardModelStatus]:
if queue_index is None:
# job was not submitted
return None
if self._job_queue.job_list[queue_index].queue_status == JobStatus.WAITING:
if self._job_queue.real_state(queue_index) == RealizationState.WAITING:
return None

return ForwardModelStatus.load(run_arg.runpath)
Expand All @@ -249,12 +248,11 @@ def run_path(self, iens: int) -> str:
"""
return self.get_run_args(iens).runpath

def job_status(self, iens: int) -> Optional["JobStatus"]:
def job_status(self, iens: int) -> Optional[RealizationState]:
"""Will query the queue system for the status of the job."""
run_arg = self.get_run_args(iens)
queue_index = run_arg.queue_index
if queue_index is None:
# job was not submitted
return None
int_status = self._job_queue.job_list[queue_index].queue_status
return JobStatus(int_status)
return self._job_queue.real_state(queue_index)

0 comments on commit d64d365

Please sign in to comment.