From e897bfe75a2552d897a5b397c3f3c4f492adccad Mon Sep 17 00:00:00 2001 From: Joachim Jablon Date: Sat, 5 Oct 2024 15:45:11 +0200 Subject: [PATCH 01/17] Rework migrations --- .../0031_pre_cancel_notification.py | 20 ++ .../migrations/0032_cancel_notification.py | 15 - ...bs.py => 0032_post_cancel_notification.py} | 11 +- .../03.00.00_01_cancel_notification.sql | 87 ------ .../03.00.00_01_pre_cancel_notification.sql | 250 +++++++++++++++ ... 03.00.00_50_post_cancel_notification.sql} | 240 +++++++++------ procrastinate/sql/queries.sql | 20 +- procrastinate/sql/schema.sql | 285 +++++------------- tests/integration/test_manager.py | 2 +- 9 files changed, 496 insertions(+), 434 deletions(-) create mode 100644 procrastinate/contrib/django/migrations/0031_pre_cancel_notification.py delete mode 100644 procrastinate/contrib/django/migrations/0032_cancel_notification.py rename procrastinate/contrib/django/migrations/{0031_add_abort_on_procrastinate_jobs.py => 0032_post_cancel_notification.py} (67%) delete mode 100644 procrastinate/sql/migrations/03.00.00_01_cancel_notification.sql create mode 100644 procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql rename procrastinate/sql/migrations/{02.09.02_01_add_abort_on_procrastinate_jobs.sql => 03.00.00_50_post_cancel_notification.sql} (59%) diff --git a/procrastinate/contrib/django/migrations/0031_pre_cancel_notification.py b/procrastinate/contrib/django/migrations/0031_pre_cancel_notification.py new file mode 100644 index 000000000..6ec17db85 --- /dev/null +++ b/procrastinate/contrib/django/migrations/0031_pre_cancel_notification.py @@ -0,0 +1,20 @@ +from __future__ import annotations + +from django.db import migrations, models + +from .. import migrations_utils + + +class Migration(migrations.Migration): + operations = [ + migrations_utils.RunProcrastinateSQL( + name="03.00.00_01_pre_cancel_notification.sql" + ), + migrations.AddField( + "procrastinatejob", + "abort_requested", + models.BooleanField(), + ), + ] + name = "0031_pre_cancel_notification" + dependencies = [("procrastinate", "0030_alter_procrastinateevent_options")] diff --git a/procrastinate/contrib/django/migrations/0032_cancel_notification.py b/procrastinate/contrib/django/migrations/0032_cancel_notification.py deleted file mode 100644 index 617265857..000000000 --- a/procrastinate/contrib/django/migrations/0032_cancel_notification.py +++ /dev/null @@ -1,15 +0,0 @@ -from __future__ import annotations - -from django.db import migrations - -from .. import migrations_utils - - -class Migration(migrations.Migration): - operations = [ - migrations_utils.RunProcrastinateSQL( - name="03.00.00_01_cancel_notification.sql" - ), - ] - name = "0032_cancel_notification" - dependencies = [("procrastinate", "0031_add_abort_on_procrastinate_jobs")] diff --git a/procrastinate/contrib/django/migrations/0031_add_abort_on_procrastinate_jobs.py b/procrastinate/contrib/django/migrations/0032_post_cancel_notification.py similarity index 67% rename from procrastinate/contrib/django/migrations/0031_add_abort_on_procrastinate_jobs.py rename to procrastinate/contrib/django/migrations/0032_post_cancel_notification.py index f968078d5..3be9f78b0 100644 --- a/procrastinate/contrib/django/migrations/0031_add_abort_on_procrastinate_jobs.py +++ b/procrastinate/contrib/django/migrations/0032_post_cancel_notification.py @@ -8,7 +8,7 @@ class Migration(migrations.Migration): operations = [ migrations_utils.RunProcrastinateSQL( - name="02.09.02_01_add_abort_on_procrastinate_jobs.sql" + name="03.00.00_50_post_cancel_notification.sql" ), migrations.AlterField( "procrastinatejob", @@ -25,11 +25,6 @@ class Migration(migrations.Migration): max_length=32, ), ), - migrations.AddField( - "procrastinatejob", - "abort_requested", - models.BooleanField(), - ), ] - name = "0031_add_abort_on_procrastinate_jobs" - dependencies = [("procrastinate", "0030_alter_procrastinateevent_options")] + name = "0032_post_cancel_notification" + dependencies = [("procrastinate", "0031_pre_cancel_notification")] diff --git a/procrastinate/sql/migrations/03.00.00_01_cancel_notification.sql b/procrastinate/sql/migrations/03.00.00_01_cancel_notification.sql deleted file mode 100644 index c2925b1f7..000000000 --- a/procrastinate/sql/migrations/03.00.00_01_cancel_notification.sql +++ /dev/null @@ -1,87 +0,0 @@ -CREATE OR REPLACE FUNCTION procrastinate_notify_queue_job_inserted() -RETURNS trigger - LANGUAGE plpgsql -AS $$ -DECLARE - payload TEXT; -BEGIN - SELECT json_object('type': 'job_inserted', 'job_id': NEW.id)::text INTO payload; - PERFORM pg_notify('procrastinate_queue#' || NEW.queue_name, payload); - PERFORM pg_notify('procrastinate_any_queue', payload); - RETURN NEW; -END; -$$; - -DROP TRIGGER IF EXISTS procrastinate_jobs_notify_queue ON procrastinate_jobs; - -CREATE TRIGGER procrastinate_jobs_notify_queue_job_inserted - AFTER INSERT ON procrastinate_jobs - FOR EACH ROW WHEN ((new.status = 'todo'::procrastinate_job_status)) - EXECUTE PROCEDURE procrastinate_notify_queue_job_inserted(); - -DROP FUNCTION IF EXISTS procrastinate_notify_queue; - -CREATE OR REPLACE FUNCTION procrastinate_notify_queue_abort_job() -RETURNS trigger - LANGUAGE plpgsql -AS $$ -DECLARE - payload TEXT; -BEGIN - SELECT json_object('type': 'abort_job_requested', 'job_id': NEW.id)::text INTO payload; - PERFORM pg_notify('procrastinate_queue#' || NEW.queue_name, payload); - PERFORM pg_notify('procrastinate_any_queue', payload); - RETURN NEW; -END; -$$; - -CREATE TRIGGER procrastinate_jobs_notify_queue_abort_job - AFTER UPDATE OF abort_requested ON procrastinate_jobs - FOR EACH ROW WHEN ((old.abort_requested = false AND new.abort_requested = true AND new.status = 'doing'::procrastinate_job_status)) - EXECUTE PROCEDURE procrastinate_notify_queue_abort_job(); - -CREATE OR REPLACE FUNCTION procrastinate_retry_job( - job_id bigint, - retry_at timestamp with time zone, - new_priority integer, - new_queue_name character varying, - new_lock character varying -) - RETURNS void - LANGUAGE plpgsql -AS $$ -DECLARE - _job_id bigint; -BEGIN - UPDATE procrastinate_jobs - SET status = CASE - WHEN NOT abort_requested THEN 'todo'::procrastinate_job_status - ELSE 'failed'::procrastinate_job_status - END, - attempts = CASE - WHEN NOT abort_requested THEN attempts + 1 - ELSE attempts - END, - scheduled_at = CASE - WHEN NOT abort_requested THEN retry_at - ELSE scheduled_at - END, - priority = CASE - WHEN NOT abort_requested THEN COALESCE(new_priority, priority) - ELSE priority - END, - queue_name = CASE - WHEN NOT abort_requested THEN COALESCE(new_queue_name, queue_name) - ELSE queue_name - END, - lock = CASE - WHEN NOT abort_requested THEN COALESCE(new_lock, lock) - ELSE lock - END - WHERE id = job_id AND status = 'doing' - RETURNING id INTO _job_id; - IF _job_id IS NULL THEN - RAISE 'Job was not found or not in "doing" status (job id: %)', job_id; - END IF; -END; -$$; diff --git a/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql b/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql new file mode 100644 index 000000000..d5e25c8cb --- /dev/null +++ b/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql @@ -0,0 +1,250 @@ +-- Note: starting with v3, there are 2 changes in the migration system: +-- - We now have pre- and post-migration scripts. pre-migrations are safe to +-- apply before upgrading the code. post-migrations are safe to apply after +-- upgrading he code. +-- This is a pre-migration script. +-- - Whenever we recreate an immutable object (function, trigger, indexes), we +-- will suffix its name with a version number. +-- +-- Add an 'abort_requested' column to the procrastinate_jobs table +-- Add an 'abort_requested' column to the procrastinate_jobs table +ALTER TABLE procrastinate_jobs ADD COLUMN abort_requested boolean DEFAULT false NOT NULL; + +-- Set abort requested flag on all jobs with 'aborting' status +UPDATE procrastinate_jobs SET abort_requested = true WHERE status = 'aborting'; + +-- Add temporary triggers to sync the abort_requested flag with the status +-- so that blue-green deployments can work +CREATE OR REPLACE FUNCTION procrastinate_sync_abort_requested_with_status_v1() + RETURNS trigger + LANGUAGE plpgsql +AS $$ +BEGIN + IF NEW.status = 'aborting' THEN + NEW.abort_requested = true; + ELSE + NEW.abort_requested = false; + END IF; + RETURN NEW; +END; +$$; + +CREATE TRIGGER procrastinate_trigger_sync_abort_requested_with_status_v1 + BEFORE UPDATE OF status ON procrastinate_jobs + FOR EACH ROW + EXECUTE FUNCTION procrastinate_sync_abort_requested_with_status_v1(); + +DROP TRIGGER IF EXISTS procrastinate_trigger_status_events_update ON procrastinate_jobs; +DROP FUNCTION IF EXISTS procrastinate_trigger_status_events_procedure_update; + +CREATE FUNCTION procrastinate_trigger_status_events_procedure_update_v1() + RETURNS trigger + LANGUAGE plpgsql +AS $$ +BEGIN + WITH t AS ( + SELECT CASE + WHEN OLD.status = 'todo'::procrastinate_job_status + AND NEW.status = 'doing'::procrastinate_job_status + THEN 'started'::procrastinate_job_event_type + WHEN OLD.status = 'doing'::procrastinate_job_status + AND NEW.status = 'todo'::procrastinate_job_status + THEN 'deferred_for_retry'::procrastinate_job_event_type + WHEN OLD.status = 'doing'::procrastinate_job_status + AND NEW.status = 'failed'::procrastinate_job_status + THEN 'failed'::procrastinate_job_event_type + WHEN OLD.status = 'doing'::procrastinate_job_status + AND NEW.status = 'succeeded'::procrastinate_job_status + THEN 'succeeded'::procrastinate_job_event_type + WHEN OLD.status = 'todo'::procrastinate_job_status + AND ( + NEW.status = 'cancelled'::procrastinate_job_status + OR NEW.status = 'failed'::procrastinate_job_status + OR NEW.status = 'succeeded'::procrastinate_job_status + ) + THEN 'cancelled'::procrastinate_job_event_type + WHEN OLD.status = 'doing'::procrastinate_job_status + AND NEW.status = 'aborted'::procrastinate_job_status + THEN 'aborted'::procrastinate_job_event_type + ELSE NULL + END as event_type + ) + INSERT INTO procrastinate_events(job_id, type) + SELECT NEW.id, t.event_type + FROM t + WHERE t.event_type IS NOT NULL; + RETURN NEW; +END; +$$; + +CREATE TRIGGER procrastinate_trigger_status_events_update_v1 + AFTER UPDATE OF status ON procrastinate_jobs + FOR EACH ROW + EXECUTE PROCEDURE procrastinate_trigger_status_events_procedure_update_v1(); + +DROP TRIGGER IF EXISTS procrastinate_trigger_status_events_update ON procrastinate_jobs; +DROP FUNCTION IF EXISTS procrastinate_trigger_status_events_procedure_update; + +DROP TRIGGER IF EXISTS procrastinate_jobs_notify_queue ON procrastinate_jobs; +DROP FUNCTION IF EXISTS procrastinate_notify_queue; + +CREATE OR REPLACE FUNCTION procrastinate_notify_queue_job_inserted_v1() +RETURNS trigger + LANGUAGE plpgsql +AS $$ +DECLARE + payload TEXT; +BEGIN + SELECT json_object('type': 'job_inserted', 'job_id': NEW.id)::text INTO payload; + PERFORM pg_notify('procrastinate_queue#' || NEW.queue_name, payload); + PERFORM pg_notify('procrastinate_any_queue', payload); + RETURN NEW; +END; +$$; + +CREATE OR REPLACE TRIGGER procrastinate_jobs_notify_queue_job_inserted_v1 + AFTER INSERT ON procrastinate_jobs + FOR EACH ROW WHEN ((new.status = 'todo'::procrastinate_job_status)) + EXECUTE PROCEDURE procrastinate_notify_queue_job_inserted_v1(); + + +-- Create the new versions for the functions +CREATE FUNCTION procrastinate_fetch_job_v1( + target_queue_names character varying[] +) + RETURNS procrastinate_jobs + LANGUAGE plpgsql +AS $$ +DECLARE + found_jobs procrastinate_jobs; +BEGIN + WITH candidate AS ( + SELECT jobs.* + FROM procrastinate_jobs AS jobs + WHERE + -- reject the job if its lock has earlier jobs + NOT EXISTS ( + SELECT 1 + FROM procrastinate_jobs AS earlier_jobs + WHERE + jobs.lock IS NOT NULL + AND earlier_jobs.lock = jobs.lock + AND earlier_jobs.status IN ('todo', 'doing') + AND earlier_jobs.id < jobs.id) + AND jobs.status = 'todo' + AND (target_queue_names IS NULL OR jobs.queue_name = ANY( target_queue_names )) + AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now()) + ORDER BY jobs.priority DESC, jobs.id ASC LIMIT 1 + FOR UPDATE OF jobs SKIP LOCKED + ) + UPDATE procrastinate_jobs + SET status = 'doing' + FROM candidate + WHERE procrastinate_jobs.id = candidate.id + RETURNING procrastinate_jobs.* INTO found_jobs; + + RETURN found_jobs; +END; +$$; + +CREATE FUNCTION procrastinate_finish_job_v1(job_id bigint, end_status procrastinate_job_status_v1, delete_job boolean) + RETURNS void + LANGUAGE plpgsql +AS $$ +DECLARE + _job_id bigint; +BEGIN + IF end_status NOT IN ('succeeded', 'failed', 'aborted') THEN + RAISE 'End status should be either "succeeded", "failed" or "aborted" (job id: %)', job_id; + END IF; + IF delete_job THEN + DELETE FROM procrastinate_jobs + WHERE id = job_id AND status IN ('todo', 'doing') + RETURNING id INTO _job_id; + ELSE + UPDATE procrastinate_jobs + SET status = end_status, + abort_requested = false, + attempts = CASE status + WHEN 'doing' THEN attempts + 1 ELSE attempts + END + WHERE id = job_id AND status IN ('todo', 'doing') + RETURNING id INTO _job_id; + END IF; + IF _job_id IS NULL THEN + RAISE 'Job was not found or not in "doing" or "todo" status (job id: %)', job_id; + END IF; +END; +$$; + +CREATE FUNCTION procrastinate_cancel_job_v1(job_id bigint, abort boolean, delete_job boolean) + RETURNS bigint + LANGUAGE plpgsql +AS $$ +DECLARE + _job_id bigint; +BEGIN + IF delete_job THEN + DELETE FROM procrastinate_jobs + WHERE id = job_id AND status = 'todo' + RETURNING id INTO _job_id; + END IF; + IF _job_id IS NULL THEN + IF abort THEN + UPDATE procrastinate_jobs + SET abort_requested = true, + status = CASE status + WHEN 'todo' THEN 'cancelled'::procrastinate_job_status_v1 ELSE status + END + WHERE id = job_id AND status IN ('todo', 'doing') + RETURNING id INTO _job_id; + ELSE + UPDATE procrastinate_jobs + SET status = 'cancelled'::procrastinate_job_status_v1 + WHERE id = job_id AND status = 'todo' + RETURNING id INTO _job_id; + END IF; + END IF; + RETURN _job_id; +END; +$$; + +-- The retry_job function now has specific behaviour when a job is set to be +-- retried while it's aborting: in that case it's marked as failed. +CREATE FUNCTION procrastinate_retry_job_v1( + job_id bigint, + retry_at timestamp with time zone, + new_priority integer, + new_queue_name character varying, + new_lock character varying +) RETURNS void LANGUAGE plpgsql AS $$ +DECLARE + _job_id bigint; + _abort_requested boolean; +BEGIN + SELECT abort_requested FROM procrastinate_jobs + WHERE id = job_id AND status = 'doing' + FOR UPDATE + INTO _abort_requested; + IF _abort_requested THEN + UPDATE procrastinate_jobs + SET status = 'failed'::procrastinate_job_status_v1 + WHERE id = job_id AND status = 'doing' + RETURNING id INTO _job_id; + ELSE + UPDATE procrastinate_jobs + SET status = 'todo'::procrastinate_job_status_v1, + attempts = attempts + 1, + scheduled_at = retry_at, + priority = COALESCE(new_priority, priority), + queue_name = COALESCE(new_queue_name, queue_name), + lock = COALESCE(new_lock, lock) + WHERE id = job_id AND status = 'doing' + RETURNING id INTO _job_id; + END IF; + + IF _job_id IS NULL THEN + RAISE 'Job was not found or not in "doing" status (job id: %)', job_id; + END IF; +END; +$$; diff --git a/procrastinate/sql/migrations/02.09.02_01_add_abort_on_procrastinate_jobs.sql b/procrastinate/sql/migrations/03.00.00_50_post_cancel_notification.sql similarity index 59% rename from procrastinate/sql/migrations/02.09.02_01_add_abort_on_procrastinate_jobs.sql rename to procrastinate/sql/migrations/03.00.00_50_post_cancel_notification.sql index 1a0b7dc58..215410bc1 100644 --- a/procrastinate/sql/migrations/02.09.02_01_add_abort_on_procrastinate_jobs.sql +++ b/procrastinate/sql/migrations/03.00.00_50_post_cancel_notification.sql @@ -1,66 +1,103 @@ --- Add an 'abort_requested' column to the procrastinate_jobs table -ALTER TABLE procrastinate_jobs ADD COLUMN abort_requested boolean DEFAULT false NOT NULL; +-- These are old versions of function, that we needed +-- to keep around for backwards compatibility. We can now safely drop them. +DROP FUNCTION IF EXISTS procrastinate_finish_job( + integer, + procrastinate_job_status, + timestamp with time zone, + boolean +); +DROP FUNCTION IF EXISTS procrastinate_defer_job( + character varying, + character varying, + text, + text, + jsonb, + timestamp with time zone +); +DROP FUNCTION IF EXISTS procrastinate_defer_periodic_job( + character varying, + character varying, + character varying, + character varying, + character varying, + bigint, + jsonb +); +DROP FUNCTION IF EXISTS procrastinate_retry_job( + bigint, + timestamp with time zone +); +DROP FUNCTION IF EXISTS procrastinate_retry_job( + bigint, + timestamp with time zone, + integer, + character varying, + character varying +); --- Set abort requested flag on all jobs with 'aborting' status +-- Remove all traces of the "aborting" status +-- Last sanity update in case the trigger didn't work 100% of the time UPDATE procrastinate_jobs SET abort_requested = true WHERE status = 'aborting'; +DROP TRIGGER IF EXISTS procrastinate_trigger_sync_abort_requested_with_status_v1 ON procrastinate_jobs; +DROP FUNCTION IF EXISTS procrastinate_sync_abort_requested_with_status_v1; + +-- Create a new enum type without 'aborting' +CREATE TYPE procrastinate_job_status_v1 AS ENUM ( + 'todo', + 'doing', + 'succeeded', + 'failed', + 'cancelled', + 'aborted' +); + +-- We need to drop the default temporarily as otherwise DatatypeMismatch would occur +ALTER TABLE procrastinate_jobs ALTER COLUMN status DROP DEFAULT; + -- Delete the indexes that depends on the old status and enum type DROP INDEX IF EXISTS procrastinate_jobs_queueing_lock_idx; DROP INDEX IF EXISTS procrastinate_jobs_lock_idx; DROP INDEX IF EXISTS procrastinate_jobs_id_lock_idx; -- Delete the triggers that depends on the old status type (to recreate them later) -DROP TRIGGER IF EXISTS procrastinate_jobs_notify_queue ON procrastinate_jobs; DROP TRIGGER IF EXISTS procrastinate_trigger_status_events_update ON procrastinate_jobs; DROP TRIGGER IF EXISTS procrastinate_trigger_status_events_insert ON procrastinate_jobs; DROP TRIGGER IF EXISTS procrastinate_trigger_scheduled_events ON procrastinate_jobs; +DROP TRIGGER IF EXISTS procrastinate_trigger_status_events_update_v1 ON procrastinate_jobs; +DROP TRIGGER IF EXISTS procrastinate_jobs_notify_queue_job_inserted_v1 ON procrastinate_jobs; -- Delete the functions that depends on the old status type DROP FUNCTION IF EXISTS procrastinate_fetch_job; DROP FUNCTION IF EXISTS procrastinate_finish_job(bigint, procrastinate_job_status, boolean); DROP FUNCTION IF EXISTS procrastinate_cancel_job; -DROP FUNCTION IF EXISTS procrastinate_trigger_status_events_procedure_update; +DROP FUNCTION IF EXISTS procrastinate_trigger_status_events_procedure_update_v1; DROP FUNCTION IF EXISTS procrastinate_finish_job(integer, procrastinate_job_status, timestamp with time zone, boolean); - --- Create a new enum type without 'aborting' -CREATE TYPE procrastinate_job_status_new AS ENUM ( - 'todo', - 'doing', - 'succeeded', - 'failed', - 'cancelled', - 'aborted' -); - --- We need to drop the default temporarily as otherwise DatatypeMismatch would occur -ALTER TABLE procrastinate_jobs ALTER COLUMN status DROP DEFAULT; +DROP FUNCTION IF EXISTS procrastinate_notify_queue_job_inserted_v1; -- Alter the table to use the new type ALTER TABLE procrastinate_jobs -ALTER COLUMN status TYPE procrastinate_job_status_new +ALTER COLUMN status TYPE procrastinate_job_status_v1 USING ( CASE status::text - WHEN 'aborting' THEN 'doing'::procrastinate_job_status_new - ELSE status::text::procrastinate_job_status_new + WHEN 'aborting' THEN 'doing'::procrastinate_job_status_v1 + ELSE status::text::procrastinate_job_status_v1 END ); -- Recreate the default -ALTER TABLE procrastinate_jobs ALTER COLUMN status SET DEFAULT 'todo'::procrastinate_job_status_new; - --- Drop the old type -DROP TYPE procrastinate_job_status; - --- Rename the new type to the original name -ALTER TYPE procrastinate_job_status_new RENAME TO procrastinate_job_status; +ALTER TABLE procrastinate_jobs ALTER COLUMN status SET DEFAULT 'todo'::procrastinate_job_status_v1; -- Recreate the indexes CREATE UNIQUE INDEX procrastinate_jobs_queueing_lock_idx ON procrastinate_jobs (queueing_lock) WHERE status = 'todo'; CREATE UNIQUE INDEX procrastinate_jobs_lock_idx ON procrastinate_jobs (lock) WHERE status = 'doing'; -CREATE INDEX procrastinate_jobs_id_lock_idx ON procrastinate_jobs (id, lock) WHERE status = ANY (ARRAY['todo'::procrastinate_job_status, 'doing'::procrastinate_job_status]); +CREATE INDEX procrastinate_jobs_id_lock_idx ON procrastinate_jobs (id, lock) WHERE status = ANY (ARRAY['todo'::procrastinate_job_status_v1, 'doing'::procrastinate_job_status_v1]); + +-- Drop the old type +DROP TYPE procrastinate_job_status; --- Recreate and update the functions -CREATE OR REPLACE FUNCTION procrastinate_fetch_job( +-- Recreate and update the functions to use the new column +CREATE FUNCTION procrastinate_fetch_job_v1( target_queue_names character varying[] ) RETURNS procrastinate_jobs @@ -98,7 +135,7 @@ BEGIN END; $$; -CREATE FUNCTION procrastinate_finish_job(job_id bigint, end_status procrastinate_job_status, delete_job boolean) +CREATE FUNCTION procrastinate_finish_job_v1(job_id bigint, end_status procrastinate_job_status_v1, delete_job boolean) RETURNS void LANGUAGE plpgsql AS $$ @@ -128,7 +165,7 @@ BEGIN END; $$; -CREATE FUNCTION procrastinate_cancel_job(job_id bigint, abort boolean, delete_job boolean) +CREATE FUNCTION procrastinate_cancel_job_v1(job_id bigint, abort boolean, delete_job boolean) RETURNS bigint LANGUAGE plpgsql AS $$ @@ -145,13 +182,13 @@ BEGIN UPDATE procrastinate_jobs SET abort_requested = true, status = CASE status - WHEN 'todo' THEN 'cancelled'::procrastinate_job_status ELSE status + WHEN 'todo' THEN 'cancelled'::procrastinate_job_status_v1 ELSE status END WHERE id = job_id AND status IN ('todo', 'doing') RETURNING id INTO _job_id; ELSE UPDATE procrastinate_jobs - SET status = 'cancelled'::procrastinate_job_status + SET status = 'cancelled'::procrastinate_job_status_v1 WHERE id = job_id AND status = 'todo' RETURNING id INTO _job_id; END IF; @@ -160,34 +197,44 @@ BEGIN END; $$; -CREATE FUNCTION procrastinate_trigger_status_events_procedure_update() +-- Recreate or rename the triggers & their associated functions + +ALTER FUNCTION procrastinate_trigger_status_events_procedure_insert + RENAME TO procrastinate_trigger_function_status_events_insert_v1; + +CREATE TRIGGER procrastinate_trigger_status_events_insert_v1 + AFTER INSERT ON procrastinate_jobs + FOR EACH ROW WHEN ((new.status = 'todo'::procrastinate_job_status_v1)) + EXECUTE PROCEDURE procrastinate_trigger_function_status_events_insert_v1(); + +CREATE FUNCTION procrastinate_trigger_function_status_events_update_v2() RETURNS trigger LANGUAGE plpgsql AS $$ BEGIN WITH t AS ( SELECT CASE - WHEN OLD.status = 'todo'::procrastinate_job_status - AND NEW.status = 'doing'::procrastinate_job_status + WHEN OLD.status = 'todo'::procrastinate_job_status_v1 + AND NEW.status = 'doing'::procrastinate_job_status_v1 THEN 'started'::procrastinate_job_event_type - WHEN OLD.status = 'doing'::procrastinate_job_status - AND NEW.status = 'todo'::procrastinate_job_status + WHEN OLD.status = 'doing'::procrastinate_job_status_v1 + AND NEW.status = 'todo'::procrastinate_job_status_v1 THEN 'deferred_for_retry'::procrastinate_job_event_type - WHEN OLD.status = 'doing'::procrastinate_job_status - AND NEW.status = 'failed'::procrastinate_job_status + WHEN OLD.status = 'doing'::procrastinate_job_status_v1 + AND NEW.status = 'failed'::procrastinate_job_status_v1 THEN 'failed'::procrastinate_job_event_type - WHEN OLD.status = 'doing'::procrastinate_job_status - AND NEW.status = 'succeeded'::procrastinate_job_status + WHEN OLD.status = 'doing'::procrastinate_job_status_v1 + AND NEW.status = 'succeeded'::procrastinate_job_status_v1 THEN 'succeeded'::procrastinate_job_event_type - WHEN OLD.status = 'todo'::procrastinate_job_status + WHEN OLD.status = 'todo'::procrastinate_job_status_v1 AND ( - NEW.status = 'cancelled'::procrastinate_job_status - OR NEW.status = 'failed'::procrastinate_job_status - OR NEW.status = 'succeeded'::procrastinate_job_status + NEW.status = 'cancelled'::procrastinate_job_status_v1 + OR NEW.status = 'failed'::procrastinate_job_status_v1 + OR NEW.status = 'succeeded'::procrastinate_job_status_v1 ) THEN 'cancelled'::procrastinate_job_event_type - WHEN OLD.status = 'doing'::procrastinate_job_status - AND NEW.status = 'aborted'::procrastinate_job_status + WHEN OLD.status = 'doing'::procrastinate_job_status_v1 + AND NEW.status = 'aborted'::procrastinate_job_status_v1 THEN 'aborted'::procrastinate_job_event_type ELSE NULL END as event_type @@ -200,61 +247,41 @@ BEGIN END; $$; -CREATE FUNCTION procrastinate_finish_job(job_id integer, end_status procrastinate_job_status, next_scheduled_at timestamp with time zone, delete_job boolean) - RETURNS void +CREATE TRIGGER procrastinate_trigger_status_events_update_v2 + AFTER UPDATE OF status ON procrastinate_jobs + FOR EACH ROW + EXECUTE PROCEDURE procrastinate_trigger_function_status_events_update_v2(); + +ALTER FUNCTION procrastinate_trigger_scheduled_events_procedure + RENAME TO procrastinate_trigger_function_scheduled_events_v1; + +CREATE TRIGGER procrastinate_trigger_scheduled_events_v1 + AFTER UPDATE OR INSERT ON procrastinate_jobs + FOR EACH ROW WHEN ((new.scheduled_at IS NOT NULL AND new.status = 'todo'::procrastinate_job_status_v1)) + EXECUTE PROCEDURE procrastinate_trigger_function_scheduled_events_v1(); + +CREATE FUNCTION procrastinate_notify_queue_job_inserted_v2() +RETURNS trigger LANGUAGE plpgsql AS $$ DECLARE - _job_id bigint; + payload TEXT; BEGIN - IF end_status NOT IN ('succeeded', 'failed') THEN - RAISE 'End status should be either "succeeded" or "failed" (job id: %)', job_id; - END IF; - IF delete_job THEN - DELETE FROM procrastinate_jobs - WHERE id = job_id AND status IN ('todo', 'doing') - RETURNING id INTO _job_id; - ELSE - UPDATE procrastinate_jobs - SET status = end_status, - abort_requested = false, - attempts = - CASE - WHEN status = 'doing' THEN attempts + 1 - ELSE attempts - END - WHERE id = job_id AND status IN ('todo', 'doing') - RETURNING id INTO _job_id; - END IF; - IF _job_id IS NULL THEN - RAISE 'Job was not found or not in "doing" or "todo" status (job id: %)', job_id; - END IF; + SELECT json_object('type': 'job_inserted', 'job_id': NEW.id)::text INTO payload; + PERFORM pg_notify('procrastinate_queue#' || NEW.queue_name, payload); + PERFORM pg_notify('procrastinate_any_queue', payload); + RETURN NEW; END; $$; --- Recreate the deleted triggers -CREATE TRIGGER procrastinate_jobs_notify_queue - AFTER INSERT ON procrastinate_jobs - FOR EACH ROW WHEN ((new.status = 'todo'::procrastinate_job_status)) - EXECUTE PROCEDURE procrastinate_notify_queue(); - -CREATE TRIGGER procrastinate_trigger_status_events_update - AFTER UPDATE OF status ON procrastinate_jobs - FOR EACH ROW - EXECUTE PROCEDURE procrastinate_trigger_status_events_procedure_update(); - -CREATE TRIGGER procrastinate_trigger_status_events_insert +CREATE TRIGGER procrastinate_jobs_notify_queue_job_inserted_v2 AFTER INSERT ON procrastinate_jobs - FOR EACH ROW WHEN ((new.status = 'todo'::procrastinate_job_status)) - EXECUTE PROCEDURE procrastinate_trigger_status_events_procedure_insert(); + FOR EACH ROW WHEN ((new.status = 'todo'::procrastinate_job_status_v1)) + EXECUTE PROCEDURE procrastinate_notify_queue_job_inserted_v2(); -CREATE TRIGGER procrastinate_trigger_scheduled_events - AFTER UPDATE OR INSERT ON procrastinate_jobs - FOR EACH ROW WHEN ((new.scheduled_at IS NOT NULL AND new.status = 'todo'::procrastinate_job_status)) - EXECUTE PROCEDURE procrastinate_trigger_scheduled_events_procedure(); -- Create additional function and trigger for abortion requests -CREATE FUNCTION procrastinate_trigger_abort_requested_events_procedure() +CREATE FUNCTION procrastinate_trigger_abort_requested_events_procedure_v1() RETURNS trigger LANGUAGE plpgsql AS $$ @@ -265,7 +292,28 @@ BEGIN END; $$; -CREATE TRIGGER procrastinate_trigger_abort_requested_events +CREATE TRIGGER procrastinate_trigger_abort_requested_events_v1 AFTER UPDATE OF abort_requested ON procrastinate_jobs FOR EACH ROW WHEN ((new.abort_requested = true)) - EXECUTE PROCEDURE procrastinate_trigger_abort_requested_events_procedure(); + EXECUTE PROCEDURE procrastinate_trigger_abort_requested_events_procedure_v1(); + + +CREATE FUNCTION procrastinate_notify_queue_abort_job_v1() +RETURNS trigger + LANGUAGE plpgsql +AS $$ +DECLARE + payload TEXT; +BEGIN + SELECT json_object('type': 'abort_job_requested', 'job_id': NEW.id)::text INTO payload; + PERFORM pg_notify('procrastinate_queue#' || NEW.queue_name, payload); + PERFORM pg_notify('procrastinate_any_queue', payload); + RETURN NEW; +END; +$$; + +-- Create a new trigger that pushes a notification when a job is aborted +CREATE TRIGGER procrastinate_jobs_notify_queue_job_aborted_v1 + AFTER UPDATE OF abort_requested ON procrastinate_jobs + FOR EACH ROW WHEN ((old.abort_requested = false AND new.abort_requested = true AND new.status = 'doing'::procrastinate_job_status_v1)) + EXECUTE PROCEDURE procrastinate_notify_queue_abort_job_v1(); diff --git a/procrastinate/sql/queries.sql b/procrastinate/sql/queries.sql index fd851f152..40e865676 100644 --- a/procrastinate/sql/queries.sql +++ b/procrastinate/sql/queries.sql @@ -10,12 +10,12 @@ SELECT procrastinate_defer_job(%(queue)s, %(task_name)s, %(priority)s, %(lock)s, -- defer_periodic_job -- -- Create a periodic job if it doesn't already exist, and delete periodic metadata -- for previous jobs in the same task. -SELECT procrastinate_defer_periodic_job(%(queue)s, %(lock)s, %(queueing_lock)s, %(task_name)s, %(periodic_id)s, %(defer_timestamp)s, %(args)s) AS id; +SELECT procrastinate_defer_periodic_job(%(queue)s, %(lock)s, %(queueing_lock)s, %(task_name)s, %(priority)s, %(periodic_id)s, %(defer_timestamp)s, %(args)s) AS id; -- fetch_job -- -- Get the first awaiting job SELECT id, status, task_name, priority, lock, queueing_lock, args, scheduled_at, queue_name, attempts - FROM procrastinate_fetch_job(%(queues)s); + FROM procrastinate_fetch_job_v1(%(queues)s::varchar[]); -- select_stalled_jobs -- -- Get running jobs that started more than a given time ago @@ -41,18 +41,18 @@ WHERE id IN ( ON job.id = event.job_id ORDER BY job.id, event.at DESC ) AS job - WHERE job.status = ANY(%(statuses)s::procrastinate_job_status[]) + WHERE job.status = ANY(%(statuses)s::procrastinate_job_status_v1[]) AND (%(queue)s::varchar IS NULL OR job.queue_name = %(queue)s) AND latest_at < NOW() - (%(nb_hours)s || 'HOUR')::INTERVAL ) -- finish_job -- -- Finish a job, changing it from "doing" to "succeeded" or "failed" -SELECT procrastinate_finish_job(%(job_id)s, %(status)s, %(delete_job)s); +SELECT procrastinate_finish_job_v1(%(job_id)s, %(status)s, %(delete_job)s); -- cancel_job -- -- Cancel a job, changing it from "todo" to "cancelled" or mark for abortion -SELECT procrastinate_cancel_job(%(job_id)s, %(abort)s, %(delete_job)s) AS id; +SELECT procrastinate_cancel_job_v1(%(job_id)s, %(abort)s, %(delete_job)s) AS id; -- get_job_status -- -- Get the status of a job @@ -60,7 +60,7 @@ SELECT status FROM procrastinate_jobs WHERE id = %(job_id)s; -- retry_job -- -- Retry a job, changing it from "doing" to "todo" -SELECT procrastinate_retry_job(%(job_id)s, %(retry_at)s, %(new_priority)s, %(new_queue_name)s, %(new_lock)s); +SELECT procrastinate_retry_job_v1(%(job_id)s, %(retry_at)s, %(new_priority)s, %(new_queue_name)s, %(new_lock)s); -- listen_queue -- -- In this one, the argument is an identifier, shoud not be escaped the same way @@ -91,7 +91,7 @@ SELECT id, WHERE (%(id)s::bigint IS NULL OR id = %(id)s) AND (%(queue_name)s::varchar IS NULL OR queue_name = %(queue_name)s) AND (%(task_name)s::varchar IS NULL OR task_name = %(task_name)s) - AND (%(status)s::procrastinate_job_status IS NULL OR status = %(status)s) + AND (%(status)s::procrastinate_job_status_v1 IS NULL OR status = %(status)s) AND (%(lock)s::varchar IS NULL OR lock = %(lock)s) AND (%(queueing_lock)s::varchar IS NULL OR queueing_lock = %(queueing_lock)s) ORDER BY id ASC; @@ -110,7 +110,7 @@ WITH jobs AS ( FROM procrastinate_jobs WHERE (%(queue_name)s::varchar IS NULL OR queue_name = %(queue_name)s) AND (%(task_name)s::varchar IS NULL OR task_name = %(task_name)s) - AND (%(status)s::procrastinate_job_status IS NULL OR status = %(status)s) + AND (%(status)s::procrastinate_job_status_v1 IS NULL OR status = %(status)s) AND (%(lock)s::varchar IS NULL OR lock = %(lock)s) ) SELECT queue_name AS name, @@ -142,7 +142,7 @@ WITH jobs AS ( FROM procrastinate_jobs WHERE (%(queue_name)s::varchar IS NULL OR queue_name = %(queue_name)s) AND (%(task_name)s::varchar IS NULL OR task_name = %(task_name)s) - AND (%(status)s::procrastinate_job_status IS NULL OR status = %(status)s) + AND (%(status)s::procrastinate_job_status_v1 IS NULL OR status = %(status)s) AND (%(lock)s::varchar IS NULL OR lock = %(lock)s) ) SELECT task_name AS name, @@ -170,7 +170,7 @@ WITH jobs AS ( FROM procrastinate_jobs WHERE (%(queue_name)s::varchar IS NULL OR queue_name = %(queue_name)s) AND (%(task_name)s::varchar IS NULL OR task_name = %(task_name)s) - AND (%(status)s::procrastinate_job_status IS NULL OR status = %(status)s) + AND (%(status)s::procrastinate_job_status_v1 IS NULL OR status = %(status)s) AND (%(lock)s::varchar IS NULL OR lock = %(lock)s) AND lock IS NOT NULL ), locks AS ( diff --git a/procrastinate/sql/schema.sql b/procrastinate/sql/schema.sql index d7ee49cd0..a60342894 100644 --- a/procrastinate/sql/schema.sql +++ b/procrastinate/sql/schema.sql @@ -4,7 +4,7 @@ CREATE EXTENSION IF NOT EXISTS plpgsql WITH SCHEMA pg_catalog; -- Enums -CREATE TYPE procrastinate_job_status AS ENUM ( +CREATE TYPE procrastinate_job_status_v1 AS ENUM ( 'todo', -- The job is queued 'doing', -- The job has been fetched by a worker 'succeeded', -- The job ended successfully @@ -35,7 +35,7 @@ CREATE TABLE procrastinate_jobs ( lock text, queueing_lock text, args jsonb DEFAULT '{}' NOT NULL, - status procrastinate_job_status DEFAULT 'todo'::procrastinate_job_status NOT NULL, + status procrastinate_job_status_v1 DEFAULT 'todo'::procrastinate_job_status_v1 NOT NULL, scheduled_at timestamp with time zone NULL, attempts integer DEFAULT 0 NOT NULL, abort_requested boolean DEFAULT false NOT NULL @@ -65,7 +65,7 @@ CREATE UNIQUE INDEX procrastinate_jobs_queueing_lock_idx ON procrastinate_jobs ( CREATE UNIQUE INDEX procrastinate_jobs_lock_idx ON procrastinate_jobs (lock) WHERE status = 'doing'; CREATE INDEX procrastinate_jobs_queue_name_idx ON procrastinate_jobs(queue_name); -CREATE INDEX procrastinate_jobs_id_lock_idx ON procrastinate_jobs (id, lock) WHERE status = ANY (ARRAY['todo'::procrastinate_job_status, 'doing'::procrastinate_job_status]); +CREATE INDEX procrastinate_jobs_id_lock_idx ON procrastinate_jobs (id, lock) WHERE status = ANY (ARRAY['todo'::procrastinate_job_status_v1, 'doing'::procrastinate_job_status_v1]); CREATE INDEX procrastinate_events_job_id_fkey ON procrastinate_events(job_id); @@ -73,7 +73,6 @@ CREATE INDEX procrastinate_periodic_defers_job_id_fkey ON procrastinate_periodic -- Functions - CREATE FUNCTION procrastinate_defer_job( queue_name character varying, task_name character varying, @@ -155,7 +154,7 @@ BEGIN END; $$; -CREATE FUNCTION procrastinate_fetch_job( +CREATE FUNCTION procrastinate_fetch_job_v1( target_queue_names character varying[] ) RETURNS procrastinate_jobs @@ -193,7 +192,7 @@ BEGIN END; $$; -CREATE FUNCTION procrastinate_finish_job(job_id bigint, end_status procrastinate_job_status, delete_job boolean) +CREATE FUNCTION procrastinate_finish_job_v1(job_id bigint, end_status procrastinate_job_status_v1, delete_job boolean) RETURNS void LANGUAGE plpgsql AS $$ @@ -223,7 +222,8 @@ BEGIN END; $$; -CREATE FUNCTION procrastinate_cancel_job(job_id bigint, abort boolean, delete_job boolean) + +CREATE FUNCTION procrastinate_cancel_job_v1(job_id bigint, abort boolean, delete_job boolean) RETURNS bigint LANGUAGE plpgsql AS $$ @@ -240,13 +240,13 @@ BEGIN UPDATE procrastinate_jobs SET abort_requested = true, status = CASE status - WHEN 'todo' THEN 'cancelled'::procrastinate_job_status ELSE status + WHEN 'todo' THEN 'cancelled'::procrastinate_job_status_v1 ELSE status END WHERE id = job_id AND status IN ('todo', 'doing') RETURNING id INTO _job_id; ELSE UPDATE procrastinate_jobs - SET status = 'cancelled'::procrastinate_job_status + SET status = 'cancelled'::procrastinate_job_status_v1 WHERE id = job_id AND status = 'todo' RETURNING id INTO _job_id; END IF; @@ -255,53 +255,45 @@ BEGIN END; $$; -CREATE FUNCTION procrastinate_retry_job( +CREATE FUNCTION procrastinate_retry_job_v1( job_id bigint, retry_at timestamp with time zone, new_priority integer, new_queue_name character varying, new_lock character varying -) - RETURNS void - LANGUAGE plpgsql -AS $$ +) RETURNS void LANGUAGE plpgsql AS $$ DECLARE _job_id bigint; + _abort_requested boolean; BEGIN - UPDATE procrastinate_jobs - SET status = CASE - WHEN NOT abort_requested THEN 'todo'::procrastinate_job_status - ELSE 'failed'::procrastinate_job_status - END, - attempts = CASE - WHEN NOT abort_requested THEN attempts + 1 - ELSE attempts - END, - scheduled_at = CASE - WHEN NOT abort_requested THEN retry_at - ELSE scheduled_at - END, - priority = CASE - WHEN NOT abort_requested THEN COALESCE(new_priority, priority) - ELSE priority - END, - queue_name = CASE - WHEN NOT abort_requested THEN COALESCE(new_queue_name, queue_name) - ELSE queue_name - END, - lock = CASE - WHEN NOT abort_requested THEN COALESCE(new_lock, lock) - ELSE lock - END + SELECT abort_requested FROM procrastinate_jobs WHERE id = job_id AND status = 'doing' - RETURNING id INTO _job_id; + FOR UPDATE + INTO _abort_requested; + IF _abort_requested THEN + UPDATE procrastinate_jobs + SET status = 'failed'::procrastinate_job_status_v1 + WHERE id = job_id AND status = 'doing' + RETURNING id INTO _job_id; + ELSE + UPDATE procrastinate_jobs + SET status = 'todo'::procrastinate_job_status_v1, + attempts = attempts + 1, + scheduled_at = retry_at, + priority = COALESCE(new_priority, priority), + queue_name = COALESCE(new_queue_name, queue_name), + lock = COALESCE(new_lock, lock) + WHERE id = job_id AND status = 'doing' + RETURNING id INTO _job_id; + END IF; + IF _job_id IS NULL THEN RAISE 'Job was not found or not in "doing" status (job id: %)', job_id; END IF; END; $$; -CREATE FUNCTION procrastinate_notify_queue_job_inserted() +CREATE FUNCTION procrastinate_notify_queue_job_inserted_v2() RETURNS trigger LANGUAGE plpgsql AS $$ @@ -315,7 +307,7 @@ BEGIN END; $$; -CREATE FUNCTION procrastinate_notify_queue_abort_job() +CREATE FUNCTION procrastinate_notify_queue_abort_job_v1() RETURNS trigger LANGUAGE plpgsql AS $$ @@ -329,7 +321,7 @@ BEGIN END; $$; -CREATE FUNCTION procrastinate_trigger_status_events_procedure_insert() +CREATE FUNCTION procrastinate_trigger_function_status_events_insert_v1() RETURNS trigger LANGUAGE plpgsql AS $$ @@ -340,34 +332,34 @@ BEGIN END; $$; -CREATE FUNCTION procrastinate_trigger_status_events_procedure_update() +CREATE FUNCTION procrastinate_trigger_function_status_events_update_v2() RETURNS trigger LANGUAGE plpgsql AS $$ BEGIN WITH t AS ( SELECT CASE - WHEN OLD.status = 'todo'::procrastinate_job_status - AND NEW.status = 'doing'::procrastinate_job_status + WHEN OLD.status = 'todo'::procrastinate_job_status_v1 + AND NEW.status = 'doing'::procrastinate_job_status_v1 THEN 'started'::procrastinate_job_event_type - WHEN OLD.status = 'doing'::procrastinate_job_status - AND NEW.status = 'todo'::procrastinate_job_status + WHEN OLD.status = 'doing'::procrastinate_job_status_v1 + AND NEW.status = 'todo'::procrastinate_job_status_v1 THEN 'deferred_for_retry'::procrastinate_job_event_type - WHEN OLD.status = 'doing'::procrastinate_job_status - AND NEW.status = 'failed'::procrastinate_job_status + WHEN OLD.status = 'doing'::procrastinate_job_status_v1 + AND NEW.status = 'failed'::procrastinate_job_status_v1 THEN 'failed'::procrastinate_job_event_type - WHEN OLD.status = 'doing'::procrastinate_job_status - AND NEW.status = 'succeeded'::procrastinate_job_status + WHEN OLD.status = 'doing'::procrastinate_job_status_v1 + AND NEW.status = 'succeeded'::procrastinate_job_status_v1 THEN 'succeeded'::procrastinate_job_event_type - WHEN OLD.status = 'todo'::procrastinate_job_status + WHEN OLD.status = 'todo'::procrastinate_job_status_v1 AND ( - NEW.status = 'cancelled'::procrastinate_job_status - OR NEW.status = 'failed'::procrastinate_job_status - OR NEW.status = 'succeeded'::procrastinate_job_status + NEW.status = 'cancelled'::procrastinate_job_status_v1 + OR NEW.status = 'failed'::procrastinate_job_status_v1 + OR NEW.status = 'succeeded'::procrastinate_job_status_v1 ) THEN 'cancelled'::procrastinate_job_event_type - WHEN OLD.status = 'doing'::procrastinate_job_status - AND NEW.status = 'aborted'::procrastinate_job_status + WHEN OLD.status = 'doing'::procrastinate_job_status_v1 + AND NEW.status = 'aborted'::procrastinate_job_status_v1 THEN 'aborted'::procrastinate_job_event_type ELSE NULL END as event_type @@ -380,7 +372,7 @@ BEGIN END; $$; -CREATE FUNCTION procrastinate_trigger_scheduled_events_procedure() +CREATE FUNCTION procrastinate_trigger_function_scheduled_events_v1() RETURNS trigger LANGUAGE plpgsql AS $$ @@ -392,7 +384,7 @@ BEGIN END; $$; -CREATE FUNCTION procrastinate_trigger_abort_requested_events_procedure() +CREATE FUNCTION procrastinate_trigger_abort_requested_events_procedure_v1() RETURNS trigger LANGUAGE plpgsql AS $$ @@ -417,177 +409,36 @@ $$; -- Triggers -CREATE TRIGGER procrastinate_jobs_notify_queue_job_inserted +CREATE TRIGGER procrastinate_jobs_notify_queue_job_inserted_v2 AFTER INSERT ON procrastinate_jobs - FOR EACH ROW WHEN ((new.status = 'todo'::procrastinate_job_status)) - EXECUTE PROCEDURE procrastinate_notify_queue_job_inserted(); + FOR EACH ROW WHEN ((new.status = 'todo'::procrastinate_job_status_v1)) + EXECUTE PROCEDURE procrastinate_notify_queue_job_inserted_v2(); -CREATE TRIGGER procrastinate_jobs_notify_queue_abort_job +CREATE TRIGGER procrastinate_jobs_notify_queue_job_aborted_v1 AFTER UPDATE OF abort_requested ON procrastinate_jobs - FOR EACH ROW WHEN ((old.abort_requested = false AND new.abort_requested = true AND new.status = 'doing'::procrastinate_job_status)) - EXECUTE PROCEDURE procrastinate_notify_queue_abort_job(); + FOR EACH ROW WHEN ((old.abort_requested = false AND new.abort_requested = true AND new.status = 'doing'::procrastinate_job_status_v1)) + EXECUTE PROCEDURE procrastinate_notify_queue_abort_job_v1(); -CREATE TRIGGER procrastinate_trigger_status_events_update +CREATE TRIGGER procrastinate_trigger_status_events_update_v2 AFTER UPDATE OF status ON procrastinate_jobs FOR EACH ROW - EXECUTE PROCEDURE procrastinate_trigger_status_events_procedure_update(); + EXECUTE PROCEDURE procrastinate_trigger_function_status_events_update_v2(); -CREATE TRIGGER procrastinate_trigger_status_events_insert +CREATE TRIGGER procrastinate_trigger_status_events_insert_v1 AFTER INSERT ON procrastinate_jobs - FOR EACH ROW WHEN ((new.status = 'todo'::procrastinate_job_status)) - EXECUTE PROCEDURE procrastinate_trigger_status_events_procedure_insert(); + FOR EACH ROW WHEN ((new.status = 'todo'::procrastinate_job_status_v1)) + EXECUTE PROCEDURE procrastinate_trigger_function_status_events_insert_v1(); -CREATE TRIGGER procrastinate_trigger_scheduled_events +CREATE TRIGGER procrastinate_trigger_scheduled_events_v1 AFTER UPDATE OR INSERT ON procrastinate_jobs - FOR EACH ROW WHEN ((new.scheduled_at IS NOT NULL AND new.status = 'todo'::procrastinate_job_status)) - EXECUTE PROCEDURE procrastinate_trigger_scheduled_events_procedure(); + FOR EACH ROW WHEN ((new.scheduled_at IS NOT NULL AND new.status = 'todo'::procrastinate_job_status_v1)) + EXECUTE PROCEDURE procrastinate_trigger_function_scheduled_events_v1(); -CREATE TRIGGER procrastinate_trigger_abort_requested_events +CREATE TRIGGER procrastinate_trigger_abort_requested_events_v1 AFTER UPDATE OF abort_requested ON procrastinate_jobs FOR EACH ROW WHEN ((new.abort_requested = true)) - EXECUTE PROCEDURE procrastinate_trigger_abort_requested_events_procedure(); + EXECUTE PROCEDURE procrastinate_trigger_abort_requested_events_procedure_v1(); CREATE TRIGGER procrastinate_trigger_delete_jobs BEFORE DELETE ON procrastinate_jobs FOR EACH ROW EXECUTE PROCEDURE procrastinate_unlink_periodic_defers(); - - --- Old versions of functions, for backwards compatibility (to be removed in a future release) - --- procrastinate_defer_job --- the function without the priority argument is kept for compatibility reasons -CREATE FUNCTION procrastinate_defer_job( - queue_name character varying, - task_name character varying, - lock text, - queueing_lock text, - args jsonb, - scheduled_at timestamp with time zone -) - RETURNS bigint - LANGUAGE plpgsql -AS $$ -DECLARE - job_id bigint; -BEGIN - INSERT INTO procrastinate_jobs (queue_name, task_name, lock, queueing_lock, args, scheduled_at) - VALUES (queue_name, task_name, lock, queueing_lock, args, scheduled_at) - RETURNING id INTO job_id; - - RETURN job_id; -END; -$$; - --- procrastinate_finish_job --- the next_scheduled_at argument is kept for compatibility reasons -CREATE FUNCTION procrastinate_finish_job(job_id integer, end_status procrastinate_job_status, next_scheduled_at timestamp with time zone, delete_job boolean) - RETURNS void - LANGUAGE plpgsql -AS $$ -DECLARE - _job_id bigint; -BEGIN - IF end_status NOT IN ('succeeded', 'failed') THEN - RAISE 'End status should be either "succeeded" or "failed" (job id: %)', job_id; - END IF; - IF delete_job THEN - DELETE FROM procrastinate_jobs - WHERE id = job_id AND status IN ('todo', 'doing') - RETURNING id INTO _job_id; - ELSE - UPDATE procrastinate_jobs - SET status = end_status, - abort_requested = false, - attempts = - CASE - WHEN status = 'doing' THEN attempts + 1 - ELSE attempts - END - WHERE id = job_id AND status IN ('todo', 'doing') - RETURNING id INTO _job_id; - END IF; - IF _job_id IS NULL THEN - RAISE 'Job was not found or not in "doing" or "todo" status (job id: %)', job_id; - END IF; -END; -$$; - --- procrastinate_defer_periodic_job --- the function without the priority argument is kept for compatibility reasons -CREATE FUNCTION procrastinate_defer_periodic_job( - _queue_name character varying, - _lock character varying, - _queueing_lock character varying, - _task_name character varying, - _periodic_id character varying, - _defer_timestamp bigint, - _args jsonb -) - RETURNS bigint - LANGUAGE plpgsql -AS $$ -DECLARE - _job_id bigint; - _defer_id bigint; -BEGIN - - INSERT - INTO procrastinate_periodic_defers (task_name, periodic_id, defer_timestamp) - VALUES (_task_name, _periodic_id, _defer_timestamp) - ON CONFLICT DO NOTHING - RETURNING id into _defer_id; - - IF _defer_id IS NULL THEN - RETURN NULL; - END IF; - - UPDATE procrastinate_periodic_defers - SET job_id = procrastinate_defer_job( - _queue_name, - _task_name, - 0, - _lock, - _queueing_lock, - _args, - NULL - ) - WHERE id = _defer_id - RETURNING job_id INTO _job_id; - - DELETE - FROM procrastinate_periodic_defers - USING ( - SELECT id - FROM procrastinate_periodic_defers - WHERE procrastinate_periodic_defers.task_name = _task_name - AND procrastinate_periodic_defers.periodic_id = _periodic_id - AND procrastinate_periodic_defers.defer_timestamp < _defer_timestamp - ORDER BY id - FOR UPDATE - ) to_delete - WHERE procrastinate_periodic_defers.id = to_delete.id; - - RETURN _job_id; -END; -$$; - --- procrastinate_retry_job --- the function without the new_* arguments is kept for compatibility reasons -CREATE FUNCTION procrastinate_retry_job(job_id bigint, retry_at timestamp with time zone) - RETURNS void - LANGUAGE plpgsql -AS $$ -DECLARE - _job_id bigint; -BEGIN - UPDATE procrastinate_jobs - SET status = 'todo', - attempts = attempts + 1, - scheduled_at = retry_at - WHERE id = job_id AND status = 'doing' - RETURNING id INTO _job_id; - IF _job_id IS NULL THEN - RAISE 'Job was not found or not in "doing" status (job id: %)', job_id; - END IF; -END; -$$; diff --git a/tests/integration/test_manager.py b/tests/integration/test_manager.py index 19b07d8db..ce37bbf3d 100644 --- a/tests/integration/test_manager.py +++ b/tests/integration/test_manager.py @@ -331,7 +331,7 @@ async def test_enum_synced(psycopg_connector): pg_enum_rows = await psycopg_connector.execute_query_all_async( """SELECT e.enumlabel FROM pg_enum e JOIN pg_type t ON e.enumtypid = t.oid WHERE t.typname = %(type_name)s""", - type_name="procrastinate_job_status", + type_name="procrastinate_job_status_v1", ) pg_values = {row["enumlabel"] for row in pg_enum_rows} From 63492c5e985fe4761bbdc3aab110981d315d6f12 Mon Sep 17 00:00:00 2001 From: Joachim Jablon Date: Sat, 5 Oct 2024 20:34:42 +0200 Subject: [PATCH 02/17] Migration doc --- docs/howto/django/migrations.md | 14 +++-- docs/howto/production/migrations.md | 83 +++++++++++++++-------------- 2 files changed, 53 insertions(+), 44 deletions(-) diff --git a/docs/howto/django/migrations.md b/docs/howto/django/migrations.md index ea80166ec..ca32e5d86 100644 --- a/docs/howto/django/migrations.md +++ b/docs/howto/django/migrations.md @@ -4,14 +4,18 @@ Procrastinate comes with its own migrations so don't forget to run `./manage.py migrate`. Procrastinate provides 2 kinds of migrations: -- The Django equivalent of the `procrastinate` normal migrations, which are - used to create all of the PostgreSQL DDL objects used by Procrastinate. -- Specific noop migrations used for Django to understand the Procrastinate - Models (see {doc}`models`). + +- The Django equivalent of the `procrastinate` normal migrations, which are + used to create all of the PostgreSQL DDL objects used by Procrastinate. +- Specific noop migrations used for Django to understand the Procrastinate + Models (see {doc}`models`). Procrastinate's Django migrations are always kept in sync with your current version of Procrastinate, it's always a good idea to check the release notes and read the migrations when upgrading so that you know what will be happening to the database. -See {doc}`../production/migrations` for more information on migrations. +See {doc}`../production/migrations` for more information on migrations, especially +around `pre` and `post` migrations: if you deploy while the code is running, you'll +want to ensure you run the `pre-` migrations before you deploy the code and the +`post-` migrations after. diff --git a/docs/howto/production/migrations.md b/docs/howto/production/migrations.md index 3543a7225..04a1e69eb 100644 --- a/docs/howto/production/migrations.md +++ b/docs/howto/production/migrations.md @@ -1,5 +1,10 @@ # Migrate the Procrastinate schema +:::{warning} +v3 introduces a new way to handle migrations. Hopefully, easier both for users +and maintainers. Read about pre- and post-migrations below. +::: + When the Procrastinate database schema evolves in new Procrastinate releases, new migrations are released alongside. Look at the [Release notes](https://github.com/procrastinate-org/procrastinate/releases) @@ -31,17 +36,22 @@ on PyPI. A simple way to list all the migrations is to use the command: $ procrastinate schema --migrations-path /home/me/my_venv/lib/python3.x/lib/site-packages/procrastinate/sql/migrations ``` + It's your responsibility to keep track of which migrations have been applied yet or not. Thankfully, the names of procrastinate migrations should help you: they follow a specific pattern: ``` -xx.yy.zz_ab_very_short_description_of_the_migration.sql +{xx.yy.zz}_{ab}_{pre|post}_very_short_description_of_the_migration.sql ``` -- `xx.yy.zz` is the version of Procrastinate the migration script can be applied to. -- `ab` is the migration script's serial number, `01` being the first number in the - series. +- `xx.yy.zz` is the version of Procrastinate the migration script can be applied to. +- `ab` is the migration script's serial number, `01` being the first number in the + series. +- `pre` / `post`: indicates wether the migration should be applied before + upgrading the code (`pre`) or after upgrading the code (`post`) in the context + of a blue-green deployment. On old migrations, if `pre` or `post` is not + specified, it's a `post` migration. :::{note} There is a [debate](https://github.com/procrastinate-org/procrastinate/issues/1040) @@ -50,51 +60,46 @@ directions for how to use classic ones (apart from Django), please feel free to and/or contribute code or documentation if you have an opinion on this. ::: -Let's say you are currently using Procrastinate 1.9.0, and you want to update to -Procrastinate 1.15.0. In that case, before upgrading the Procrastinate Python package -(from 1.9.0 to 1.15.0), you will need to apply all the migration scripts whose versions -are greater than or equal to 1.9.0, and lower than 1.15.0 (1.9.0 ≤ version \< 1.15.0). -And you will apply them in version order, and, for a version, in serial number order. -For example, you will apply the following migration scripts, in that order: - -1. `01.09.00_01_xxxxx.sql` -2. `01.10.00_01_xxxxx.sql` -3. `01.11.00_01_xxxxx.sql` -4. `01.11.00_02_xxxxx.sql` -5. `01.12.00_01_xxxxx.sql` -6. `01.14.00_01_xxxxx.sql` -7. `01.14.00_02_xxxxx.sql` +## How to apply migrations -If you want to upgrade from one Procrastinate major version to another, say from -Procrastinate 1.6.0 to 3.2.0, there are two options, depending on whether you can -interrupt the service to do the migration or not. - -## The easier way, with service interruption +##1 The easier way, with service interruption 1. Shut down the services that use Procrastinate: both the services that defer tasks and the workers. -2. Apply all the migration scripts (1.6.0 ≤ version \< 3.2.0). -3. Upgrade your code to the new Procrastinate version (3.2.0). +2. Apply all the migration scripts (`pre` & `post`), e.g. with: + +```console +$ MIGRATION_TO_APPLY="02.00.00_01_pre_some_migration.sql" +$ cat $(procrastinate schema --migrations-path)/${MIGRATION_TO_APPLY} | psql +$ MIGRATION_TO_APPLY="02.00.00_01_post_some_migration.sql" +$ cat $(procrastinate schema --migrations-path)/${MIGRATION_TO_APPLY} | psql +$ ... +``` + +3. Upgrade your code to the new Procrastinate version. 4. Start all the services. This, as you've noticed, only works if you're able to stop the services. ## The safer way, without service interruption -:::{note} -This only applies starting at Procrastinate 0.17.0. For previous versions, -you will have to interrupt the service or write custom migrations. -::: +If you need to ensure service continuity, you'll need to make intermediate upgrades. +Basically, you'll need to stop at every version that provides migrations. + +```console +$ MIGRATION_TO_APPLY="02.01.00_01_pre_some_migration.sql" +$ cat $(procrastinate schema --migrations-path)/${MIGRATION_TO_APPLY} | psql + +$ yoursystem/deploy procrastinate 2.1.0 -If you care about service continuity, you'll need to make intermediate upgrades. For -example, to upgrade from Procrastinate 1.6.0 to 3.2.0, here are the steps you will need -to follow: +$ MIGRATION_TO_APPLY="02.01.00_01_post_some_migration.sql" +$ cat $(procrastinate schema --migrations-path)/${MIGRATION_TO_APPLY} | psql -1. Apply all the migration scripts between 1.6.0 and 2.0.0 (1.6.0 ≤ version \< 2.0.0). -2. Live-upgrade the Procrastinate version used in your services, from 1.6.0 to 2.0.0. -3. Apply all the migration scripts between 2.0.0 and 3.0.0 (2.0.0 ≤ version \< 3.0.0). -4. Live-upgrade the Procrastinate version used in your services, from 2.0.0 to 3.0.0. -5. Apply all the migration scripts between 3.0.0 and 3.2.0 (3.0.0 ≤ version \< 3.2.0). -6. Live-upgrade the Procrastinate version used in your services, from 3.0.0 and 3.2.0. +$ MIGRATION_TO_APPLY="02.02.00_01_pre_some_migration.sql" +$ cat $(procrastinate schema --migrations-path)/${MIGRATION_TO_APPLY} | psql -Following this process you can go from 1.6.0 to 3.2.0 with no service discontinuity. +$ yoursystem/deploy procrastinate 2.2.0 + +$ MIGRATION_TO_APPLY="02.02.00_01_post_some_migration.sql" +$ cat $(procrastinate schema --migrations-path)/${MIGRATION_TO_APPLY} | psql +``` From a9954a10e86492211fc70d1f2667fa6e72e97487 Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Sat, 19 Oct 2024 14:02:51 +0000 Subject: [PATCH 03/17] Fix pre/post migrations --- .../03.00.00_01_pre_cancel_notification.sql | 15 ++- .../03.00.00_50_post_cancel_notification.sql | 115 +----------------- 2 files changed, 14 insertions(+), 116 deletions(-) diff --git a/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql b/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql index d5e25c8cb..53dab46ab 100644 --- a/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql +++ b/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql @@ -5,11 +5,20 @@ -- This is a pre-migration script. -- - Whenever we recreate an immutable object (function, trigger, indexes), we -- will suffix its name with a version number. --- --- Add an 'abort_requested' column to the procrastinate_jobs table + -- Add an 'abort_requested' column to the procrastinate_jobs table ALTER TABLE procrastinate_jobs ADD COLUMN abort_requested boolean DEFAULT false NOT NULL; +-- Create a new enum type without 'aborting' +CREATE TYPE procrastinate_job_status_v1 AS ENUM ( + 'todo', + 'doing', + 'succeeded', + 'failed', + 'cancelled', + 'aborted' +); + -- Set abort requested flag on all jobs with 'aborting' status UPDATE procrastinate_jobs SET abort_requested = true WHERE status = 'aborting'; @@ -108,7 +117,7 @@ CREATE OR REPLACE TRIGGER procrastinate_jobs_notify_queue_job_inserted_v1 EXECUTE PROCEDURE procrastinate_notify_queue_job_inserted_v1(); --- Create the new versions for the functions +-- Create the new versioned functions (without aborting state) CREATE FUNCTION procrastinate_fetch_job_v1( target_queue_names character varying[] ) diff --git a/procrastinate/sql/migrations/03.00.00_50_post_cancel_notification.sql b/procrastinate/sql/migrations/03.00.00_50_post_cancel_notification.sql index 215410bc1..a227ac2bd 100644 --- a/procrastinate/sql/migrations/03.00.00_50_post_cancel_notification.sql +++ b/procrastinate/sql/migrations/03.00.00_50_post_cancel_notification.sql @@ -1,5 +1,5 @@ --- These are old versions of function, that we needed --- to keep around for backwards compatibility. We can now safely drop them. +-- These are old versions of functions, that we needed to keep around for +-- backwards compatibility. We can now safely drop them. DROP FUNCTION IF EXISTS procrastinate_finish_job( integer, procrastinate_job_status, @@ -42,16 +42,6 @@ UPDATE procrastinate_jobs SET abort_requested = true WHERE status = 'aborting'; DROP TRIGGER IF EXISTS procrastinate_trigger_sync_abort_requested_with_status_v1 ON procrastinate_jobs; DROP FUNCTION IF EXISTS procrastinate_sync_abort_requested_with_status_v1; --- Create a new enum type without 'aborting' -CREATE TYPE procrastinate_job_status_v1 AS ENUM ( - 'todo', - 'doing', - 'succeeded', - 'failed', - 'cancelled', - 'aborted' -); - -- We need to drop the default temporarily as otherwise DatatypeMismatch would occur ALTER TABLE procrastinate_jobs ALTER COLUMN status DROP DEFAULT; @@ -96,107 +86,6 @@ CREATE INDEX procrastinate_jobs_id_lock_idx ON procrastinate_jobs (id, lock) WHE -- Drop the old type DROP TYPE procrastinate_job_status; --- Recreate and update the functions to use the new column -CREATE FUNCTION procrastinate_fetch_job_v1( - target_queue_names character varying[] -) - RETURNS procrastinate_jobs - LANGUAGE plpgsql -AS $$ -DECLARE - found_jobs procrastinate_jobs; -BEGIN - WITH candidate AS ( - SELECT jobs.* - FROM procrastinate_jobs AS jobs - WHERE - -- reject the job if its lock has earlier jobs - NOT EXISTS ( - SELECT 1 - FROM procrastinate_jobs AS earlier_jobs - WHERE - jobs.lock IS NOT NULL - AND earlier_jobs.lock = jobs.lock - AND earlier_jobs.status IN ('todo', 'doing') - AND earlier_jobs.id < jobs.id) - AND jobs.status = 'todo' - AND (target_queue_names IS NULL OR jobs.queue_name = ANY( target_queue_names )) - AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now()) - ORDER BY jobs.priority DESC, jobs.id ASC LIMIT 1 - FOR UPDATE OF jobs SKIP LOCKED - ) - UPDATE procrastinate_jobs - SET status = 'doing' - FROM candidate - WHERE procrastinate_jobs.id = candidate.id - RETURNING procrastinate_jobs.* INTO found_jobs; - - RETURN found_jobs; -END; -$$; - -CREATE FUNCTION procrastinate_finish_job_v1(job_id bigint, end_status procrastinate_job_status_v1, delete_job boolean) - RETURNS void - LANGUAGE plpgsql -AS $$ -DECLARE - _job_id bigint; -BEGIN - IF end_status NOT IN ('succeeded', 'failed', 'aborted') THEN - RAISE 'End status should be either "succeeded", "failed" or "aborted" (job id: %)', job_id; - END IF; - IF delete_job THEN - DELETE FROM procrastinate_jobs - WHERE id = job_id AND status IN ('todo', 'doing') - RETURNING id INTO _job_id; - ELSE - UPDATE procrastinate_jobs - SET status = end_status, - abort_requested = false, - attempts = CASE status - WHEN 'doing' THEN attempts + 1 ELSE attempts - END - WHERE id = job_id AND status IN ('todo', 'doing') - RETURNING id INTO _job_id; - END IF; - IF _job_id IS NULL THEN - RAISE 'Job was not found or not in "doing" or "todo" status (job id: %)', job_id; - END IF; -END; -$$; - -CREATE FUNCTION procrastinate_cancel_job_v1(job_id bigint, abort boolean, delete_job boolean) - RETURNS bigint - LANGUAGE plpgsql -AS $$ -DECLARE - _job_id bigint; -BEGIN - IF delete_job THEN - DELETE FROM procrastinate_jobs - WHERE id = job_id AND status = 'todo' - RETURNING id INTO _job_id; - END IF; - IF _job_id IS NULL THEN - IF abort THEN - UPDATE procrastinate_jobs - SET abort_requested = true, - status = CASE status - WHEN 'todo' THEN 'cancelled'::procrastinate_job_status_v1 ELSE status - END - WHERE id = job_id AND status IN ('todo', 'doing') - RETURNING id INTO _job_id; - ELSE - UPDATE procrastinate_jobs - SET status = 'cancelled'::procrastinate_job_status_v1 - WHERE id = job_id AND status = 'todo' - RETURNING id INTO _job_id; - END IF; - END IF; - RETURN _job_id; -END; -$$; - -- Recreate or rename the triggers & their associated functions ALTER FUNCTION procrastinate_trigger_status_events_procedure_insert From a166eb88d0998a9eaa226346191173aa4beadf36 Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Sat, 19 Oct 2024 16:19:05 +0000 Subject: [PATCH 04/17] Don't increase version of temporary functions and triggers --- .../03.00.00_01_pre_cancel_notification.sql | 12 +++++------ .../03.00.00_50_post_cancel_notification.sql | 20 +++++++++---------- procrastinate/sql/schema.sql | 12 +++++------ 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql b/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql index 53dab46ab..a81c28b50 100644 --- a/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql +++ b/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql @@ -46,7 +46,7 @@ CREATE TRIGGER procrastinate_trigger_sync_abort_requested_with_status_v1 DROP TRIGGER IF EXISTS procrastinate_trigger_status_events_update ON procrastinate_jobs; DROP FUNCTION IF EXISTS procrastinate_trigger_status_events_procedure_update; -CREATE FUNCTION procrastinate_trigger_status_events_procedure_update_v1() +CREATE FUNCTION procrastinate_trigger_status_events_procedure_update_temp() RETURNS trigger LANGUAGE plpgsql AS $$ @@ -86,10 +86,10 @@ BEGIN END; $$; -CREATE TRIGGER procrastinate_trigger_status_events_update_v1 +CREATE TRIGGER procrastinate_trigger_status_events_update_temp AFTER UPDATE OF status ON procrastinate_jobs FOR EACH ROW - EXECUTE PROCEDURE procrastinate_trigger_status_events_procedure_update_v1(); + EXECUTE PROCEDURE procrastinate_trigger_status_events_procedure_update_temp(); DROP TRIGGER IF EXISTS procrastinate_trigger_status_events_update ON procrastinate_jobs; DROP FUNCTION IF EXISTS procrastinate_trigger_status_events_procedure_update; @@ -97,7 +97,7 @@ DROP FUNCTION IF EXISTS procrastinate_trigger_status_events_procedure_update; DROP TRIGGER IF EXISTS procrastinate_jobs_notify_queue ON procrastinate_jobs; DROP FUNCTION IF EXISTS procrastinate_notify_queue; -CREATE OR REPLACE FUNCTION procrastinate_notify_queue_job_inserted_v1() +CREATE OR REPLACE FUNCTION procrastinate_notify_queue_job_inserted_temp() RETURNS trigger LANGUAGE plpgsql AS $$ @@ -111,10 +111,10 @@ BEGIN END; $$; -CREATE OR REPLACE TRIGGER procrastinate_jobs_notify_queue_job_inserted_v1 +CREATE OR REPLACE TRIGGER procrastinate_jobs_notify_queue_job_inserted_temp AFTER INSERT ON procrastinate_jobs FOR EACH ROW WHEN ((new.status = 'todo'::procrastinate_job_status)) - EXECUTE PROCEDURE procrastinate_notify_queue_job_inserted_v1(); + EXECUTE PROCEDURE procrastinate_notify_queue_job_inserted_temp(); -- Create the new versioned functions (without aborting state) diff --git a/procrastinate/sql/migrations/03.00.00_50_post_cancel_notification.sql b/procrastinate/sql/migrations/03.00.00_50_post_cancel_notification.sql index a227ac2bd..0cbc14828 100644 --- a/procrastinate/sql/migrations/03.00.00_50_post_cancel_notification.sql +++ b/procrastinate/sql/migrations/03.00.00_50_post_cancel_notification.sql @@ -54,16 +54,16 @@ DROP INDEX IF EXISTS procrastinate_jobs_id_lock_idx; DROP TRIGGER IF EXISTS procrastinate_trigger_status_events_update ON procrastinate_jobs; DROP TRIGGER IF EXISTS procrastinate_trigger_status_events_insert ON procrastinate_jobs; DROP TRIGGER IF EXISTS procrastinate_trigger_scheduled_events ON procrastinate_jobs; -DROP TRIGGER IF EXISTS procrastinate_trigger_status_events_update_v1 ON procrastinate_jobs; -DROP TRIGGER IF EXISTS procrastinate_jobs_notify_queue_job_inserted_v1 ON procrastinate_jobs; +DROP TRIGGER IF EXISTS procrastinate_trigger_status_events_update_temp ON procrastinate_jobs; +DROP TRIGGER IF EXISTS procrastinate_jobs_notify_queue_job_inserted_temp ON procrastinate_jobs; -- Delete the functions that depends on the old status type DROP FUNCTION IF EXISTS procrastinate_fetch_job; DROP FUNCTION IF EXISTS procrastinate_finish_job(bigint, procrastinate_job_status, boolean); DROP FUNCTION IF EXISTS procrastinate_cancel_job; -DROP FUNCTION IF EXISTS procrastinate_trigger_status_events_procedure_update_v1; +DROP FUNCTION IF EXISTS procrastinate_trigger_status_events_procedure_update_temp; DROP FUNCTION IF EXISTS procrastinate_finish_job(integer, procrastinate_job_status, timestamp with time zone, boolean); -DROP FUNCTION IF EXISTS procrastinate_notify_queue_job_inserted_v1; +DROP FUNCTION IF EXISTS procrastinate_notify_queue_job_inserted_temp; -- Alter the table to use the new type ALTER TABLE procrastinate_jobs @@ -96,7 +96,7 @@ CREATE TRIGGER procrastinate_trigger_status_events_insert_v1 FOR EACH ROW WHEN ((new.status = 'todo'::procrastinate_job_status_v1)) EXECUTE PROCEDURE procrastinate_trigger_function_status_events_insert_v1(); -CREATE FUNCTION procrastinate_trigger_function_status_events_update_v2() +CREATE FUNCTION procrastinate_trigger_function_status_events_update_v1() RETURNS trigger LANGUAGE plpgsql AS $$ @@ -136,10 +136,10 @@ BEGIN END; $$; -CREATE TRIGGER procrastinate_trigger_status_events_update_v2 +CREATE TRIGGER procrastinate_trigger_status_events_update_v1 AFTER UPDATE OF status ON procrastinate_jobs FOR EACH ROW - EXECUTE PROCEDURE procrastinate_trigger_function_status_events_update_v2(); + EXECUTE PROCEDURE procrastinate_trigger_function_status_events_update_v1(); ALTER FUNCTION procrastinate_trigger_scheduled_events_procedure RENAME TO procrastinate_trigger_function_scheduled_events_v1; @@ -149,7 +149,7 @@ CREATE TRIGGER procrastinate_trigger_scheduled_events_v1 FOR EACH ROW WHEN ((new.scheduled_at IS NOT NULL AND new.status = 'todo'::procrastinate_job_status_v1)) EXECUTE PROCEDURE procrastinate_trigger_function_scheduled_events_v1(); -CREATE FUNCTION procrastinate_notify_queue_job_inserted_v2() +CREATE FUNCTION procrastinate_notify_queue_job_inserted_v1() RETURNS trigger LANGUAGE plpgsql AS $$ @@ -163,10 +163,10 @@ BEGIN END; $$; -CREATE TRIGGER procrastinate_jobs_notify_queue_job_inserted_v2 +CREATE TRIGGER procrastinate_jobs_notify_queue_job_inserted_v1 AFTER INSERT ON procrastinate_jobs FOR EACH ROW WHEN ((new.status = 'todo'::procrastinate_job_status_v1)) - EXECUTE PROCEDURE procrastinate_notify_queue_job_inserted_v2(); + EXECUTE PROCEDURE procrastinate_notify_queue_job_inserted_v1(); -- Create additional function and trigger for abortion requests diff --git a/procrastinate/sql/schema.sql b/procrastinate/sql/schema.sql index a60342894..6fd0c10f5 100644 --- a/procrastinate/sql/schema.sql +++ b/procrastinate/sql/schema.sql @@ -293,7 +293,7 @@ BEGIN END; $$; -CREATE FUNCTION procrastinate_notify_queue_job_inserted_v2() +CREATE FUNCTION procrastinate_notify_queue_job_inserted_v1() RETURNS trigger LANGUAGE plpgsql AS $$ @@ -332,7 +332,7 @@ BEGIN END; $$; -CREATE FUNCTION procrastinate_trigger_function_status_events_update_v2() +CREATE FUNCTION procrastinate_trigger_function_status_events_update_v1() RETURNS trigger LANGUAGE plpgsql AS $$ @@ -409,20 +409,20 @@ $$; -- Triggers -CREATE TRIGGER procrastinate_jobs_notify_queue_job_inserted_v2 +CREATE TRIGGER procrastinate_jobs_notify_queue_job_inserted_v1 AFTER INSERT ON procrastinate_jobs FOR EACH ROW WHEN ((new.status = 'todo'::procrastinate_job_status_v1)) - EXECUTE PROCEDURE procrastinate_notify_queue_job_inserted_v2(); + EXECUTE PROCEDURE procrastinate_notify_queue_job_inserted_v1(); CREATE TRIGGER procrastinate_jobs_notify_queue_job_aborted_v1 AFTER UPDATE OF abort_requested ON procrastinate_jobs FOR EACH ROW WHEN ((old.abort_requested = false AND new.abort_requested = true AND new.status = 'doing'::procrastinate_job_status_v1)) EXECUTE PROCEDURE procrastinate_notify_queue_abort_job_v1(); -CREATE TRIGGER procrastinate_trigger_status_events_update_v2 +CREATE TRIGGER procrastinate_trigger_status_events_update_v1 AFTER UPDATE OF status ON procrastinate_jobs FOR EACH ROW - EXECUTE PROCEDURE procrastinate_trigger_function_status_events_update_v2(); + EXECUTE PROCEDURE procrastinate_trigger_function_status_events_update_v1(); CREATE TRIGGER procrastinate_trigger_status_events_insert_v1 AFTER INSERT ON procrastinate_jobs From 2963d4ea55a7b013049457ade451e1e3759d6a07 Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Sat, 19 Oct 2024 16:27:56 +0000 Subject: [PATCH 05/17] Delete redundant drops --- .../sql/migrations/03.00.00_01_pre_cancel_notification.sql | 3 --- 1 file changed, 3 deletions(-) diff --git a/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql b/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql index a81c28b50..968789e21 100644 --- a/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql +++ b/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql @@ -91,9 +91,6 @@ CREATE TRIGGER procrastinate_trigger_status_events_update_temp FOR EACH ROW EXECUTE PROCEDURE procrastinate_trigger_status_events_procedure_update_temp(); -DROP TRIGGER IF EXISTS procrastinate_trigger_status_events_update ON procrastinate_jobs; -DROP FUNCTION IF EXISTS procrastinate_trigger_status_events_procedure_update; - DROP TRIGGER IF EXISTS procrastinate_jobs_notify_queue ON procrastinate_jobs; DROP FUNCTION IF EXISTS procrastinate_notify_queue; From 5294cbf080fb441c35f567a79012d76b6606ed88 Mon Sep 17 00:00:00 2001 From: Joachim Jablon Date: Sun, 20 Oct 2024 15:45:57 +0200 Subject: [PATCH 06/17] Move update --- .../sql/migrations/03.00.00_01_pre_cancel_notification.sql | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql b/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql index 968789e21..a5d0d4400 100644 --- a/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql +++ b/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql @@ -19,9 +19,6 @@ CREATE TYPE procrastinate_job_status_v1 AS ENUM ( 'aborted' ); --- Set abort requested flag on all jobs with 'aborting' status -UPDATE procrastinate_jobs SET abort_requested = true WHERE status = 'aborting'; - -- Add temporary triggers to sync the abort_requested flag with the status -- so that blue-green deployments can work CREATE OR REPLACE FUNCTION procrastinate_sync_abort_requested_with_status_v1() @@ -43,6 +40,10 @@ CREATE TRIGGER procrastinate_trigger_sync_abort_requested_with_status_v1 FOR EACH ROW EXECUTE FUNCTION procrastinate_sync_abort_requested_with_status_v1(); +-- Set abort requested flag on all jobs with 'aborting' status +UPDATE procrastinate_jobs SET abort_requested = true WHERE status = 'aborting'; + + DROP TRIGGER IF EXISTS procrastinate_trigger_status_events_update ON procrastinate_jobs; DROP FUNCTION IF EXISTS procrastinate_trigger_status_events_procedure_update; From a90765581ddd7be8c31bbed5169bcad0fedb8263 Mon Sep 17 00:00:00 2001 From: Joachim Jablon Date: Sun, 20 Oct 2024 16:00:50 +0200 Subject: [PATCH 07/17] Move the "DROP FUNCTION" for old function in pre- procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql --- .../03.00.00_01_pre_cancel_notification.sql | 37 +++++++++++++++++++ .../03.00.00_50_post_cancel_notification.sql | 37 ------------------- 2 files changed, 37 insertions(+), 37 deletions(-) diff --git a/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql b/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql index a5d0d4400..be48dde04 100644 --- a/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql +++ b/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql @@ -255,3 +255,40 @@ BEGIN END IF; END; $$; + +-- These are old versions of functions, that we needed to keep around for +-- backwards compatibility. We can now safely drop them. +DROP FUNCTION IF EXISTS procrastinate_finish_job( + integer, + procrastinate_job_status, + timestamp with time zone, + boolean +); +DROP FUNCTION IF EXISTS procrastinate_defer_job( + character varying, + character varying, + text, + text, + jsonb, + timestamp with time zone +); +DROP FUNCTION IF EXISTS procrastinate_defer_periodic_job( + character varying, + character varying, + character varying, + character varying, + character varying, + bigint, + jsonb +); +DROP FUNCTION IF EXISTS procrastinate_retry_job( + bigint, + timestamp with time zone +); +DROP FUNCTION IF EXISTS procrastinate_retry_job( + bigint, + timestamp with time zone, + integer, + character varying, + character varying +); diff --git a/procrastinate/sql/migrations/03.00.00_50_post_cancel_notification.sql b/procrastinate/sql/migrations/03.00.00_50_post_cancel_notification.sql index 0cbc14828..91edcdbd0 100644 --- a/procrastinate/sql/migrations/03.00.00_50_post_cancel_notification.sql +++ b/procrastinate/sql/migrations/03.00.00_50_post_cancel_notification.sql @@ -1,40 +1,3 @@ --- These are old versions of functions, that we needed to keep around for --- backwards compatibility. We can now safely drop them. -DROP FUNCTION IF EXISTS procrastinate_finish_job( - integer, - procrastinate_job_status, - timestamp with time zone, - boolean -); -DROP FUNCTION IF EXISTS procrastinate_defer_job( - character varying, - character varying, - text, - text, - jsonb, - timestamp with time zone -); -DROP FUNCTION IF EXISTS procrastinate_defer_periodic_job( - character varying, - character varying, - character varying, - character varying, - character varying, - bigint, - jsonb -); -DROP FUNCTION IF EXISTS procrastinate_retry_job( - bigint, - timestamp with time zone -); -DROP FUNCTION IF EXISTS procrastinate_retry_job( - bigint, - timestamp with time zone, - integer, - character varying, - character varying -); - -- Remove all traces of the "aborting" status -- Last sanity update in case the trigger didn't work 100% of the time UPDATE procrastinate_jobs SET abort_requested = true WHERE status = 'aborting'; From 4b882a4608511206d0d0da6b49e51df42c1af2ef Mon Sep 17 00:00:00 2001 From: Joachim Jablon Date: Sun, 20 Oct 2024 15:47:28 +0200 Subject: [PATCH 08/17] Use procrastinate_job_event_type_v1 in post-functions definition --- .../03.00.00_50_post_cancel_notification.sql | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/procrastinate/sql/migrations/03.00.00_50_post_cancel_notification.sql b/procrastinate/sql/migrations/03.00.00_50_post_cancel_notification.sql index 91edcdbd0..94f49c498 100644 --- a/procrastinate/sql/migrations/03.00.00_50_post_cancel_notification.sql +++ b/procrastinate/sql/migrations/03.00.00_50_post_cancel_notification.sql @@ -68,26 +68,26 @@ BEGIN SELECT CASE WHEN OLD.status = 'todo'::procrastinate_job_status_v1 AND NEW.status = 'doing'::procrastinate_job_status_v1 - THEN 'started'::procrastinate_job_event_type + THEN 'started'::procrastinate_job_event_type_v1 WHEN OLD.status = 'doing'::procrastinate_job_status_v1 AND NEW.status = 'todo'::procrastinate_job_status_v1 - THEN 'deferred_for_retry'::procrastinate_job_event_type + THEN 'deferred_for_retry'::procrastinate_job_event_type_v1 WHEN OLD.status = 'doing'::procrastinate_job_status_v1 AND NEW.status = 'failed'::procrastinate_job_status_v1 - THEN 'failed'::procrastinate_job_event_type + THEN 'failed'::procrastinate_job_event_type_v1 WHEN OLD.status = 'doing'::procrastinate_job_status_v1 AND NEW.status = 'succeeded'::procrastinate_job_status_v1 - THEN 'succeeded'::procrastinate_job_event_type + THEN 'succeeded'::procrastinate_job_event_type_v1 WHEN OLD.status = 'todo'::procrastinate_job_status_v1 AND ( NEW.status = 'cancelled'::procrastinate_job_status_v1 OR NEW.status = 'failed'::procrastinate_job_status_v1 OR NEW.status = 'succeeded'::procrastinate_job_status_v1 ) - THEN 'cancelled'::procrastinate_job_event_type + THEN 'cancelled'::procrastinate_job_event_type_v1 WHEN OLD.status = 'doing'::procrastinate_job_status_v1 AND NEW.status = 'aborted'::procrastinate_job_status_v1 - THEN 'aborted'::procrastinate_job_event_type + THEN 'aborted'::procrastinate_job_event_type_v1 ELSE NULL END as event_type ) @@ -139,7 +139,7 @@ CREATE FUNCTION procrastinate_trigger_abort_requested_events_procedure_v1() AS $$ BEGIN INSERT INTO procrastinate_events(job_id, type) - VALUES (NEW.id, 'abort_requested'::procrastinate_job_event_type); + VALUES (NEW.id, 'abort_requested'::procrastinate_job_event_type_v1); RETURN NEW; END; $$; From cb6d08fe2081aa99bb7fde84a1f7a946300255ea Mon Sep 17 00:00:00 2001 From: Joachim Jablon Date: Sun, 20 Oct 2024 15:47:43 +0200 Subject: [PATCH 09/17] procrastinate_defer_job --- .../03.00.00_01_pre_cancel_notification.sql | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql b/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql index be48dde04..9582f4599 100644 --- a/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql +++ b/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql @@ -292,3 +292,14 @@ DROP FUNCTION IF EXISTS procrastinate_retry_job( character varying, character varying ); + +-- Rename the old functions to their new versions with _v1 suffix +ALTER FUNCTION procrastinate_defer_job( + character varying, + character varying, + integer, + text, + text, + jsonb, + timestamp with time zone +) RENAME TO procrastinate_defer_job_v1; From 4ffbcdde3862ef117cfdc0cfbd8defbaac53838e Mon Sep 17 00:00:00 2001 From: Joachim Jablon Date: Sun, 20 Oct 2024 15:47:59 +0200 Subject: [PATCH 10/17] rename procrastinate_defer_periodic_job --- .../03.00.00_01_pre_cancel_notification.sql | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql b/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql index 9582f4599..7e6bdac5b 100644 --- a/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql +++ b/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql @@ -303,3 +303,14 @@ ALTER FUNCTION procrastinate_defer_job( jsonb, timestamp with time zone ) RENAME TO procrastinate_defer_job_v1; + +ALTER FUNCTION procrastinate_defer_periodic_job( + character varying, + character varying, + character varying, + character varying, + integer, + character varying, + bigint, + jsonb +) RENAME TO procrastinate_defer_periodic_job_v1; From b6cf15056d4c78c49a79f9bbeed1470e091baf01 Mon Sep 17 00:00:00 2001 From: Joachim Jablon Date: Sun, 20 Oct 2024 15:48:14 +0200 Subject: [PATCH 11/17] rename procrastinate_unlink_periodic_defers --- .../sql/migrations/03.00.00_01_pre_cancel_notification.sql | 2 ++ 1 file changed, 2 insertions(+) diff --git a/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql b/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql index 7e6bdac5b..8c879f52b 100644 --- a/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql +++ b/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql @@ -314,3 +314,5 @@ ALTER FUNCTION procrastinate_defer_periodic_job( bigint, jsonb ) RENAME TO procrastinate_defer_periodic_job_v1; + +ALTER FUNCTION procrastinate_unlink_periodic_defers() RENAME TO procrastinate_unlink_periodic_defers_v1; From ba56bc7e4031f33d774143cb7fe5c9748103a7b0 Mon Sep 17 00:00:00 2001 From: Joachim Jablon Date: Sun, 20 Oct 2024 15:48:24 +0200 Subject: [PATCH 12/17] Rename trigger --- .../sql/migrations/03.00.00_01_pre_cancel_notification.sql | 3 +++ 1 file changed, 3 insertions(+) diff --git a/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql b/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql index 8c879f52b..88851f35b 100644 --- a/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql +++ b/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql @@ -316,3 +316,6 @@ ALTER FUNCTION procrastinate_defer_periodic_job( ) RENAME TO procrastinate_defer_periodic_job_v1; ALTER FUNCTION procrastinate_unlink_periodic_defers() RENAME TO procrastinate_unlink_periodic_defers_v1; + +-- Rename the triggers & types too +ALTER TRIGGER procrastinate_trigger_delete_jobs ON procrastinate_jobs RENAME TO procrastinate_trigger_delete_jobs_v1; From b32091272989c7eb72de86ec8ab77ddbf44d7de5 Mon Sep 17 00:00:00 2001 From: Joachim Jablon Date: Sun, 20 Oct 2024 15:48:33 +0200 Subject: [PATCH 13/17] Rename type --- .../sql/migrations/03.00.00_01_pre_cancel_notification.sql | 1 + 1 file changed, 1 insertion(+) diff --git a/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql b/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql index 88851f35b..9d2d0b793 100644 --- a/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql +++ b/procrastinate/sql/migrations/03.00.00_01_pre_cancel_notification.sql @@ -319,3 +319,4 @@ ALTER FUNCTION procrastinate_unlink_periodic_defers() RENAME TO procrastinate_un -- Rename the triggers & types too ALTER TRIGGER procrastinate_trigger_delete_jobs ON procrastinate_jobs RENAME TO procrastinate_trigger_delete_jobs_v1; +ALTER TYPE procrastinate_job_event_type RENAME TO procrastinate_job_event_type_v1; From 2b50c6a23945897a5246a0434db985b29fc38b16 Mon Sep 17 00:00:00 2001 From: Joachim Jablon Date: Sun, 20 Oct 2024 15:48:38 +0200 Subject: [PATCH 14/17] Update schema --- procrastinate/sql/schema.sql | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/procrastinate/sql/schema.sql b/procrastinate/sql/schema.sql index 6fd0c10f5..bd653a6f0 100644 --- a/procrastinate/sql/schema.sql +++ b/procrastinate/sql/schema.sql @@ -13,7 +13,7 @@ CREATE TYPE procrastinate_job_status_v1 AS ENUM ( 'aborted' -- The job was aborted ); -CREATE TYPE procrastinate_job_event_type AS ENUM ( +CREATE TYPE procrastinate_job_event_type_v1 AS ENUM ( 'deferred', -- Job created, in todo 'started', -- todo -> doing 'deferred_for_retry', -- doing -> todo @@ -53,23 +53,23 @@ CREATE TABLE procrastinate_periodic_defers ( CREATE TABLE procrastinate_events ( id bigserial PRIMARY KEY, job_id bigint NOT NULL REFERENCES procrastinate_jobs ON DELETE CASCADE, - type procrastinate_job_event_type, + type procrastinate_job_event_type_v1, at timestamp with time zone DEFAULT NOW() NULL ); -- Constraints & Indices -- this prevents from having several jobs with the same queueing lock in the "todo" state -CREATE UNIQUE INDEX procrastinate_jobs_queueing_lock_idx ON procrastinate_jobs (queueing_lock) WHERE status = 'todo'; +CREATE UNIQUE INDEX procrastinate_jobs_queueing_lock_idx_v1 ON procrastinate_jobs (queueing_lock) WHERE status = 'todo'; -- this prevents from having several jobs with the same lock in the "doing" state -CREATE UNIQUE INDEX procrastinate_jobs_lock_idx ON procrastinate_jobs (lock) WHERE status = 'doing'; +CREATE UNIQUE INDEX procrastinate_jobs_lock_idx_v1 ON procrastinate_jobs (lock) WHERE status = 'doing'; -CREATE INDEX procrastinate_jobs_queue_name_idx ON procrastinate_jobs(queue_name); -CREATE INDEX procrastinate_jobs_id_lock_idx ON procrastinate_jobs (id, lock) WHERE status = ANY (ARRAY['todo'::procrastinate_job_status_v1, 'doing'::procrastinate_job_status_v1]); +CREATE INDEX procrastinate_jobs_queue_name_idx_v1 ON procrastinate_jobs(queue_name); +CREATE INDEX procrastinate_jobs_id_lock_idx_v1 ON procrastinate_jobs (id, lock) WHERE status = ANY (ARRAY['todo'::procrastinate_job_status_v1, 'doing'::procrastinate_job_status_v1]); -CREATE INDEX procrastinate_events_job_id_fkey ON procrastinate_events(job_id); +CREATE INDEX procrastinate_events_job_id_fkey_v1 ON procrastinate_events(job_id); -CREATE INDEX procrastinate_periodic_defers_job_id_fkey ON procrastinate_periodic_defers(job_id); +CREATE INDEX procrastinate_periodic_defers_job_id_fkey_v1 ON procrastinate_periodic_defers(job_id); -- Functions @@ -327,7 +327,7 @@ CREATE FUNCTION procrastinate_trigger_function_status_events_insert_v1() AS $$ BEGIN INSERT INTO procrastinate_events(job_id, type) - VALUES (NEW.id, 'deferred'::procrastinate_job_event_type); + VALUES (NEW.id, 'deferred'::procrastinate_job_event_type_v1); RETURN NEW; END; $$; @@ -341,26 +341,26 @@ BEGIN SELECT CASE WHEN OLD.status = 'todo'::procrastinate_job_status_v1 AND NEW.status = 'doing'::procrastinate_job_status_v1 - THEN 'started'::procrastinate_job_event_type + THEN 'started'::procrastinate_job_event_type_v1 WHEN OLD.status = 'doing'::procrastinate_job_status_v1 AND NEW.status = 'todo'::procrastinate_job_status_v1 - THEN 'deferred_for_retry'::procrastinate_job_event_type + THEN 'deferred_for_retry'::procrastinate_job_event_type_v1 WHEN OLD.status = 'doing'::procrastinate_job_status_v1 AND NEW.status = 'failed'::procrastinate_job_status_v1 - THEN 'failed'::procrastinate_job_event_type + THEN 'failed'::procrastinate_job_event_type_v1 WHEN OLD.status = 'doing'::procrastinate_job_status_v1 AND NEW.status = 'succeeded'::procrastinate_job_status_v1 - THEN 'succeeded'::procrastinate_job_event_type + THEN 'succeeded'::procrastinate_job_event_type_v1 WHEN OLD.status = 'todo'::procrastinate_job_status_v1 AND ( NEW.status = 'cancelled'::procrastinate_job_status_v1 OR NEW.status = 'failed'::procrastinate_job_status_v1 OR NEW.status = 'succeeded'::procrastinate_job_status_v1 ) - THEN 'cancelled'::procrastinate_job_event_type + THEN 'cancelled'::procrastinate_job_event_type_v1 WHEN OLD.status = 'doing'::procrastinate_job_status_v1 AND NEW.status = 'aborted'::procrastinate_job_status_v1 - THEN 'aborted'::procrastinate_job_event_type + THEN 'aborted'::procrastinate_job_event_type_v1 ELSE NULL END as event_type ) @@ -378,7 +378,7 @@ CREATE FUNCTION procrastinate_trigger_function_scheduled_events_v1() AS $$ BEGIN INSERT INTO procrastinate_events(job_id, type, at) - VALUES (NEW.id, 'scheduled'::procrastinate_job_event_type, NEW.scheduled_at); + VALUES (NEW.id, 'scheduled'::procrastinate_job_event_type_v1, NEW.scheduled_at); RETURN NEW; END; @@ -390,7 +390,7 @@ CREATE FUNCTION procrastinate_trigger_abort_requested_events_procedure_v1() AS $$ BEGIN INSERT INTO procrastinate_events(job_id, type) - VALUES (NEW.id, 'abort_requested'::procrastinate_job_event_type); + VALUES (NEW.id, 'abort_requested'::procrastinate_job_event_type_v1); RETURN NEW; END; $$; @@ -439,6 +439,6 @@ CREATE TRIGGER procrastinate_trigger_abort_requested_events_v1 FOR EACH ROW WHEN ((new.abort_requested = true)) EXECUTE PROCEDURE procrastinate_trigger_abort_requested_events_procedure_v1(); -CREATE TRIGGER procrastinate_trigger_delete_jobs +CREATE TRIGGER procrastinate_trigger_delete_jobs_v1 BEFORE DELETE ON procrastinate_jobs FOR EACH ROW EXECUTE PROCEDURE procrastinate_unlink_periodic_defers(); From a175f986ad91841946101775024dab13f38cf270 Mon Sep 17 00:00:00 2001 From: Joachim Jablon Date: Sun, 20 Oct 2024 15:48:53 +0200 Subject: [PATCH 15/17] WIP DON'T MERGE --- pyproject.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 63a014496..d3a9c75fb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -114,7 +114,6 @@ addopts = [ "-vv", "--strict-markers", "-rfE", - "--reuse-db", ] testpaths = [ "tests/unit", From 730e88b55e63c6f2ff3d32bef973b25956434a87 Mon Sep 17 00:00:00 2001 From: Joachim Jablon Date: Sun, 20 Oct 2024 16:16:09 +0200 Subject: [PATCH 16/17] Recreate procrastinate_trigger_status_events_procedure_insert_v1 --- .../03.00.00_50_post_cancel_notification.sql | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/procrastinate/sql/migrations/03.00.00_50_post_cancel_notification.sql b/procrastinate/sql/migrations/03.00.00_50_post_cancel_notification.sql index 94f49c498..edc078363 100644 --- a/procrastinate/sql/migrations/03.00.00_50_post_cancel_notification.sql +++ b/procrastinate/sql/migrations/03.00.00_50_post_cancel_notification.sql @@ -50,14 +50,22 @@ CREATE INDEX procrastinate_jobs_id_lock_idx ON procrastinate_jobs (id, lock) WHE DROP TYPE procrastinate_job_status; -- Recreate or rename the triggers & their associated functions +CREATE FUNCTION procrastinate_trigger_status_events_procedure_insert_v1() RETURNS trigger + LANGUAGE plpgsql + AS $$ +BEGIN + INSERT INTO procrastinate_events(job_id, type) + VALUES (NEW.id, 'deferred'::procrastinate_job_event_type_v1); + RETURN NEW; +END; +$$; -ALTER FUNCTION procrastinate_trigger_status_events_procedure_insert - RENAME TO procrastinate_trigger_function_status_events_insert_v1; +DROP FUNCTION procrastinate_trigger_status_events_procedure_insert; CREATE TRIGGER procrastinate_trigger_status_events_insert_v1 AFTER INSERT ON procrastinate_jobs FOR EACH ROW WHEN ((new.status = 'todo'::procrastinate_job_status_v1)) - EXECUTE PROCEDURE procrastinate_trigger_function_status_events_insert_v1(); + EXECUTE PROCEDURE procrastinate_trigger_status_events_procedure_insert_v1(); CREATE FUNCTION procrastinate_trigger_function_status_events_update_v1() RETURNS trigger From b22458e367c118dac1bde821e2c960f42df12c4d Mon Sep 17 00:00:00 2001 From: Joachim Jablon Date: Sun, 20 Oct 2024 16:16:43 +0200 Subject: [PATCH 17/17] Fix queries: use v1 functions --- procrastinate/sql/queries.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/procrastinate/sql/queries.sql b/procrastinate/sql/queries.sql index 40e865676..763ff9f5f 100644 --- a/procrastinate/sql/queries.sql +++ b/procrastinate/sql/queries.sql @@ -5,12 +5,12 @@ -- defer_job -- -- Create and enqueue a job -SELECT procrastinate_defer_job(%(queue)s, %(task_name)s, %(priority)s, %(lock)s, %(queueing_lock)s, %(args)s, %(scheduled_at)s) AS id; +SELECT procrastinate_defer_job_v1(%(queue)s, %(task_name)s, %(priority)s, %(lock)s, %(queueing_lock)s, %(args)s, %(scheduled_at)s) AS id; -- defer_periodic_job -- -- Create a periodic job if it doesn't already exist, and delete periodic metadata -- for previous jobs in the same task. -SELECT procrastinate_defer_periodic_job(%(queue)s, %(lock)s, %(queueing_lock)s, %(task_name)s, %(priority)s, %(periodic_id)s, %(defer_timestamp)s, %(args)s) AS id; +SELECT procrastinate_defer_periodic_job_v1(%(queue)s, %(lock)s, %(queueing_lock)s, %(task_name)s, %(priority)s, %(periodic_id)s, %(defer_timestamp)s, %(args)s) AS id; -- fetch_job -- -- Get the first awaiting job