Skip to content

Commit

Permalink
Rename job_queue enums
Browse files Browse the repository at this point in the history
  • Loading branch information
eivindjahren committed Aug 24, 2023
1 parent 059d942 commit cbb4606
Show file tree
Hide file tree
Showing 20 changed files with 208 additions and 216 deletions.
32 changes: 16 additions & 16 deletions src/_ert_com_protocol/status_type_enum.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
from typing import Dict

_queue_state_to_pbuf_type_map: Dict[str, str] = {
"JOB_QUEUE_NOT_ACTIVE": "STEP_WAITING",
"JOB_QUEUE_WAITING": "STEP_WAITING",
"JOB_QUEUE_SUBMITTED": "STEP_WAITING",
"JOB_QUEUE_PENDING": "STEP_PENDING",
"JOB_QUEUE_RUNNING": "STEP_RUNNING",
"JOB_QUEUE_DONE": "STEP_RUNNING",
"JOB_QUEUE_EXIT": "STEP_RUNNING",
"JOB_QUEUE_IS_KILLED": "STEP_FAILED",
"JOB_QUEUE_DO_KILL": "STEP_FAILED",
"JOB_QUEUE_SUCCESS": "STEP_SUCCESS",
"JOB_QUEUE_RUNNING_DONE_CALLBACK": "STEP_RUNNING",
"JOB_QUEUE_RUNNING_EXIT_CALLBACK": "STEP_RUNNING",
"JOB_QUEUE_STATUS_FAILURE": "STEP_UNKNOWN",
"JOB_QUEUE_FAILED": "STEP_FAILED",
"JOB_QUEUE_DO_KILL_NODE_FAILURE": "STEP_FAILED",
"JOB_QUEUE_UNKNOWN": "STEP_UNKNOWN",
"NOT_ACTIVE": "STEP_WAITING",
"WAITING": "STEP_WAITING",
"SUBMITTED": "STEP_WAITING",
"PENDING": "STEP_PENDING",
"RUNNING": "STEP_RUNNING",
"DONE": "STEP_RUNNING",
"EXIT": "STEP_RUNNING",
"IS_KILLED": "STEP_FAILED",
"DO_KILL": "STEP_FAILED",
"SUCCESS": "STEP_SUCCESS",
"RUNNING_DONE_CALLBACK": "STEP_RUNNING",
"RUNNING_EXIT_CALLBACK": "STEP_RUNNING",
"STATUS_FAILURE": "STEP_UNKNOWN",
"FAILED": "STEP_FAILED",
"DO_KILL_NODE_FAILURE": "STEP_FAILED",
"UNKNOWN": "STEP_UNKNOWN",
}


Expand Down
4 changes: 2 additions & 2 deletions src/ert/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""
from .config import ErtScript
from .data import MeasuredData
from .job_queue import JobStatusType
from .job_queue import JobStatus
from .libres_facade import LibresFacade
from .simulator import BatchSimulator

Expand All @@ -12,5 +12,5 @@
"LibresFacade",
"BatchSimulator",
"ErtScript",
"JobStatusType",
"JobStatus",
]
12 changes: 6 additions & 6 deletions src/ert/job_queue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ def root() -> str:
from .driver import Driver # noqa
from .job_queue_manager import JobQueueManager # noqa
from .job_queue_node import JobQueueNode # noqa
from .job_status import JobStatusType # noqa
from .job_status import JobStatus # noqa
from .queue import JobQueue # noqa
from .run_status import RunStatusType # noqa
from .submit_status import JobSubmitStatusType # noqa
from .run_status import RunStatus # noqa
from .submit_status import SubmitStatus # noqa
from .thread_status import ThreadStatus # noqa
from .workflow_runner import WorkflowJobRunner, WorkflowRunner # noqa

Expand All @@ -112,9 +112,9 @@ def root() -> str:
"JobQueue",
"JobQueueManager",
"JobQueueNode",
"JobStatusType",
"JobSubmitStatusType",
"RunStatusType",
"JobStatus",
"SubmitStatus",
"RunStatus",
"ThreadStatus",
"WorkflowJobRunner",
"WorkflowRunner",
Expand Down
22 changes: 11 additions & 11 deletions src/ert/job_queue/job_queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from threading import BoundedSemaphore
from typing import TYPE_CHECKING, Any

from .job_status import JobStatusType
from .job_status import JobStatus

if TYPE_CHECKING:
from .queue import JobQueue
Expand All @@ -30,40 +30,40 @@ def stop_queue(self) -> None:
self.queue.kill_all_jobs()

