Skip to content

Commit

Permalink
HH-220317 get tornado back
Browse files Browse the repository at this point in the history
  • Loading branch information
712u3 committed Jul 1, 2024
1 parent c3782c0 commit d47be3a
Show file tree
Hide file tree
Showing 72 changed files with 1,605 additions and 1,159 deletions.
66 changes: 61 additions & 5 deletions frontik/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,52 @@
from collections.abc import Callable
from ctypes import c_bool, c_int
from threading import Lock
from typing import Optional, Union
from typing import Awaitable, Optional, Union

from aiokafka import AIOKafkaProducer
from fastapi import FastAPI, HTTPException
from http_client import AIOHttpClientWrapper, HttpClientFactory
from http_client import options as http_client_options
from http_client.balancing import RequestBalancerBuilder, Upstream
from lxml import etree
from tornado import httputil

import frontik.producers.json_producer
import frontik.producers.xml_producer
from frontik import integrations, media_types
from frontik.debug import get_frontik_and_apps_versions
from frontik.handler import PageHandler, get_current_handler
from frontik.handler_asgi import execute_page
from frontik.integrations.statsd import StatsDClient, StatsDClientStub, create_statsd_client
from frontik.options import options
from frontik.process import WorkerState
from frontik.routing import router
from frontik.routing import import_all_pages, router
from frontik.service_discovery import UpstreamManager
from frontik.util import check_request_id, generate_uniq_timestamp_request_id

app_logger = logging.getLogger('app_logger')


class AsgiRouter:
async def __call__(self, scope, receive, send):
assert scope['type'] == 'http'

if 'router' not in scope:
scope['router'] = self

route = scope['route']
scope['endpoint'] = route.endpoint

await route.handle(scope, receive, send)


class FrontikAsgiApp(FastAPI):
def __init__(self) -> None:
super().__init__()
self.router = AsgiRouter() # type: ignore
self.http_client = None


@router.get('/version', cls=PageHandler)
async def get_version(handler: PageHandler = get_current_handler()) -> None:
handler.set_header('Content-Type', 'text/xml')
Expand All @@ -53,10 +76,8 @@ class DefaultConfig:
def __init__(self, app_module_name: Optional[str] = None) -> None:
self.start_time = time.time()

self.fastapi_app = FastAPI()

self.app_module_name: Optional[str] = app_module_name
if app_module_name is None:
if app_module_name is None: # for tests
app_module = importlib.import_module(self.__class__.__module__)
else:
app_module = importlib.import_module(app_module_name)
Expand All @@ -79,6 +100,38 @@ def __init__(self, app_module_name: Optional[str] = None) -> None:
count_down_lock = multiprocessing.Lock()
self.worker_state = WorkerState(init_workers_count_down, master_done, count_down_lock) # type: ignore

import_all_pages(app_module_name)

self.ui_methods: dict = {}
self.ui_modules: dict = {}
self.settings: dict = {}

self.app = FrontikAsgiApp()

def __call__(self, tornado_request: httputil.HTTPServerRequest) -> Optional[Awaitable[None]]:
# for making more asgi, reimplement tornado.http1connection._server_request_loop and ._read_message
request_id = tornado_request.headers.get('X-Request-Id') or generate_uniq_timestamp_request_id()
if options.validate_request_id:
check_request_id(request_id)

async def _serve_tornado_request(
frontik_app: FrontikApplication,
_tornado_request: httputil.HTTPServerRequest,
_request_id: str,
app: FrontikAsgiApp,
) -> None:
status, reason, headers, data = await execute_page(frontik_app, _tornado_request, _request_id, app)

assert _tornado_request.connection is not None
_tornado_request.connection.set_close_callback(None) # type: ignore

start_line = httputil.ResponseStartLine('', status, reason)
future = _tornado_request.connection.write_headers(start_line, headers, data)
_tornado_request.connection.finish()
return await future

return asyncio.create_task(_serve_tornado_request(self, tornado_request, request_id, self.app))

