From 35363e8238403136ecc4f0422a51c7ea3909eef7 Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Mon, 29 Jul 2024 23:00:43 +0000 Subject: [PATCH 1/2] Set job to failed when task raises and job is aborting (regardless of retry) --- procrastinate/worker.py | 17 ++++++++++++----- tests/acceptance/test_async.py | 19 +++++++++++++++++++ tests/unit/test_worker.py | 9 ++++++--- 3 files changed, 37 insertions(+), 8 deletions(-) diff --git a/procrastinate/worker.py b/procrastinate/worker.py index e5809e61a..1402d8260 100644 --- a/procrastinate/worker.py +++ b/procrastinate/worker.py @@ -306,11 +306,18 @@ async def run_job(self, job: jobs.Job, worker_id: int) -> None: exc_info = e critical = not isinstance(e, Exception) - retry_exception = task.get_retry_exception(exception=e, job=job) - if retry_exception: - log_title = "Error, to retry" - log_action = "job_error_retry" - log_level = logging.INFO + assert job.id + status = await self.job_manager.get_job_status_async(job_id=job.id) + + if status == jobs.Status.ABORTING: + retry_exception = None + else: + retry_exception = task.get_retry_exception(exception=e, job=job) + if retry_exception: + log_title = "Error, to retry" + log_action = "job_error_retry" + log_level = logging.INFO + raise exceptions.JobError( retry_exception=retry_exception, critical=critical ) from e diff --git a/tests/acceptance/test_async.py b/tests/acceptance/test_async.py index ba6618e10..4056f4f07 100644 --- a/tests/acceptance/test_async.py +++ b/tests/acceptance/test_async.py @@ -145,3 +145,22 @@ def task2(context): status = await async_app.job_manager.get_job_status_async(job2_id) assert status == Status.ABORTED + + +async def test_retry_when_aborting(async_app): + attempts = 0 + + @async_app.task(queue="default", name="task1", pass_context=True, retry=True) + async def example_task(context): + nonlocal attempts + attempts += 1 + await async_app.job_manager.cancel_job_by_id_async(context.job.id, abort=True) + raise ValueError() + + job_id = await example_task.defer_async() + + await async_app.run_worker_async(queues=["default"], wait=False) + + status = await async_app.job_manager.get_job_status_async(job_id) + assert status == Status.FAILED + assert attempts == 1 diff --git a/tests/unit/test_worker.py b/tests/unit/test_worker.py index 04c9c945b..790b295bb 100644 --- a/tests/unit/test_worker.py +++ b/tests/unit/test_worker.py @@ -366,7 +366,7 @@ def job_func(a, b): # pylint: disable=unused-argument ) -async def test_run_job_error(app, caplog): +async def test_run_job_error(app, caplog, mocker): caplog.set_level("INFO") def job_func(a, b): # pylint: disable=unused-argument @@ -385,6 +385,7 @@ def job_func(a, b): # pylint: disable=unused-argument task_name="job", queue="yay", ) + app.job_manager.get_job_status_async = mocker.AsyncMock(return_value="doing") test_worker = worker.Worker(app, queues=["yay"]) with pytest.raises(exceptions.JobError): await test_worker.run_job(job=job, worker_id=3) @@ -401,7 +402,7 @@ def job_func(a, b): # pylint: disable=unused-argument ) -async def test_run_job_critical_error(app, caplog): +async def test_run_job_critical_error(app, caplog, mocker): caplog.set_level("INFO") def job_func(a, b): # pylint: disable=unused-argument @@ -420,6 +421,7 @@ def job_func(a, b): # pylint: disable=unused-argument task_name="job", queue="yay", ) + app.job_manager.get_job_status_async = mocker.AsyncMock(return_value="doing") test_worker = worker.Worker(app, queues=["yay"]) with pytest.raises(exceptions.JobError) as exc_info: await test_worker.run_job(job=job, worker_id=3) @@ -427,7 +429,7 @@ def job_func(a, b): # pylint: disable=unused-argument assert exc_info.value.critical is True -async def test_run_job_retry(app, caplog): +async def test_run_job_retry(app, caplog, mocker): caplog.set_level("INFO") def job_func(a, b): # pylint: disable=unused-argument @@ -446,6 +448,7 @@ def job_func(a, b): # pylint: disable=unused-argument queueing_lock="houba", queue="yay", ) + app.job_manager.get_job_status_async = mocker.AsyncMock(return_value="doing") test_worker = worker.Worker(app, queues=["yay"]) with pytest.raises(exceptions.JobError) as exc_info: await test_worker.run_job(job=job, worker_id=3) From 0e6c2f7ab3c4dc09d362e71e1c88d4684805bc3a Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Wed, 31 Jul 2024 09:43:27 +0000 Subject: [PATCH 2/2] Add note to documentation of task that fails while aborting --- docs/howto/advanced/cancellation.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/howto/advanced/cancellation.md b/docs/howto/advanced/cancellation.md index c8bddfe2c..0743c21e8 100644 --- a/docs/howto/advanced/cancellation.md +++ b/docs/howto/advanced/cancellation.md @@ -70,3 +70,8 @@ async def my_task(context): database and might flood the database. Ensure you do it only sometimes and not from too many parallel tasks. ::: + +:::{note} +When a task of a job that was requested to be aborted raises an error, the job +is marked as failed (regardless of the retry strategy). +:::