Skip to content

Commit

Permalink
HH-212492 use fastapi server/routing
Browse files Browse the repository at this point in the history
  • Loading branch information
712u3 committed Apr 25, 2024
1 parent 1f2e49b commit 117b154
Show file tree
Hide file tree
Showing 15 changed files with 978 additions and 689 deletions.
264 changes: 189 additions & 75 deletions frontik/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
from functools import partial
from threading import Lock
from typing import Any, Optional, Union
import json
import inspect
import re

from aiokafka import AIOKafkaProducer
from http_client import AIOHttpClientWrapper, HttpClientFactory
Expand All @@ -24,33 +27,118 @@
import frontik.producers.xml_producer
from frontik import integrations, media_types, request_context
from frontik.debug import DebugTransform, get_frontik_and_apps_versions
from frontik.handler import ErrorHandler, PageHandler
from frontik.handler import PageHandler, FinishPageSignal, RedirectPageSignal, build_error_data
from frontik.handler_return_values import ReturnedValueHandlers, get_default_returned_value_handlers
from frontik.integrations.statsd import StatsDClient, StatsDClientStub, create_statsd_client
from frontik.loggers import CUSTOM_JSON_EXTRA, JSON_REQUESTS_LOGGER
from frontik.options import options
from frontik.process import WorkerState
from frontik.routing import FileMappingRouter, FrontikRouter
from frontik.routing import routers, normal_routes, regex_mapping, FrontikRouter, FrontikRegexRouter
from frontik.service_discovery import UpstreamManager
from frontik.util import check_request_id, generate_uniq_timestamp_request_id

app_logger = logging.getLogger('http_client')


class VersionHandler(RequestHandler):
def get(self):
self.application: FrontikApplication
self.set_header('Content-Type', 'text/xml')
self.write(
etree.tostring(get_frontik_and_apps_versions(self.application), encoding='utf-8', xml_declaration=True),
)


class StatusHandler(RequestHandler):
def get(self):
self.application: FrontikApplication
self.set_header('Content-Type', media_types.APPLICATION_JSON)
self.finish(self.application.get_current_status())
from fastapi import FastAPI, APIRouter, Request
from fastapi.routing import APIRoute
import pkgutil
from http_client import HttpClient
from starlette.middleware.base import Response
from fastapi import Depends
import os
from inspect import ismodule
from starlette.datastructures import MutableHeaders
from frontik.json_builder import json_decode
from frontik.handler import get_current_handler

app_logger = logging.getLogger('app_logger')

_core_router = FrontikRouter()
router = FrontikRouter()
regex_router = FrontikRegexRouter()
routers.extend((_core_router, router, regex_router))


def setup_page_handler(request: Request, cls: type(PageHandler)):
# create legacy PageHandler and put to request
handler = cls(
request.app.frontik_app,
request.query_params,
request.cookies,
request.headers,
request.state.body_bytes,
request.state.start_time,
request.url.path,
request.state.path_params,
request.client.host,
request.method,
)

request.state.handler = handler
return handler


def _data_to_chunk(data, headers) -> bytes:
if isinstance(data, str):
chunk = data.encode("utf-8")
elif isinstance(data, dict):
chunk = json.dumps(data).replace("</", "<\\/")
chunk = chunk.encode("utf-8")
headers["Content-Type"] = "application/json; charset=UTF-8"
elif isinstance(data, bytes):
chunk = data
else:
raise RuntimeError('unexpected type of chunk')
return chunk


async def core_middleware(request: Request, call_next):
request.state.start_time = time.time()
request.state.body_bytes = await request.body()

request_id = request.headers.get('X-Request-Id') or FrontikApplication.next_request_id()
if options.validate_request_id:
check_request_id(request_id)

with request_context.request_context(request, request_id):
route = normal_routes.get((request.url.path, request.method))
if route is None:
return await call_next(request) # если в нормальных не нашли, пусть фолбэчнется на регекс роутер

