Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implementing graceful shutdown on inactivity #1291

Open
saro2-a opened this issue Jan 18, 2025 · 9 comments
Open

implementing graceful shutdown on inactivity #1291

saro2-a opened this issue Jan 18, 2025 · 9 comments

Comments

@saro2-a
Copy link

saro2-a commented Jan 18, 2025

in a elastic environment, we might spin up more worker and scale down. How to graceful shutdown after 60 seconds of no tasks on the current worker?

it would be nice if the "wait" parameter was a scalar, (or for backward compatibility a new param was added)

https://github.com/procrastinate-org/procrastinate/blob/68debead597724633f7ff1946788e9586b33b045/procrastinate/worker.py#L34C1-L48C7

    def __init__(
        self,
        app: app.App,
        queues: Iterable[str] | None = None,
        name: str | None = WORKER_NAME,
        concurrency: int = WORKER_CONCURRENCY,
        wait: bool = True,
        fetch_job_polling_interval: float = FETCH_JOB_POLLING_INTERVAL,
        abort_job_polling_interval: float = ABORT_JOB_POLLING_INTERVAL,
        shutdown_graceful_timeout: float | None = None,
        listen_notify: bool = True,
        delete_jobs: str | jobs.DeleteJobCondition | None = None,
        additional_context: dict[str, Any] | None = None,
        install_signal_handlers: bool = True,
    ):

wait_before_shutdown_seconds=None

@saro2-a
Copy link
Author

saro2-a commented Jan 18, 2025

It seems that version 2.15 had this https://github.com/procrastinate-org/procrastinate/blob/2.15.1/procrastinate/worker.py

via the timeout parameter.

Could that be added back with something like "timeout_pull_tasks"?

waiting on the "listen_notify" until sigterm only might lead to a restricted flexibility in behavior imo

@medihack
Copy link
Member

Hi @saro2-a. I think we only renamed timeout to fetch_job_polling_interval (see https://github.com/procrastinate-org/procrastinate/releases/tag/3.0.0b1), and wait was also a boolean in v2. But I wonder if it would make more sense to implement something like this on the user side. With app.run_worker_async, you have full control over the worker. Once, @ewjoachim wrote this for me for scheduling tasks (#1105 (comment)). Maybe something like this can be helpful in your scenario, too.

@ewjoachim
Copy link
Member

Yep, timeout was something else entirely.

I think this might actually be something else that could be helped by the introduction of middleware (as we may be able to track when a worker starts or stops a job).

@ewjoachim ewjoachim changed the title implementing graceful shutdown implementing graceful shutdown on inactivity Jan 18, 2025
@medihack
Copy link
Member

Yep, timeout was something else entirely.

So, is our release note wrong here? ("The timeout worker option was renamed to fetch_job_polling_interval to better reflect its purpose.)

I think this might actually be something else that could be helped by the introduction of middleware (as we may be able to track when a worker starts or stops a job).

I don't see where middleware helps here, as it is only called when a new job is fetched for processing. If I understand the issue correctly, the worker should be stopped when there is no job to process after a specified time period.

@ewjoachim
Copy link
Member

So, is our release note wrong here? ("The timeout worker option was renamed to fetch_job_polling_interval to better reflect its purpose.)

No sorry, I meant: timeout was not related to the ability to shutdown the worker after a time of inactivity. I think what you said & the release notes are right.

I don't see where middleware helps here
Middleware would help an outside system to know how many tasks are currently running and when the last task was finished. With this info, a coroutine that has the worker asyncio task could send a .cancel()

@saro2-a
Copy link
Author

saro2-a commented Jan 21, 2025

the reason I thought it was related was:

  • if the wait=false
  • hence was the time after fetch_job_polling_interval is passed, it will shutdown accordingly.

so do you think we can't have this configuration within the release? I think it is a fairly common feature in job processors as we want to avoid to spin up/destroy them all the time

@ewjoachim
Copy link
Member

ewjoachim commented Jan 25, 2025

What we're heading towards, I think, if that's ok with you:

  • The middleware feature
  • In the middleware, you can notify an asyncio.Event() everytime a job starts and stops
  • You launch your own worker, alongside with a coroutine that will loop on the event
  • When there's no activity on the event for , if the worker currently handles no task, you can send a cancel() on the worker and stop the process.

Here's a very rough look at what it may look like, provided the middleware API isn't finalized:

from __future__ import annotations

import asyncio
import functools

import my_project


async def middleware(
    process_job,
    context: job_context.JobContext,
    *,
    current_tasks: list,
    event: asyncio.Event,
):
    current_tasks.append(context)
    event.set()
    result = await process_job()
    current_tasks.remove(context)
    return result


async def run_worker():
    current_tasks = []
    event = asyncio.Event()
    worker = asyncio.create_task(
        my_project.app.run_worker_async(
            middleware=[
                functools.partial(middleware, current_tasks=current_tasks, event=event)
            ],
        )
    )
    while True:
        done, _ = await asyncio.wait(
            [worker, asyncio.create_task(event.wait())],
            timeout=10 * 60,
            return_when=asyncio.FIRST_COMPLETED,
        )
        if not done and not current_tasks:
            print("No task for 10 minutes, exiting")
            break
        event.clear()
        if worker in done:
            break

    worker.cancel()
    await worker


if __name__ == "__main__":
    asyncio.run(run_worker())

@onlyann
Copy link
Contributor

onlyann commented Jan 26, 2025

@ewjoachim I think asyncio.wait confusingly doesn't raise TimeoutError.
One tweak instead is to check the absence of returned done tasks.

Alternatively, another option is asyncio.timeout and to reschedule it everytime a job completes.

By the way, until a middleware feature is available, it is already possible to implement a middleware.

@ewjoachim
Copy link
Member

(Ha strange, I was re-reading my comment exactly when you commented)
Yes, I first wrote it using asyncio.wait_for which confusingly does raise, and had to change it to wait because we actually need to listen both to the worker task and the event. But yeah you're right. I'll edit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants