Skip to content

Commit

Permalink
fix: fix restapi/worker race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
chisholm committed Oct 19, 2023
1 parent 1b6d159 commit 2062481
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
15 changes: 8 additions & 7 deletions src/dioptra/restapi/job/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,11 @@ def submit_task_engine(
if not is_valid(experiment_description):
raise InvalidExperimentDescriptionError

job_id = str(uuid.uuid4())
timestamp = datetime.datetime.now()

new_job = Job(
job_id=job_id,
experiment_id=experiment.experiment_id,
queue_id=queue.queue_id,
created_on=timestamp,
Expand All @@ -194,7 +196,11 @@ def submit_task_engine(
if global_parameters is not None:
new_job.entry_point_kwargs = json.dumps(global_parameters)

rq_job: RQJob = self._rq_service.submit_task_engine_job(
db.session.add(new_job)
db.session.commit()

self._rq_service.submit_task_engine_job(
job_id=job_id,
queue=queue_name,
experiment_id=experiment.experiment_id,
experiment_description=experiment_description,
Expand All @@ -203,12 +209,7 @@ def submit_task_engine(
timeout=timeout,
)

new_job.job_id = rq_job.get_id()

db.session.add(new_job)
db.session.commit()

log.info("Job submission successful", job_id=new_job.job_id)
log.info("Job submission successful", job_id=job_id)

return new_job

Expand Down
9 changes: 5 additions & 4 deletions src/dioptra/restapi/shared/rq/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,14 @@ def submit_mlflow_job(

def submit_task_engine_job(
self,
job_id: str,
queue: str,
experiment_id: int,
experiment_description: Mapping[str, Any],
global_parameters: Optional[Mapping[str, Any]] = None,
depends_on: Optional[str] = None,
timeout: Optional[str] = None,
) -> RQJob:
):
log: BoundLogger = LOGGER.new()

job_dependency: Optional[RQJob] = None
Expand All @@ -134,17 +135,17 @@ def submit_task_engine_job(
log.info(
"Enqueuing job",
function=self._run_task_engine,
job_id=job_id,
cmd_kwargs=cmd_kwargs,
timeout=timeout,
depends_on=job_dependency,
)

q: RQQueue = RQQueue(queue, default_timeout=24 * 3600, connection=self._redis)
result: RQJob = q.enqueue(
q.enqueue(
self._run_task_engine,
job_id=job_id,
kwargs=cmd_kwargs,
timeout=timeout,
depends_on=job_dependency,
)

return result

0 comments on commit 2062481

Please sign in to comment.