page_cls = route[1] # from normal router
request.state.path_params = {}
setup_page_handler(request, page_cls)

_call_next = route[0].get_route_handler()
response = await process_request(request, _call_next)
return response


async def regex_router_fallback(request: Request, _):
for pattern, route, cls in regex_mapping:
route: APIRoute
match = pattern.match(request.url.path)
if match and next(iter(route.methods), None) == request.method:
request.state.path_params = match.groupdict()
setup_page_handler(request, cls)
call_next = route.get_route_handler()
response = await process_request(request, call_next)
return response

rid = request_context.get_request_id()
status, headers, content = build_error_data(rid, 404, 'Not Found')
return Response(status_code=status, headers=headers, content=content)


@_core_router.get('/version', cls=PageHandler)
async def get_version(self=get_current_handler):
self.set_header('Content-Type', 'text/xml')
self.finish(
etree.tostring(get_frontik_and_apps_versions(self.application), encoding='utf-8', xml_declaration=True),
)


@_core_router.get('/status', cls=PageHandler)
async def get_status(self=get_current_handler):
self.set_header('Content-Type', media_types.APPLICATION_JSON)
self.finish(self.application.get_current_status())


class PydevdHandler(RequestHandler):
Expand All @@ -60,8 +148,8 @@ def get(self):
return

try:
debugger_ip = self.get_argument('debugger_ip', self.request.remote_ip)
debugger_port = self.get_argument('debugger_port', '32223')
debugger_ip = self.get_query_argument('debugger_ip', self.request.remote_ip)
debugger_port = self.get_query_argument('debugger_port', '32223')
self.settrace(debugger_ip, int(debugger_port))
self.trace_page(debugger_ip, debugger_port)

Expand All @@ -86,7 +174,7 @@ def error_page(self) -> None:
self.finish(traceback.format_exc())


class FrontikApplication(Application):
class FrontikApplication:
request_id = ''

class DefaultConfig:
Expand All @@ -107,15 +195,6 @@ def __init__(self, app_root: str, **settings: Any) -> None:
self.available_integrations: list[integrations.Integration] = []
self.tornado_http_client: Optional[AIOHttpClientWrapper] = None
self.http_client_factory: HttpClientFactory
self.router = FrontikRouter(self)

core_handlers: list[Any] = [
(r'/version/?', VersionHandler),
(r'/status/?', StatusHandler),
(r'.*', self.router),
]
if options.debug:
core_handlers.insert(0, (r'/pydevd/?', PydevdHandler))

self.statsd_client: Union[StatsDClient, StatsDClientStub] = create_statsd_client(options, self)

Expand All @@ -126,8 +205,6 @@ def __init__(self, app_root: str, **settings: Any) -> None:

self.returned_value_handlers: ReturnedValueHandlers = get_default_returned_value_handlers()

super().__init__(core_handlers)

