Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set job to failed when task raises and job is aborting (regardless retry) #1133

Merged
merged 2 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions procrastinate/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,11 +306,18 @@ async def run_job(self, job: jobs.Job, worker_id: int) -> None:
exc_info = e
critical = not isinstance(e, Exception)

retry_exception = task.get_retry_exception(exception=e, job=job)
if retry_exception:
log_title = "Error, to retry"
log_action = "job_error_retry"
log_level = logging.INFO
assert job.id
status = await self.job_manager.get_job_status_async(job_id=job.id)

if status == jobs.Status.ABORTING:
retry_exception = None
else:
retry_exception = task.get_retry_exception(exception=e, job=job)
if retry_exception:
log_title = "Error, to retry"
log_action = "job_error_retry"
log_level = logging.INFO

raise exceptions.JobError(
retry_exception=retry_exception, critical=critical
) from e
Expand Down
19 changes: 19 additions & 0 deletions tests/acceptance/test_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,22 @@ def task2(context):

status = await async_app.job_manager.get_job_status_async(job2_id)
assert status == Status.ABORTED


async def test_retry_when_aborting(async_app):
attempts = 0

@async_app.task(queue="default", name="task1", pass_context=True, retry=True)
async def example_task(context):
nonlocal attempts
attempts += 1
await async_app.job_manager.cancel_job_by_id_async(context.job.id, abort=True)
raise ValueError()

job_id = await example_task.defer_async()

await async_app.run_worker_async(queues=["default"], wait=False)

status = await async_app.job_manager.get_job_status_async(job_id)
assert status == Status.FAILED
assert attempts == 1
9 changes: 6 additions & 3 deletions tests/unit/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ def job_func(a, b): # pylint: disable=unused-argument
)


async def test_run_job_error(app, caplog):
async def test_run_job_error(app, caplog, mocker):
caplog.set_level("INFO")

def job_func(a, b): # pylint: disable=unused-argument
Expand All @@ -385,6 +385,7 @@ def job_func(a, b): # pylint: disable=unused-argument
task_name="job",
queue="yay",
)
app.job_manager.get_job_status_async = mocker.AsyncMock(return_value="doing")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious: what happens if we don't mock? The InMemory implementation doesn't answer correctly ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'll throw. The job is not put into the worker queue. The run_job method is tested in isolation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, makes sense!

test_worker = worker.Worker(app, queues=["yay"])
with pytest.raises(exceptions.JobError):
await test_worker.run_job(job=job, worker_id=3)
Expand All @@ -401,7 +402,7 @@ def job_func(a, b): # pylint: disable=unused-argument
)


async def test_run_job_critical_error(app, caplog):
async def test_run_job_critical_error(app, caplog, mocker):
caplog.set_level("INFO")

def job_func(a, b): # pylint: disable=unused-argument
Expand All @@ -420,14 +421,15 @@ def job_func(a, b): # pylint: disable=unused-argument
task_name="job",
queue="yay",
)
app.job_manager.get_job_status_async = mocker.AsyncMock(return_value="doing")
test_worker = worker.Worker(app, queues=["yay"])
with pytest.raises(exceptions.JobError) as exc_info:
await test_worker.run_job(job=job, worker_id=3)

assert exc_info.value.critical is True


async def test_run_job_retry(app, caplog):
async def test_run_job_retry(app, caplog, mocker):
caplog.set_level("INFO")

def job_func(a, b): # pylint: disable=unused-argument
Expand All @@ -446,6 +448,7 @@ def job_func(a, b): # pylint: disable=unused-argument
queueing_lock="houba",
queue="yay",
)
app.job_manager.get_job_status_async = mocker.AsyncMock(return_value="doing")
test_worker = worker.Worker(app, queues=["yay"])
with pytest.raises(exceptions.JobError) as exc_info:
await test_worker.run_job(job=job, worker_id=3)
Expand Down
Loading