Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/HH-168926' into EXP-81199
Browse files Browse the repository at this point in the history
  • Loading branch information
HH ReleaseBot committed Oct 8, 2023
2 parents 190acb7 + 3005156 commit e0b8627
Show file tree
Hide file tree
Showing 179 changed files with 3,304 additions and 2,298 deletions.
1 change: 0 additions & 1 deletion examples/example-run.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#!/usr/bin/env python3
from frontik.server import main


if __name__ == '__main__':
main('./frontik.cfg')
8 changes: 5 additions & 3 deletions examples/example_app/pages/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

class Page(frontik.handler.PageHandler):
def get_page(self):
self.json.put({
'text': 'Hello, world!'
})
self.json.put(
{
'text': 'Hello, world!',
},
)
2 changes: 1 addition & 1 deletion examples/example_app/pages/tpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ class Page(frontik.handler.PageHandler):
def get_page(self):
self.set_template('main.html') # This template is located in the `templates` folder
self.json.put(
self.get_url(self.request.host, '/example')
self.get_url(self.request.host, '/example'),
)
161 changes: 82 additions & 79 deletions frontik/app.py
Original file line number Diff line number Diff line change
@@ -1,68 +1,61 @@
from __future__ import annotations

import asyncio
import importlib
import logging
import multiprocessing
import sys
import time
import traceback
from functools import partial
from typing import TYPE_CHECKING
import logging

import aiohttp
import tornado
from http_client import AIOHttpClientWrapper, HttpClientFactory
from http_client import options as http_client_options
from http_client.balancing import RequestBalancerBuilder, Upstream, UpstreamManager
from lxml import etree
from tornado import httputil
from tornado.web import Application, RequestHandler, HTTPError
from http_client import HttpClientFactory, options as http_client_options, AIOHttpClientWrapper
from http_client.balancing import RequestBalancerBuilder, UpstreamManager
from tornado.web import Application, HTTPError, RequestHandler

import frontik.producers.json_producer
import frontik.producers.xml_producer
from frontik import integrations, media_types, request_context
from frontik.integrations.statsd import create_statsd_client
from frontik.debug import DebugTransform
from frontik.debug import DebugTransform, get_frontik_and_apps_versions
from frontik.handler import ErrorHandler
from frontik.integrations.statsd import create_statsd_client
from frontik.loggers import CUSTOM_JSON_EXTRA, JSON_REQUESTS_LOGGER
from frontik.options import options
from frontik.routing import FileMappingRouter, FrontikRouter
from frontik.service_discovery import get_sync_service_discovery, get_async_service_discovery, UpstreamCaches
from frontik.util import generate_uniq_timestamp_request_id, check_request_id
from frontik.version import version as frontik_version
from frontik.service_discovery import UpstreamCaches, get_async_service_discovery, get_sync_service_discovery
from frontik.util import check_request_id, generate_uniq_timestamp_request_id

app_logger = logging.getLogger('http_client')

if TYPE_CHECKING:
from typing import Optional
from collections.abc import Callable
from multiprocessing.sharedctypes import Synchronized
from typing import Any

from aiokafka import AIOKafkaProducer
from tornado import httputil
from tornado.httputil import HTTPServerRequest


def get_frontik_and_apps_versions(application):
versions = etree.Element('versions')

etree.SubElement(versions, 'frontik').text = frontik_version
etree.SubElement(versions, 'tornado').text = tornado.version
etree.SubElement(versions, 'lxml.etree.LXML').text = '.'.join(str(x) for x in etree.LXML_VERSION)
etree.SubElement(versions, 'lxml.etree.LIBXML').text = '.'.join(str(x) for x in etree.LIBXML_VERSION)
etree.SubElement(versions, 'lxml.etree.LIBXSLT').text = '.'.join(str(x) for x in etree.LIBXSLT_VERSION)
etree.SubElement(versions, 'aiohttp').text = aiohttp.__version__
etree.SubElement(versions, 'python').text = sys.version.replace('\n', '')
etree.SubElement(versions, 'event_loop').text = str(type(asyncio.get_event_loop())).split("'")[1]
etree.SubElement(versions, 'application', name=options.app).extend(application.application_version_xml())

