Skip to content

Commit

Permalink
HH-208976 refactor master-worker pipes
Browse files Browse the repository at this point in the history
  • Loading branch information
712u3 committed Mar 19, 2024
1 parent a3f93d3 commit d6881cd
Show file tree
Hide file tree
Showing 68 changed files with 1,073 additions and 1,166 deletions.
6 changes: 1 addition & 5 deletions examples/example_app/pages/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,4 @@

class Page(frontik.handler.PageHandler):
def get_page(self):
self.json.put(
{
'text': 'Hello, world!',
},
)
self.json.put({'text': 'Hello, world!'})
4 changes: 1 addition & 3 deletions examples/example_app/pages/tpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,4 @@
class Page(frontik.handler.PageHandler):
def get_page(self):
self.set_template('main.html') # This template is located in the `templates` folder
self.json.put(
self.get_url(self.request.host, '/example'),
)
self.json.put(self.get_url(self.request.host, '/example'))
95 changes: 49 additions & 46 deletions frontik/app.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,41 @@
from __future__ import annotations

import asyncio
import importlib
import logging
import multiprocessing
import sys
import time
import traceback
from collections.abc import Callable
from ctypes import c_bool, c_int
from functools import partial
from typing import TYPE_CHECKING, Optional
from threading import Lock
from typing import Any, Optional, Union

from aiokafka import AIOKafkaProducer
from http_client import AIOHttpClientWrapper, HttpClientFactory
from http_client import options as http_client_options
from http_client.balancing import RequestBalancerBuilder, Upstream, UpstreamManager
from http_client.balancing import RequestBalancerBuilder, Upstream
from lxml import etree
from tornado import httputil
from tornado.httputil import HTTPServerRequest
from tornado.web import Application, HTTPError, RequestHandler

import frontik.producers.json_producer
import frontik.producers.xml_producer
from frontik import integrations, media_types, request_context
from frontik.debug import DebugTransform, get_frontik_and_apps_versions
from frontik.handler import ErrorHandler
from frontik.handler import ErrorHandler, PageHandler
from frontik.handler_return_values import ReturnedValueHandlers, get_default_returned_value_handlers
from frontik.integrations.statsd import create_statsd_client
from frontik.integrations.statsd import StatsDClient, StatsDClientStub, create_statsd_client
from frontik.loggers import CUSTOM_JSON_EXTRA, JSON_REQUESTS_LOGGER
from frontik.options import options
from frontik.process import WorkerState
from frontik.routing import FileMappingRouter, FrontikRouter
from frontik.service_discovery import UpstreamCaches, get_async_service_discovery, get_sync_service_discovery
from frontik.service_discovery import UpstreamManager
from frontik.util import check_request_id, generate_uniq_timestamp_request_id

app_logger = logging.getLogger('http_client')

if TYPE_CHECKING:
from collections.abc import Callable
from multiprocessing.sharedctypes import Synchronized
from typing import Any

from aiokafka import AIOKafkaProducer
from tornado import httputil
from tornado.httputil import HTTPServerRequest

from frontik.handler import PageHandler
from frontik.integrations.statsd import StatsDClient, StatsDClientStub
from frontik.service_discovery import UpstreamUpdateListener


class VersionHandler(RequestHandler):
def get(self):
Expand Down Expand Up @@ -115,36 +107,43 @@ def __init__(self, app_root: str, **settings: Any) -> None:
self.available_integrations: list[integrations.Integration] = []
self.tornado_http_client: Optional[AIOHttpClientWrapper] = None
self.http_client_factory: HttpClientFactory
self.upstream_manager: UpstreamManager = None
self.upstreams: dict[str, Upstream] = {}
self.children_pipes: dict[int, Any] = {}
self.upstream_update_listener: UpstreamUpdateListener
self.router = FrontikRouter(self)
self.init_workers_count_down: Synchronized = multiprocessing.Value('i', options.workers) # type: ignore

core_handlers: list[Any] = [
(r'/version/?', VersionHandler),
(r'/status/?', StatusHandler),
(r'.*', self.router),
]

if options.debug:
core_handlers.insert(0, (r'/pydevd/?', PydevdHandler))

self.statsd_client: StatsDClient | StatsDClientStub = create_statsd_client(options, self)
sync_service_discovery = get_sync_service_discovery(options, self.statsd_client)
self.service_discovery_client = (
get_async_service_discovery(options, self.statsd_client) if options.workers == 1 else sync_service_discovery
)
self.upstream_caches = (
UpstreamCaches(self.children_pipes, self.upstreams, sync_service_discovery)
if options.consul_enabled
else UpstreamCaches(self.children_pipes, self.upstreams)
)
self.statsd_client: Union[StatsDClient, StatsDClientStub] = create_statsd_client(options, self)

init_workers_count_down = multiprocessing.Value(c_int, options.workers)
master_done = multiprocessing.Value(c_bool, False)
count_down_lock = multiprocessing.Lock()
self.worker_state = WorkerState(init_workers_count_down, master_done, count_down_lock) # type: ignore

self.returned_value_handlers: ReturnedValueHandlers = get_default_returned_value_handlers()

tornado_settings = settings.get('tornado_settings') or {}
super().__init__(core_handlers, **tornado_settings)
super().__init__(core_handlers)

def create_upstream_manager(
self,
upstreams: dict[str, Upstream],
upstreams_lock: Optional[Lock],
send_to_all_workers: Optional[Callable],
with_consul: bool,
) -> None:
self.upstream_manager = UpstreamManager(
upstreams,
self.statsd_client,
upstreams_lock,
send_to_all_workers,
with_consul,
)

self.upstream_manager.send_updates() # initial full state sending

