diff --git a/django_celery_beat/migrations/0007_periodictask_origin_key.py b/django_celery_beat/migrations/0007_periodictask_origin_key.py new file mode 100644 index 00000000..c02f2ef1 --- /dev/null +++ b/django_celery_beat/migrations/0007_periodictask_origin_key.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('django_celery_beat', '0006_auto_20180210_1226'), + ] + + operations = [ + migrations.AddField( + model_name='periodictask', + name='origin_key', + field=models.CharField( + blank=True, + default=None, + max_length=200, + null=True, + verbose_name='origin key' + ), + ), + ] diff --git a/django_celery_beat/models.py b/django_celery_beat/models.py index d5fe2ddf..2e1a9be3 100644 --- a/django_celery_beat/models.py +++ b/django_celery_beat/models.py @@ -267,6 +267,9 @@ class PeriodicTask(models.Model): routing_key = models.CharField( _('routing key'), max_length=200, blank=True, null=True, default=None, ) + origin_key = models.CharField( + _('origin key'), max_length=200, blank=True, null=True, default=None, + ) expires = models.DateTimeField( _('expires'), blank=True, null=True, ) diff --git a/django_celery_beat/schedulers.py b/django_celery_beat/schedulers.py index 14885018..18ab11d4 100644 --- a/django_celery_beat/schedulers.py +++ b/django_celery_beat/schedulers.py @@ -142,12 +142,19 @@ def _unpack_fields(cls, schedule, args=None, kwargs=None, relative=None, options=None, **entry): model_schedule, model_field = cls.to_model_schedule(schedule) + schedules = { + 'interval': None, + 'crontab': None, + 'solar': None + } + schedules[model_field] = model_schedule + + entry.update(**schedules) entry.update( - {model_field: model_schedule}, args=dumps(args or []), kwargs=dumps(kwargs or {}), - **cls._unpack_options(**options or {}) ) + entry.update(**cls._unpack_options(**options or {})) return entry @classmethod @@ -189,7 +196,8 @@ def __init__(self, *args, **kwargs): def setup_schedule(self): self.install_default_entries(self.schedule) - self.update_from_dict(self.app.conf.beat_schedule) + self.update_from_dict(self.app.conf.beat_schedule, + origin_key='beat_schedule') def all_as_schedule(self): debug('DatabaseScheduler: Fetching database schedule') @@ -248,18 +256,36 @@ def sync(self): self._dirty |= _tried logger.exception('Database error while sync: %r', exc) - def update_from_dict(self, mapping): + def update_from_dict(self, mapping, origin_key=None): s = {} for name, entry_fields in items(mapping): try: entry = self.Entry.from_entry(name, app=self.app, + origin_key=origin_key, **entry_fields) if entry.model.enabled: s[name] = entry except Exception as exc: logger.error(ADD_ENTRY_ERROR, name, exc, entry_fields) + if origin_key: + # delete database-persisted periodic tasks that + # are no longer declared in source code + existing_task_instances = PeriodicTask.objects.filter( + origin_key=origin_key + ) + existing_tasks = set( + map(lambda x: x.name, existing_task_instances) + ) + tasks_to_purge = existing_tasks - set(mapping.keys()) + if tasks_to_purge: + PeriodicTask.objects.filter( + origin_key=origin_key, + name__in=list(tasks_to_purge) + ).delete() + logger.warn("Purged periodic tasks [origin_key=%s]: %s", + origin_key, ', '.join(list(tasks_to_purge))) self.schedule.update(s) def install_default_entries(self, data): @@ -272,7 +298,7 @@ def install_default_entries(self, data): 'options': {'expires': 12 * 3600}, }, ) - self.update_from_dict(entries) + self.update_from_dict(entries, origin_key='default_entries') @property def schedule(self):