Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[16.0][FIX] fastapi: Avoid zombie threads #486

Open
wants to merge 5 commits into
base: 16.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions fastapi/fastapi_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from .context import odoo_env_ctx
from .error_handlers import convert_exception_to_status_body
from .pools import fastapi_app_pool


class FastApiDispatcher(Dispatcher):
Expand All @@ -24,18 +25,17 @@ def dispatch(self, endpoint, args):
root_path = "/" + environ["PATH_INFO"].split("/")[1]
# TODO store the env into contextvar to be used by the odoo_env
# depends method
fastapi_endpoint = self.request.env["fastapi.endpoint"].sudo()
app = fastapi_endpoint.get_app(root_path)
uid = fastapi_endpoint.get_uid(root_path)
data = BytesIO()
with self._manage_odoo_env(uid):
for r in app(environ, self._make_response):
data.write(r)
if self.inner_exception:
raise self.inner_exception
return self.request.make_response(
data.getvalue(), headers=self.headers, status=self.status
)
with fastapi_app_pool.get_app(root_path, request.env) as app:
uid = request.env["fastapi.endpoint"].sudo().get_uid(root_path)
data = BytesIO()
with self._manage_odoo_env(uid):
for r in app(environ, self._make_response):
data.write(r)
if self.inner_exception:
raise self.inner_exception
return self.request.make_response(
data.getvalue(), headers=self.headers, status=self.status
)

def handle_error(self, exc):
headers = getattr(exc, "headers", None)
Expand Down
26 changes: 26 additions & 0 deletions fastapi/middleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright 2025 ACSONE SA/NV
# License LGPL-3.0 or later (http://www.gnu.org/licenses/LGPL).
"""
ASGI middleware for FastAPI.

This module provides an ASGI middleware for FastAPI applications. The middleware
is designed to ensure managed the lifecycle of the threads used to as event loop
for the ASGI application.

"""

from typing import Iterable

import a2wsgi
from a2wsgi.asgi import ASGIResponder
from a2wsgi.wsgi_typing import Environ, StartResponse

from .pools import event_loop_pool


class ASGIMiddleware(a2wsgi.ASGIMiddleware):
def __call__(
self, environ: Environ, start_response: StartResponse
) -> Iterable[bytes]:
with event_loop_pool.get_event_loop() as loop:
return ASGIResponder(self.app, loop)(environ, start_response)
10 changes: 3 additions & 7 deletions fastapi/models/fastapi_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from itertools import chain
from typing import Any, Callable, Dict, List, Tuple

from a2wsgi import ASGIMiddleware
from starlette.middleware import Middleware
from starlette.routing import Mount

Expand All @@ -15,6 +14,8 @@
from fastapi import APIRouter, Depends, FastAPI

from .. import dependencies
from ..middleware import ASGIMiddleware
from ..pools import fastapi_app_pool

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -198,14 +199,9 @@
return f"{self._name}:{self.id}:{path}"

def _reset_app(self):
self.get_app.clear_cache(self)
fastapi_app_pool.invalidate(self.root_path, self.env)

Check warning on line 202 in fastapi/models/fastapi_endpoint.py

View check run for this annotation

Codecov / codecov/patch

fastapi/models/fastapi_endpoint.py#L202

Added line #L202 was not covered by tests

@api.model
@tools.ormcache("root_path")
# TODO cache on thread local by db to enable to get 1 middelware by
# thread when odoo runs in multi threads mode and to allows invalidate
# specific entries in place og the overall cache as we have to do into
# the _rest_app method
def get_app(self, root_path):
record = self.search([("root_path", "=", root_path)])
if not record:
Expand Down
7 changes: 7 additions & 0 deletions fastapi/pools/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .event_loop import EventLoopPool
from .fastapi_app import FastApiAppPool

event_loop_pool = EventLoopPool()
fastapi_app_pool = FastApiAppPool()

__all__ = ["event_loop_pool", "fastapi_app_pool"]
58 changes: 58 additions & 0 deletions fastapi/pools/event_loop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Copyright 2025 ACSONE SA/NV
# License LGPL-3.0 or later (http://www.gnu.org/licenses/LGPL).

