diff --git a/frontik/app.py b/frontik/app.py index 4e7e63ac0..ee25ec516 100644 --- a/frontik/app.py +++ b/frontik/app.py @@ -24,7 +24,6 @@ from frontik.app_integrations.statsd import StatsDClient, StatsDClientStub, create_statsd_client from frontik.balancing_client import create_http_client from frontik.dependencies import clients -from frontik.handler_asgi import serve_tornado_request from frontik.options import options from frontik.process import WorkerState from frontik.routing import ( @@ -35,13 +34,13 @@ routers, ) from frontik.service_discovery import MasterServiceDiscovery, ServiceDiscovery, WorkerServiceDiscovery +from frontik.tornado_connection_handler import LegacyTornadoConnectionHandler, TornadoConnectionHandler from frontik.version import version as frontik_version app_logger = logging.getLogger('app_logger') -_server_tasks = set() -class FrontikApplication: +class FrontikApplication(httputil.HTTPServerConnectionDelegate): request_id = '' class DefaultConfig: @@ -88,12 +87,6 @@ def patch_anyio(self) -> None: except ImportError: pass - def __call__(self, tornado_request: httputil.HTTPServerRequest) -> None: - # for make it more asgi, reimplement tornado.http1connection._server_request_loop and ._read_message - task = asyncio.create_task(serve_tornado_request(self, self.asgi_app, tornado_request)) - _server_tasks.add(task) - task.add_done_callback(_server_tasks.discard) - def make_service_discovery(self) -> ServiceDiscovery: if self.worker_state.is_master and options.consul_enabled: return MasterServiceDiscovery(self.statsd_client, self.app_name) @@ -186,6 +179,13 @@ def get_frontik_and_apps_versions(self) -> etree.Element: def get_kafka_producer(self, producer_name: str) -> Optional[AIOKafkaProducer]: # pragma: no cover pass + def start_request( + self, + server_conn: object, + request_conn: httputil.HTTPConnection, + ) -> TornadoConnectionHandler: + return LegacyTornadoConnectionHandler(self, request_conn) + def anyio_noop(*_args: Any, **_kwargs: Any) -> None: raise RuntimeError(f'trying to use non async {_args[0]}') diff --git a/frontik/auth.py b/frontik/auth.py index ffee2f024..932ea0cd4 100644 --- a/frontik/auth.py +++ b/frontik/auth.py @@ -1,10 +1,11 @@ import base64 from typing import Mapping, MutableMapping, Optional, Union -from tornado import httputil from tornado.escape import to_unicode from tornado.web import Finish +from frontik.tornado_request import FrontikTornadoServerRequest + DEBUG_AUTH_HEADER_NAME = 'Frontik-Debug-Auth' @@ -41,6 +42,6 @@ def check_debug_auth_by_headers( def check_debug_auth( - tornado_request: httputil.HTTPServerRequest, login: Optional[str], password: Optional[str] + tornado_request: FrontikTornadoServerRequest, login: Optional[str], password: Optional[str] ) -> Optional[dict]: return check_debug_auth_by_headers(tornado_request.headers, login, password) diff --git a/frontik/debug.py b/frontik/debug.py index 042cdc8b5..96c15d7e4 100644 --- a/frontik/debug.py +++ b/frontik/debug.py @@ -19,9 +19,8 @@ from lxml import etree from lxml.builder import E -from tornado import httputil from tornado.escape import to_unicode, utf8 -from tornado.httputil import HTTPHeaders, HTTPServerRequest +from tornado.httputil import HTTPHeaders from frontik import media_types, request_context from frontik.auth import check_debug_auth @@ -37,6 +36,7 @@ from http_client.request_response import RequestBuilder, RequestResult from frontik.app import FrontikApplication + from frontik.tornado_request import FrontikTornadoServerRequest debug_log = logging.getLogger('frontik.debug') @@ -388,7 +388,7 @@ def is_inherited(self) -> bool: return self.debug_mode.inherited def transform_chunk( - self, tornado_request: httputil.HTTPServerRequest, response: FrontikResponse + self, tornado_request: FrontikTornadoServerRequest, response: FrontikResponse ) -> FrontikResponse: if not self.is_enabled(): return response @@ -406,7 +406,7 @@ def transform_chunk( debug_log_data.set('code', str(int(response.status_code))) debug_log_data.set('handler-name', handler_name if handler_name else 'unknown handler') debug_log_data.set('started', _format_number(tornado_request._start_time)) - debug_log_data.set('request-id', str(tornado_request.request_id)) # type: ignore + debug_log_data.set('request-id', str(tornado_request.request_id)) debug_log_data.set('stages-total', _format_number((time.time() - tornado_request._start_time) * 1000)) try: @@ -468,7 +468,7 @@ def transform_chunk( class DebugMode: - def __init__(self, tornado_request: HTTPServerRequest) -> None: + def __init__(self, tornado_request: FrontikTornadoServerRequest) -> None: self.debug_value = get_cookie_or_param_from_request(tornado_request, 'debug') self.notpl = get_cookie_or_param_from_request(tornado_request, 'notpl') self.notrl = get_cookie_or_param_from_request(tornado_request, 'notrl') @@ -492,7 +492,9 @@ def __init__(self, tornado_request: HTTPServerRequest) -> None: if self.inherited: debug_log.debug('debug mode is inherited due to %s request header', DEBUG_HEADER_NAME) - def require_debug_access(self, tornado_request: HTTPServerRequest, auth_failed: Optional[bool] = None) -> None: + def require_debug_access( + self, tornado_request: FrontikTornadoServerRequest, auth_failed: Optional[bool] = None + ) -> None: if auth_failed is True: self.auth_failed = True return diff --git a/frontik/handler_asgi.py b/frontik/handler_asgi.py index eeb470634..d08238451 100644 --- a/frontik/handler_asgi.py +++ b/frontik/handler_asgi.py @@ -8,7 +8,6 @@ from typing import TYPE_CHECKING from tornado import httputil -from tornado.httputil import HTTPServerRequest from frontik import request_context from frontik.debug import DebugMode, DebugTransform @@ -21,6 +20,7 @@ if TYPE_CHECKING: from frontik.app import FrontikApplication, FrontikAsgiApp + from frontik.tornado_request import FrontikTornadoServerRequest CHARSET = 'utf-8' log = logging.getLogger('handler') @@ -29,7 +29,7 @@ async def serve_tornado_request( frontik_app: FrontikApplication, asgi_app: FrontikAsgiApp, - tornado_request: httputil.HTTPServerRequest, + tornado_request: FrontikTornadoServerRequest, ) -> None: with ExitStack() as stack: integrations: dict[str, IntegrationDto] = { @@ -64,7 +64,7 @@ async def serve_tornado_request( async def process_request( frontik_app: FrontikApplication, asgi_app: FrontikAsgiApp, - tornado_request: HTTPServerRequest, + tornado_request: FrontikTornadoServerRequest, integrations: dict[str, IntegrationDto], ) -> FrontikResponse: if integrations.get('request_limiter', IntegrationDto()).get_value() is False: @@ -91,7 +91,7 @@ async def process_request( async def execute_asgi_page( frontik_app: FrontikApplication, asgi_app: FrontikAsgiApp, - tornado_request: HTTPServerRequest, + tornado_request: FrontikTornadoServerRequest, scope: dict, debug_mode: DebugMode, integrations: dict[str, IntegrationDto], @@ -119,10 +119,19 @@ async def execute_asgi_page( async def receive(): await asyncio.sleep(0) + + if tornado_request.finished and tornado_request.body_chunks.empty(): + return { + 'body': b'', + 'type': 'http.request', + 'more_body': False, + } + + chunk = await tornado_request.body_chunks.get() return { - 'body': tornado_request.body, + 'body': chunk, 'type': 'http.request', - 'more_body': False, + 'more_body': not tornado_request.finished or not tornado_request.body_chunks.empty(), } async def send(message): @@ -157,7 +166,7 @@ async def send(message): return response -def make_debug_mode(frontik_app: FrontikApplication, tornado_request: HTTPServerRequest) -> DebugMode: +def make_debug_mode(frontik_app: FrontikApplication, tornado_request: FrontikTornadoServerRequest) -> DebugMode: debug_mode = DebugMode(tornado_request) if not debug_mode.need_auth: @@ -184,12 +193,12 @@ def _on_connection_close(tornado_request, process_request_task, integrations): process_request_task.cancel() # serve_tornado_request will be interrupted with CanceledError -def log_request(tornado_request: httputil.HTTPServerRequest, status_code: int) -> None: +def log_request(tornado_request: FrontikTornadoServerRequest, status_code: int) -> None: # frontik.request_context can't be used in case when client has closed connection request_time = int(1000.0 * tornado_request.request_time()) extra = { 'ip': tornado_request.remote_ip, - 'rid': tornado_request.request_id, # type: ignore + 'rid': tornado_request.request_id, 'status': status_code, 'time': request_time, 'method': tornado_request.method, diff --git a/frontik/server_tasks.py b/frontik/server_tasks.py new file mode 100644 index 000000000..43e46cb67 --- /dev/null +++ b/frontik/server_tasks.py @@ -0,0 +1,4 @@ +import asyncio +import typing + +_server_tasks: typing.Set[asyncio.Task] = set() diff --git a/frontik/tornado_connection_handler.py b/frontik/tornado_connection_handler.py new file mode 100644 index 000000000..d89d85c16 --- /dev/null +++ b/frontik/tornado_connection_handler.py @@ -0,0 +1,78 @@ +import asyncio +import typing +from typing import Awaitable, Optional, Union + +from tornado import httputil + +from frontik.handler_asgi import serve_tornado_request +from frontik.server_tasks import _server_tasks +from frontik.tornado_request import FrontikTornadoServerRequest + +if typing.TYPE_CHECKING: + from frontik.app import FrontikApplication + + +class TornadoConnectionHandler(httputil.HTTPMessageDelegate): + def __init__( + self, + frontik_app: 'FrontikApplication', + request_conn: httputil.HTTPConnection, + ) -> None: + self.connection = request_conn + self.frontik_app = frontik_app + self.request = None # type: Optional[FrontikTornadoServerRequest] + + def headers_received( + self, + start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine], + headers: httputil.HTTPHeaders, + ) -> None: + self.request = FrontikTornadoServerRequest( + connection=self.connection, + start_line=typing.cast(httputil.RequestStartLine, start_line), + headers=headers, + ) + self.process_request() + + def process_request(self) -> None: + self._process_request() + + def _process_request(self) -> Optional[Awaitable[None]]: + assert self.request is not None + task = asyncio.create_task(serve_tornado_request(self.frontik_app, self.frontik_app.asgi_app, self.request)) + _server_tasks.add(task) + task.add_done_callback(_server_tasks.discard) + return task + + def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]: + assert self.request is not None + task = asyncio.create_task(self.request.body_chunks.put(chunk)) + return task + + def finish(self) -> None: + assert self.request is not None + self.request.finished = True + + def on_connection_close(self) -> None: + assert self.request is not None + self.request.finished = True + + +class LegacyTornadoConnectionHandler(TornadoConnectionHandler): + """Used because of + 1. Debug (requires request.body) + 2. FrontikPageHandler (full of legacy shit) + """ + + def process_request(self) -> None: ... + + def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]: + assert self.request is not None + self.request.body += chunk + return super().data_received(chunk) + + def finish(self) -> None: + assert self.request is not None + self.request._parse_body() + super().finish() + self._process_request() diff --git a/frontik/tornado_request.py b/frontik/tornado_request.py new file mode 100644 index 000000000..e541dfe4f --- /dev/null +++ b/frontik/tornado_request.py @@ -0,0 +1,11 @@ +import asyncio + +from tornado import httputil + + +class FrontikTornadoServerRequest(httputil.HTTPServerRequest): + def __init__(self, *args, **kwargs) -> None: # type: ignore + super().__init__(*args, **kwargs) + self.body_chunks: asyncio.Queue = asyncio.Queue(maxsize=100) + self.request_id = None + self.finished = False diff --git a/poetry.lock b/poetry.lock index 48a244120..f87b5012d 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. [[package]] name = "aiohttp" @@ -556,7 +556,7 @@ protobuf = ["grpcio-tools (>=1.67.0)"] [[package]] name = "http-client" -version = "2.1.20" +version = "2.1.21" description = "Balancing http client around aiohttp" optional = false python-versions = "~=3.9" @@ -572,8 +572,8 @@ yarl = "1.9.2" [package.source] type = "git" url = "https://github.com/hhru/balancing-http-client.git" -reference = "2.1.20" -resolved_reference = "faa3aaed9e059752c030910d50d23eed8459ebf9" +reference = "2.1.21" +resolved_reference = "8d4058fdba61aee19485c3a399ca247f4e7da366" [[package]] name = "idna" @@ -1537,6 +1537,17 @@ url = "https://github.com/hhru/python-consul2" reference = "v0.2.10" resolved_reference = "a9c9256832fdaae5beaea4e3925c9fb7b0adf47e" +[[package]] +name = "python-multipart" +version = "0.0.16" +description = "A streaming multipart parser for Python" +optional = false +python-versions = ">=3.8" +files = [ + {file = "python_multipart-0.0.16-py3-none-any.whl", hash = "sha256:c2759b7b976ef3937214dfb592446b59dfaa5f04682a076f78b117c94776d87a"}, + {file = "python_multipart-0.0.16.tar.gz", hash = "sha256:8dee37b88dab9b59922ca173c35acb627cc12ec74019f5cd4578369c6df36554"}, +] + [[package]] name = "requests" version = "2.32.3" @@ -2020,4 +2031,4 @@ testing = ["tornado-httpclient-mock"] [metadata] lock-version = "2.0" python-versions = "~=3.9" -content-hash = "465518e3be0c246eb7ccae461bb49c6ca5772ed0d1dbc859941f65100a7f9b18" +content-hash = "6988c9a80f4addc8baa237a88f791e3f45d6b8d8e0db55ee96f7839fdfec5333" diff --git a/pyproject.toml b/pyproject.toml index 790811c37..9addf58ee 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,7 +22,7 @@ lxml = '4.9.2' pydantic = '^2.3.0' tornado = '6.3.3' orjson = '*' -http-client = {git = 'https://github.com/hhru/balancing-http-client.git', tag = '2.1.20'} +http-client = {git = 'https://github.com/hhru/balancing-http-client.git', tag = '2.1.21'} python-consul2-hh = {git = 'https://github.com/hhru/python-consul2', tag = 'v0.2.10'} opentelemetry-sdk = '1.25.0' opentelemetry-api = '1.25.0' @@ -34,6 +34,7 @@ fastapi = '0.115.2' aiokafka = '0.8.1' sentry-sdk = '2.7.0' tornado-httpclient-mock = '0.2.3' +python-multipart = "^0.0.16" [tool.poetry.group.test.dependencies] pytest = '8.1.1' diff --git a/tests/test_kafka_integration.py b/tests/test_kafka_integration.py index 1c20d40fe..a0a51d142 100644 --- a/tests/test_kafka_integration.py +++ b/tests/test_kafka_integration.py @@ -12,7 +12,7 @@ from frontik.testing import FrontikTestBase -class TestKafkaProducer: +class KafkaProducerMock: def __init__(self) -> None: self.data: list[dict[str, dict]] = [] self.request_id = None @@ -34,7 +34,7 @@ def disable_and_get_data(self): class KafkaApplication(FrontikApplication): async def init(self): await super().init() - self.http_client_factory.request_engine_builder.kafka_producer = TestKafkaProducer() + self.http_client_factory.request_engine_builder.kafka_producer = KafkaProducerMock() @router.get('/kafka') diff --git a/tests/test_request.py b/tests/test_request.py new file mode 100644 index 000000000..33772ce45 --- /dev/null +++ b/tests/test_request.py @@ -0,0 +1,22 @@ +from fastapi import Request +from fastapi.responses import ORJSONResponse, Response + +from frontik.routing import router +from frontik.testing import FrontikTestBase + +DATA = {'body_arg': 'value'} + + +@router.post('/echo') +async def echo_handler(request: Request) -> Response: + return ORJSONResponse(content=dict(await request.form())) + + +class TestStreamingRequest(FrontikTestBase): + async def test_post_request_with_body(self): + response = await self.fetch( + method='POST', + path='/echo', + data=DATA, + ) + assert response.data == DATA diff --git a/tests/test_streaming_request.py b/tests/test_streaming_request.py new file mode 100644 index 000000000..c64c7474d --- /dev/null +++ b/tests/test_streaming_request.py @@ -0,0 +1,32 @@ +import math + +from fastapi import Request +from fastapi.responses import Response + +from frontik.media_types import TEXT_PLAIN +from frontik.routing import router +from frontik.testing import FrontikTestBase + +DATA = b'x' * 1_000_000 + + +@router.post('/streaming_request') +async def get_page(request: Request) -> Response: + chunks_count = 0 + async for chunk in request.stream(): + if chunk != b'': + chunks_count += 1 + + assert chunks_count == math.ceil(len(DATA) / 65536) + + return Response(headers={'Content-type': TEXT_PLAIN}) + + +class TestStreamingRequest(FrontikTestBase): + async def test_streaming_request(self): + await self.fetch( + method='POST', + path='/streaming_request', + data={'field': 'value'}, + files={'file_file': [{'filename': 'file_name', 'body': DATA}]}, + ) diff --git a/tests/test_streaming.py b/tests/test_streaming_response.py similarity index 93% rename from tests/test_streaming.py rename to tests/test_streaming_response.py index d4f8e11a5..2421fe1bb 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming_response.py @@ -16,7 +16,7 @@ async def iterable() -> AsyncIterable: return StreamingResponse(content=iterable(), headers={'Content-type': TEXT_PLAIN}) -class TestStreaming(FrontikTestBase): +class TestStreamingResponse(FrontikTestBase): async def test_streaming_response(self): response = await self.fetch('/stream') assert response.headers['content-type'] == 'text/plain'