Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HH-220317 do things #718

Merged
merged 1 commit into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 61 additions & 5 deletions frontik/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,52 @@
from collections.abc import Callable
from ctypes import c_bool, c_int
from threading import Lock
from typing import Optional, Union
from typing import Awaitable, Optional, Union

from aiokafka import AIOKafkaProducer
from fastapi import FastAPI, HTTPException
from http_client import AIOHttpClientWrapper, HttpClientFactory
from http_client import options as http_client_options
from http_client.balancing import RequestBalancerBuilder, Upstream
from lxml import etree
from tornado import httputil

import frontik.producers.json_producer
import frontik.producers.xml_producer
from frontik import integrations, media_types
from frontik.debug import get_frontik_and_apps_versions
from frontik.handler import PageHandler, get_current_handler
from frontik.handler_asgi import execute_page
from frontik.integrations.statsd import StatsDClient, StatsDClientStub, create_statsd_client
from frontik.options import options
from frontik.process import WorkerState
from frontik.routing import router
from frontik.routing import import_all_pages, router
from frontik.service_discovery import UpstreamManager
from frontik.util import check_request_id, generate_uniq_timestamp_request_id

app_logger = logging.getLogger('app_logger')


class AsgiRouter:
async def __call__(self, scope, receive, send):
assert scope['type'] == 'http'

if 'router' not in scope:
scope['router'] = self

route = scope['route']
scope['endpoint'] = route.endpoint

await route.handle(scope, receive, send)


class FrontikAsgiApp(FastAPI):
def __init__(self) -> None:
super().__init__()
self.router = AsgiRouter() # type: ignore
self.http_client = None


@router.get('/version', cls=PageHandler)
async def get_version(handler: PageHandler = get_current_handler()) -> None:
handler.set_header('Content-Type', 'text/xml')
Expand All @@ -53,10 +76,8 @@ class DefaultConfig:
def __init__(self, app_module_name: Optional[str] = None) -> None:
self.start_time = time.time()

self.fastapi_app = FastAPI()

self.app_module_name: Optional[str] = app_module_name
if app_module_name is None:
if app_module_name is None: # for tests
app_module = importlib.import_module(self.__class__.__module__)
else:
app_module = importlib.import_module(app_module_name)
Expand All @@ -79,6 +100,38 @@ def __init__(self, app_module_name: Optional[str] = None) -> None:
count_down_lock = multiprocessing.Lock()
self.worker_state = WorkerState(init_workers_count_down, master_done, count_down_lock) # type: ignore

import_all_pages(app_module_name)

self.ui_methods: dict = {}
self.ui_modules: dict = {}
self.settings: dict = {}

self.app = FrontikAsgiApp()

def __call__(self, tornado_request: httputil.HTTPServerRequest) -> Optional[Awaitable[None]]:
# for making more asgi, reimplement tornado.http1connection._server_request_loop and ._read_message
request_id = tornado_request.headers.get('X-Request-Id') or generate_uniq_timestamp_request_id()
if options.validate_request_id:
check_request_id(request_id)

async def _serve_tornado_request(
frontik_app: FrontikApplication,
_tornado_request: httputil.HTTPServerRequest,
_request_id: str,
app: FrontikAsgiApp,
) -> None:
status, reason, headers, data = await execute_page(frontik_app, _tornado_request, _request_id, app)

assert _tornado_request.connection is not None
_tornado_request.connection.set_close_callback(None) # type: ignore

start_line = httputil.ResponseStartLine('', status, reason)
future = _tornado_request.connection.write_headers(start_line, headers, data)
_tornado_request.connection.finish()
return await future

return asyncio.create_task(_serve_tornado_request(self, tornado_request, request_id, self.app))

