Skip to content

Commit

Permalink
[IMP] fastapi: add event loop lifecycle management
Browse files Browse the repository at this point in the history
This commit adds event loop lifecycle management to the FastAPI dispatcher.

Before this commit, an event loop and the thread to run it were created
each time a FastAPI app was created. The drawback of this approach is that
when the app was destroyed (for example, when the cache of app was cleared),
the event loop and the thread were not properly stopped, which could lead
to memory leaks and zombie threads. This commit fixes this issue by creating
a pool of event loops and threads that are shared among all FastAPI apps.
On each call to a FastAPI app, a event loop is requested from the pool and
is returned to the pool when the app is destroyed. At request time of
an event loop, the pool try to reuse an existing event loop and if no event
loop is available, a new event loop is created.

The cache of the FastAPI app is also refactored to use it's own mechanism.
It's now based on a dictionary of queues by root path by database,
where each queue is a pool of FastAPI app. This allows a better management
of the invalidation of the cache. It's now possible to invalidate
the cache of FastAPI app by root path without affecting the cache of others
root paths.
  • Loading branch information
lmignon committed Jan 10, 2025
1 parent 696a7c0 commit 8c090cd
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 43 deletions.
24 changes: 12 additions & 12 deletions fastapi/fastapi_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from .context import odoo_env_ctx
from .error_handlers import convert_exception_to_status_body
from .pools import fastapi_app_pool


class FastApiDispatcher(Dispatcher):
Expand All @@ -24,18 +25,17 @@ def dispatch(self, endpoint, args):
root_path = "/" + environ["PATH_INFO"].split("/")[1]
# TODO store the env into contextvar to be used by the odoo_env
# depends method
fastapi_endpoint = self.request.env["fastapi.endpoint"].sudo()
app = fastapi_endpoint.get_app(root_path)
uid = fastapi_endpoint.get_uid(root_path)
data = BytesIO()
with self._manage_odoo_env(uid):
for r in app(environ, self._make_response):
data.write(r)
if self.inner_exception:
raise self.inner_exception
return self.request.make_response(
data.getvalue(), headers=self.headers, status=self.status
)
with fastapi_app_pool.get_app(root_path, request.env) as app:
uid = request.env["fastapi.endpoint"].sudo().get_uid(root_path)
data = BytesIO()
with self._manage_odoo_env(uid):
for r in app(environ, self._make_response):
data.write(r)
if self.inner_exception:
raise self.inner_exception
return self.request.make_response(
data.getvalue(), headers=self.headers, status=self.status
)

def handle_error(self, exc):
headers = getattr(exc, "headers", None)
Expand Down
26 changes: 26 additions & 0 deletions fastapi/middleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright 2025 ACSONE SA/NV
# License LGPL-3.0 or later (http://www.gnu.org/licenses/LGPL).
"""
ASGI middleware for FastAPI.
This module provides an ASGI middleware for FastAPI applications. The middleware
is designed to ensure managed the lifecycle of the threads used to as event loop
for the ASGI application.
"""

from typing import Iterable

import a2wsgi
from a2wsgi.asgi import ASGIResponder
from a2wsgi.wsgi_typing import Environ, StartResponse

from .pools import event_loop_pool


class ASGIMiddleware(a2wsgi.ASGIMiddleware):
def __call__(
self, environ: Environ, start_response: StartResponse
) -> Iterable[bytes]:
with event_loop_pool.get_event_loop() as loop:
return ASGIResponder(self.app, loop)(environ, start_response)
35 changes: 4 additions & 31 deletions fastapi/models/fastapi_endpoint.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
# Copyright 2022 ACSONE SA/NV
# License LGPL-3.0 or later (http://www.gnu.org/licenses/LGPL).

import asyncio
import logging
import threading
from functools import partial
from itertools import chain
from typing import Any, Callable, Dict, List, Tuple

from a2wsgi import ASGIMiddleware
from starlette.middleware import Middleware
from starlette.routing import Mount

Expand All @@ -17,30 +14,12 @@
from fastapi import APIRouter, Depends, FastAPI

from .. import dependencies
from ..middleware import ASGIMiddleware
from ..pools import fastapi_app_pool

_logger = logging.getLogger(__name__)


# Thread-local storage for event loops
# Using a thread-local storage allows to have a dedicated event loop per thread
# and avoid the need to create a new event loop for each request. It's also
# compatible with the multi-worker mode of Odoo.
_event_loop_storage = threading.local()


def get_or_create_event_loop() -> asyncio.AbstractEventLoop:
"""
Get or create a reusable event loop for the current thread.
"""
if not hasattr(_event_loop_storage, "loop"):
loop = asyncio.new_event_loop()
loop_thread = threading.Thread(target=loop.run_forever, daemon=True)
loop_thread.start()
_event_loop_storage.loop = loop
_event_loop_storage.thread = loop_thread
return _event_loop_storage.loop


class FastapiEndpoint(models.Model):

_name = "fastapi.endpoint"
Expand Down Expand Up @@ -220,23 +199,17 @@ def _endpoint_registry_route_unique_key(self, routing: Dict[str, Any]):
return f"{self._name}:{self.id}:{path}"

