Skip to content

Commit

Permalink
Added the metadata field to tasks, schedules and jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
agronholm committed Jul 28, 2024
1 parent 7566881 commit f36a398
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 14 deletions.
1 change: 1 addition & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ APScheduler, see the :doc:`migration section <migration>`.
limit maximum concurrent job execution
- **BREAKING** Changed the ``timezone`` argument to ``CronTrigger.from_crontab()`` into
a keyword-only argument
- **BREAKING** Added the ``metadata`` field to tasks, schedules and jobs
- Added the ``start_time`` and ``end_time`` arguments to ``CronTrigger.from_crontab()``
(`#676 <https://github.com/agronholm/apscheduler/issues/676>`_)
- Added the ``psycopg`` event broker
Expand Down
23 changes: 21 additions & 2 deletions src/apscheduler/_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
from typing import Any, TypeVar

import attrs
from attr.validators import instance_of, optional

from ._structures import TaskDefaults
from ._converters import as_timedelta
from ._structures import MetadataType, TaskDefaults
from ._utils import UnsetValue, unset
from ._validators import if_not_unset, valid_metadata

T = TypeVar("T", bound="Callable[..., Any]")

Expand All @@ -17,6 +20,20 @@
@attrs.define(kw_only=True)
class TaskParameters(TaskDefaults):
id: str | UnsetValue = attrs.field(default=unset)
job_executor: str | UnsetValue = attrs.field(
validator=if_not_unset(instance_of(str)), default=unset
)
max_running_jobs: int | None | UnsetValue = attrs.field(
validator=if_not_unset(optional(instance_of(int))), default=unset
)
misfire_grace_time: timedelta | None | UnsetValue = attrs.field(
converter=as_timedelta,
validator=if_not_unset(optional(instance_of(timedelta))),
default=unset,
)
metadata: MetadataType | UnsetValue = attrs.field(
validator=if_not_unset(valid_metadata), default=unset
)


