Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test test test #649

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion examples/example-run.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#!/usr/bin/env python3
from frontik.server import main


if __name__ == '__main__':
main('./frontik.cfg')
8 changes: 5 additions & 3 deletions examples/example_app/pages/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

class Page(frontik.handler.PageHandler):
def get_page(self):
self.json.put({
'text': 'Hello, world!'
})
self.json.put(
{
'text': 'Hello, world!',
},
)
2 changes: 1 addition & 1 deletion examples/example_app/pages/tpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ class Page(frontik.handler.PageHandler):
def get_page(self):
self.set_template('main.html') # This template is located in the `templates` folder
self.json.put(
self.get_url(self.request.host, '/example')
self.get_url(self.request.host, '/example'),
)
160 changes: 81 additions & 79 deletions frontik/app.py
Original file line number Diff line number Diff line change
@@ -1,68 +1,60 @@
from __future__ import annotations

import asyncio
import importlib
import logging
import multiprocessing
import sys
import time
import traceback
from functools import partial
from typing import TYPE_CHECKING
import logging

import aiohttp
import tornado
from http_client import AIOHttpClientWrapper, HttpClientFactory
from http_client import options as http_client_options
from http_client.balancing import RequestBalancerBuilder, Upstream, UpstreamManager
from lxml import etree
from tornado import httputil
from tornado.web import Application, RequestHandler, HTTPError
from http_client import HttpClientFactory, options as http_client_options, AIOHttpClientWrapper
from http_client.balancing import RequestBalancerBuilder, UpstreamManager
from tornado.web import Application, HTTPError, RequestHandler

import frontik.producers.json_producer
import frontik.producers.xml_producer
from frontik import integrations, media_types, request_context
from frontik.integrations.statsd import create_statsd_client
from frontik.debug import DebugTransform
from frontik.debug import DebugTransform, get_frontik_and_apps_versions
from frontik.handler import ErrorHandler
from frontik.integrations.statsd import create_statsd_client
from frontik.loggers import CUSTOM_JSON_EXTRA, JSON_REQUESTS_LOGGER
from frontik.options import options
from frontik.routing import FileMappingRouter, FrontikRouter
from frontik.service_discovery import get_sync_service_discovery, get_async_service_discovery, UpstreamCaches
from frontik.util import generate_uniq_timestamp_request_id, check_request_id
from frontik.version import version as frontik_version
from frontik.service_discovery import UpstreamCaches, get_async_service_discovery, get_sync_service_discovery
from frontik.util import check_request_id, generate_uniq_timestamp_request_id

app_logger = logging.getLogger('http_client')

if TYPE_CHECKING:
from typing import Optional
from collections.abc import Callable
from multiprocessing.sharedctypes import Synchronized
from typing import Any

from aiokafka import AIOKafkaProducer
from tornado import httputil
from tornado.httputil import HTTPServerRequest


def get_frontik_and_apps_versions(application):
versions = etree.Element('versions')

etree.SubElement(versions, 'frontik').text = frontik_version
etree.SubElement(versions, 'tornado').text = tornado.version
etree.SubElement(versions, 'lxml.etree.LXML').text = '.'.join(str(x) for x in etree.LXML_VERSION)
etree.SubElement(versions, 'lxml.etree.LIBXML').text = '.'.join(str(x) for x in etree.LIBXML_VERSION)
etree.SubElement(versions, 'lxml.etree.LIBXSLT').text = '.'.join(str(x) for x in etree.LIBXSLT_VERSION)
etree.SubElement(versions, 'aiohttp').text = aiohttp.__version__
etree.SubElement(versions, 'python').text = sys.version.replace('\n', '')
etree.SubElement(versions, 'event_loop').text = str(type(asyncio.get_event_loop())).split("'")[1]
etree.SubElement(versions, 'application', name=options.app).extend(application.application_version_xml())

return versions
from frontik.integrations.statsd import StatsDClient, StatsDClientStub
from frontik.service_discovery import UpstreamUpdateListener


