Skip to content

Commit

Permalink
HH-221505 small fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
712u3 committed Jul 30, 2024
1 parent cd9bfd3 commit 04b84ac
Show file tree
Hide file tree
Showing 31 changed files with 497 additions and 397 deletions.
25 changes: 13 additions & 12 deletions frontik/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from frontik.debug import get_frontik_and_apps_versions
from frontik.handler import PageHandler, get_current_handler
from frontik.handler_asgi import execute_page
from frontik.handler_return_values import ReturnedValueHandlers, get_default_returned_value_handlers
from frontik.integrations.statsd import StatsDClient, StatsDClientStub, create_statsd_client
from frontik.options import options
from frontik.process import WorkerState
Expand Down Expand Up @@ -73,16 +74,15 @@ class FrontikApplication:
class DefaultConfig:
pass

def __init__(self, app_module_name: Optional[str] = None) -> None:
def __init__(self) -> None:
self.start_time = time.time()

self.app_module_name: Optional[str] = app_module_name
if app_module_name is None: # for tests
app_module = importlib.import_module(self.__class__.__module__)
else:
app_module = importlib.import_module(app_module_name)
self.app_module_name: str = self.__class__.__module__
app_module = importlib.import_module(self.app_module_name)
self.app_root = os.path.dirname(str(app_module.__file__))
self.app_name = app_module.__name__
if options.service_name is None:
options.service_name = self.app_module_name.rsplit('.', 1)[-1]
self.app_name = options.service_name

self.config = self.application_config()

Expand All @@ -99,14 +99,15 @@ def __init__(self, app_module_name: Optional[str] = None) -> None:
master_done = multiprocessing.Value(c_bool, False)
count_down_lock = multiprocessing.Lock()
self.worker_state = WorkerState(init_workers_count_down, master_done, count_down_lock) # type: ignore
self.returned_value_handlers: ReturnedValueHandlers = get_default_returned_value_handlers()

import_all_pages(app_module_name)
import_all_pages(self.app_module_name)

self.ui_methods: dict = {}
self.ui_modules: dict = {}
self.settings: dict = {}

self.app = FrontikAsgiApp()
self.asgi_app = FrontikAsgiApp()

def __call__(self, tornado_request: httputil.HTTPServerRequest) -> Optional[Awaitable[None]]:
# for making more asgi, reimplement tornado.http1connection._server_request_loop and ._read_message
Expand All @@ -118,9 +119,9 @@ async def _serve_tornado_request(
frontik_app: FrontikApplication,
_tornado_request: httputil.HTTPServerRequest,
_request_id: str,
app: FrontikAsgiApp,
asgi_app: FrontikAsgiApp,
) -> None:
status, reason, headers, data = await execute_page(frontik_app, _tornado_request, _request_id, app)
status, reason, headers, data = await execute_page(frontik_app, _tornado_request, _request_id, asgi_app)

