diff --git a/src/ert/job_queue/job_status.py b/src/ert/job_queue/job_status.py index 3e53cca79a5..d6b0c3397a8 100644 --- a/src/ert/job_queue/job_status.py +++ b/src/ert/job_queue/job_status.py @@ -1,7 +1,10 @@ import asyncio -from statemachine import StateMachine, states from enum import Enum, auto +from statemachine import StateMachine, states +from transitions import Machine +from transitions.extensions.asyncio import AsyncMachine + class JobStatus(Enum): # This value is used in external query routines - for jobs which are @@ -46,6 +49,90 @@ def __str__(self): return super().__str__().replace("JobStatus.", "") +transitions = [ + ["activate", JobStatus.NOT_ACTIVE, JobStatus.WAITING], + { + "trigger": "submit", + "source": JobStatus.WAITING, + "dest": JobStatus.SUBMITTED, + "before": "on_submit", + }, + ["allocate", JobStatus.UNKNOWN, JobStatus.NOT_ACTIVE], + ["accept", JobStatus.SUBMITTED, JobStatus.PENDING], # from driver + ["start", JobStatus.PENDING, JobStatus.RUNNING], # from driver + ["runend", JobStatus.RUNNING, JobStatus.DONE], # from driver + ["runfail", JobStatus.RUNNING, JobStatus.EXIT], # from driver + ["retry", JobStatus.EXIT, JobStatus.SUBMITTED], + [ + "dokill", + [JobStatus.SUBMITTED, JobStatus.PENDING, JobStatus.RUNNING], + JobStatus.DO_KILL, + ], + ["verify_kill", JobStatus.DO_KILL, JobStatus.IS_KILLED], + [ + "ack_killfailure", + JobStatus.DO_KILL, + JobStatus.DO_KILL_NODE_FAILURE, + ], # do we want to track this? + ["validate", JobStatus.DONE, JobStatus.SUCCESS], + ["invalidate", JobStatus.DONE, JobStatus.FAILED], + [ + "somethingwentwrong", + [ + JobStatus.NOT_ACTIVE, + JobStatus.WAITING, + JobStatus.SUBMITTED, + JobStatus.PENDING, + JobStatus.RUNNING, + JobStatus.DONE, + JobStatus.EXIT, + JobStatus.DO_KILL, + ], + JobStatus.UNKNOWN, + ], + ["donotgohere", JobStatus.UNKNOWN, JobStatus.STATUS_FAILURE], +] + + +class JobStatusModel: + def __init__(self, jobqueue, iens, retries: int = 1): + self.jobqueue = jobqueue + self.iens: int = iens + self.retries_left: int = retries + self.machine = AsyncMachine( + model=self, + states=JobStatus, + transitions=transitions, + initial=JobStatus.NOT_ACTIVE, + ) + + async def on_submit(self, event, state): + self.jobqueue.driver_submit(self.iens) + + async def on_enter_state(self, event, state): + if state in [ + self.SUBMITTED, + self.PENDING, + self.RUNNING, + self.SUCCESS, + self.FAILED, + ]: + self.jobqueue.publish_change(self.iens, state.id) + + async def on_enter_EXIT(self): + if self.retries_left > 0: + self.retry() + self.retries_left -= 1 + else: + self.invalidate() + + async def on_runend(self): + self.jobqueue.run_done_callback(self.iens) + + async def on_enter_DO_KILL(self): + self.jobqueue.driver_kill(self.iens) + + class JobStatusMachine(StateMachine): def __init__(self, jobqueue, iens, retries: int = 1): self.jobqueue = jobqueue