Skip to content

Commit

Permalink
Merge pull request #1133 from openradx/fix-retry-when-aborting-2
Browse files Browse the repository at this point in the history
Set job to failed when task raises and job is aborting (regardless of retry strategy)
  • Loading branch information
medihack authored Jul 31, 2024
2 parents 84ba1a5 + 0e6c2f7 commit 73ba60a
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 8 deletions.
5 changes: 5 additions & 0 deletions docs/howto/advanced/cancellation.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,8 @@ async def my_task(context):
database and might flood the database. Ensure you do it only sometimes and
not from too many parallel tasks.
:::

:::{note}
When a task of a job that was requested to be aborted raises an error, the job
is marked as failed (regardless of the retry strategy).
:::
17 changes: 12 additions & 5 deletions procrastinate/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,11 +306,18 @@ async def run_job(self, job: jobs.Job, worker_id: int) -> None:
exc_info = e
critical = not isinstance(e, Exception)

retry_exception = task.get_retry_exception(exception=e, job=job)
if retry_exception:
log_title = "Error, to retry"
log_action = "job_error_retry"
log_level = logging.INFO
assert job.id
status = await self.job_manager.get_job_status_async(job_id=job.id)

if status == jobs.Status.ABORTING:
retry_exception = None
else:
retry_exception = task.get_retry_exception(exception=e, job=job)
if retry_exception:
log_title = "Error, to retry"
log_action = "job_error_retry"
log_level = logging.INFO

raise exceptions.JobError(
retry_exception=retry_exception, critical=critical
) from e
Expand Down
19 changes: 19 additions & 0 deletions tests/acceptance/test_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,22 @@ def task2(context):

status = await async_app.job_manager.get_job_status_async(job2_id)
assert status == Status.ABORTED


async def test_retry_when_aborting(async_app):
attempts = 0

@async_app.task(queue="default", name="task1", pass_context=True, retry=True)
async def example_task(context):
nonlocal attempts
attempts += 1
await async_app.job_manager.cancel_job_by_id_async(context.job.id, abort=True)
raise ValueError()

job_id = await example_task.defer_async()

await async_app.run_worker_async(queues=["default"], wait=False)

status = await async_app.job_manager.get_job_status_async(job_id)
assert status == Status.FAILED
assert attempts == 1
9 changes: 6 additions & 3 deletions tests/unit/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ def job_func(a, b): # pylint: disable=unused-argument
)


async def test_run_job_error(app, caplog):
async def test_run_job_error(app, caplog, mocker):
caplog.set_level("INFO")

def job_func(a, b): # pylint: disable=unused-argument
Expand All @@ -385,6 +385,7 @@ def job_func(a, b): # pylint: disable=unused-argument
task_name="job",
queue="yay",
)
app.job_manager.get_job_status_async = mocker.AsyncMock(return_value="doing")
test_worker = worker.Worker(app, queues=["yay"])
with pytest.raises(exceptions.JobError):
await test_worker.run_job(job=job, worker_id=3)
Expand All @@ -401,7 +402,7 @@ def job_func(a, b): # pylint: disable=unused-argument
)


async def test_run_job_critical_error(app, caplog):
async def test_run_job_critical_error(app, caplog, mocker):
caplog.set_level("INFO")

def job_func(a, b): # pylint: disable=unused-argument
Expand All @@ -420,14 +421,15 @@ def job_func(a, b): # pylint: disable=unused-argument
task_name="job",
queue="yay",
)
app.job_manager.get_job_status_async = mocker.AsyncMock(return_value="doing")
test_worker = worker.Worker(app, queues=["yay"])
with pytest.raises(exceptions.JobError) as exc_info:
await test_worker.run_job(job=job, worker_id=3)

assert exc_info.value.critical is True


async def test_run_job_retry(app, caplog):
async def test_run_job_retry(app, caplog, mocker):
caplog.set_level("INFO")

def job_func(a, b): # pylint: disable=unused-argument
Expand All @@ -446,6 +448,7 @@ def job_func(a, b): # pylint: disable=unused-argument
queueing_lock="houba",
queue="yay",
)
app.job_manager.get_job_status_async = mocker.AsyncMock(return_value="doing")
test_worker = worker.Worker(app, queues=["yay"])
with pytest.raises(exceptions.JobError) as exc_info:
await test_worker.run_job(job=job, worker_id=3)
Expand Down

0 comments on commit 73ba60a

Please sign in to comment.