def create_upstream_manager(
self,
upstreams: dict[str, Upstream],
Expand All @@ -146,8 +223,6 @@ def create_upstream_manager(
self.upstream_manager.send_updates() # initial full state sending

async def init(self) -> None:
self.transforms.insert(0, partial(DebugTransform, self)) # type: ignore

self.available_integrations, integration_futures = integrations.load_integrations(self)
await asyncio.gather(*[future for future in integration_futures if future])

Expand Down Expand Up @@ -182,36 +257,6 @@ async def init(self) -> None:
if self.worker_state.single_worker_mode:
self.worker_state.master_done.value = True

def find_handler(self, request, **kwargs):
request_id = request.headers.get('X-Request-Id')
if request_id is None:
request_id = FrontikApplication.next_request_id()
if options.validate_request_id:
check_request_id(request_id)

def wrapped_in_context(func: Callable) -> Callable:
def wrapper(*args, **kwargs):
with request_context.request_context(request, request_id):
return func(*args, **kwargs)

return wrapper

delegate: httputil.HTTPMessageDelegate = wrapped_in_context(super().find_handler)(request, **kwargs)
delegate.headers_received = wrapped_in_context(delegate.headers_received) # type: ignore
delegate.data_received = wrapped_in_context(delegate.data_received) # type: ignore
delegate.finish = wrapped_in_context(delegate.finish) # type: ignore
delegate.on_connection_close = wrapped_in_context(delegate.on_connection_close) # type: ignore

return delegate

def reverse_url(self, name: str, *args: Any, **kwargs: Any) -> str:
return self.router.reverse_url(name, *args, **kwargs)

def application_urls(self) -> list[tuple]:
return [('', FileMappingRouter(importlib.import_module(f'{self.app_module}.pages')))]

def application_404_handler(self, request: HTTPServerRequest) -> tuple[type[PageHandler], dict]:
return ErrorHandler, {'status_code': 404}

def application_config(self) -> DefaultConfig:
return FrontikApplication.DefaultConfig()
Expand Down Expand Up @@ -248,19 +293,15 @@ def get_current_status(self) -> dict[str, str]:

return {'uptime': uptime_value, 'datacenter': http_client_options.datacenter}

def log_request(self, handler):
if not options.log_json:
super().log_request(handler)
return

request_time = int(1000.0 * handler.request.request_time())
def log_request(self, handler, request: Request):
request_time = int(1000.0 * (time.time() - handler.request_start_time))
extra = {
'ip': handler.request.remote_ip,
'ip': request.client.host,
'rid': request_context.get_request_id(),
'status': handler.get_status(),
'time': request_time,
'method': handler.request.method,
'uri': handler.request.uri,
'method': request.method,
'uri': str(request.url),
}

handler_name = request_context.get_handler_name()
Expand All @@ -271,3 +312,76 @@ def log_request(self, handler):

def get_kafka_producer(self, producer_name: str) -> Optional[AIOKafkaProducer]: # pragma: no cover
pass


async def process_request(request, call_next):
handler = request.state.handler
status = 200
headers = {}
content = None

try:
request_context.set_handler(handler)

handler.prepare()
handler.stages_logger.commit_stage('prepare')
_response = await call_next(request)

handler._handler_finished_notification()
await handler.finish_group.get_gathering_future()
await handler.finish_group.get_finish_future()
handler.stages_logger.commit_stage('page')

render_result = await handler._postprocess()
handler.stages_logger.commit_stage('postprocess')

headers = handler.resp_headers
status = handler.get_status()

debug_transform = DebugTransform(request.app.frontik_app, request)
if debug_transform.is_enabled():
chunk = _data_to_chunk(render_result, headers)
status, headers, render_result = debug_transform.transform_chunk(status, headers, chunk)

content = render_result

except FinishPageSignal as finish_ex:
handler._handler_finished_notification()
headers = handler.resp_headers
chunk = _data_to_chunk(finish_ex.data, headers)
status = handler.get_status()
content = chunk

except RedirectPageSignal as redirect_ex:
handler._handler_finished_notification()
headers = handler.resp_headers
url = redirect_ex.url
status = redirect_ex.status
headers["Location"] = url.encode('utf-8')

except Exception as ex:
try:
status, headers, content = await handler._handle_request_exception(ex)
except Exception as exc:
app_logger.exception(f'request processing has failed')
status, headers, content = build_error_data(handler.request_id)

finally:
handler.cleanup()

if status in (204, 304) or (100 <= status < 200):
for h in ('Content-Encoding', 'Content-Language', 'Content-Type'):
if h in headers:
headers.pop(h)
content = None

response = Response(status_code=status, headers=headers, content=content)

for key, values in handler.resp_cookies.items():
response.set_cookie(key, **values)

handler.finish_group.abort()
request.app.frontik_app.log_request(handler, request)
handler.on_finish(status)

return response
Loading

0 comments on commit 117b154

Please sign in to comment.