def getNumRunning(self) -> int:
return self.queue.count_status(JobStatusType.JOB_QUEUE_RUNNING) # type: ignore
return self.queue.count_status(JobStatus.RUNNING) # type: ignore

def getNumWaiting(self) -> int:
return self.queue.count_status(JobStatusType.JOB_QUEUE_WAITING) # type: ignore
return self.queue.count_status(JobStatus.WAITING) # type: ignore

def getNumPending(self) -> int:
return self.queue.count_status(JobStatusType.JOB_QUEUE_PENDING) # type: ignore
return self.queue.count_status(JobStatus.PENDING) # type: ignore

def getNumSuccess(self) -> int:
return self.queue.count_status(JobStatusType.JOB_QUEUE_SUCCESS) # type: ignore
return self.queue.count_status(JobStatus.SUCCESS) # type: ignore

def getNumFailed(self) -> int:
return self.queue.count_status(JobStatusType.JOB_QUEUE_FAILED) # type: ignore
return self.queue.count_status(JobStatus.FAILED) # type: ignore

def isRunning(self) -> bool:
return self.queue.is_active()

def isJobComplete(self, job_index: int) -> bool:
return not (
self.queue.job_list[job_index].is_running()
or self.queue.job_list[job_index].status == JobStatusType.JOB_QUEUE_WAITING
or self.queue.job_list[job_index].status == JobStatus.WAITING
)

def isJobWaiting(self, job_index: int) -> bool:
return self.queue.job_list[job_index].status == JobStatusType.JOB_QUEUE_WAITING
return self.queue.job_list[job_index].status == JobStatus.WAITING

def didJobSucceed(self, job_index: int) -> bool:
return self.queue.job_list[job_index].status == JobStatusType.JOB_QUEUE_SUCCESS
return self.queue.job_list[job_index].status == JobStatus.SUCCESS

def getJobStatus(self, job_index: int) -> JobStatusType:
def getJobStatus(self, job_index: int) -> JobStatus:
# See comment about return type in the prototype section at
# the top of class.
int_status = self.queue.job_list[job_index].status
return JobStatusType(int_status)
return JobStatus(int_status)