def create_upstream_manager(
self,
upstreams: dict[str, Upstream],
Expand Down Expand Up @@ -164,3 +217,6 @@ def get_current_status(self) -> dict[str, str]:

def get_kafka_producer(self, producer_name: str) -> Optional[AIOKafkaProducer]: # pragma: no cover
pass

def log_request(self, tornado_handler: PageHandler) -> None:
pass
43 changes: 28 additions & 15 deletions frontik/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
from tornado.escape import to_unicode
from tornado.web import Finish

from frontik.options import options

if TYPE_CHECKING:
from tornado import httputil

from frontik.handler import PageHandler

DEBUG_AUTH_HEADER_NAME = 'Frontik-Debug-Auth'
Expand All @@ -17,8 +21,8 @@ class DebugUnauthorizedError(Finish):
pass


def passed_basic_auth(handler: PageHandler, login: Optional[str], passwd: Optional[str]) -> bool:
auth_header = handler.get_header('Authorization')
def passed_basic_auth(tornado_request: httputil.HTTPServerRequest, login: Optional[str], passwd: Optional[str]) -> bool:
auth_header = tornado_request.headers.get('Authorization')
if auth_header and auth_header.startswith('Basic '):
method, auth_b64 = auth_header.split(' ')
try:
Expand All @@ -30,21 +34,30 @@ def passed_basic_auth(handler: PageHandler, login: Optional[str], passwd: Option
return False


def check_debug_auth(handler: PageHandler, login: Optional[str], password: Optional[str]) -> None:
"""
:type handler: tornado.web.RequestHandler
:return: None or tuple(http_code, headers)
"""
debug_auth_header = handler.get_header(DEBUG_AUTH_HEADER_NAME)
def check_debug_auth(
tornado_request: httputil.HTTPServerRequest, login: Optional[str], password: Optional[str]
) -> Optional[str]:
debug_auth_header = tornado_request.headers.get(DEBUG_AUTH_HEADER_NAME)
if debug_auth_header is not None:
debug_access = debug_auth_header == f'{login}:{password}'
if not debug_access:
handler.set_header('WWW-Authenticate', f'{DEBUG_AUTH_HEADER_NAME}-Header realm="Secure Area"')
handler.set_status(http.client.UNAUTHORIZED)
handler.finish()
return f'{DEBUG_AUTH_HEADER_NAME}-Header realm="Secure Area"'
else:
debug_access = passed_basic_auth(handler, login, password)
debug_access = passed_basic_auth(tornado_request, login, password)
if not debug_access:
handler.set_header('WWW-Authenticate', 'Basic realm="Secure Area"')
handler.set_status(http.client.UNAUTHORIZED)
handler.finish()
return 'Basic realm="Secure Area"'
return None


def check_debug_auth_or_finish(
handler: PageHandler, login: Optional[str] = None, password: Optional[str] = None
) -> None:
if options.debug:
return
login = login or options.debug_login
password = password or options.debug_password
fail_header = check_debug_auth(handler.request, login, password)
if fail_header:
handler.set_header('WWW-Authenticate', fail_header)
handler.set_status(http.client.UNAUTHORIZED)
handler.finish()
57 changes: 57 additions & 0 deletions frontik/balancing_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import time
from functools import partial
from typing import Annotated

from fastapi import Depends, Request
from http_client import HttpClient, RequestBuilder
from http_client.request_response import USER_AGENT_HEADER

from frontik import request_context
from frontik.auth import DEBUG_AUTH_HEADER_NAME
from frontik.debug import DEBUG_HEADER_NAME
from frontik.timeout_tracking import get_timeout_checker
from frontik.util import make_url

OUTER_TIMEOUT_MS_HEADER = 'X-Outer-Timeout-Ms'


def modify_http_client_request(request: Request, balanced_request: RequestBuilder) -> None:
balanced_request.headers['x-request-id'] = request_context.get_request_id()
balanced_request.headers[OUTER_TIMEOUT_MS_HEADER] = f'{balanced_request.request_timeout * 1000:.0f}'

outer_timeout = request.headers.get(OUTER_TIMEOUT_MS_HEADER.lower())
if outer_timeout:
timeout_checker = get_timeout_checker(
request.headers.get(USER_AGENT_HEADER.lower()),
float(outer_timeout),
request['start_time'],
)
timeout_checker.check(balanced_request)

if request['pass_debug']:
balanced_request.headers[DEBUG_HEADER_NAME] = 'true'

# debug_timestamp is added to avoid caching of debug responses
balanced_request.path = make_url(balanced_request.path, debug_timestamp=int(time.time()))

for header_name in ('Authorization', DEBUG_AUTH_HEADER_NAME):
authorization = request.headers.get(header_name.lower())
if authorization is not None:
balanced_request.headers[header_name] = authorization


def get_http_client(modify_request_hook=None):
def _get_http_client(request: Request) -> HttpClient:
hook = modify_request_hook or partial(modify_http_client_request, request)

http_client = request['http_client_factory'].get_http_client(
modify_http_request_hook=hook,
debug_enabled=request['debug_enabled'],
)

return http_client

return _get_http_client


HttpClientT = Annotated[HttpClient, Depends(get_http_client())]
Loading
Loading