import asyncio
import queue
import threading
from contextlib import contextmanager
from typing import Generator


class EventLoopPool:
def __init__(self):
self.pool = queue.Queue[tuple[asyncio.AbstractEventLoop, threading.Thread]]()

def __get_event_loop_and_thread(
self,
) -> tuple[asyncio.AbstractEventLoop, threading.Thread]:
"""
Get an event loop from the pool. If no event loop is available, create a new one.
"""
try:
return self.pool.get_nowait()
sbidoul marked this conversation as resolved.
Show resolved Hide resolved
except queue.Empty:
loop = asyncio.new_event_loop()
thread = threading.Thread(target=loop.run_forever, daemon=True)
thread.start()
return loop, thread
sbidoul marked this conversation as resolved.
Show resolved Hide resolved

def __return_event_loop(
self, loop: asyncio.AbstractEventLoop, thread: threading.Thread
) -> None:
"""
Return an event loop to the pool for reuse.
"""
self.pool.put((loop, thread))

def shutdown(self):
"""
Shutdown all event loop threads in the pool.
"""
while not self.pool.empty():
loop, thread = self.pool.get_nowait()
loop.call_soon_threadsafe(loop.stop)
thread.join()
loop.close()

Check warning on line 45 in fastapi/pools/event_loop.py

View check run for this annotation

Codecov / codecov/patch

fastapi/pools/event_loop.py#L42-L45

Added lines #L42 - L45 were not covered by tests

@contextmanager
def get_event_loop(self) -> Generator[asyncio.AbstractEventLoop, None, None]:
"""
Get an event loop from the pool. If no event loop is available, create a new one.

After the context manager exits, the event loop is returned to the pool for reuse.
"""
loop, thread = self.__get_event_loop_and_thread()
try:
yield loop
finally:
self.__return_event_loop(loop, thread)
52 changes: 52 additions & 0 deletions fastapi/pools/fastapi_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright 2025 ACSONE SA/NV
# License LGPL-3.0 or later (http://www.gnu.org/licenses/LGPL).

import queue
from collections import defaultdict
from contextlib import contextmanager
from typing import Generator

from odoo.api import Environment

from fastapi import FastAPI


class FastApiAppPool:
def __init__(self):
self._queue_by_db_by_root_path: dict[
str, dict[str, queue.Queue[FastAPI]]
] = defaultdict(lambda: defaultdict(queue.Queue))

def __get_app(self, env: Environment, root_path: str) -> FastAPI:
db_name = env.cr.dbname
try:
return self._queue_by_db_by_root_path[db_name][root_path].get_nowait()
sbidoul marked this conversation as resolved.
Show resolved Hide resolved
except queue.Empty:
env["fastapi.endpoint"].sudo()
return env["fastapi.endpoint"].sudo().get_app(root_path)

def __return_app(self, env: Environment, app: FastAPI, root_path: str) -> None:
db_name = env.cr.dbname
self._queue_by_db_by_root_path[db_name][root_path].put(app)

@contextmanager
def get_app(
self, root_path: str, env: Environment
) -> Generator[FastAPI, None, None]:
"""Return a FastAPI app to be used in a context manager.

The app is retrieved from the pool if available, otherwise a new one is created.
The app is returned to the pool after the context manager exits.

When used into the FastApiDispatcher class this ensures that the app is reused
across multiple requests but only one request at a time uses an app.
"""
app = self.__get_app(env, root_path)
try:
yield app
finally:
self.__return_app(env, app, root_path)

def invalidate(self, root_path: str, env: Environment) -> None:
db_name = env.cr.dbname
self._queue_by_db_by_root_path[db_name][root_path] = queue.Queue()

Check warning on line 52 in fastapi/pools/fastapi_app.py

View check run for this annotation

Codecov / codecov/patch

fastapi/pools/fastapi_app.py#L51-L52

Added lines #L51 - L52 were not covered by tests
Loading