return versions
from frontik.handler import PageHandler
from frontik.integrations.statsd import StatsDClient, StatsDClientStub
from frontik.service_discovery import UpstreamUpdateListener


class VersionHandler(RequestHandler):
def get(self):
self.application: FrontikApplication
self.set_header('Content-Type', 'text/xml')
self.write(
etree.tostring(get_frontik_and_apps_versions(self.application), encoding='utf-8', xml_declaration=True)
etree.tostring(get_frontik_and_apps_versions(self.application), encoding='utf-8', xml_declaration=True),
)


class StatusHandler(RequestHandler):
def get(self):
self.application: FrontikApplication
self.set_header('Content-Type', media_types.APPLICATION_JSON)
self.finish(self.application.get_current_status())

Expand All @@ -82,19 +75,20 @@ def get(self):
except BaseException:
self.error_page()

def settrace(self, debugger_ip, debugger_port):
def settrace(self, debugger_ip: str | None, debugger_port: int) -> None:
import pydevd

pydevd.settrace(debugger_ip, port=debugger_port, stdoutToServer=True, stderrToServer=True, suspend=False)

def trace_page(self, ip, port):
def trace_page(self, ip: str | None, port: str) -> None:
self.set_header('Content-Type', media_types.TEXT_PLAIN)
self.finish(f'Connected to debug server at {ip}:{port}')

def already_tracing_page(self):
def already_tracing_page(self) -> None:
self.set_header('Content-Type', media_types.TEXT_PLAIN)
self.finish('App is already in tracing mode, try to restart service')

def error_page(self):
def error_page(self) -> None:
self.set_header('Content-Type', media_types.TEXT_PLAIN)
self.finish(traceback.format_exc())

Expand All @@ -105,7 +99,7 @@ class FrontikApplication(Application):
class DefaultConfig:
pass

def __init__(self, **settings):
def __init__(self, app_root: str, **settings: Any) -> None:
self.start_time = time.time()

tornado_settings = settings.get('tornado_settings')
Expand All @@ -115,22 +109,22 @@ def __init__(self, **settings):
self.config = self.application_config()
self.app = settings.get('app')
self.app_module = settings.get('app_module')
self.app_root = settings.get('app_root')
self.app_root = app_root

self.xml = frontik.producers.xml_producer.XMLProducerFactory(self)
self.json = frontik.producers.json_producer.JsonProducerFactory(self)

self.available_integrations = None
self.tornado_http_client: Optional[AIOHttpClientWrapper] = None
self.http_client_factory: Optional[HttpClientFactory] = None
self.upstream_manager = None
self.upstreams = {}
self.children_pipes = {}
self.upstream_update_listener = None
self.available_integrations: list[integrations.Integration] = []
self.tornado_http_client: AIOHttpClientWrapper | None = None
self.http_client_factory: HttpClientFactory
self.upstream_manager: UpstreamManager = None
self.upstreams: dict[str, Upstream] = {}
self.children_pipes: dict[int, Any] = {}
self.upstream_update_listener: UpstreamUpdateListener
self.router = FrontikRouter(self)
self.init_workers_count_down = multiprocessing.Value('i', options.workers)
self.init_workers_count_down: Synchronized = multiprocessing.Value('i', options.workers) # type: ignore

