From 71f53be102e8ce86bb1982385399d757bdf69b90 Mon Sep 17 00:00:00 2001 From: James Addison Date: Thu, 20 Jun 2024 09:25:18 -0700 Subject: [PATCH 1/4] Add pre and post task hooks. --- README.md | 23 +++++++++++ django_dbq/__init__.py | 2 +- django_dbq/management/commands/worker.py | 4 ++ django_dbq/models.py | 22 ++++++++++ django_dbq/tasks.py | 12 ++++++ django_dbq/tests.py | 52 ++++++++++++++++++++++++ 6 files changed, 114 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 5562651..cb77aef 100644 --- a/README.md +++ b/README.md @@ -112,6 +112,29 @@ JOBS = { } ``` +#### Pre & Post Task Hooks +You can also run pre task or post task hooks, which happen in the normal processing of your `Job` instances and are executed in the worker process. + +Both pre and post task hooks receive your `Job` instance as their only argument. Here's an example: + +```python +def my_pre_task_hook(job): + ... # configure something before running your task +``` + +To ensure these hooks gets run, simply add a `pre_task_hook` or `post_task_hook` key (or both, if needed) to your job config like so: + +```python +JOBS = { + "my_job": { + "tasks": ["project.common.jobs.my_task"], + "pre_task_hook": "project.common.jobs.my_pre_task_hook", + "post_task_hook": "project.common.jobs.my_post_task_hook", + }, +} +``` + + ### Start the worker In another terminal: diff --git a/django_dbq/__init__.py b/django_dbq/__init__.py index f5f41e5..1173108 100644 --- a/django_dbq/__init__.py +++ b/django_dbq/__init__.py @@ -1 +1 @@ -__version__ = "3.1.0" +__version__ = "3.2.0" diff --git a/django_dbq/management/commands/worker.py b/django_dbq/management/commands/worker.py index 9215aad..7434981 100644 --- a/django_dbq/management/commands/worker.py +++ b/django_dbq/management/commands/worker.py @@ -61,6 +61,8 @@ def _process_job(self): if not job: return + job.run_pre_task_hook() + logger.info( 'Processing job: name="%s" queue="%s" id=%s state=%s next_task=%s', job.name, @@ -109,6 +111,8 @@ def _process_job(self): logger.exception("Failed to save job: id=%s", job.pk) raise + job.run_post_task_hook() + self.current_job = None diff --git a/django_dbq/models.py b/django_dbq/models.py index d93a05b..31c4aef 100644 --- a/django_dbq/models.py +++ b/django_dbq/models.py @@ -3,6 +3,8 @@ from django.utils.module_loading import import_string from django_dbq.tasks import ( get_next_task_name, + get_pre_task_hook_name, + get_post_task_hook_name, get_failure_hook_name, get_creation_hook_name, ) @@ -126,12 +128,32 @@ def save(self, *args, **kwargs): def update_next_task(self): self.next_task = get_next_task_name(self.name, self.next_task) or "" + def get_pre_task_hook_name(self): + return get_pre_task_hook_name(self.name) + + def get_post_task_hook_name(self): + return get_post_task_hook_name(self.name) + def get_failure_hook_name(self): return get_failure_hook_name(self.name) def get_creation_hook_name(self): return get_creation_hook_name(self.name) + def run_pre_task_hook(self): + pre_task_hook_name = self.get_pre_task_hook_name() + if pre_task_hook_name: + logger.info("Running pre_task hook %s for new job", pre_task_hook_name) + pre_task_hook_function = import_string(pre_task_hook_name) + pre_task_hook_function(self) + + def run_post_task_hook(self): + post_task_hook_name = self.get_post_task_hook_name() + if post_task_hook_name: + logger.info("Running post_task hook %s for new job", post_task_hook_name) + post_task_hook_function = import_string(post_task_hook_name) + post_task_hook_function(self) + def run_creation_hook(self): creation_hook_name = self.get_creation_hook_name() if creation_hook_name: diff --git a/django_dbq/tasks.py b/django_dbq/tasks.py index 3e43da3..a95b4a5 100644 --- a/django_dbq/tasks.py +++ b/django_dbq/tasks.py @@ -2,6 +2,8 @@ TASK_LIST_KEY = "tasks" +PRE_TASK_HOOK_KEY = "pre_task_hook" +POST_TASK_HOOK_KEY = "post_task_hook" FAILURE_HOOK_KEY = "failure_hook" CREATION_HOOK_KEY = "creation_hook" @@ -24,6 +26,16 @@ def get_next_task_name(job_name, current_task=None): return None +def get_pre_task_hook_name(job_name): + """Return the name of the pre task hook for the given job (as a string) or None""" + return settings.JOBS[job_name].get(PRE_TASK_HOOK_KEY) + + +def get_post_task_hook_name(job_name): + """Return the name of the post_task hook for the given job (as a string) or None""" + return settings.JOBS[job_name].get(POST_TASK_HOOK_KEY) + + def get_failure_hook_name(job_name): """Return the name of the failure hook for the given job (as a string) or None""" return settings.JOBS[job_name].get(FAILURE_HOOK_KEY) diff --git a/django_dbq/tests.py b/django_dbq/tests.py index ad376cd..39c0ba5 100644 --- a/django_dbq/tests.py +++ b/django_dbq/tests.py @@ -34,12 +34,25 @@ def failing_task(job): raise Exception("uh oh") +def pre_task_hook(job): + job.workspace["output"] = "pre task hook ran" + job.workspace["job_id"] = str(job.id) + + +def post_task_hook(job): + job.workspace["output"] = "post task hook ran" + job.workspace["job_id"] = str(job.id) + + def failure_hook(job, exception): job.workspace["output"] = "failure hook ran" + job.workspace["exception"] = str(exception) + job.workspace["job_id"] = str(job.id) def creation_hook(job): job.workspace["output"] = "creation hook ran" + job.workspace["job_id"] = str(job.id) @override_settings(JOBS={"testjob": {"tasks": ["a"]}}) @@ -316,6 +329,7 @@ def test_creation_hook(self): job = Job.objects.create(name="testjob") job = Job.objects.get() self.assertEqual(job.workspace["output"], "creation hook ran") + self.assertEqual(job.workspace["job_id"], str(job.id)) def test_creation_hook_only_runs_on_create(self): job = Job.objects.create(name="testjob") @@ -326,6 +340,42 @@ def test_creation_hook_only_runs_on_create(self): self.assertEqual(job.workspace["output"], "creation hook output removed") +@override_settings( + JOBS={ + "testjob": { + "tasks": ["django_dbq.tests.failing_task"], + "pre_task_hook": "django_dbq.tests.pre_task_hook", + } + } +) +class JobPreTaskHookTestCase(TestCase): + def test_pre_task_hook(self): + job = Job.objects.create(name="testjob") + Worker("default", 1)._process_job() + job = Job.objects.get() + self.assertEqual(job.state, Job.STATES.FAILED) + self.assertEqual(job.workspace["output"], "failure hook ran") + self.assertEqual(job.workspace["job_id"], str(job.id)) + + +@override_settings( + JOBS={ + "testjob": { + "tasks": ["django_dbq.tests.failing_task"], + "post_task_hook": "django_dbq.tests.post_task_hook", + } + } +) +class JobPostTaskHookTestCase(TestCase): + def test_post_task_hook(self): + job = Job.objects.create(name="testjob") + Worker("default", 1)._process_job() + job = Job.objects.get() + self.assertEqual(job.state, Job.STATES.FAILED) + self.assertEqual(job.workspace["output"], "post task hook ran") + self.assertEqual(job.workspace["job_id"], str(job.id)) + + @override_settings( JOBS={ "testjob": { @@ -341,6 +391,8 @@ def test_failure_hook(self): job = Job.objects.get() self.assertEqual(job.state, Job.STATES.FAILED) self.assertEqual(job.workspace["output"], "failure hook ran") + self.assertIn("uh oh", job.workspace["exception"]) + self.assertEqual(job.workspace["job_id"], str(job.id)) @override_settings(JOBS={"testjob": {"tasks": ["a"]}}) From dba1e9159354869adf5b6f214c73698aa354cc82 Mon Sep 17 00:00:00 2001 From: Jamie Matthews Date: Fri, 21 Jun 2024 14:54:57 +0100 Subject: [PATCH 2/4] Rework where pre and post task hooks are run, and fix tests --- django_dbq/management/commands/worker.py | 12 ++++++++---- django_dbq/tests.py | 10 +++++----- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/django_dbq/management/commands/worker.py b/django_dbq/management/commands/worker.py index 7434981..fcafd14 100644 --- a/django_dbq/management/commands/worker.py +++ b/django_dbq/management/commands/worker.py @@ -61,8 +61,6 @@ def _process_job(self): if not job: return - job.run_pre_task_hook() - logger.info( 'Processing job: name="%s" queue="%s" id=%s state=%s next_task=%s', job.name, @@ -77,7 +75,10 @@ def _process_job(self): try: task_function = import_string(job.next_task) + + job.run_pre_task_hook() task_function(job) + job.update_next_task() if not job.next_task: job.state = Job.STATES.COMPLETE @@ -96,6 +97,11 @@ def _process_job(self): failure_hook_function(job, exception) else: logger.info("No failure hook for job id=%s", job.pk) + finally: + try: + job.run_post_task_hook() + except: + logger.exception("Job id=%s post_task_hook failed", job.pk) logger.info( 'Updating job: name="%s" id=%s state=%s next_task=%s', @@ -111,8 +117,6 @@ def _process_job(self): logger.exception("Failed to save job: id=%s", job.pk) raise - job.run_post_task_hook() - self.current_job = None diff --git a/django_dbq/tests.py b/django_dbq/tests.py index 39c0ba5..dd83540 100644 --- a/django_dbq/tests.py +++ b/django_dbq/tests.py @@ -343,7 +343,7 @@ def test_creation_hook_only_runs_on_create(self): @override_settings( JOBS={ "testjob": { - "tasks": ["django_dbq.tests.failing_task"], + "tasks": ["django_dbq.tests.test_task"], "pre_task_hook": "django_dbq.tests.pre_task_hook", } } @@ -353,15 +353,15 @@ def test_pre_task_hook(self): job = Job.objects.create(name="testjob") Worker("default", 1)._process_job() job = Job.objects.get() - self.assertEqual(job.state, Job.STATES.FAILED) - self.assertEqual(job.workspace["output"], "failure hook ran") + self.assertEqual(job.state, Job.STATES.COMPLETE) + self.assertEqual(job.workspace["output"], "pre task hook ran") self.assertEqual(job.workspace["job_id"], str(job.id)) @override_settings( JOBS={ "testjob": { - "tasks": ["django_dbq.tests.failing_task"], + "tasks": ["django_dbq.tests.test_task"], "post_task_hook": "django_dbq.tests.post_task_hook", } } @@ -371,7 +371,7 @@ def test_post_task_hook(self): job = Job.objects.create(name="testjob") Worker("default", 1)._process_job() job = Job.objects.get() - self.assertEqual(job.state, Job.STATES.FAILED) + self.assertEqual(job.state, Job.STATES.COMPLETE) self.assertEqual(job.workspace["output"], "post task hook ran") self.assertEqual(job.workspace["job_id"], str(job.id)) From c52941f6fe2aa41e21c1df96bb294e2db9cb9be0 Mon Sep 17 00:00:00 2001 From: Jamie Matthews Date: Fri, 21 Jun 2024 16:14:18 +0100 Subject: [PATCH 3/4] Refactor to pull task and hook running into Job model --- django_dbq/management/commands/worker.py | 17 +++-------------- django_dbq/models.py | 17 ++++++++++++++--- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/django_dbq/management/commands/worker.py b/django_dbq/management/commands/worker.py index fcafd14..d166b8d 100644 --- a/django_dbq/management/commands/worker.py +++ b/django_dbq/management/commands/worker.py @@ -74,12 +74,10 @@ def _process_job(self): self.current_job = job try: - task_function = import_string(job.next_task) - job.run_pre_task_hook() - task_function(job) - + job.run_next_task() job.update_next_task() + if not job.next_task: job.state = Job.STATES.COMPLETE else: @@ -87,16 +85,7 @@ def _process_job(self): except Exception as exception: logger.exception("Job id=%s failed", job.pk) job.state = Job.STATES.FAILED - - failure_hook_name = job.get_failure_hook_name() - if failure_hook_name: - logger.info( - "Running failure hook %s for job id=%s", failure_hook_name, job.pk - ) - failure_hook_function = import_string(failure_hook_name) - failure_hook_function(job, exception) - else: - logger.info("No failure hook for job id=%s", job.pk) + job.run_failure_hook(exception) finally: try: job.run_post_task_hook() diff --git a/django_dbq/models.py b/django_dbq/models.py index 31c4aef..b58eef4 100644 --- a/django_dbq/models.py +++ b/django_dbq/models.py @@ -128,6 +128,10 @@ def save(self, *args, **kwargs): def update_next_task(self): self.next_task = get_next_task_name(self.name, self.next_task) or "" + def run_next_task(self): + next_task_function = import_string(self.next_task) + next_task_function(self) + def get_pre_task_hook_name(self): return get_pre_task_hook_name(self.name) @@ -143,21 +147,28 @@ def get_creation_hook_name(self): def run_pre_task_hook(self): pre_task_hook_name = self.get_pre_task_hook_name() if pre_task_hook_name: - logger.info("Running pre_task hook %s for new job", pre_task_hook_name) + logger.info("Running pre_task hook %s for job", pre_task_hook_name) pre_task_hook_function = import_string(pre_task_hook_name) pre_task_hook_function(self) def run_post_task_hook(self): post_task_hook_name = self.get_post_task_hook_name() if post_task_hook_name: - logger.info("Running post_task hook %s for new job", post_task_hook_name) + logger.info("Running post_task hook %s for job", post_task_hook_name) post_task_hook_function = import_string(post_task_hook_name) post_task_hook_function(self) + def run_failure_hook(self, exception): + failure_hook_name = self.get_failure_hook_name() + if failure_hook_name: + logger.info("Running failure hook %s for job", failure_hook_name) + failure_hook_function = import_string(failure_hook_name) + failure_hook_function(self, exception) + def run_creation_hook(self): creation_hook_name = self.get_creation_hook_name() if creation_hook_name: - logger.info("Running creation hook %s for new job", creation_hook_name) + logger.info("Running creation hook %s for job", creation_hook_name) creation_hook_function = import_string(creation_hook_name) creation_hook_function(self) From 32b1ac9a353cc1277c5d29d87d6f6fd192008c95 Mon Sep 17 00:00:00 2001 From: Jamie Matthews Date: Tue, 25 Jun 2024 10:53:31 +0100 Subject: [PATCH 4/4] Add notes on pre/post hook behaviour --- README.md | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index cb77aef..c4e698a 100644 --- a/README.md +++ b/README.md @@ -113,7 +113,7 @@ JOBS = { ``` #### Pre & Post Task Hooks -You can also run pre task or post task hooks, which happen in the normal processing of your `Job` instances and are executed in the worker process. +You can also run pre task or post task hooks, which happen in the normal processing of your `Job` instances and are executed inside the worker process. Both pre and post task hooks receive your `Job` instance as their only argument. Here's an example: @@ -122,7 +122,7 @@ def my_pre_task_hook(job): ... # configure something before running your task ``` -To ensure these hooks gets run, simply add a `pre_task_hook` or `post_task_hook` key (or both, if needed) to your job config like so: +To ensure these hooks are run, simply add a `pre_task_hook` or `post_task_hook` key (or both, if needed) to your job config like so: ```python JOBS = { @@ -134,6 +134,12 @@ JOBS = { } ``` +Notes: + +* If the `pre_task_hook` fails (raises an exception), the task function is not run, and django-db-queue behaves as if the task function itself had failed: the failure hook is called, and the job is goes into the `FAILED` state. +* The `post_task_hook` is always run, even if the job fails. In this case, it runs after the `failure_hook`. +* If the `post_task_hook` raises an exception, this is logged but the the job is **not marked as failed** and the failure hook does not run. This is because the `post_task_hook` might need to perform cleanup that always happens after the task, no matter whether it succeeds or fails. + ### Start the worker