assert _tornado_request.connection is not None
_tornado_request.connection.set_close_callback(None) # type: ignore
Expand All @@ -130,7 +131,7 @@ async def _serve_tornado_request(
_tornado_request.connection.finish()
return await future

return asyncio.create_task(_serve_tornado_request(self, tornado_request, request_id, self.app))
return asyncio.create_task(_serve_tornado_request(self, tornado_request, request_id, self.asgi_app))

def create_upstream_manager(
self,
Expand Down
2 changes: 1 addition & 1 deletion frontik/balancing_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def modify_http_client_request(request: Request, balanced_request: RequestBuilde


def get_http_client(modify_request_hook=None):
def _get_http_client(request: Request) -> HttpClient:
async def _get_http_client(request: Request) -> HttpClient:
hook = modify_request_hook or partial(modify_http_client_request, request)

http_client = request['http_client_factory'].get_http_client(
Expand Down
4 changes: 3 additions & 1 deletion frontik/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,8 @@ def get_frontik_and_apps_versions(application: FrontikApplication) -> etree.Elem
etree.SubElement(versions, 'aiohttp').text = aiohttp.__version__
etree.SubElement(versions, 'python').text = sys.version.replace('\n', '')
etree.SubElement(versions, 'event_loop').text = str(type(asyncio.get_event_loop())).split("'")[1]
etree.SubElement(versions, 'application', name=application.app_name).extend(application.application_version_xml())
etree.SubElement(versions, 'application', name=application.app_module_name).extend(
application.application_version_xml()
)

return versions
72 changes: 49 additions & 23 deletions frontik/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from frontik.json_builder import FrontikJsonDecodeError, json_decode
from frontik.loggers import CUSTOM_JSON_EXTRA, JSON_REQUESTS_LOGGER
from frontik.loggers.stages import StagesLogger
from frontik.options import options
from frontik.timeout_tracking import get_timeout_checker
from frontik.util import gather_dict, make_url
from frontik.validator import BaseValidationModel, Validators
Expand All @@ -45,6 +46,7 @@
from tornado.httputil import HTTPHeaders, HTTPServerRequest

from frontik.app import FrontikApplication
from frontik.handler_return_values import ReturnedValue, ReturnedValueHandlers
from frontik.integrations.statsd import StatsDClient, StatsDClientStub


Expand All @@ -53,6 +55,14 @@ def __init__(self, wait_finish_group: bool = False) -> None:
self.wait_finish_group = wait_finish_group


class RedirectSignal(Exception):
pass


class FinishSignal(Exception):
pass


class HTTPErrorWithPostprocessors(tornado.web.HTTPError):
pass

Expand Down Expand Up @@ -92,6 +102,8 @@ def _fail_fast_policy(fail_fast: bool, waited: bool, host: str, path: str) -> bo


class PageHandler(RequestHandler):
returned_value_handlers: ReturnedValueHandlers = []

def __init__(
self,
application: FrontikApplication,
Expand All @@ -109,6 +121,9 @@ def __init__(
self.route = route
self.debug_mode = debug_mode
self.path_params = path_params
for _name, _value in path_params.items():
if _value:
request.arguments.setdefault(_name, []).append(_value) # type: ignore

super().__init__(application, request) # type: ignore

Expand All @@ -117,6 +132,9 @@ def __init__(
for integration in application.available_integrations:
integration.initialize_handler(self)

if not self.returned_value_handlers:
self.returned_value_handlers = list(application.returned_value_handlers)

self.stages_logger = StagesLogger(request._start_time, self.statsd_client)

self._render_postprocessors: list = []
Expand Down Expand Up @@ -314,7 +332,8 @@ def redirect(self, url: str, *args: Any, allow_protocol_relative: bool = False,
# This is only reachable under certain configurations.
raise tornado.web.HTTPError(403, 'cannot redirect path with two initial slashes')
self.log.info('redirecting to: %s', url)
return super().redirect(url, *args, **kwargs)
super().redirect(url, *args, **kwargs)
raise RedirectSignal()

@property
def json_body(self):
Expand Down Expand Up @@ -346,12 +365,25 @@ def add_future(cls, future: Future, callback: Callable) -> None:

# Requests handling

async def my_execute(self) -> tuple[int, str, HTTPHeaders, bytes]:
async def execute(self) -> tuple[int, str, HTTPHeaders, bytes]:
if (
self.request.method
not in (
'GET',
'HEAD',
'OPTIONS',
)
and options.xsrf_cookies
):
self.check_xsrf_cookie()
await super()._execute([], b'', b'')

try:
await super()._execute([], b'', b'')
except Exception as ex:
self._handle_request_exception(ex)
return await self.handler_result_future # status, reason, headers, chunk
return await asyncio.wait_for(self.handler_result_future, timeout=5.0)
except TimeoutError:
self.log.error('handler was never finished')
self.send_error()
return self.handler_result_future.result()

async def get(self, *args, **kwargs):
await self._execute_page()
Expand All @@ -368,9 +400,6 @@ async def delete(self, *args, **kwargs):
async def head(self, *args, **kwargs):
await self._execute_page()

def options(self, *args, **kwargs):
self.return_405()

async def _execute_page(self) -> None:
self.stages_logger.commit_stage('prepare')

Expand All @@ -388,7 +417,10 @@ async def _execute_page(self) -> None:
raise RuntimeError(f'dependency solving failed: {errors}')

assert self.route.dependant.call is not None
await self.route.dependant.call(**values)
returned_value: ReturnedValue = await self.route.dependant.call(**values)

for returned_value_handler in self.returned_value_handlers:
returned_value_handler(self, returned_value)

self._handler_finished_notification()
await self.finish_group.get_gathering_future()
Expand All @@ -398,12 +430,6 @@ async def _execute_page(self) -> None:
if render_result is not None:
self.write(render_result)

def return_405(self) -> None:
allowed_methods = [name for name in ('get', 'post', 'put', 'delete') if f'{name}_page' in vars(self.__class__)]
self.set_header('Allow', ', '.join(allowed_methods))
self.set_status(405)
self.finish()

def get_page_fail_fast(self, request_result: RequestResult) -> None:
self.__return_error(request_result.status_code, error_info={'is_fail_fast': True})

Expand Down Expand Up @@ -517,6 +543,10 @@ def _handle_request_exception(self, e: BaseException) -> None:
if self._finished and not isinstance(e, Finish):
return

if isinstance(e, FinishSignal):
# Not an error; request was finished explicitly
return

if isinstance(e, FailFastError):
request = e.failed_result.request

Expand Down Expand Up @@ -626,12 +656,12 @@ def finish(self, chunk: Optional[Union[str, bytes, dict]] = None) -> Future[None
content_length = sum(len(part) for part in self._write_buffer)
self.set_header('Content-Length', content_length)

future = self.flush(include_footers=True)
self._flush()
self._finished = True
self.on_finish()
return future
raise FinishSignal()

def flush(self, include_footers: bool = False) -> Future[None]:
def _flush(self) -> None:
assert self.request.connection is not None
chunk = b''.join(self._write_buffer)
self._write_buffer = []
Expand All @@ -646,10 +676,6 @@ def flush(self, include_footers: bool = False) -> Future[None]:

self.handler_result_future.set_result((self._status_code, self._reason, self._headers, chunk))

future = Future() # type: Future[None]
future.set_result(None)
return future

# postprocessors

def set_mandatory_header(self, name: str, value: str) -> None:
Expand Down
28 changes: 17 additions & 11 deletions frontik/handler_asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@


async def execute_page(
frontik_app: FrontikApplication, tornado_request: httputil.HTTPServerRequest, request_id: str, app: FrontikAsgiApp
frontik_app: FrontikApplication,
tornado_request: httputil.HTTPServerRequest,
request_id: str,
asgi_app: FrontikAsgiApp,
) -> tuple[int, str, HTTPHeaders, bytes]:
with request_context.request_context(request_id), request_limiter(frontik_app.statsd_client) as accepted:
log.info('requested url: %s', tornado_request.uri)
Expand All @@ -40,7 +43,7 @@ async def execute_page(
assert debug_mode.failed_auth_header is not None
status, reason, headers, data = make_debug_auth_failed_response(debug_mode.failed_auth_header)
elif route is None:
status, reason, headers, data = make_not_found_response(frontik_app, tornado_request.path)
status, reason, headers, data = await make_not_found_response(frontik_app, tornado_request)
else:
request_context.set_handler_name(f'{route.endpoint.__module__}.{route.endpoint.__name__}')

Expand All @@ -53,7 +56,7 @@ async def execute_page(
scope, receive, send = convert_tornado_request_to_asgi(
frontik_app, tornado_request, route, path_params, debug_mode, result
)
await app(scope, receive, send)
await asgi_app(scope, receive, send)

status = result['status']
reason = httputil.responses.get(status, 'Unknown')
Expand All @@ -78,19 +81,23 @@ async def execute_page(
return status, reason, headers, data


def make_not_found_response(frontik_app: FrontikApplication, path: str) -> tuple[int, str, HTTPHeaders, bytes]:
allowed_methods = get_allowed_methods(path)
async def make_not_found_response(
frontik_app: FrontikApplication, tornado_request: httputil.HTTPServerRequest
) -> tuple[int, str, HTTPHeaders, bytes]:
allowed_methods = get_allowed_methods(tornado_request.path.strip('/'))
default_headers = get_default_headers()

if allowed_methods:
status = 405
headers = get_default_headers()
headers['Allow'] = ', '.join(allowed_methods)
headers = {'Allow': ', '.join(allowed_methods)}
data = b''
elif hasattr(frontik_app, 'application_404_handler'):
status, headers, data = frontik_app.application_404_handler()
status, headers, data = await frontik_app.application_404_handler(tornado_request)
else:
status, headers, data = build_error_data(404, 'Not Found')

default_headers.update(headers)

reason = httputil.responses.get(status, 'Unknown')
return status, reason, HTTPHeaders(headers), data

Expand All @@ -114,8 +121,7 @@ def make_not_accepted_response() -> tuple[int, str, HTTPHeaders, bytes]:
def build_error_data(
status_code: int = 500, message: Optional[str] = 'Internal Server Error'
) -> tuple[int, dict, bytes]:
headers = get_default_headers()
headers['Content-Type'] = media_types.TEXT_HTML
headers = {'Content-Type': media_types.TEXT_HTML}
data = f'<html><title>{status_code}: {message}</title><body>{status_code}: {message}</body></html>'.encode()
return status_code, headers, data

Expand All @@ -129,7 +135,7 @@ async def legacy_process_request(
debug_mode: DebugMode,
) -> tuple[int, str, HTTPHeaders, bytes]:
handler: PageHandler = page_cls(frontik_app, tornado_request, route, debug_mode, path_params)
return await handler.my_execute()
return await handler.execute()


def convert_tornado_request_to_asgi(
Expand Down
28 changes: 28 additions & 0 deletions frontik/handler_return_values.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from collections.abc import Callable, Collection
from typing import Any

from pydantic import BaseModel

from frontik import media_types
from frontik.handler import PageHandler
from frontik.json_builder import json_encode

ReturnedValue = Any
ReturnedValueHandler = Callable[[PageHandler, ReturnedValue], Any]
ReturnedValueHandlers = Collection[ReturnedValueHandler]


def write_json_response_from_dict(handler: PageHandler, value: Any) -> None:
if isinstance(value, dict):
handler.set_header('Content-Type', media_types.APPLICATION_JSON)
handler.text = json_encode(value)


def write_json_response_from_pydantic(handler: PageHandler, value: BaseModel) -> None:
if isinstance(value, BaseModel):
handler.set_header('Content-Type', media_types.APPLICATION_JSON)
handler.text = value.model_dump_json()


def get_default_returned_value_handlers() -> ReturnedValueHandlers:
return [write_json_response_from_dict, write_json_response_from_pydantic]
2 changes: 1 addition & 1 deletion frontik/integrations/statsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,6 @@ def create_statsd_client(options: Options, app: FrontikApplication) -> Union[Sta
options.statsd_port,
options.statsd_default_periodic_send_interval_sec,
options.statsd_max_udp_size,
app=app.app_name,
app=app.app_module_name,
)
return statsd_client
2 changes: 1 addition & 1 deletion frontik/json_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def to_bytes(self) -> bytes:
return json_encode_bytes(self._concat_chunks())


def get_json_builder(request: Request) -> JsonBuilder:
async def get_json_builder(request: Request) -> JsonBuilder:
return request['json_builder']


Expand Down
Loading

0 comments on commit 04b84ac

Please sign in to comment.