Skip to content

Commit

Permalink
Testing transitions lib
Browse files Browse the repository at this point in the history
  • Loading branch information
xjules committed Nov 13, 2023
1 parent 47f0cbc commit 9ae6445
Showing 1 changed file with 88 additions and 1 deletion.
89 changes: 88 additions & 1 deletion src/ert/job_queue/job_status.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import asyncio
from statemachine import StateMachine, states
from enum import Enum, auto

from statemachine import StateMachine, states
from transitions import Machine
from transitions.extensions.asyncio import AsyncMachine


class JobStatus(Enum):
# This value is used in external query routines - for jobs which are
Expand Down Expand Up @@ -46,6 +49,90 @@ def __str__(self):
return super().__str__().replace("JobStatus.", "")


transitions = [
["activate", JobStatus.NOT_ACTIVE, JobStatus.WAITING],
{
"trigger": "submit",
"source": JobStatus.WAITING,
"dest": JobStatus.SUBMITTED,
"before": "on_submit",
},
["allocate", JobStatus.UNKNOWN, JobStatus.NOT_ACTIVE],
["accept", JobStatus.SUBMITTED, JobStatus.PENDING], # from driver
["start", JobStatus.PENDING, JobStatus.RUNNING], # from driver
["runend", JobStatus.RUNNING, JobStatus.DONE], # from driver
["runfail", JobStatus.RUNNING, JobStatus.EXIT], # from driver
["retry", JobStatus.EXIT, JobStatus.SUBMITTED],
[
"dokill",
[JobStatus.SUBMITTED, JobStatus.PENDING, JobStatus.RUNNING],
JobStatus.DO_KILL,
],
["verify_kill", JobStatus.DO_KILL, JobStatus.IS_KILLED],
[
"ack_killfailure",
JobStatus.DO_KILL,
JobStatus.DO_KILL_NODE_FAILURE,
], # do we want to track this?
["validate", JobStatus.DONE, JobStatus.SUCCESS],
["invalidate", JobStatus.DONE, JobStatus.FAILED],
[
"somethingwentwrong",
[
JobStatus.NOT_ACTIVE,
JobStatus.WAITING,
JobStatus.SUBMITTED,
JobStatus.PENDING,
JobStatus.RUNNING,
JobStatus.DONE,
JobStatus.EXIT,
JobStatus.DO_KILL,
],
JobStatus.UNKNOWN,
],
["donotgohere", JobStatus.UNKNOWN, JobStatus.STATUS_FAILURE],
]


class JobStatusModel:
def __init__(self, jobqueue, iens, retries: int = 1):
self.jobqueue = jobqueue
self.iens: int = iens
self.retries_left: int = retries
self.machine = AsyncMachine(
model=self,
states=JobStatus,
transitions=transitions,
initial=JobStatus.NOT_ACTIVE,
)

async def on_submit(self, event, state):
self.jobqueue.driver_submit(self.iens)

async def on_enter_state(self, event, state):
if state in [
self.SUBMITTED,
self.PENDING,
self.RUNNING,
self.SUCCESS,
self.FAILED,
]:
self.jobqueue.publish_change(self.iens, state.id)

async def on_enter_EXIT(self):
if self.retries_left > 0:
self.retry()
self.retries_left -= 1
else:
self.invalidate()

async def on_runend(self):
self.jobqueue.run_done_callback(self.iens)

async def on_enter_DO_KILL(self):
self.jobqueue.driver_kill(self.iens)


class JobStatusMachine(StateMachine):
def __init__(self, jobqueue, iens, retries: int = 1):
self.jobqueue = jobqueue
Expand Down

0 comments on commit 9ae6445

Please sign in to comment.