def task(
Expand All @@ -25,6 +42,7 @@ def task(
job_executor: str | UnsetValue = unset,
max_running_jobs: int | None | UnsetValue = unset,
misfire_grace_time: int | timedelta | None | UnsetValue = unset,
metadata: MetadataType | UnsetValue = unset,
) -> Callable[[T], T]:
"""
Decorate a function to have implied defaults as an APScheduler task.
Expand All @@ -36,7 +54,7 @@ def task(
:param ~datetime.timedelta | None misfire_grace_time: maximum number of seconds the
run time of jobs created for the task are allowed to be late, compared to the
scheduled run time
:param metadata: key-value pairs for storing JSON compatible custom information
"""

def wrapper(func: T) -> T:
Expand All @@ -56,6 +74,7 @@ def wrapper(func: T) -> T:
job_executor=job_executor,
max_running_jobs=max_running_jobs,
misfire_grace_time=misfire_grace_time,
metadata=metadata,
),
)
return func
Expand Down
38 changes: 36 additions & 2 deletions src/apscheduler/_schedulers/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,16 @@
ScheduleLookupError,
)
from .._marshalling import callable_from_ref, callable_to_ref
from .._structures import Job, JobResult, Schedule, ScheduleResult, Task, TaskDefaults
from .._utils import UnsetValue, unset
from .._structures import (
Job,
JobResult,
MetadataType,
Schedule,
ScheduleResult,
Task,
TaskDefaults,
)
from .._utils import UnsetValue, merge_metadata, unset
from .._validators import non_negative_number
from ..abc import DataStore, EventBroker, JobExecutor, Subscription, Trigger
from ..datastores.memory import MemoryDataStore
Expand Down Expand Up @@ -305,6 +313,7 @@ async def configure_task(
job_executor: str | UnsetValue = unset,
misfire_grace_time: float | timedelta | None | UnsetValue = unset,
max_running_jobs: int | None | UnsetValue = unset,
metadata: MetadataType | UnsetValue = unset,
) -> Task:
"""
Add or update a :ref:`task <task>` definition.
Expand All @@ -329,6 +338,7 @@ async def configure_task(
run time is allowed to be late, compared to the scheduled run time
:param max_running_jobs: maximum number of instances of the task that are
allowed to run concurrently
:param metadata: key-value pairs for storing JSON compatible custom information
:raises TypeError: if ``func_or_task_id`` is neither a task, task ID or a
callable
:return: the created or updated task definition
Expand All @@ -348,6 +358,7 @@ async def configure_task(
job_executor=func_or_task_id.job_executor,
max_running_jobs=func_or_task_id.max_running_jobs,
misfire_grace_time=func_or_task_id.misfire_grace_time,
metadata=func_or_task_id.metadata,
)
elif isinstance(func_or_task_id, str) and func_or_task_id:
task_params = get_task_params(func) if callable(func) else TaskParameters()
Expand All @@ -373,6 +384,12 @@ async def configure_task(
if task_params.misfire_grace_time is unset:
task_params.misfire_grace_time = self.task_defaults.misfire_grace_time

# Merge the metadata from the defaults, task definition and explicitly passed
# metadata
task_params.metadata = merge_metadata(
self.task_defaults.metadata, task_params.metadata, metadata
)

assert task_params.id
if callable(func):
self._task_callables[task_params.id] = func
Expand All @@ -391,6 +408,7 @@ async def configure_task(
job_executor=task_params.job_executor,
max_running_jobs=task_params.max_running_jobs,
misfire_grace_time=task_params.misfire_grace_time,
metadata=task_params.metadata,
)
modified = True
else:
Expand All @@ -411,6 +429,10 @@ async def configure_task(
changes["misfire_grace_time"] = task_params.misfire_grace_time
modified = True

if task_params.metadata != task.metadata:
changes["metadata"] = task_params.metadata
modified = True

task = attrs.evolve(task, **changes)

if modified:
Expand Down Expand Up @@ -440,6 +462,7 @@ async def add_schedule(
coalesce: CoalescePolicy = CoalescePolicy.latest,
job_executor: str | UnsetValue = unset,
misfire_grace_time: float | timedelta | None | UnsetValue = unset,
metadata: MetadataType | UnsetValue = unset,
max_jitter: float | timedelta | None = None,
job_result_expiration_time: float | timedelta = 0,
conflict_policy: ConflictPolicy = ConflictPolicy.do_nothing,
Expand All @@ -461,6 +484,7 @@ async def add_schedule(
fire times have become due for this schedule since the last processing
:param misfire_grace_time: maximum number of seconds the scheduled job's actual
run time is allowed to be late, compared to the scheduled run time
:param metadata: key-value pairs for storing JSON compatible custom information
:param max_jitter: maximum time (in seconds, or as a timedelta) to randomly add
to the scheduled time for each job created from this schedule
:param job_result_expiration_time: minimum time (in seconds, or as a timedelta)
Expand Down Expand Up @@ -507,6 +531,9 @@ async def add_schedule(
misfire_grace_time=task.misfire_grace_time
if misfire_grace_time is unset
else misfire_grace_time,
metadata=task.metadata.copy()
if metadata is unset
else merge_metadata(task.metadata, metadata),
max_jitter=max_jitter,
job_executor=task.job_executor if job_executor is unset else job_executor,
job_result_expiration_time=job_result_expiration_time,
Expand Down Expand Up @@ -615,6 +642,7 @@ async def add_job(
args: Iterable | None = None,
kwargs: Mapping[str, Any] | None = None,
job_executor: str | UnsetValue = unset,
metadata: MetadataType | UnsetValue = unset,
result_expiration_time: timedelta | float = 0,
) -> UUID:
"""
Expand All @@ -628,6 +656,7 @@ async def add_job(
:param kwargs: keyword arguments to call the target callable with
:param job_executor: name of the job executor to run the task with
(overrides the executor in the task definition, if any)
:param metadata: key-value pairs for storing JSON compatible custom information
:param result_expiration_time: the minimum time (as seconds, or timedelta) to
keep the result of the job available for fetching (the result won't be
saved at all if that time is 0)
Expand Down Expand Up @@ -665,6 +694,7 @@ async def add_job(
kwargs=kwargs or {},
executor=task.job_executor if job_executor is unset else job_executor,
result_expiration_time=result_expiration_time,
metadata=merge_metadata(task.metadata, metadata),
)
await self.data_store.add_job(job)
return job.id
Expand Down Expand Up @@ -717,6 +747,7 @@ async def run_job(
args: Iterable | None = None,
kwargs: Mapping[str, Any] | None = None,
job_executor: str | UnsetValue = unset,
metadata: MetadataType | UnsetValue = unset,
) -> Any:
"""
Convenience method to add a job and then return its result.
Expand All @@ -729,6 +760,7 @@ async def run_job(
:param kwargs: keyword arguments to be passed to the task function
:param job_executor: name of the job executor to run the task with
(overrides the executor in the task definition, if any)
:param metadata: key-value pairs for storing JSON compatible custom information
:returns: the return value of the task function
"""
Expand All @@ -746,6 +778,7 @@ def listener(event: JobReleased) -> None:
args=args,
kwargs=kwargs,
job_executor=job_executor,
metadata=metadata,
result_expiration_time=timedelta(minutes=15),
)
await job_complete_event.wait()
Expand Down Expand Up @@ -982,6 +1015,7 @@ async def extend_schedule_leases(schedules: Sequence[Schedule]) -> None:
start_deadline=start_deadline,
executor=schedule.job_executor,
result_expiration_time=schedule.job_result_expiration_time,
metadata=schedule.metadata.copy(),
)
await self.data_store.add_job(job)