def __repr__(self) -> str:
return (
Expand Down
66 changes: 30 additions & 36 deletions src/ert/job_queue/job_queue_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
from ert.realization_state import RealizationState

from . import ResPrototype
from .job_status import JobStatusType
from .submit_status import JobSubmitStatusType
from .job_status import JobStatus
from .submit_status import SubmitStatus
from .thread_status import ThreadStatus

if TYPE_CHECKING:
Expand Down Expand Up @@ -163,22 +163,22 @@ def timed_out(self) -> bool:
def submit_attempt(self) -> int:
return _get_submit_attempt(self) # type: ignore

def _poll_queue_status(self, driver: "Driver") -> JobStatusType:
def _poll_queue_status(self, driver: "Driver") -> JobStatus:
result, msg = _refresh_status(self, driver)
if msg is not None:
self._status_msg = msg
return JobStatusType(result)
return JobStatus(result)

@property
def status(self) -> JobStatusType:
def status(self) -> JobStatus:
return self._get_status() # type: ignore

@property
def thread_status(self) -> ThreadStatus:
return self._thread_status

def submit(self, driver: "Driver") -> JobSubmitStatusType:
return JobSubmitStatusType(_submit(self, driver))
def submit(self, driver: "Driver") -> SubmitStatus:
return SubmitStatus(_submit(self, driver))

def run_done_callback(self) -> Optional[LoadStatus]:
if sys.platform == "linux":
Expand All @@ -201,11 +201,11 @@ def run_done_callback(self) -> Optional[LoadStatus]:
callback_status = LoadStatus.LOAD_FAILURE
status_msg = error_message
if callback_status == LoadStatus.LOAD_SUCCESSFUL:
self._set_queue_status(JobStatusType.JOB_QUEUE_SUCCESS)
self._set_queue_status(JobStatus.SUCCESS)
elif callback_status == LoadStatus.TIME_MAP_FAILURE:
self._set_queue_status(JobStatusType.JOB_QUEUE_FAILED)
self._set_queue_status(JobStatus.FAILED)
else:
self._set_queue_status(JobStatusType.JOB_QUEUE_EXIT)
self._set_queue_status(JobStatus.EXIT)
if self._status_msg != "":
self._status_msg = status_msg
else:
Expand Down Expand Up @@ -275,13 +275,13 @@ def done_callback_wrapper(
def run_exit_callback(self) -> None:
self.exit_callback_function(*self.callback_arguments)

def is_running(self, given_status: Optional[JobStatusType] = None) -> bool:
def is_running(self, given_status: Optional[JobStatus] = None) -> bool:
status = given_status or self.status
return status in (
JobStatusType.JOB_QUEUE_PENDING,
JobStatusType.JOB_QUEUE_SUBMITTED,
JobStatusType.JOB_QUEUE_RUNNING,
JobStatusType.JOB_QUEUE_UNKNOWN,
JobStatus.PENDING,
JobStatus.SUBMITTED,
JobStatus.RUNNING,
JobStatus.UNKNOWN,
) # dont stop monitoring if LSF commands are unavailable

@property
Expand All @@ -298,23 +298,20 @@ def _job_monitor(
self, driver: "Driver", pool_sema: Semaphore, max_submit: int
) -> None:
submit_status = self.submit(driver)
if submit_status is not JobSubmitStatusType.SUBMIT_OK:
self._set_queue_status(JobStatusType.JOB_QUEUE_DONE)
if submit_status is not SubmitStatus.OK:
self._set_queue_status(JobStatus.DONE)

end_status = self._poll_until_done(driver)
self._handle_end_status(driver, pool_sema, end_status, max_submit)

def _poll_until_done(self, driver: Driver) -> JobStatusType:
def _poll_until_done(self, driver: Driver) -> JobStatus:
current_status = self._poll_queue_status(driver)
backoff = _BackoffFunction()
# in the following loop, we increase the sleep time between loop iterations as
# long running realizations do not change state often, and too frequent querying
# with many realizations starves other threads for resources.
while self.is_running(current_status):
if (
self._start_time is None
and current_status == JobStatusType.JOB_QUEUE_RUNNING
):
if self._start_time is None and current_status == JobStatus.RUNNING:
self._start_time = time.time()
if self._start_time is not None:
elapsed_time = time.time() - self._start_time
Expand Down Expand Up @@ -357,23 +354,23 @@ def _log_kill_thread_stopping_status(self) -> None:
if self._tried_killing == 1:
logger.error(f"Killing job in {self.run_path} ({self.thread_status}).")

RESUBMIT_STATES = [JobStatusType.JOB_QUEUE_EXIT]
RESUBMIT_STATES = [JobStatus.EXIT]
DONE_STATES = [
JobStatusType.JOB_QUEUE_SUCCESS,
JobStatusType.JOB_QUEUE_IS_KILLED,
JobStatusType.JOB_QUEUE_DO_KILL_NODE_FAILURE,
JobStatus.SUCCESS,
JobStatus.IS_KILLED,
JobStatus.DO_KILL_NODE_FAILURE,
]
FAILURE_STATES = [JobStatusType.JOB_QUEUE_FAILED]
FAILURE_STATES = [JobStatus.FAILED]

def _handle_end_status(
self,
driver: Driver,
pool_sema: Semaphore,
end_status: JobStatusType,
end_status: JobStatus,
max_submit: int,
) -> None:
with self._mutex:
if end_status == JobStatusType.JOB_QUEUE_DONE:
if end_status == JobStatus.DONE:
with pool_sema:
logger.info(
f"Realization: {self.callback_arguments[0].iens} complete, "
Expand Down Expand Up @@ -414,19 +411,16 @@ def _transition_to_failure(self, message: str) -> None:
logger.error(message)
self._transition_status(
thread_status=ThreadStatus.DONE,
queue_status=JobStatusType.JOB_QUEUE_FAILED, # type: ignore
queue_status=JobStatus.FAILED, # type: ignore
)

def _transition_status(
self,
thread_status: ThreadStatus,
queue_status: JobStatusType,
queue_status: JobStatus,
) -> None:
self._set_queue_status(queue_status)
if (
thread_status == ThreadStatus.DONE
and queue_status != JobStatusType.JOB_QUEUE_SUCCESS
):
if thread_status == ThreadStatus.DONE and queue_status != JobStatus.SUCCESS:
self.run_exit_callback()
self._set_thread_status(thread_status)

Expand Down Expand Up @@ -456,7 +450,7 @@ def stop(self) -> None:
elif self.thread_status == ThreadStatus.READY:
# clean-up to get the correct status after being stopped by user
self._set_thread_status(ThreadStatus.DONE)
self._set_queue_status(JobStatusType.JOB_QUEUE_FAILED)
self._set_queue_status(JobStatus.FAILED)

assert self.thread_status in [
ThreadStatus.DONE,
Expand Down
70 changes: 35 additions & 35 deletions src/ert/job_queue/job_status.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,55 @@
from cwrap import BaseCEnum


class JobStatusType(BaseCEnum): # type: ignore
class JobStatus(BaseCEnum): # type: ignore
TYPE_NAME = "job_status_type_enum"
# This value is used in external query routines - for jobs which are
# (currently) not active.
JOB_QUEUE_NOT_ACTIVE = None
JOB_QUEUE_WAITING = None # A node which is waiting in the internal queue.
NOT_ACTIVE = None
WAITING = None # A node which is waiting in the internal queue.
# Internal status: It has has been submitted - the next status update will
# (should) place it as pending or running.
JOB_QUEUE_SUBMITTED = None
SUBMITTED = None
# A node which is pending - a status returned by the external system. I.e LSF
JOB_QUEUE_PENDING = None
JOB_QUEUE_RUNNING = None # The job is running
PENDING = None
RUNNING = None # The job is running
# The job is done - but we have not yet checked if the target file is
# produced
JOB_QUEUE_DONE = None
DONE = None
# The job has exited - check attempts to determine if we retry or go to
# complete_fail
JOB_QUEUE_EXIT = None
# The job has been killed, following a JOB_QUEUE_DO_KILL - can restart.
JOB_QUEUE_IS_KILLED = None
EXIT = None
# The job has been killed, following a DO_KILL - can restart.
IS_KILLED = None
# The the job should be killed, either due to user request, or automated
# measures - the job can NOT be restarted..
JOB_QUEUE_DO_KILL = None
JOB_QUEUE_SUCCESS = None
JOB_QUEUE_RUNNING_DONE_CALLBACK = None
JOB_QUEUE_RUNNING_EXIT_CALLBACK = None
JOB_QUEUE_STATUS_FAILURE = None
JOB_QUEUE_FAILED = None
JOB_QUEUE_DO_KILL_NODE_FAILURE = None
JOB_QUEUE_UNKNOWN = None
DO_KILL = None
SUCCESS = None
RUNNING_DONE_CALLBACK = None
RUNNING_EXIT_CALLBACK = None
STATUS_FAILURE = None
FAILED = None
DO_KILL_NODE_FAILURE = None
UNKNOWN = None

@classmethod
def from_string(cls, name: str) -> "JobStatusType":
def from_string(cls, name: str) -> "JobStatus":
return super().from_string(name) # type: ignore


JobStatusType.addEnum("JOB_QUEUE_NOT_ACTIVE", 1)
JobStatusType.addEnum("JOB_QUEUE_WAITING", 2)
JobStatusType.addEnum("JOB_QUEUE_SUBMITTED", 4)
JobStatusType.addEnum("JOB_QUEUE_PENDING", 8)
JobStatusType.addEnum("JOB_QUEUE_RUNNING", 16)
JobStatusType.addEnum("JOB_QUEUE_DONE", 32)
JobStatusType.addEnum("JOB_QUEUE_EXIT", 64)
JobStatusType.addEnum("JOB_QUEUE_IS_KILLED", 128)
JobStatusType.addEnum("JOB_QUEUE_DO_KILL", 256)
JobStatusType.addEnum("JOB_QUEUE_SUCCESS", 512)
JobStatusType.addEnum("JOB_QUEUE_RUNNING_DONE_CALLBACK", 1024)
JobStatusType.addEnum("JOB_QUEUE_RUNNING_EXIT_CALLBACK", 2048)
JobStatusType.addEnum("JOB_QUEUE_STATUS_FAILURE", 4096)
JobStatusType.addEnum("JOB_QUEUE_FAILED", 8192)
JobStatusType.addEnum("JOB_QUEUE_DO_KILL_NODE_FAILURE", 16384)
JobStatusType.addEnum("JOB_QUEUE_UNKNOWN", 32768)
JobStatus.addEnum("NOT_ACTIVE", 1)
JobStatus.addEnum("WAITING", 2)
JobStatus.addEnum("SUBMITTED", 4)
JobStatus.addEnum("PENDING", 8)
JobStatus.addEnum("RUNNING", 16)
JobStatus.addEnum("DONE", 32)
JobStatus.addEnum("EXIT", 64)
JobStatus.addEnum("IS_KILLED", 128)
JobStatus.addEnum("DO_KILL", 256)
JobStatus.addEnum("SUCCESS", 512)
JobStatus.addEnum("RUNNING_DONE_CALLBACK", 1024)
JobStatus.addEnum("RUNNING_EXIT_CALLBACK", 2048)
JobStatus.addEnum("STATUS_FAILURE", 4096)
JobStatus.addEnum("FAILED", 8192)
JobStatus.addEnum("DO_KILL_NODE_FAILURE", 16384)
JobStatus.addEnum("UNKNOWN", 32768)
Loading

0 comments on commit cbb4606

Please sign in to comment.