Skip to content

Commit

Permalink
Gave explicit names to all background tasks
Browse files Browse the repository at this point in the history
Also removed a redundant task and task group from the job processing loop.
  • Loading branch information
agronholm committed Jul 29, 2024
1 parent 4127459 commit 783129c
Showing 1 changed file with 42 additions and 18 deletions.
60 changes: 42 additions & 18 deletions src/apscheduler/_schedulers/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,10 @@ async def wait_until_stopped(self) -> None:

async def start_in_background(self) -> None:
self._check_initialized()
await self._services_task_group.start(self.run_until_stopped)
await self._services_task_group.start(
self.run_until_stopped,
name=f"Scheduler {self.identity!r} main task",
)

async def run_until_stopped(
self, *, task_status: TaskStatus = TASK_STATUS_IGNORED
Expand Down Expand Up @@ -860,19 +863,28 @@ async def run_until_stopped(

# Start periodic cleanups
if self.cleanup_interval:
task_group.start_soon(self._cleanup_loop)
task_group.start_soon(
self._cleanup_loop,
name=f"Scheduler {self.identity!r} clean-up loop",
)
self.logger.debug(
"Started internal cleanup loop with interval: %s",
self.cleanup_interval,
)

# Start processing due schedules, if configured to do so
if self.role in (SchedulerRole.scheduler, SchedulerRole.both):
await task_group.start(self._process_schedules)
await task_group.start(
self._process_schedules,
name=f"Scheduler {self.identity!r} schedule processing loop",
)

# Start processing due jobs, if configured to do so
if self.role in (SchedulerRole.worker, SchedulerRole.both):
await task_group.start(self._process_jobs)
await task_group.start(
self._process_jobs,
name=f"Scheduler {self.identity!r} job processing loop",
)

# Signal that the scheduler has started
self._state = RunState.started
Expand Down Expand Up @@ -937,7 +949,14 @@ async def extend_schedule_leases(schedules: Sequence[Schedule]) -> None:
)
async with AsyncExitStack() as exit_stack:
tg = await exit_stack.enter_async_context(create_task_group())
tg.start_soon(extend_schedule_leases, schedules)
tg.start_soon(
extend_schedule_leases,
schedules,
name=(
f"Scheduler {self.identity!r} schedule lease extension "
f"loop"
),
)
exit_stack.callback(tg.cancel_scope.cancel)

now = datetime.now(timezone.utc)
Expand Down Expand Up @@ -1101,8 +1120,11 @@ async def extend_job_leases() -> None:
for job_executor in self.job_executors.values():
await job_executor.start(exit_stack)

outer_tg = await exit_stack.enter_async_context(create_task_group())
outer_tg.start_soon(extend_job_leases)
task_group = await exit_stack.enter_async_context(create_task_group())
task_group.start_soon(
extend_job_leases,
name=f"Scheduler {self.identity!r} job lease extension loop",
)

# Fetch new jobs every time
exit_stack.enter_context(
Expand All @@ -1121,18 +1143,20 @@ async def extend_job_leases() -> None:
jobs = await self.data_store.acquire_jobs(
self.identity, self.lease_duration, limit
)
async with AsyncExitStack() as inner_exit_stack:
inner_tg = await inner_exit_stack.enter_async_context(
create_task_group()
for job in jobs:
task = await self.data_store.get_task(job.task_id)
func = self._get_task_callable(task)
self._running_jobs.add(job)
task_group.start_soon(
self._run_job,
job,
func,
job.executor,
name=(
f"Scheduler {self.identity!r} job {job.id} "
f"({job.executor!r})"
),
)
inner_exit_stack.callback(inner_tg.cancel_scope.cancel)
inner_tg.start_soon(extend_job_leases)

for job in jobs:
task = await self.data_store.get_task(job.task_id)
func = self._get_task_callable(task)
self._running_jobs.add(job)
outer_tg.start_soon(self._run_job, job, func, job.executor)

await wakeup_event.wait()
wakeup_event = anyio.Event()
Expand Down

0 comments on commit 783129c

Please sign in to comment.