Expand Down
10 changes: 9 additions & 1 deletion src/apscheduler/_schedulers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from .. import current_scheduler
from .._enums import CoalescePolicy, ConflictPolicy, RunState, SchedulerRole
from .._events import Event, T_Event
from .._structures import Job, JobResult, Schedule, Task, TaskDefaults
from .._structures import Job, JobResult, MetadataType, Schedule, Task, TaskDefaults
from .._utils import UnsetValue, unset
from ..abc import DataStore, EventBroker, JobExecutor, Subscription, Trigger
from .async_ import AsyncScheduler, TaskType
Expand Down Expand Up @@ -236,6 +236,7 @@ def configure_task(
job_executor: str | UnsetValue = unset,
misfire_grace_time: float | timedelta | None | UnsetValue = unset,
max_running_jobs: int | None | UnsetValue = unset,
metadata: MetadataType | UnsetValue = unset,
) -> Task:
portal = self._ensure_services_ready()
return portal.call(
Expand All @@ -246,6 +247,7 @@ def configure_task(
job_executor=job_executor,
misfire_grace_time=misfire_grace_time,
max_running_jobs=max_running_jobs,
metadata=metadata,
)
)

Expand All @@ -265,6 +267,7 @@ def add_schedule(
coalesce: CoalescePolicy = CoalescePolicy.latest,
job_executor: str | UnsetValue = unset,
misfire_grace_time: float | timedelta | None | UnsetValue = unset,
metadata: MetadataType | UnsetValue = unset,
max_jitter: float | timedelta | None = None,
job_result_expiration_time: float | timedelta = 0,
conflict_policy: ConflictPolicy = ConflictPolicy.do_nothing,
Expand All @@ -284,6 +287,7 @@ def add_schedule(
misfire_grace_time=misfire_grace_time,
max_jitter=max_jitter,
job_result_expiration_time=job_result_expiration_time,
metadata=metadata,
conflict_policy=conflict_policy,
)
)
Expand Down Expand Up @@ -326,6 +330,7 @@ def add_job(
args: Iterable | None = None,
kwargs: Mapping[str, Any] | None = None,
job_executor: str | UnsetValue = unset,
metadata: MetadataType | UnsetValue = unset,
result_expiration_time: timedelta | float = 0,
) -> UUID:
portal = self._ensure_services_ready()
Expand All @@ -336,6 +341,7 @@ def add_job(
args=args,
kwargs=kwargs,
job_executor=job_executor,
metadata=metadata,
result_expiration_time=result_expiration_time,
)
)
Expand All @@ -357,6 +363,7 @@ def run_job(
args: Iterable | None = None,
kwargs: Mapping[str, Any] | None = None,
job_executor: str | UnsetValue = unset,
metadata: MetadataType | UnsetValue = unset,
) -> Any:
portal = self._ensure_services_ready()
return portal.call(
Expand All @@ -366,6 +373,7 @@ def run_job(
args=args,
kwargs=kwargs,
job_executor=job_executor,
metadata=metadata,
)
)

