Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/HH-235648' into EXP-101494
Browse files Browse the repository at this point in the history
  • Loading branch information
HH ReleaseBot committed Oct 30, 2024
2 parents add3d3d + 0cce557 commit a4a0703
Show file tree
Hide file tree
Showing 13 changed files with 206 additions and 35 deletions.
18 changes: 9 additions & 9 deletions frontik/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from frontik.app_integrations.statsd import StatsDClient, StatsDClientStub, create_statsd_client
from frontik.balancing_client import create_http_client
from frontik.dependencies import clients
from frontik.handler_asgi import serve_tornado_request
from frontik.options import options
from frontik.process import WorkerState
from frontik.routing import (
Expand All @@ -35,13 +34,13 @@
routers,
)
from frontik.service_discovery import MasterServiceDiscovery, ServiceDiscovery, WorkerServiceDiscovery
from frontik.tornado_connection_handler import LegacyTornadoConnectionHandler, TornadoConnectionHandler
from frontik.version import version as frontik_version

app_logger = logging.getLogger('app_logger')
_server_tasks = set()


class FrontikApplication:
class FrontikApplication(httputil.HTTPServerConnectionDelegate):
request_id = ''

class DefaultConfig:
Expand Down Expand Up @@ -88,12 +87,6 @@ def patch_anyio(self) -> None:
except ImportError:
pass

def __call__(self, tornado_request: httputil.HTTPServerRequest) -> None:
# for make it more asgi, reimplement tornado.http1connection._server_request_loop and ._read_message
task = asyncio.create_task(serve_tornado_request(self, self.asgi_app, tornado_request))
_server_tasks.add(task)
task.add_done_callback(_server_tasks.discard)

def make_service_discovery(self) -> ServiceDiscovery:
if self.worker_state.is_master and options.consul_enabled:
return MasterServiceDiscovery(self.statsd_client, self.app_name)
Expand Down Expand Up @@ -186,6 +179,13 @@ def get_frontik_and_apps_versions(self) -> etree.Element:
def get_kafka_producer(self, producer_name: str) -> Optional[AIOKafkaProducer]: # pragma: no cover
pass

def start_request(
self,
server_conn: object,
request_conn: httputil.HTTPConnection,
) -> TornadoConnectionHandler:
return LegacyTornadoConnectionHandler(self, request_conn)


def anyio_noop(*_args: Any, **_kwargs: Any) -> None:
raise RuntimeError(f'trying to use non async {_args[0]}')
Expand Down
5 changes: 3 additions & 2 deletions frontik/auth.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import base64
from typing import Mapping, MutableMapping, Optional, Union

from tornado import httputil
from tornado.escape import to_unicode
from tornado.web import Finish

from frontik.tornado_request import FrontikTornadoServerRequest

DEBUG_AUTH_HEADER_NAME = 'Frontik-Debug-Auth'


