Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HH-212492 use fastapi server/routing #695

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
264 changes: 189 additions & 75 deletions frontik/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
from functools import partial
from threading import Lock
from typing import Any, Optional, Union
import json
import inspect
import re

from aiokafka import AIOKafkaProducer
from http_client import AIOHttpClientWrapper, HttpClientFactory
Expand All @@ -24,33 +27,118 @@
import frontik.producers.xml_producer
from frontik import integrations, media_types, request_context
from frontik.debug import DebugTransform, get_frontik_and_apps_versions
from frontik.handler import ErrorHandler, PageHandler
from frontik.handler import PageHandler, FinishPageSignal, RedirectPageSignal, build_error_data
from frontik.handler_return_values import ReturnedValueHandlers, get_default_returned_value_handlers
from frontik.integrations.statsd import StatsDClient, StatsDClientStub, create_statsd_client
from frontik.loggers import CUSTOM_JSON_EXTRA, JSON_REQUESTS_LOGGER
from frontik.options import options
from frontik.process import WorkerState
from frontik.routing import FileMappingRouter, FrontikRouter
from frontik.routing import routers, normal_routes, regex_mapping, FrontikRouter, FrontikRegexRouter
from frontik.service_discovery import UpstreamManager
from frontik.util import check_request_id, generate_uniq_timestamp_request_id

app_logger = logging.getLogger('http_client')


class VersionHandler(RequestHandler):
def get(self):
self.application: FrontikApplication
self.set_header('Content-Type', 'text/xml')
self.write(
etree.tostring(get_frontik_and_apps_versions(self.application), encoding='utf-8', xml_declaration=True),
)


class StatusHandler(RequestHandler):
def get(self):
self.application: FrontikApplication
self.set_header('Content-Type', media_types.APPLICATION_JSON)
self.finish(self.application.get_current_status())
from fastapi import FastAPI, APIRouter, Request
from fastapi.routing import APIRoute
import pkgutil
from http_client import HttpClient
from starlette.middleware.base import Response
from fastapi import Depends
import os
from inspect import ismodule
from starlette.datastructures import MutableHeaders
from frontik.json_builder import json_decode
from frontik.handler import get_current_handler

app_logger = logging.getLogger('app_logger')

_core_router = FrontikRouter()
router = FrontikRouter()
regex_router = FrontikRegexRouter()
routers.extend((_core_router, router, regex_router))


def setup_page_handler(request: Request, cls: type(PageHandler)):
# create legacy PageHandler and put to request
handler = cls(
request.app.frontik_app,
request.query_params,
request.cookies,
request.headers,
request.state.body_bytes,
request.state.start_time,
request.url.path,
request.state.path_params,
request.client.host,
request.method,
)

request.state.handler = handler
return handler


def _data_to_chunk(data, headers) -> bytes:
if isinstance(data, str):
chunk = data.encode("utf-8")
elif isinstance(data, dict):
chunk = json.dumps(data).replace("</", "<\\/")
chunk = chunk.encode("utf-8")
headers["Content-Type"] = "application/json; charset=UTF-8"
elif isinstance(data, bytes):
chunk = data
else:
raise RuntimeError('unexpected type of chunk')
return chunk


async def core_middleware(request: Request, call_next):
request.state.start_time = time.time()
request.state.body_bytes = await request.body()
bokshitsky marked this conversation as resolved.
Show resolved Hide resolved

request_id = request.headers.get('X-Request-Id') or FrontikApplication.next_request_id()
if options.validate_request_id:
check_request_id(request_id)

with request_context.request_context(request, request_id):
route = normal_routes.get((request.url.path, request.method))
if route is None:
return await call_next(request) # если в нормальных не нашли, пусть фолбэчнется на регекс роутер

page_cls = route[1] # from normal router
request.state.path_params = {}
setup_page_handler(request, page_cls)

_call_next = route[0].get_route_handler()
response = await process_request(request, _call_next)
return response


async def regex_router_fallback(request: Request, _):
for pattern, route, cls in regex_mapping:
route: APIRoute
match = pattern.match(request.url.path)
if match and next(iter(route.methods), None) == request.method:
request.state.path_params = match.groupdict()
setup_page_handler(request, cls)
call_next = route.get_route_handler()
response = await process_request(request, call_next)
return response

rid = request_context.get_request_id()
status, headers, content = build_error_data(rid, 404, 'Not Found')
return Response(status_code=status, headers=headers, content=content)


@_core_router.get('/version', cls=PageHandler)
async def get_version(self=get_current_handler):
self.set_header('Content-Type', 'text/xml')
self.finish(
etree.tostring(get_frontik_and_apps_versions(self.application), encoding='utf-8', xml_declaration=True),
)


@_core_router.get('/status', cls=PageHandler)
async def get_status(self=get_current_handler):
self.set_header('Content-Type', media_types.APPLICATION_JSON)
self.finish(self.application.get_current_status())


class PydevdHandler(RequestHandler):
Expand All @@ -60,8 +148,8 @@ def get(self):
return

try:
debugger_ip = self.get_argument('debugger_ip', self.request.remote_ip)
debugger_port = self.get_argument('debugger_port', '32223')
debugger_ip = self.get_query_argument('debugger_ip', self.request.remote_ip)
debugger_port = self.get_query_argument('debugger_port', '32223')
self.settrace(debugger_ip, int(debugger_port))
self.trace_page(debugger_ip, debugger_port)

Expand All @@ -86,7 +174,7 @@ def error_page(self) -> None:
self.finish(traceback.format_exc())


class FrontikApplication(Application):
class FrontikApplication:
request_id = ''

