-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Proposal for a new scheduler. Inspired by tha apscheduler, but with two important changes: Respecting a cancellation token both in the main loop and in sleeps, and allowing tasks to be triggered manually while still respecting the no overlap rule.
- Loading branch information
Showing
6 changed files
with
292 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
from ._scheduler import TaskScheduler | ||
|
||
__all__ = ["TaskScheduler"] |
102 changes: 102 additions & 0 deletions
102
cognite/extractorutils/unstable/scheduling/_scheduler.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
from dataclasses import dataclass | ||
from logging import getLogger | ||
from threading import RLock, Thread | ||
from time import time | ||
from typing import Callable | ||
|
||
import arrow | ||
from humps import pascalize | ||
|
||
from cognite.extractorutils.threading import CancellationToken | ||
from cognite.extractorutils.unstable.configuration.models import CronConfig, IntervalConfig, ScheduleConfig | ||
from cognite.extractorutils.unstable.scheduling._schedules import CronSchedule, IntervalSchedule, Schedule | ||
|
||
|
||
@dataclass | ||
class Job: | ||
name: str | ||
call: Callable[[], None] | ||
schedule: Schedule | ||
|
||
def __hash__(self) -> int: | ||
return hash(self.name) | ||
|
||
|
||
class TaskScheduler: | ||
def __init__(self, cancellation_token: CancellationToken) -> None: | ||
self._cancellation_token = cancellation_token | ||
self._jobs: dict[str, Job] = {} | ||
self._jobs_lock = RLock() | ||
self._running: set[Job] = set() | ||
self._running_lock = RLock() | ||
|
||
self._logger = getLogger() | ||
|
||
def schedule_task(self, name: str, schedule: ScheduleConfig, task: Callable[[], None]) -> None: | ||
parsed_schedule: Schedule | ||
match schedule: | ||
case CronConfig() as cron_config: | ||
parsed_schedule = CronSchedule(expression=cron_config.expression) | ||
|
||
case IntervalConfig() as interval_config: | ||
parsed_schedule = IntervalSchedule(interval=interval_config.expression.seconds) | ||
|
||
with self._jobs_lock: | ||
self._jobs[name] = Job(name=name, call=task, schedule=parsed_schedule) | ||
|
||
def _get_next(self) -> list[Job]: | ||
if not self._jobs: | ||
return [] | ||
with self._jobs_lock: | ||
next_runs = sorted([(j.schedule.next(), j) for j in self._jobs.values()], key=lambda tup: tup[0]) | ||
return [job for (next, job) in next_runs if next == next_runs[0][0]] if next_runs else [] | ||
|
||
def _run_job(self, job: Job) -> bool: | ||
with self._running_lock: | ||
if job in self._running: | ||
self._logger.warning(f"Job {job.name} already running") | ||
return False | ||
|
||
def wrap() -> None: | ||
with self._running_lock: | ||
self._running.add(job) | ||
try: | ||
job.call() | ||
|
||
self._logger.info(f"Job {job.name} done. Next run at {arrow.get(job.schedule.next()).isoformat()}") | ||
|
||
finally: | ||
with self._running_lock: | ||
self._running.remove(job) | ||
|
||
Thread(target=wrap, name=f"Run{pascalize(job.name)}").start() | ||
return True | ||
|
||
def trigger(self, name: str) -> bool: | ||
return self._run_job(self._jobs[name]) | ||
|
||
def run(self) -> None: | ||
if not self._jobs: | ||
raise ValueError("Can't run scheduler without any scheduled tasks") | ||
|
||
# Run all interval jobs on startup since the first next() is one interval from now | ||
for job in [j for j in self._jobs.values() if isinstance(j.schedule, IntervalSchedule)]: | ||
self.trigger(job.name) | ||
|
||
while not self._cancellation_token.is_cancelled: | ||
next_runs = self._get_next() | ||
|
||
next_time = next_runs[0].schedule.next() | ||
wait_time = max(next_time - time(), 0) | ||
|
||
if wait_time: | ||
self._logger.info(f"Waiting until {arrow.get(next_time).isoformat()}") | ||
if self._cancellation_token.wait(wait_time): | ||
break | ||
|
||
for job in next_runs: | ||
self._logger.info(f"Starting job {job.name}") | ||
self._run_job(job) | ||
|
||
def stop(self) -> None: | ||
self._cancellation_token.cancel() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
from abc import ABC, abstractmethod | ||
from time import time | ||
|
||
from croniter import croniter | ||
|
||
|
||
class Schedule(ABC): | ||
@abstractmethod | ||
def next(self) -> int: | ||
pass | ||
|
||
|
||
class CronSchedule(Schedule): | ||
def __init__(self, expression: str) -> None: | ||
self._cron = croniter(expression) | ||
|
||
def next(self) -> int: | ||
return int(self._cron.get_next(start_time=time())) | ||
|
||
|
||
class IntervalSchedule(Schedule): | ||
def __init__(self, interval: int) -> None: | ||
self._interval = interval | ||
self._next = int(time()) | ||
|
||
def next(self) -> int: | ||
t = time() | ||
while t > self._next: | ||
self._next += self._interval | ||
|
||
return self._next |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
from threading import Thread | ||
from time import sleep, time | ||
|
||
from pytest import approx | ||
|
||
from cognite.extractorutils.threading import CancellationToken | ||
from cognite.extractorutils.unstable.configuration.models import IntervalConfig, TimeIntervalConfig | ||
from cognite.extractorutils.unstable.scheduling._scheduler import TaskScheduler | ||
|
||
|
||
class MockFunction: | ||
def __init__(self, sleep_time: int) -> None: | ||
self.called_times: list[float] = [] | ||
|
||
self.sleep_time = sleep_time | ||
|
||
def __call__(self) -> None: | ||
self.called_times.append(time()) | ||
sleep(self.sleep_time) | ||
|
||
|
||
def test_interval_schedules() -> None: | ||
ct = CancellationToken() | ||
|
||
mock = MockFunction(sleep_time=1) | ||
|
||
scheduler = TaskScheduler(cancellation_token=ct.create_child_token()) | ||
scheduler.schedule_task( | ||
name="test", | ||
schedule=IntervalConfig(type="interval", expression=TimeIntervalConfig("3s")), | ||
task=mock, | ||
) | ||
start = time() | ||
Thread(target=scheduler.run).start() | ||
sleep(7) | ||
scheduler.stop() | ||
|
||
assert len(mock.called_times) == 3 | ||
assert mock.called_times[0] == approx(start) | ||
assert mock.called_times[1] == approx(mock.called_times[0] + 3) | ||
assert mock.called_times[2] == approx(mock.called_times[1] + 3) | ||
|
||
|
||
def test_overlapping_schedules() -> None: | ||
""" | ||
Test with a trigger that fires when the job is still running | ||
Timeline: | ||
time | 0 1 2 3 4 5 6 7 8 9 STOP | ||
--------|------------------------- | ||
trigger | x x x x x | ||
job | |-----| |-----| |-----| | ||
""" | ||
|
||
ct = CancellationToken() | ||
|
||
mock = MockFunction(sleep_time=3) | ||
|
||
scheduler = TaskScheduler(cancellation_token=ct.create_child_token()) | ||
scheduler.schedule_task( | ||
name="test", | ||
schedule=IntervalConfig(type="interval", expression=TimeIntervalConfig("2s")), | ||
task=mock, | ||
) | ||
start = time() | ||
Thread(target=scheduler.run).start() | ||
sleep(9) | ||
scheduler.stop() | ||
|
||
assert len(mock.called_times) == 3 | ||
assert mock.called_times[0] == approx(start) | ||
assert mock.called_times[1] == approx(mock.called_times[0] + 4) | ||
assert mock.called_times[1] == approx(mock.called_times[1] + 4) | ||
|
||
|
||
def test_manual() -> None: | ||
ct = CancellationToken() | ||
mock = MockFunction(sleep_time=0) | ||
|
||
scheduler = TaskScheduler(cancellation_token=ct.create_child_token()) | ||
scheduler.schedule_task( | ||
name="test", | ||
schedule=IntervalConfig(type="interval", expression=TimeIntervalConfig("1h")), | ||
task=mock, | ||
) | ||
|
||
Thread(target=scheduler.run).start() | ||
|
||
scheduler.trigger("test") | ||
sleep(0.1) | ||
scheduler.trigger("test") | ||
sleep(0.1) | ||
scheduler.trigger("test") | ||
|
||
sleep(1) | ||
|
||
scheduler.stop() | ||
|
||
assert len(mock.called_times) == 4 | ||
|
||
|
||
def test_manual_interval_mix() -> None: | ||
""" | ||
Test with a scheduled trigger mixed with manual trigger, make sure there's no overlap | ||
Timeline: | ||
time | 0 1 2 3 4 5 6 7 8 9 STOP | ||
---------|------------------------- | ||
schedule | x x x | ||
manual | x x | ||
job | |---| |---| |---| | ||
""" | ||
|
||
ct = CancellationToken() | ||
mock = MockFunction(sleep_time=2) | ||
|
||
scheduler = TaskScheduler(cancellation_token=ct.create_child_token()) | ||
scheduler.schedule_task( | ||
name="test", | ||
schedule=IntervalConfig(type="interval", expression=TimeIntervalConfig("4s")), | ||
task=mock, | ||
) | ||
|
||
start = time() | ||
Thread(target=scheduler.run).start() | ||
sleep(1) | ||
first_trigger = scheduler.trigger("test") | ||
sleep(2) | ||
second_trigger = scheduler.trigger("test") | ||
|
||
sleep(6) | ||
scheduler.stop() | ||
|
||
assert not first_trigger | ||
assert second_trigger | ||
|
||
assert len(mock.called_times) == 3 | ||
assert mock.called_times[0] == approx(start) | ||
assert mock.called_times[1] == approx(start + 3) | ||
assert mock.called_times[2] == approx(start + 8) |