async def init(self) -> None:
self.transforms.insert(0, partial(DebugTransform, self)) # type: ignore
Expand All @@ -170,13 +169,18 @@ async def init(self) -> None:
self.get_kafka_producer(kafka_cluster) if send_metrics_to_kafka and kafka_cluster is not None else None
)

self.upstream_manager = UpstreamManager(self.upstreams)
with_consul = self.worker_state.single_worker_mode and options.consul_enabled
self.create_upstream_manager({}, None, None, with_consul)
self.upstream_manager.register_service()

request_balancer_builder = RequestBalancerBuilder(
self.upstream_manager,
self.upstream_manager.get_upstreams(),
statsd_client=self.statsd_client,
kafka_producer=kafka_producer,
)
self.http_client_factory = HttpClientFactory(self.app, self.tornado_http_client, request_balancer_builder)
if self.worker_state.single_worker_mode:
self.worker_state.master_done.value = True

def find_handler(self, request, **kwargs):
request_id = request.headers.get('X-Request-Id')
Expand Down Expand Up @@ -226,10 +230,12 @@ def next_request_id() -> str:
return FrontikApplication.request_id

def get_current_status(self) -> dict[str, str]:
if self.init_workers_count_down.value > 0:
not_started_workers = self.worker_state.init_workers_count_down.value
master_done = self.worker_state.master_done.value
if not_started_workers > 0 or not master_done:
raise HTTPError(
500,
f'some workers are not started init_workers_count_down={self.init_workers_count_down.value}',
f'some workers are not started not_started_workers={not_started_workers}, master_done={master_done}',
)

cur_uptime = time.time() - self.start_time
Expand All @@ -240,10 +246,7 @@ def get_current_status(self) -> dict[str, str]:
else:
uptime_value = f'{cur_uptime / 3600:.2f} hours and {(cur_uptime % 3600) / 60:.2f} minutes'

return {
'uptime': uptime_value,
'datacenter': http_client_options.datacenter,
}
return {'uptime': uptime_value, 'datacenter': http_client_options.datacenter}

def log_request(self, handler):
if not options.log_json:
Expand Down
7 changes: 1 addition & 6 deletions frontik/dependency_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ async def execute_page_method_with_dependencies(handler: Any, get_page_method: C

route = get_page_method._route # type: ignore

await solve_dependencies(
request=request,
dependant=route.dependant,
body=None,
dependency_overrides_provider=None,
)
await solve_dependencies(request=request, dependant=route.dependant, body=None, dependency_overrides_provider=None)

return await get_page_method()
2 changes: 1 addition & 1 deletion frontik/doc.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def _is_valid_element(node: Any) -> bool:


class Doc:
__slots__ = ('root_node', 'data')
__slots__ = ('data', 'root_node')

def __init__(self, root_node: Any = 'doc') -> None:
if isinstance(root_node, str):
Expand Down
12 changes: 5 additions & 7 deletions frontik/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

import tornado.httputil
import tornado.web
from fastapi import Request # noqa
from fastapi import Request
from http_client.request_response import USER_AGENT_HEADER, FailFastError, RequestBuilder, RequestResult
from pydantic import BaseModel, ValidationError
from tornado.ioloop import IOLoop
Expand Down Expand Up @@ -171,12 +171,10 @@ def require_debug_access(self, login: Optional[str] = None, passwd: Optional[str
self._debug_access = debug_access

def set_default_headers(self):
self._headers = tornado.httputil.HTTPHeaders(
{
'Server': f'Frontik/{frontik_version}',
'X-Request-Id': self.request_id,
},
)
self._headers = tornado.httputil.HTTPHeaders({
'Server': f'Frontik/{frontik_version}',
'X-Request-Id': self.request_id,
})

def decode_argument(self, value: bytes, name: Optional[str] = None) -> str:
try:
Expand Down
5 changes: 1 addition & 4 deletions frontik/handler_return_values.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,4 @@ def write_json_response_from_pydantic(handler: PageHandler, value: BaseModel) ->


def get_default_returned_value_handlers() -> ReturnedValueHandlers:
return [
write_json_response_from_dict,
write_json_response_from_pydantic,
]
return [write_json_response_from_dict, write_json_response_from_pydantic]
2 changes: 1 addition & 1 deletion frontik/integrations/gc_metrics_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def initialize_handler(self, handler):


class GCStats:
__slots__ = ('start', 'duration', 'count', 'max_stw')
__slots__ = ('count', 'duration', 'max_stw', 'start')

def __init__(self) -> None:
self.start: float = 0
Expand Down
4 changes: 1 addition & 3 deletions frontik/integrations/sentry.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ def initialize_app(self, app: FrontikApplication) -> Optional[Future]:
max_breadcrumbs=options.sentry_max_breadcrumbs,
default_integrations=False,
auto_enabling_integrations=False,
integrations=[
TornadoIntegration(),
],
integrations=[TornadoIntegration()],
ignore_errors=[HTTPError, FailFastError],
)

Expand Down
4 changes: 1 addition & 3 deletions frontik/integrations/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,7 @@ def initialize_app(self, app: FrontikApplication) -> Optional[Future]:

self.aiohttp_instrumentor.instrument(request_hook=_client_request_hook, response_hook=_client_response_hook)

self.tornado_instrumentor.instrument(
server_request_hook=_server_request_hook,
)
self.tornado_instrumentor.instrument(server_request_hook=_server_request_hook)

return None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
LogLevelOverride = namedtuple('LogLevelOverride', ['logger_name', 'log_level'])


class LogLevelOverrideExtension(metaclass=abc.ABCMeta):
class LogLevelOverrideExtension(abc.ABC):
@abc.abstractmethod
async def load_log_level_overrides(self) -> list[LogLevelOverride]:
pass
Loading

0 comments on commit d6881cd

Please sign in to comment.