class VersionHandler(RequestHandler):
def get(self):
self.application: FrontikApplication
self.set_header('Content-Type', 'text/xml')
self.write(
etree.tostring(get_frontik_and_apps_versions(self.application), encoding='utf-8', xml_declaration=True)
etree.tostring(get_frontik_and_apps_versions(self.application), encoding='utf-8', xml_declaration=True),
)


class StatusHandler(RequestHandler):
def get(self):
self.application: FrontikApplication
self.set_header('Content-Type', media_types.APPLICATION_JSON)
self.finish(self.application.get_current_status())

Expand All @@ -82,19 +74,20 @@ def get(self):
except BaseException:
self.error_page()

def settrace(self, debugger_ip, debugger_port):
def settrace(self, debugger_ip: str | None, debugger_port: int) -> None:
import pydevd

pydevd.settrace(debugger_ip, port=debugger_port, stdoutToServer=True, stderrToServer=True, suspend=False)

def trace_page(self, ip, port):
def trace_page(self, ip: str | None, port: str) -> None:
self.set_header('Content-Type', media_types.TEXT_PLAIN)
self.finish(f'Connected to debug server at {ip}:{port}')

def already_tracing_page(self):
def already_tracing_page(self) -> None:
self.set_header('Content-Type', media_types.TEXT_PLAIN)
self.finish('App is already in tracing mode, try to restart service')

def error_page(self):
def error_page(self) -> None:
self.set_header('Content-Type', media_types.TEXT_PLAIN)
self.finish(traceback.format_exc())

Expand All @@ -105,7 +98,7 @@ class FrontikApplication(Application):
class DefaultConfig:
pass

def __init__(self, **settings):
def __init__(self, **settings: Any) -> None:
self.start_time = time.time()

tornado_settings = settings.get('tornado_settings')
Expand All @@ -115,22 +108,22 @@ def __init__(self, **settings):
self.config = self.application_config()
self.app = settings.get('app')
self.app_module = settings.get('app_module')
self.app_root = settings.get('app_root')
self.app_root: str = settings.get('app_root') # type: ignore

self.xml = frontik.producers.xml_producer.XMLProducerFactory(self)
self.json = frontik.producers.json_producer.JsonProducerFactory(self)

self.available_integrations = None
self.tornado_http_client: Optional[AIOHttpClientWrapper] = None
self.http_client_factory: Optional[HttpClientFactory] = None
self.upstream_manager = None
self.upstreams = {}
self.children_pipes = {}
self.upstream_update_listener = None
self.available_integrations: list[integrations.Integration] = []
self.tornado_http_client: AIOHttpClientWrapper | None = None
self.http_client_factory: HttpClientFactory = None # type: ignore
self.upstream_manager: UpstreamManager = None
self.upstreams: dict[str, Upstream] = {}
self.children_pipes: dict[int, Any] = {}
self.upstream_update_listener: UpstreamUpdateListener = None # type: ignore
self.router = FrontikRouter(self)
self.init_workers_count_down = multiprocessing.Value('i', options.workers)
self.init_workers_count_down: Synchronized = multiprocessing.Value('i', options.workers) # type: ignore