Expand Down Expand Up @@ -41,6 +42,6 @@ def check_debug_auth_by_headers(


def check_debug_auth(
tornado_request: httputil.HTTPServerRequest, login: Optional[str], password: Optional[str]
tornado_request: FrontikTornadoServerRequest, login: Optional[str], password: Optional[str]
) -> Optional[dict]:
return check_debug_auth_by_headers(tornado_request.headers, login, password)
14 changes: 8 additions & 6 deletions frontik/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@

from lxml import etree
from lxml.builder import E
from tornado import httputil
from tornado.escape import to_unicode, utf8
from tornado.httputil import HTTPHeaders, HTTPServerRequest
from tornado.httputil import HTTPHeaders

from frontik import media_types, request_context
from frontik.auth import check_debug_auth
Expand All @@ -37,6 +36,7 @@
from http_client.request_response import RequestBuilder, RequestResult

from frontik.app import FrontikApplication
from frontik.tornado_request import FrontikTornadoServerRequest

debug_log = logging.getLogger('frontik.debug')

Expand Down Expand Up @@ -388,7 +388,7 @@ def is_inherited(self) -> bool:
return self.debug_mode.inherited

def transform_chunk(
self, tornado_request: httputil.HTTPServerRequest, response: FrontikResponse
self, tornado_request: FrontikTornadoServerRequest, response: FrontikResponse
) -> FrontikResponse:
if not self.is_enabled():
return response
Expand All @@ -406,7 +406,7 @@ def transform_chunk(
debug_log_data.set('code', str(int(response.status_code)))
debug_log_data.set('handler-name', handler_name if handler_name else 'unknown handler')
debug_log_data.set('started', _format_number(tornado_request._start_time))
debug_log_data.set('request-id', str(tornado_request.request_id)) # type: ignore
debug_log_data.set('request-id', str(tornado_request.request_id))
debug_log_data.set('stages-total', _format_number((time.time() - tornado_request._start_time) * 1000))

try:
Expand Down Expand Up @@ -468,7 +468,7 @@ def transform_chunk(


class DebugMode:
def __init__(self, tornado_request: HTTPServerRequest) -> None:
def __init__(self, tornado_request: FrontikTornadoServerRequest) -> None:
self.debug_value = get_cookie_or_param_from_request(tornado_request, 'debug')
self.notpl = get_cookie_or_param_from_request(tornado_request, 'notpl')
self.notrl = get_cookie_or_param_from_request(tornado_request, 'notrl')
Expand All @@ -492,7 +492,9 @@ def __init__(self, tornado_request: HTTPServerRequest) -> None:
if self.inherited:
debug_log.debug('debug mode is inherited due to %s request header', DEBUG_HEADER_NAME)

def require_debug_access(self, tornado_request: HTTPServerRequest, auth_failed: Optional[bool] = None) -> None:
def require_debug_access(
self, tornado_request: FrontikTornadoServerRequest, auth_failed: Optional[bool] = None
) -> None:
if auth_failed is True:
self.auth_failed = True
return
Expand Down
27 changes: 18 additions & 9 deletions frontik/handler_asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from typing import TYPE_CHECKING

from tornado import httputil
from tornado.httputil import HTTPServerRequest

from frontik import request_context
from frontik.debug import DebugMode, DebugTransform
Expand All @@ -21,6 +20,7 @@

if TYPE_CHECKING:
from frontik.app import FrontikApplication, FrontikAsgiApp
from frontik.tornado_request import FrontikTornadoServerRequest

CHARSET = 'utf-8'
log = logging.getLogger('handler')
Expand All @@ -29,7 +29,7 @@
async def serve_tornado_request(
frontik_app: FrontikApplication,
asgi_app: FrontikAsgiApp,
tornado_request: httputil.HTTPServerRequest,
tornado_request: FrontikTornadoServerRequest,
) -> None:
with ExitStack() as stack:
integrations: dict[str, IntegrationDto] = {
Expand Down Expand Up @@ -64,7 +64,7 @@ async def serve_tornado_request(
async def process_request(
frontik_app: FrontikApplication,
asgi_app: FrontikAsgiApp,
tornado_request: HTTPServerRequest,
tornado_request: FrontikTornadoServerRequest,
integrations: dict[str, IntegrationDto],
) -> FrontikResponse:
if integrations.get('request_limiter', IntegrationDto()).get_value() is False:
Expand All @@ -91,7 +91,7 @@ async def process_request(
async def execute_asgi_page(
frontik_app: FrontikApplication,
asgi_app: FrontikAsgiApp,
tornado_request: HTTPServerRequest,
tornado_request: FrontikTornadoServerRequest,
scope: dict,
debug_mode: DebugMode,
integrations: dict[str, IntegrationDto],
Expand Down Expand Up @@ -119,10 +119,19 @@ async def execute_asgi_page(

async def receive():
await asyncio.sleep(0)

if tornado_request.finished and tornado_request.body_chunks.empty():
return {
'body': b'',
'type': 'http.request',
'more_body': False,
}

chunk = await tornado_request.body_chunks.get()
return {
'body': tornado_request.body,
'body': chunk,
'type': 'http.request',
'more_body': False,
'more_body': not tornado_request.finished or not tornado_request.body_chunks.empty(),
}

async def send(message):
Expand Down Expand Up @@ -157,7 +166,7 @@ async def send(message):
return response


def make_debug_mode(frontik_app: FrontikApplication, tornado_request: HTTPServerRequest) -> DebugMode:
def make_debug_mode(frontik_app: FrontikApplication, tornado_request: FrontikTornadoServerRequest) -> DebugMode:
debug_mode = DebugMode(tornado_request)

if not debug_mode.need_auth:
Expand All @@ -184,12 +193,12 @@ def _on_connection_close(tornado_request, process_request_task, integrations):
process_request_task.cancel() # serve_tornado_request will be interrupted with CanceledError


def log_request(tornado_request: httputil.HTTPServerRequest, status_code: int) -> None:
def log_request(tornado_request: FrontikTornadoServerRequest, status_code: int) -> None:
# frontik.request_context can't be used in case when client has closed connection
request_time = int(1000.0 * tornado_request.request_time())
extra = {
'ip': tornado_request.remote_ip,
'rid': tornado_request.request_id, # type: ignore
'rid': tornado_request.request_id,
'status': status_code,
'time': request_time,
'method': tornado_request.method,
Expand Down
4 changes: 4 additions & 0 deletions frontik/server_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import asyncio
import typing

_server_tasks: typing.Set[asyncio.Task] = set()
78 changes: 78 additions & 0 deletions frontik/tornado_connection_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import asyncio
import typing
from typing import Awaitable, Optional, Union

from tornado import httputil

from frontik.handler_asgi import serve_tornado_request
from frontik.server_tasks import _server_tasks
from frontik.tornado_request import FrontikTornadoServerRequest

if typing.TYPE_CHECKING:
from frontik.app import FrontikApplication


class TornadoConnectionHandler(httputil.HTTPMessageDelegate):
def __init__(
self,
frontik_app: 'FrontikApplication',
request_conn: httputil.HTTPConnection,
) -> None:
self.connection = request_conn
self.frontik_app = frontik_app
self.request = None # type: Optional[FrontikTornadoServerRequest]

def headers_received(
self,
start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine],
headers: httputil.HTTPHeaders,
) -> None:
self.request = FrontikTornadoServerRequest(
connection=self.connection,
start_line=typing.cast(httputil.RequestStartLine, start_line),
headers=headers,
)
self.process_request()

def process_request(self) -> None:
self._process_request()

def _process_request(self) -> Optional[Awaitable[None]]:
assert self.request is not None
task = asyncio.create_task(serve_tornado_request(self.frontik_app, self.frontik_app.asgi_app, self.request))
_server_tasks.add(task)
task.add_done_callback(_server_tasks.discard)
return task

def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]:
assert self.request is not None
task = asyncio.create_task(self.request.body_chunks.put(chunk))
return task

def finish(self) -> None:
assert self.request is not None
self.request.finished = True

def on_connection_close(self) -> None:
assert self.request is not None
self.request.finished = True


class LegacyTornadoConnectionHandler(TornadoConnectionHandler):
"""Used because of
1. Debug (requires request.body)
2. FrontikPageHandler (full of legacy shit)
"""

def process_request(self) -> None: ...

def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]:
assert self.request is not None
self.request.body += chunk
return super().data_received(chunk)

def finish(self) -> None:
assert self.request is not None
self.request._parse_body()
super().finish()
self._process_request()
11 changes: 11 additions & 0 deletions frontik/tornado_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import asyncio

from tornado import httputil


class FrontikTornadoServerRequest(httputil.HTTPServerRequest):
def __init__(self, *args, **kwargs) -> None: # type: ignore
super().__init__(*args, **kwargs)
self.body_chunks: asyncio.Queue = asyncio.Queue(maxsize=100)
self.request_id = None
self.finished = False
21 changes: 16 additions & 5 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ lxml = '4.9.2'
pydantic = '^2.3.0'
tornado = '6.3.3'
orjson = '*'
http-client = {git = 'https://github.com/hhru/balancing-http-client.git', tag = '2.1.20'}
http-client = {git = 'https://github.com/hhru/balancing-http-client.git', tag = '2.1.21'}
python-consul2-hh = {git = 'https://github.com/hhru/python-consul2', tag = 'v0.2.10'}
opentelemetry-sdk = '1.25.0'
opentelemetry-api = '1.25.0'
Expand All @@ -34,6 +34,7 @@ fastapi = '0.115.2'
aiokafka = '0.8.1'
sentry-sdk = '2.7.0'
tornado-httpclient-mock = '0.2.3'
python-multipart = "^0.0.16"

[tool.poetry.group.test.dependencies]
pytest = '8.1.1'
Expand Down
4 changes: 2 additions & 2 deletions tests/test_kafka_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from frontik.testing import FrontikTestBase


class TestKafkaProducer:
class KafkaProducerMock:
def __init__(self) -> None:
self.data: list[dict[str, dict]] = []
self.request_id = None
Expand All @@ -34,7 +34,7 @@ def disable_and_get_data(self):
class KafkaApplication(FrontikApplication):
async def init(self):
await super().init()
self.http_client_factory.request_engine_builder.kafka_producer = TestKafkaProducer()
self.http_client_factory.request_engine_builder.kafka_producer = KafkaProducerMock()


@router.get('/kafka')
Expand Down
Loading

0 comments on commit a4a0703

Please sign in to comment.