def create_upstream_manager(
self,
upstreams: dict[str, Upstream],
Expand Down Expand Up @@ -164,3 +217,6 @@ def get_current_status(self) -> dict[str, str]:

def get_kafka_producer(self, producer_name: str) -> Optional[AIOKafkaProducer]: # pragma: no cover
pass

def log_request(self, tornado_handler: PageHandler) -> None:
pass
43 changes: 28 additions & 15 deletions frontik/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
from tornado.escape import to_unicode
from tornado.web import Finish

from frontik.options import options

if TYPE_CHECKING:
from tornado import httputil

from frontik.handler import PageHandler

DEBUG_AUTH_HEADER_NAME = 'Frontik-Debug-Auth'
Expand All @@ -17,8 +21,8 @@ class DebugUnauthorizedError(Finish):
pass


def passed_basic_auth(handler: PageHandler, login: Optional[str], passwd: Optional[str]) -> bool:
auth_header = handler.get_header('Authorization')
def passed_basic_auth(tornado_request: httputil.HTTPServerRequest, login: Optional[str], passwd: Optional[str]) -> bool:
auth_header = tornado_request.headers.get('Authorization')
if auth_header and auth_header.startswith('Basic '):
method, auth_b64 = auth_header.split(' ')
try:
Expand All @@ -30,21 +34,30 @@ def passed_basic_auth(handler: PageHandler, login: Optional[str], passwd: Option
return False


def check_debug_auth(handler: PageHandler, login: Optional[str], password: Optional[str]) -> None:
"""
:type handler: tornado.web.RequestHandler
:return: None or tuple(http_code, headers)
"""
debug_auth_header = handler.get_header(DEBUG_AUTH_HEADER_NAME)
def check_debug_auth(
tornado_request: httputil.HTTPServerRequest, login: Optional[str], password: Optional[str]
) -> Optional[str]:
debug_auth_header = tornado_request.headers.get(DEBUG_AUTH_HEADER_NAME)
if debug_auth_header is not None:
debug_access = debug_auth_header == f'{login}:{password}'
if not debug_access:
handler.set_header('WWW-Authenticate', f'{DEBUG_AUTH_HEADER_NAME}-Header realm="Secure Area"')
handler.set_status(http.client.UNAUTHORIZED)
handler.finish()
return f'{DEBUG_AUTH_HEADER_NAME}-Header realm="Secure Area"'
else:
debug_access = passed_basic_auth(handler, login, password)
debug_access = passed_basic_auth(tornado_request, login, password)
if not debug_access:
handler.set_header('WWW-Authenticate', 'Basic realm="Secure Area"')
handler.set_status(http.client.UNAUTHORIZED)
handler.finish()
return 'Basic realm="Secure Area"'
return None


def check_debug_auth_or_finish(
handler: PageHandler, login: Optional[str] = None, password: Optional[str] = None
) -> None:
if options.debug:
return
login = login or options.debug_login
password = password or options.debug_password
fail_header = check_debug_auth(handler.request, login, password)
if fail_header:
handler.set_header('WWW-Authenticate', fail_header)
handler.set_status(http.client.UNAUTHORIZED)
handler.finish()
57 changes: 57 additions & 0 deletions frontik/balancing_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import time
from functools import partial
from typing import Annotated

from fastapi import Depends, Request
from http_client import HttpClient, RequestBuilder
from http_client.request_response import USER_AGENT_HEADER

from frontik import request_context
from frontik.auth import DEBUG_AUTH_HEADER_NAME
from frontik.debug import DEBUG_HEADER_NAME
from frontik.timeout_tracking import get_timeout_checker
from frontik.util import make_url

OUTER_TIMEOUT_MS_HEADER = 'X-Outer-Timeout-Ms'


def modify_http_client_request(request: Request, balanced_request: RequestBuilder) -> None:
balanced_request.headers['x-request-id'] = request_context.get_request_id()
balanced_request.headers[OUTER_TIMEOUT_MS_HEADER] = f'{balanced_request.request_timeout * 1000:.0f}'

outer_timeout = request.headers.get(OUTER_TIMEOUT_MS_HEADER.lower())
if outer_timeout:
timeout_checker = get_timeout_checker(
request.headers.get(USER_AGENT_HEADER.lower()),
float(outer_timeout),
request['start_time'],
)
timeout_checker.check(balanced_request)

if request['pass_debug']:
balanced_request.headers[DEBUG_HEADER_NAME] = 'true'

# debug_timestamp is added to avoid caching of debug responses
balanced_request.path = make_url(balanced_request.path, debug_timestamp=int(time.time()))

for header_name in ('Authorization', DEBUG_AUTH_HEADER_NAME):
authorization = request.headers.get(header_name.lower())
if authorization is not None:
balanced_request.headers[header_name] = authorization


def get_http_client(modify_request_hook=None):
def _get_http_client(request: Request) -> HttpClient:
hook = modify_request_hook or partial(modify_http_client_request, request)

http_client = request['http_client_factory'].get_http_client(
modify_http_request_hook=hook,
debug_enabled=request['debug_enabled'],
)

return http_client

return _get_http_client


HttpClientT = Annotated[HttpClient, Depends(get_http_client())]
Loading

0 comments on commit d47be3a

Please sign in to comment.