core_handlers = [
core_handlers: list[Any] = [
(r'/version/?', VersionHandler),
(r'/status/?', StatusHandler),
(r'.*', self.router),
Expand All @@ -139,17 +133,21 @@ def __init__(self, **settings):
if options.debug:
core_handlers.insert(0, (r'/pydevd/?', PydevdHandler))

statsd_client = create_statsd_client(options, self)
sync_service_discovery = get_sync_service_discovery(options, statsd_client)
self.service_discovery_client = get_async_service_discovery(options, statsd_client) \
if options.workers == 1 else sync_service_discovery
self.upstream_caches = UpstreamCaches(self.children_pipes, self.upstreams, sync_service_discovery) \
if options.consul_enabled else UpstreamCaches(self.children_pipes, self.upstreams)
self.statsd_client: StatsDClient | StatsDClientStub = create_statsd_client(options, self)
sync_service_discovery = get_sync_service_discovery(options, self.statsd_client)
self.service_discovery_client = (
get_async_service_discovery(options, self.statsd_client) if options.workers == 1 else sync_service_discovery
)
self.upstream_caches = (
UpstreamCaches(self.children_pipes, self.upstreams, sync_service_discovery)
if options.consul_enabled
else UpstreamCaches(self.children_pipes, self.upstreams)
)

super().__init__(core_handlers, **tornado_settings)

async def init(self):
self.transforms.insert(0, partial(DebugTransform, self))
async def init(self) -> None:
self.transforms.insert(0, partial(DebugTransform, self)) # type: ignore

self.available_integrations, integration_futures = integrations.load_integrations(self)
await asyncio.gather(*[future for future in integration_futures if future])
Expand All @@ -162,17 +160,22 @@ async def init(self):
if kafka_cluster and kafka_cluster not in options.kafka_clusters:
app_logger.warning(
'kafka cluster for http client metrics "%s" is not present in "kafka_clusters" option, '
'metrics will be disabled', kafka_cluster
'metrics will be disabled',
kafka_cluster,
)
else:
app_logger.info('kafka metrics are %s', 'enabled' if send_metrics_to_kafka else 'disabled')

kafka_producer = self.get_kafka_producer(kafka_cluster) if send_metrics_to_kafka else None
kafka_producer = (
self.get_kafka_producer(kafka_cluster) if send_metrics_to_kafka and kafka_cluster is not None else None
)

self.upstream_manager = UpstreamManager(self.upstreams)
request_balancer_builder = RequestBalancerBuilder(self.upstream_manager,
statsd_client=self.statsd_client,
kafka_producer=kafka_producer)
request_balancer_builder = RequestBalancerBuilder(
self.upstream_manager,
statsd_client=self.statsd_client,
kafka_producer=kafka_producer,
)
self.http_client_factory = HttpClientFactory(self.app, self.tornado_http_client, request_balancer_builder)

def find_handler(self, request, **kwargs):
Expand All @@ -182,7 +185,7 @@ def find_handler(self, request, **kwargs):
if options.validate_request_id:
check_request_id(request_id)

def wrapped_in_context(func):
def wrapped_in_context(func: Callable) -> Callable:
def wrapper(*args, **kwargs):
token = request_context.initialize(request, request_id)

Expand All @@ -194,52 +197,52 @@ def wrapper(*args, **kwargs):
return wrapper

delegate: httputil.HTTPMessageDelegate = wrapped_in_context(super().find_handler)(request, **kwargs)
delegate.headers_received = wrapped_in_context(delegate.headers_received)
delegate.data_received = wrapped_in_context(delegate.data_received)
delegate.finish = wrapped_in_context(delegate.finish)
delegate.on_connection_close = wrapped_in_context(delegate.on_connection_close)
delegate.headers_received = wrapped_in_context(delegate.headers_received) # type: ignore
delegate.data_received = wrapped_in_context(delegate.data_received) # type: ignore
delegate.finish = wrapped_in_context(delegate.finish) # type: ignore
delegate.on_connection_close = wrapped_in_context(delegate.on_connection_close) # type: ignore

return delegate

def reverse_url(self, name, *args, **kwargs):
def reverse_url(self, name: str, *args: Any, **kwargs: Any) -> str:
return self.router.reverse_url(name, *args, **kwargs)

def application_urls(self):
return [
('', FileMappingRouter(importlib.import_module(f'{self.app_module}.pages')))
]
def application_urls(self) -> list[tuple]:
return [('', FileMappingRouter(importlib.import_module(f'{self.app_module}.pages')))]

def application_404_handler(self, request):
def application_404_handler(self, request: HTTPServerRequest) -> tuple[type[PageHandler], dict]:
return ErrorHandler, {'status_code': 404}

def application_config(self):
def application_config(self) -> DefaultConfig:
return FrontikApplication.DefaultConfig()

def application_version_xml(self):
def application_version_xml(self) -> list[etree.Element]:
version = etree.Element('version')
version.text = 'unknown'
return [version]

def application_version(self):
def application_version(self) -> str | None:
return None

@staticmethod
def next_request_id():
def next_request_id() -> str:
FrontikApplication.request_id = generate_uniq_timestamp_request_id()
return FrontikApplication.request_id

def get_current_status(self):
def get_current_status(self) -> dict[str, str]:
if self.init_workers_count_down.value > 0:
raise HTTPError(500, f'some workers are not started '
f'init_workers_count_down={self.init_workers_count_down.value}')
raise HTTPError(
500,
f'some workers are not started init_workers_count_down={self.init_workers_count_down.value}',
)

cur_uptime = time.time() - self.start_time
if cur_uptime < 60:
uptime_value = '{:.2f} seconds'.format(cur_uptime)
uptime_value = f'{cur_uptime:.2f} seconds'
elif cur_uptime < 3600:
uptime_value = '{:.2f} minutes'.format(cur_uptime / 60)
uptime_value = f'{cur_uptime / 60:.2f} minutes'
else:
uptime_value = '{:.2f} hours and {:.2f} minutes'.format(cur_uptime / 3600, (cur_uptime % 3600) / 60)
uptime_value = f'{cur_uptime / 3600:.2f} hours and {(cur_uptime % 3600) / 60:.2f} minutes'

return {
'uptime': uptime_value,
Expand Down Expand Up @@ -267,5 +270,5 @@ def log_request(self, handler):

JSON_REQUESTS_LOGGER.info('', extra={CUSTOM_JSON_EXTRA: extra})

def get_kafka_producer(self, producer_name: str) -> 'Optional[AIOKafkaProducer]': # pragma: no cover
def get_kafka_producer(self, producer_name: str) -> AIOKafkaProducer | None: # pragma: no cover
pass
12 changes: 9 additions & 3 deletions frontik/auth.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
from __future__ import annotations

import base64
import http.client
from typing import TYPE_CHECKING

from tornado.escape import to_unicode
from tornado.web import Finish

if TYPE_CHECKING:
from frontik.handler import PageHandler

DEBUG_AUTH_HEADER_NAME = 'Frontik-Debug-Auth'


class DebugUnauthorizedError(Finish):
pass


def passed_basic_auth(handler, login, passwd):
def passed_basic_auth(handler: PageHandler, login: str | None, passwd: str | None) -> bool:
auth_header = handler.request.headers.get('Authorization')
if auth_header and auth_header.startswith('Basic '):
method, auth_b64 = auth_header.split(' ')
Expand All @@ -24,15 +30,15 @@ def passed_basic_auth(handler, login, passwd):
return False


def check_debug_auth(handler, login, password):
def check_debug_auth(handler: PageHandler, login: str | None, password: str | None) -> None:
"""
:type handler: tornado.web.RequestHandler
:return: None or tuple(http_code, headers)
"""
header_name = DEBUG_AUTH_HEADER_NAME
debug_auth_header = handler.request.headers.get(header_name)
if debug_auth_header is not None:
debug_access = (debug_auth_header == f'{login}:{password}')
debug_access = debug_auth_header == f'{login}:{password}'
if not debug_access:
handler.set_header('WWW-Authenticate', f'{header_name}-Header realm="Secure Area"')
handler.set_status(http.client.UNAUTHORIZED)
Expand Down
Loading

0 comments on commit e0b8627

Please sign in to comment.