def _reset_app(self):
self.get_app.clear_cache(self)
fastapi_app_pool.invalidate(self.root_path, self.env)

Check warning on line 202 in fastapi/models/fastapi_endpoint.py

View check run for this annotation

Codecov / codecov/patch

fastapi/models/fastapi_endpoint.py#L202

Added line #L202 was not covered by tests

@api.model
@tools.ormcache("root_path")
# TODO cache on thread local by db to enable to get 1 middelware by
# thread when odoo runs in multi threads mode and to allows invalidate
# specific entries in place og the overall cache as we have to do into
# the _rest_app method
def get_app(self, root_path):
record = self.search([("root_path", "=", root_path)])
if not record:
return None
app = FastAPI()
app.mount(record.root_path, record._get_app())
self._clear_fastapi_exception_handlers(app)
event_loop = get_or_create_event_loop()
return ASGIMiddleware(app, loop=event_loop)
return ASGIMiddleware(app)

def _clear_fastapi_exception_handlers(self, app: FastAPI) -> None:
"""
Expand Down
7 changes: 7 additions & 0 deletions fastapi/pools/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .event_loop import EventLoopPool
from .fastapi_app import FastApiAppPool

event_loop_pool = EventLoopPool()
fastapi_app_pool = FastApiAppPool()

__all__ = ["event_loop_pool", "fastapi_app_pool"]
58 changes: 58 additions & 0 deletions fastapi/pools/event_loop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Copyright 2025 ACSONE SA/NV
# License LGPL-3.0 or later (http://www.gnu.org/licenses/LGPL).

import asyncio
import queue
import threading
from contextlib import contextmanager
from typing import Generator


class EventLoopPool:
def __init__(self):
self.pool = queue.Queue[tuple[asyncio.AbstractEventLoop, threading.Thread]]()

def __get_event_loop_and_thread(
self,
) -> tuple[asyncio.AbstractEventLoop, threading.Thread]:
"""
Get an event loop from the pool. If no event loop is available, create a new one.
"""
try:
return self.pool.get_nowait()
except queue.Empty:
loop = asyncio.new_event_loop()
thread = threading.Thread(target=loop.run_forever, daemon=True)
thread.start()
return loop, thread

def __return_event_loop(
self, loop: asyncio.AbstractEventLoop, thread: threading.Thread
) -> None:
"""
Return an event loop to the pool for reuse.
"""
self.pool.put((loop, thread))

def shutdown(self):
"""
Shutdown all event loop threads in the pool.
"""
while not self.pool.empty():
loop, thread = self.pool.get_nowait()
loop.call_soon_threadsafe(loop.stop)
thread.join()
loop.close()

Check warning on line 45 in fastapi/pools/event_loop.py

View check run for this annotation

Codecov / codecov/patch

fastapi/pools/event_loop.py#L42-L45

Added lines #L42 - L45 were not covered by tests

@contextmanager
def get_event_loop(self) -> Generator[asyncio.AbstractEventLoop, None, None]:
"""
Get an event loop from the pool. If no event loop is available, create a new one.
After the context manager exits, the event loop is returned to the pool for reuse.
"""
loop, thread = self.__get_event_loop_and_thread()
try:
yield loop
finally:
self.__return_event_loop(loop, thread)
52 changes: 52 additions & 0 deletions fastapi/pools/fastapi_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright 2025 ACSONE SA/NV
# License LGPL-3.0 or later (http://www.gnu.org/licenses/LGPL).

import queue
from collections import defaultdict
from contextlib import contextmanager
from typing import Generator

from odoo.api import Environment

from fastapi import FastAPI


class FastApiAppPool:
def __init__(self):
self._queue_by_db_by_root_path: dict[
str, dict[str, queue.Queue[FastAPI]]
] = defaultdict(lambda: defaultdict(queue.Queue))

def __get_app(self, env: Environment, root_path: str) -> FastAPI:
db_name = env.cr.dbname
try:
return self._queue_by_db_by_root_path[db_name][root_path].get_nowait()
except queue.Empty:
env["fastapi.endpoint"].sudo()
return env["fastapi.endpoint"].sudo().get_app(root_path)

def __return_app(self, env: Environment, app: FastAPI, root_path: str) -> None:
db_name = env.cr.dbname
self._queue_by_db_by_root_path[db_name][root_path].put(app)

@contextmanager
def get_app(
self, root_path: str, env: Environment
) -> Generator[FastAPI, None, None]:
"""Return a FastAPI app to be used in a context manager.
The app is retrieved from the pool if available, otherwise a new one is created.
The app is returned to the pool after the context manager exits.
When used into the FastApiDispatcher class this ensures that the app is reused
across multiple requests but only one request at a time uses an app.
"""
app = self.__get_app(env, root_path)
try:
yield app
finally:
self.__return_app(env, app, root_path)

def invalidate(self, root_path: str, env: Environment) -> None:
db_name = env.cr.dbname
self._queue_by_db_by_root_path[db_name][root_path] = queue.Queue()

Check warning on line 52 in fastapi/pools/fastapi_app.py

View check run for this annotation

Codecov / codecov/patch

fastapi/pools/fastapi_app.py#L51-L52

Added lines #L51 - L52 were not covered by tests

0 comments on commit 8c090cd

Please sign in to comment.