class DefaultConfig:
Expand All @@ -107,15 +195,6 @@ def __init__(self, app_root: str, **settings: Any) -> None:
self.available_integrations: list[integrations.Integration] = []
self.tornado_http_client: Optional[AIOHttpClientWrapper] = None
self.http_client_factory: HttpClientFactory
self.router = FrontikRouter(self)

core_handlers: list[Any] = [
(r'/version/?', VersionHandler),
(r'/status/?', StatusHandler),
(r'.*', self.router),
]
if options.debug:
core_handlers.insert(0, (r'/pydevd/?', PydevdHandler))

self.statsd_client: Union[StatsDClient, StatsDClientStub] = create_statsd_client(options, self)

Expand All @@ -126,8 +205,6 @@ def __init__(self, app_root: str, **settings: Any) -> None:

self.returned_value_handlers: ReturnedValueHandlers = get_default_returned_value_handlers()

super().__init__(core_handlers)

def create_upstream_manager(
self,
upstreams: dict[str, Upstream],
Expand All @@ -146,8 +223,6 @@ def create_upstream_manager(
self.upstream_manager.send_updates() # initial full state sending

async def init(self) -> None:
self.transforms.insert(0, partial(DebugTransform, self)) # type: ignore

self.available_integrations, integration_futures = integrations.load_integrations(self)
await asyncio.gather(*[future for future in integration_futures if future])

Expand Down Expand Up @@ -182,36 +257,6 @@ async def init(self) -> None:
if self.worker_state.single_worker_mode:
self.worker_state.master_done.value = True

def find_handler(self, request, **kwargs):
request_id = request.headers.get('X-Request-Id')
if request_id is None:
request_id = FrontikApplication.next_request_id()
if options.validate_request_id:
check_request_id(request_id)

def wrapped_in_context(func: Callable) -> Callable:
def wrapper(*args, **kwargs):
with request_context.request_context(request, request_id):
return func(*args, **kwargs)

return wrapper

delegate: httputil.HTTPMessageDelegate = wrapped_in_context(super().find_handler)(request, **kwargs)
delegate.headers_received = wrapped_in_context(delegate.headers_received) # type: ignore
delegate.data_received = wrapped_in_context(delegate.data_received) # type: ignore
delegate.finish = wrapped_in_context(delegate.finish) # type: ignore
delegate.on_connection_close = wrapped_in_context(delegate.on_connection_close) # type: ignore

return delegate

def reverse_url(self, name: str, *args: Any, **kwargs: Any) -> str:
return self.router.reverse_url(name, *args, **kwargs)

def application_urls(self) -> list[tuple]:
return [('', FileMappingRouter(importlib.import_module(f'{self.app_module}.pages')))]

def application_404_handler(self, request: HTTPServerRequest) -> tuple[type[PageHandler], dict]:
return ErrorHandler, {'status_code': 404}

def application_config(self) -> DefaultConfig:
return FrontikApplication.DefaultConfig()
Expand Down Expand Up @@ -248,19 +293,15 @@ def get_current_status(self) -> dict[str, str]:

return {'uptime': uptime_value, 'datacenter': http_client_options.datacenter}

def log_request(self, handler):
if not options.log_json:
super().log_request(handler)
return

request_time = int(1000.0 * handler.request.request_time())
def log_request(self, handler, request: Request):
request_time = int(1000.0 * (time.time() - handler.request_start_time))
extra = {
'ip': handler.request.remote_ip,
'ip': request.client.host,
'rid': request_context.get_request_id(),
'status': handler.get_status(),
'time': request_time,
'method': handler.request.method,
'uri': handler.request.uri,
'method': request.method,
'uri': str(request.url),
}

handler_name = request_context.get_handler_name()
Expand All @@ -271,3 +312,76 @@ def log_request(self, handler):

def get_kafka_producer(self, producer_name: str) -> Optional[AIOKafkaProducer]: # pragma: no cover
pass


async def process_request(request, call_next):
handler = request.state.handler
status = 200
headers = {}
content = None

try:
request_context.set_handler(handler)

handler.prepare()
handler.stages_logger.commit_stage('prepare')
_response = await call_next(request)

handler._handler_finished_notification()
await handler.finish_group.get_gathering_future()
await handler.finish_group.get_finish_future()
handler.stages_logger.commit_stage('page')

render_result = await handler._postprocess()
handler.stages_logger.commit_stage('postprocess')

headers = handler.resp_headers
status = handler.get_status()

debug_transform = DebugTransform(request.app.frontik_app, request)
if debug_transform.is_enabled():
chunk = _data_to_chunk(render_result, headers)
status, headers, render_result = debug_transform.transform_chunk(status, headers, chunk)

content = render_result

except FinishPageSignal as finish_ex:
handler._handler_finished_notification()
headers = handler.resp_headers
chunk = _data_to_chunk(finish_ex.data, headers)
status = handler.get_status()
content = chunk

except RedirectPageSignal as redirect_ex:
handler._handler_finished_notification()
headers = handler.resp_headers
url = redirect_ex.url
status = redirect_ex.status
headers["Location"] = url.encode('utf-8')

except Exception as ex:
try:
status, headers, content = await handler._handle_request_exception(ex)
except Exception as exc:
app_logger.exception(f'request processing has failed')
status, headers, content = build_error_data(handler.request_id)

finally:
handler.cleanup()

if status in (204, 304) or (100 <= status < 200):
for h in ('Content-Encoding', 'Content-Language', 'Content-Type'):
if h in headers:
headers.pop(h)
content = None

response = Response(status_code=status, headers=headers, content=content)

for key, values in handler.resp_cookies.items():
response.set_cookie(key, **values)

handler.finish_group.abort()
request.app.frontik_app.log_request(handler, request)
handler.on_finish(status)

return response
Loading
Loading