Expand Down
29 changes: 24 additions & 5 deletions src/apscheduler/_structures.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import sys
from datetime import datetime, timedelta, timezone
from functools import partial
from typing import Any
Expand All @@ -12,8 +13,18 @@
from ._converters import as_aware_datetime, as_enum, as_timedelta
from ._enums import CoalescePolicy, JobOutcome
from ._utils import UnsetValue, unset
from ._validators import if_not_unset, valid_metadata
from .abc import Serializer, Trigger

if sys.version_info >= (3, 10):
from typing import TypeAlias
else:
from typing_extensions import TypeAlias

MetadataType: TypeAlias = (
"dict[str, str | int | bool | None | list[MetadataType] | dict[str, MetadataType]]"
)


def serialize(inst: Any, field: attrs.Attribute, value: Any) -> Any:
if isinstance(value, frozenset):
Expand All @@ -36,6 +47,7 @@ class Task:
:var ~datetime.timedelta | None misfire_grace_time: maximum number of seconds the
run time of jobs created for this task are allowed to be late, compared to the
scheduled run time
:var metadata: key-value pairs for storing JSON compatible custom information
"""

id: str = attrs.field(validator=[instance_of(str), min_len(1)], on_setattr=frozen)
Expand All @@ -55,6 +67,7 @@ class Task:
validator=optional(instance_of(timedelta)),
on_setattr=frozen,
)
metadata: MetadataType = attrs.field(validator=valid_metadata, factory=dict)
running_jobs: int = attrs.field(default=0)

def marshal(self, serializer: Serializer) -> dict[str, Any]:
Expand Down Expand Up @@ -92,19 +105,21 @@ class TaskDefaults:
:param ~datetime.timedelta | None misfire_grace_time: maximum number of seconds the
run time of jobs created for this task are allowed to be late, compared to the
scheduled run time
:var metadata: key-value pairs for storing JSON compatible custom information
"""

job_executor: str | UnsetValue = attrs.field(
validator=instance_of((str, UnsetValue)), default=unset
validator=if_not_unset(instance_of(str)), default=unset
)
max_running_jobs: int | None | UnsetValue = attrs.field(
validator=optional(instance_of((int, UnsetValue))), default=unset
validator=optional(instance_of(int)), default=1
)
misfire_grace_time: timedelta | None | UnsetValue = attrs.field(
misfire_grace_time: timedelta | None = attrs.field(
converter=as_timedelta,
validator=optional(instance_of((timedelta, UnsetValue))),
default=unset,
validator=optional(instance_of(timedelta)),
default=None,
)
metadata: MetadataType = attrs.field(validator=valid_metadata, factory=dict)


@attrs.define(kw_only=True, order=False)
Expand All @@ -127,6 +142,7 @@ class Schedule:
add to the scheduled time for each job created from this schedule
:var ~datetime.timedelta job_result_expiration_time: minimum time to keep the job
results in storage from the jobs created by this schedule
:var metadata: key-value pairs for storing JSON compatible custom information
:var ~datetime.datetime next_fire_time: the next time the task will be run
:var ~datetime.datetime | None last_fire_time: the last time the task was scheduled
to run
Expand Down Expand Up @@ -172,6 +188,7 @@ class Schedule:
validator=optional(instance_of(timedelta)),
on_setattr=frozen,
)
metadata: MetadataType = attrs.field(validator=valid_metadata, factory=dict)
next_fire_time: datetime | None = attrs.field(
converter=as_aware_datetime,
default=None,
Expand Down Expand Up @@ -266,6 +283,7 @@ class Job:
scheduler after this time, it is considered to be misfired and will be aborted
:var ~datetime.timedelta result_expiration_time: minimum amount of time to keep the
result available for fetching in the data store
:var metadata: key-value pairs for storing JSON compatible custom information
:var ~datetime.datetime created_at: the time at which the job was created
:var str | None acquired_by: the unique identifier of the scheduler that has
acquired the job for execution
Expand Down Expand Up @@ -295,6 +313,7 @@ class Job:
result_expiration_time: timedelta = attrs.field(
converter=as_timedelta, default=timedelta(), repr=False, on_setattr=frozen
)
metadata: MetadataType = attrs.field(validator=valid_metadata, factory=dict)
created_at: datetime = attrs.field(
converter=as_aware_datetime,
factory=partial(datetime.now, timezone.utc),
Expand Down
Loading

0 comments on commit f36a398

Please sign in to comment.