core_handlers = [
core_handlers: list[Any] = [
(r'/version/?', VersionHandler),
(r'/status/?', StatusHandler),
(r'.*', self.router),
Expand All @@ -139,17 +132,21 @@ def __init__(self, **settings):
if options.debug:
core_handlers.insert(0, (r'/pydevd/?', PydevdHandler))

statsd_client = create_statsd_client(options, self)
sync_service_discovery = get_sync_service_discovery(options, statsd_client)
self.service_discovery_client = get_async_service_discovery(options, statsd_client) \
if options.workers == 1 else sync_service_discovery
self.upstream_caches = UpstreamCaches(self.children_pipes, self.upstreams, sync_service_discovery) \
if options.consul_enabled else UpstreamCaches(self.children_pipes, self.upstreams)
self.statsd_client: StatsDClient | StatsDClientStub = create_statsd_client(options, self)
sync_service_discovery = get_sync_service_discovery(options, self.statsd_client)
self.service_discovery_client = (
get_async_service_discovery(options, self.statsd_client) if options.workers == 1 else sync_service_discovery
)
self.upstream_caches = (
UpstreamCaches(self.children_pipes, self.upstreams, sync_service_discovery)
if options.consul_enabled
else UpstreamCaches(self.children_pipes, self.upstreams)
)

super().__init__(core_handlers, **tornado_settings)

async def init(self):
self.transforms.insert(0, partial(DebugTransform, self))
async def init(self) -> None:
self.transforms.insert(0, partial(DebugTransform, self)) # type: ignore

self.available_integrations, integration_futures = integrations.load_integrations(self)
await asyncio.gather(*[future for future in integration_futures if future])
Expand All @@ -162,17 +159,22 @@ async def init(self):
if kafka_cluster and kafka_cluster not in options.kafka_clusters:
app_logger.warning(
'kafka cluster for http client metrics "%s" is not present in "kafka_clusters" option, '
'metrics will be disabled', kafka_cluster
'metrics will be disabled',
kafka_cluster,
)
else:
app_logger.info('kafka metrics are %s', 'enabled' if send_metrics_to_kafka else 'disabled')

kafka_producer = self.get_kafka_producer(kafka_cluster) if send_metrics_to_kafka else None
kafka_producer = (
self.get_kafka_producer(kafka_cluster) if send_metrics_to_kafka and kafka_cluster is not None else None
)

self.upstream_manager = UpstreamManager(self.upstreams)
request_balancer_builder = RequestBalancerBuilder(self.upstream_manager,
statsd_client=self.statsd_client,
kafka_producer=kafka_producer)
request_balancer_builder = RequestBalancerBuilder(
self.upstream_manager,
statsd_client=self.statsd_client,
kafka_producer=kafka_producer,
)
self.http_client_factory = HttpClientFactory(self.app, self.tornado_http_client, request_balancer_builder)

def find_handler(self, request, **kwargs):
Expand All @@ -182,7 +184,7 @@ def find_handler(self, request, **kwargs):
if options.validate_request_id:
check_request_id(request_id)

def wrapped_in_context(func):
def wrapped_in_context(func: Callable) -> Callable:
def wrapper(*args, **kwargs):
token = request_context.initialize(request, request_id)

Expand All @@ -194,52 +196,52 @@ def wrapper(*args, **kwargs):
return wrapper

delegate: httputil.HTTPMessageDelegate = wrapped_in_context(super().find_handler)(request, **kwargs)
delegate.headers_received = wrapped_in_context(delegate.headers_received)
delegate.data_received = wrapped_in_context(delegate.data_received)
delegate.finish = wrapped_in_context(delegate.finish)
delegate.on_connection_close = wrapped_in_context(delegate.on_connection_close)
delegate.headers_received = wrapped_in_context(delegate.headers_received) # type: ignore
delegate.data_received = wrapped_in_context(delegate.data_received) # type: ignore
delegate.finish = wrapped_in_context(delegate.finish) # type: ignore
delegate.on_connection_close = wrapped_in_context(delegate.on_connection_close) # type: ignore

return delegate

def reverse_url(self, name, *args, **kwargs):
def reverse_url(self, name: str, *args: Any, **kwargs: Any) -> str:
return self.router.reverse_url(name, *args, **kwargs)

def application_urls(self):
return [
('', FileMappingRouter(importlib.import_module(f'{self.app_module}.pages')))
]
def application_urls(self) -> list[tuple]:
return [('', FileMappingRouter(importlib.import_module(f'{self.app_module}.pages')))]

def application_404_handler(self, request):
def application_404_handler(self, request: HTTPServerRequest) -> tuple:
return ErrorHandler, {'status_code': 404}

def application_config(self):
def application_config(self) -> DefaultConfig:
return FrontikApplication.DefaultConfig()

def application_version_xml(self):
def application_version_xml(self) -> list[etree.Element]:
version = etree.Element('version')
version.text = 'unknown'
return [version]

def application_version(self):
def application_version(self) -> str | None:
return None

@staticmethod
def next_request_id():
def next_request_id() -> str:
FrontikApplication.request_id = generate_uniq_timestamp_request_id()
return FrontikApplication.request_id

def get_current_status(self):
def get_current_status(self) -> dict[str, str]:
if self.init_workers_count_down.value > 0:
raise HTTPError(500, f'some workers are not started '
f'init_workers_count_down={self.init_workers_count_down.value}')
raise HTTPError(
500,
f'some workers are not started init_workers_count_down={self.init_workers_count_down.value}',
)

cur_uptime = time.time() - self.start_time
if cur_uptime < 60:
uptime_value = '{:.2f} seconds'.format(cur_uptime)
uptime_value = f'{cur_uptime:.2f} seconds'
elif cur_uptime < 3600:
uptime_value = '{:.2f} minutes'.format(cur_uptime / 60)
uptime_value = f'{cur_uptime / 60:.2f} minutes'
else:
uptime_value = '{:.2f} hours and {:.2f} minutes'.format(cur_uptime / 3600, (cur_uptime % 3600) / 60)
uptime_value = f'{cur_uptime / 3600:.2f} hours and {(cur_uptime % 3600) / 60:.2f} minutes'

return {
'uptime': uptime_value,
Expand Down Expand Up @@ -267,5 +269,5 @@ def log_request(self, handler):

JSON_REQUESTS_LOGGER.info('', extra={CUSTOM_JSON_EXTRA: extra})

def get_kafka_producer(self, producer_name: str) -> 'Optional[AIOKafkaProducer]': # pragma: no cover
def get_kafka_producer(self, producer_name: str) -> AIOKafkaProducer | None: # pragma: no cover
pass
16 changes: 11 additions & 5 deletions frontik/auth.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
from __future__ import annotations

import base64
import http.client
from typing import TYPE_CHECKING

from tornado.escape import to_unicode
from tornado.web import Finish

if TYPE_CHECKING:
from frontik.handler import PageHandler

DEBUG_AUTH_HEADER_NAME = 'Frontik-Debug-Auth'


class DebugUnauthorizedError(Finish):
pass


def passed_basic_auth(handler, login, passwd):
def passed_basic_auth(handler: PageHandler, login: str | None, passwd: str | None) -> bool:
auth_header = handler.request.headers.get('Authorization')
if auth_header and auth_header.startswith('Basic '):
method, auth_b64 = auth_header.split(' ')
Expand All @@ -24,22 +30,22 @@ def passed_basic_auth(handler, login, passwd):
return False


def check_debug_auth(handler, login, password):
def check_debug_auth(handler: PageHandler, login: str | None, password: str | None) -> None:
"""
:type handler: tornado.web.RequestHandler
:return: None or tuple(http_code, headers)
"""
header_name = DEBUG_AUTH_HEADER_NAME
debug_auth_header = handler.request.headers.get(header_name)
if debug_auth_header is not None:
debug_access = (debug_auth_header == f'{login}:{password}')
debug_access = debug_auth_header == f'{login}:{password}'
if not debug_access:
handler.set_header('WWW-Authenticate', f'{header_name}-Header realm="Secure Area"')
handler.set_status(http.client.UNAUTHORIZED)
raise DebugUnauthorizedError()
raise DebugUnauthorizedError
else:
debug_access = passed_basic_auth(handler, login, password)
if not debug_access:
handler.set_header('WWW-Authenticate', 'Basic realm="Secure Area"')
handler.set_status(http.client.UNAUTHORIZED)
raise DebugUnauthorizedError()
raise DebugUnauthorizedError
Loading
Loading