Skip to content

Commit

Permalink
HH-230352 make request_integrations
Browse files Browse the repository at this point in the history
  • Loading branch information
712u3 committed Sep 16, 2024
1 parent 9487dbb commit 044c403
Show file tree
Hide file tree
Showing 26 changed files with 309 additions and 944 deletions.
9 changes: 5 additions & 4 deletions frontik/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

import frontik.producers.json_producer
import frontik.producers.xml_producer
from frontik import integrations, media_types
from frontik import app_integrations, media_types, request_integrations
from frontik.app_integrations.statsd import StatsDClient, StatsDClientStub, create_statsd_client
from frontik.debug import get_frontik_and_apps_versions
from frontik.handler import PageHandler, get_current_handler
from frontik.handler_asgi import serve_tornado_request
from frontik.handler_return_values import ReturnedValueHandlers, get_default_returned_value_handlers
from frontik.integrations.statsd import StatsDClient, StatsDClientStub, create_statsd_client
from frontik.options import options
from frontik.process import WorkerState
from frontik.routing import (
Expand Down Expand Up @@ -73,7 +73,7 @@ def __init__(self, app_module_name: Optional[str] = None) -> None:
self.xml = frontik.producers.xml_producer.XMLProducerFactory(self)
self.json = frontik.producers.json_producer.JsonProducerFactory(self)

self.available_integrations: list[integrations.Integration] = []
self.available_integrations: list[app_integrations.Integration] = []
self.http_client_factory: HttpClientFactory

self.statsd_client: Union[StatsDClient, StatsDClientStub] = create_statsd_client(options, self)
Expand Down Expand Up @@ -106,8 +106,9 @@ def make_service_discovery(self) -> ServiceDiscovery:
return WorkerServiceDiscovery(self.worker_state.initial_shared_data)

async def install_integrations(self) -> None:
self.available_integrations, integration_futures = integrations.load_integrations(self)
self.available_integrations, integration_futures = app_integrations.load_integrations(self)
await asyncio.gather(*[future for future in integration_futures if future])
request_integrations.load_integrations()

self.service_discovery = self.make_service_discovery()
self.http_client_factory = self.make_http_client_factory()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def initialize_handler(self, handler: PageHandler) -> None:
def load_integrations(app: FrontikApplication) -> tuple[list[Integration], list[Future]]:
for _importer, module_name, _is_package in pkgutil.iter_modules(__path__):
try:
importlib.import_module(f'frontik.integrations.{module_name}')
importlib.import_module(f'frontik.app_integrations.{module_name}')
except Exception as e:
integrations_logger.info('%s integration is not available: %s', module_name, e)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from tornado.ioloop import PeriodicCallback

from frontik.integrations import Integration, integrations_logger
from frontik.app_integrations import Integration, integrations_logger
from frontik.options import options

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from aiokafka import AIOKafkaProducer
from tornado import gen

from frontik.integrations import Integration
from frontik.app_integrations import Integration
from frontik.options import options

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
from sentry_sdk.integrations.stdlib import StdlibIntegration
from tornado.web import HTTPError

from frontik.integrations import Integration, integrations_logger
from frontik.integrations.sentry_tornado_integration import TornadoIntegration
from frontik.app_integrations import Integration, integrations_logger
from frontik.app_integrations.sentry_tornado_integration import TornadoIntegration
from frontik.options import options

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import sentry_sdk

from frontik.app import FrontikApplication
from frontik.integrations import Integration, integrations_logger
from frontik.app_integrations import Integration, integrations_logger
from frontik.loggers import bootstrap_logger
from frontik.options import options

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from functools import partial
from typing import TYPE_CHECKING, Optional, Union

from frontik.integrations import Integration, integrations_logger
from frontik.app_integrations import Integration, integrations_logger

if TYPE_CHECKING:
from asyncio import Future
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,18 @@
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation import aiohttp_client
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor # type: ignore
from opentelemetry.propagate import set_global_textmap
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import IdGenerator, TracerProvider
from opentelemetry.sdk.trace import Span as SpanImpl
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.sampling import ParentBased, TraceIdRatioBased
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from opentelemetry.util.http import ExcludeList

from frontik import request_context
from frontik.integrations import Integration, integrations_logger, tornado
from frontik.app_integrations import Integration, integrations_logger
from frontik.options import options

if TYPE_CHECKING:
Expand All @@ -31,18 +30,12 @@
import aiohttp
from http_client.request_response import RequestBuilder
from opentelemetry.trace import Span
from opentelemetry.util import types

from frontik.app import FrontikApplication

log = logging.getLogger('telemetry')
# change log-level, because mainly detach context produce exception on Tornado 5. Will be deleted, when up Tornado to 6
logging.getLogger('opentelemetry.context').setLevel(logging.CRITICAL)
set_global_textmap(TraceContextTextMapPropagator())

tornado._excluded_urls = ExcludeList([*list(tornado._excluded_urls._excluded_urls), '/status'])
excluded_span_attributes = ['tornado.handler']


def make_otel_provider(app: FrontikApplication) -> TracerProvider:
resource = Resource(
Expand All @@ -61,60 +54,60 @@ def make_otel_provider(app: FrontikApplication) -> TracerProvider:
return provider


class TelemetryIntegration(Integration):
def __init__(self):
self.aiohttp_instrumentor = aiohttp_client.AioHttpClientInstrumentor()
self.tornado_instrumentor = tornado.TornadoInstrumentor()
TelemetryIntegration.patch_span_impl()
class FrontikServerInstrumentor(BaseInstrumentor):
patched_handlers: list = []
original_handler_new = None

@staticmethod
def patch_span_impl() -> None:
set_attribute = SpanImpl.set_attribute
def _instrument(self, **kwargs):
tracer_provider = kwargs.get('tracer_provider')
self.tracer = trace.get_tracer(
'frontik',
'0.0.1',
tracer_provider,
schema_url='https://opentelemetry.io/schemas/1.11.0',
)

def patched_set_attribute(self: SpanImpl, key: str, value: types.AttributeValue) -> None:
if key not in excluded_span_attributes:
return set_attribute(self, key, value)
def _uninstrument(self, **kwargs):
pass

def instrumentation_dependencies(self):
return []

SpanImpl.set_attribute = patched_set_attribute # type: ignore

def initialize_app(self, app: FrontikApplication) -> Optional[Future]:
class TelemetryIntegration(Integration):
def __init__(self):
self.aiohttp_instrumentor = aiohttp_client.AioHttpClientInstrumentor()
self.frontik_instrumentor = FrontikServerInstrumentor()

def initialize_app(self, frontik_app: FrontikApplication) -> Optional[Future]:
if not options.opentelemetry_enabled:
return None

integrations_logger.info('start telemetry')
provider = make_otel_provider(app)
provider = make_otel_provider(frontik_app)

otlp_exporter = OTLPSpanExporter(endpoint=options.opentelemetry_collector_url, insecure=True)
provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
trace.set_tracer_provider(provider)

self.aiohttp_instrumentor.instrument(request_hook=_client_request_hook, response_hook=_client_response_hook)
self.tornado_instrumentor.instrument(server_request_hook=_server_request_hook)
self.frontik_instrumentor.instrument()
frontik_app.otel_tracer = self.frontik_instrumentor.tracer # type: ignore
return None

def deinitialize_app(self, app: FrontikApplication) -> Optional[Future]:
if not options.opentelemetry_enabled:
return None

integrations_logger.info('stop telemetry')
self.frontik_instrumentor.uninstrument()
self.aiohttp_instrumentor.uninstrument()
self.tornado_instrumentor.uninstrument()
return None

def initialize_handler(self, handler):
pass


def _server_request_hook(span, handler):
if (handler_name := request_context.get_handler_name()) is not None:
method_path, method_name = handler_name.rsplit('.', 1)
span.update_name(f'{method_path}.{method_name}')
span.set_attribute(SpanAttributes.CODE_FUNCTION, method_name)
span.set_attribute(SpanAttributes.CODE_NAMESPACE, method_path)

span.set_attribute(SpanAttributes.HTTP_TARGET, handler.request.uri)


def _client_request_hook(span: Span, params: aiohttp.TraceRequestStartParams) -> None:
if not span or not span.is_recording():
return
Expand Down
18 changes: 12 additions & 6 deletions frontik/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from asyncio.futures import Future
from functools import partial, wraps
from http import HTTPStatus
from inspect import iscoroutinefunction
from typing import TYPE_CHECKING, Any, Optional, Type, TypeVar, Union, overload

import tornado.web
Expand Down Expand Up @@ -47,8 +48,8 @@
from tornado.httputil import HTTPHeaders, HTTPServerRequest

from frontik.app import FrontikApplication
from frontik.app_integrations.statsd import StatsDClient, StatsDClientStub
from frontik.handler_return_values import ReturnedValue, ReturnedValueHandlers
from frontik.integrations.statsd import StatsDClient, StatsDClientStub


class FinishWithPostprocessors(Exception):
Expand Down Expand Up @@ -585,11 +586,15 @@ async def _handle_exception(self, e: BaseException) -> None:
)

try:
# this block should ends with finish() call
error_method_name = f'{self.request.method.lower()}_page_fail_fast' # type: ignore
method = getattr(self, error_method_name, None)
if callable(method):
if iscoroutinefunction(method):
await method(e.failed_result)
self.finish()
elif callable(method):
method(e.failed_result)
self.finish()
else:
await self.__return_error(e.failed_result.status_code, error_info={'is_fail_fast': True})
return
Expand All @@ -600,8 +605,8 @@ async def _handle_exception(self, e: BaseException) -> None:
await self._send_error(exception=e)

async def _send_error(self, status_code: int = 500, exception: Any = None, **kwargs: Any) -> None:
"""`send_error` is adapted to support `write_error` that can call
`finish` asynchronously.
"""
`send_error` shouldn't raise any exception
"""
self.stages_logger.commit_stage('page')
if exception is not None:
Expand All @@ -627,7 +632,7 @@ async def _send_error(self, status_code: int = 500, exception: Any = None, **kwa

async def _write_error(self, status_code: int = 500, **kwargs: Any) -> None:
"""
`write_error` can call `finish` asynchronously if HTTPErrorWithPostprocessors is raised.
`write_error` must ends with finish() call
"""
exception = kwargs['exc_info'][1] if 'exc_info' in kwargs else None

Expand Down Expand Up @@ -1039,10 +1044,11 @@ def _execute_http_client_method(


def log_request(tornado_request: httputil.HTTPServerRequest, status_code: int) -> None:
# frontik.request_context can't be used in case when client has closed connection
request_time = int(1000.0 * tornado_request.request_time())
extra = {
'ip': tornado_request.remote_ip,
'rid': request_context.get_request_id(),
'rid': tornado_request.request_id, # type: ignore
'status': status_code,
'time': request_time,
'method': tornado_request.method,
Expand Down
60 changes: 26 additions & 34 deletions frontik/handler_asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import asyncio
import http.client
import logging
from contextlib import ExitStack
from functools import partial
from typing import TYPE_CHECKING, Optional

Expand All @@ -13,11 +14,10 @@
from frontik.debug import DebugMode, DebugTransform
from frontik.frontik_response import FrontikResponse
from frontik.handler import PageHandler, log_request
from frontik.handler_active_limit import request_limiter
from frontik.http_status import CLIENT_CLOSED_REQUEST
from frontik.options import options
from frontik.request_integrations import get_integrations
from frontik.request_integrations.integrations_dto import IntegrationDto
from frontik.routing import find_route, get_allowed_methods, method_not_allowed_router, not_found_router
from frontik.util import check_request_id, generate_uniq_timestamp_request_id

if TYPE_CHECKING:
from frontik.app import FrontikApplication, FrontikAsgiApp
Expand All @@ -31,59 +31,46 @@ async def serve_tornado_request(
asgi_app: FrontikAsgiApp,
tornado_request: httputil.HTTPServerRequest,
) -> None:
request_id = tornado_request.headers.get('X-Request-Id') or generate_uniq_timestamp_request_id()
if options.validate_request_id:
check_request_id(request_id)
tornado_request.request_id = request_id # type: ignore

with request_context.request_context(request_id):
with ExitStack() as stack:
integrations: dict[str, IntegrationDto] = {
ctx_name: stack.enter_context(ctx(frontik_app, tornado_request)) for ctx_name, ctx in get_integrations()
}
log.info('requested url: %s', tornado_request.uri)

process_request_task = asyncio.create_task(process_request(frontik_app, asgi_app, tornado_request))

process_request_task = asyncio.create_task(
process_request(frontik_app, asgi_app, tornado_request, integrations)
)
assert tornado_request.connection is not None
tornado_request.connection.set_close_callback( # type: ignore
partial(_on_connection_close, tornado_request, process_request_task)
partial(_on_connection_close, tornado_request, process_request_task, integrations)
)

response = await process_request_task
log_request(tornado_request, response.status_code)

assert tornado_request.connection is not None
tornado_request.connection.set_close_callback(None) # type: ignore

if getattr(tornado_request, 'canceled', False):
return None

if not response.data_written:
for integration in integrations.values():
integration.set_response(response)

start_line = httputil.ResponseStartLine('', response.status_code, response.reason)
await tornado_request.connection.write_headers(start_line, response.headers, response.body)

log_request(tornado_request, response.status_code)
tornado_request.connection.finish()


async def process_request(
frontik_app: FrontikApplication,
asgi_app: FrontikAsgiApp,
tornado_request: httputil.HTTPServerRequest,
tornado_request: HTTPServerRequest,
integrations: dict[str, IntegrationDto],
) -> FrontikResponse:
with request_limiter(frontik_app.statsd_client) as accepted:
if not accepted:
response = make_not_accepted_response()
else:
response = await execute_page(frontik_app, asgi_app, tornado_request)
response.headers.add(
'Server-Timing', f'frontik;desc="frontik execution time";dur={tornado_request.request_time()!s}'
)

if integrations.get('request_limiter', IntegrationDto()).get_value() is False:
response = make_not_accepted_response()
return response


async def execute_page(
frontik_app: FrontikApplication,
asgi_app: FrontikAsgiApp,
tornado_request: HTTPServerRequest,
) -> FrontikResponse:
debug_mode = make_debug_mode(frontik_app, tornado_request)
if debug_mode.auth_failed():
assert debug_mode.failed_auth_header is not None
Expand Down Expand Up @@ -244,6 +231,11 @@ async def execute_tornado_page(
return FrontikResponse(status_code=status_code, headers=headers, body=body)


def _on_connection_close(tornado_request, process_request_task):
process_request_task.cancel()
def _on_connection_close(tornado_request, process_request_task, integrations):
response = FrontikResponse(CLIENT_CLOSED_REQUEST)
for integration in integrations.values():
integration.set_response(response)

log_request(tornado_request, CLIENT_CLOSED_REQUEST)
setattr(tornado_request, 'canceled', False)
process_request_task.cancel() # serve_tornado_request will be interrupted with CanceledError
Loading

0 comments on commit 044c403

Please sign in to comment.