From ee5d2c4b985a4770464e47c0ec56b4f06a271b94 Mon Sep 17 00:00:00 2001 From: Leonid Vinogradov Date: Thu, 24 Oct 2024 12:42:21 +0300 Subject: [PATCH] HH-234831 small fixes --- examples/example_app/pages/example.py | 4 +- frontik/app.py | 32 +++++ frontik/balancing_client.py | 39 +++--- frontik/debug.py | 2 +- frontik/dependencies/__init__.py | 40 ++++-- frontik/frontik_response.py | 2 +- frontik/handler_asgi.py | 75 +++-------- frontik/request_integrations/__init__.py | 4 +- frontik/request_integrations/clients.py | 13 ++ frontik/request_integrations/request_id.py | 2 +- .../request_integrations/request_limiter.py | 2 +- frontik/request_integrations/server_timing.py | 13 -- frontik/routing.py | 32 ++++- frontik/testing.py | 4 + pyproject.toml | 4 +- tests/__init__.py | 29 +++++ tests/instances.py | 29 +---- .../pages/different_datacenter.py | 4 +- .../pages/no_available_backend.py | 4 +- .../balancer_app/pages/no_retry_error.py | 4 +- .../balancer_app/pages/no_retry_timeout.py | 4 +- .../balancer_app/pages/profile_with_retry.py | 4 +- .../pages/profile_without_retry.py | 4 +- .../balancer_app/pages/requests_count.py | 4 +- .../balancer_app/pages/retry_connect.py | 4 +- .../pages/retry_connect_timeout.py | 4 +- .../balancer_app/pages/retry_count_limit.py | 6 +- .../balancer_app/pages/retry_error.py | 4 +- .../pages/retry_non_idempotent_503.py | 4 +- .../balancer_app/pages/retry_on_timeout.py | 4 +- .../projects/balancer_app/pages/slow_start.py | 4 +- .../pages/speculative_no_retry.py | 4 +- .../balancer_app/pages/speculative_retry.py | 4 +- .../v1/kv/host/hostname/weight/weight.py | 18 +++ .../pages/v1/kv/upstream/upstream.py | 18 +++ tests/projects/test_app/__init__.py | 24 ---- .../test_app/pages/http_client/proxy_code.py | 4 +- tests/projects/test_app/pages/kafka.py | 23 ---- tests/projects/test_app/pages/log.py | 22 ---- tests/test_balancer.py | 3 +- tests/test_consul_registration.py | 68 ---------- tests/test_debug.py | 6 +- tests/test_default_urls.py | 38 ++---- tests/test_file_logging.py | 34 ----- tests/test_frontik_testing.py | 6 +- tests/test_http_client.py | 56 ++++++--- tests/test_http_client_keep_alive.py | 3 +- tests/test_kafka_integration.py | 67 +++++++++- tests/test_logging.py | 117 +++++++++++++++--- tests/test_no_debug_mode.py | 2 +- tests/test_routing.py | 26 +++- tests/test_service_discovery.py | 27 +++- tests/test_statsd_integration.py | 4 +- tests/test_telemetry.py | 4 +- 54 files changed, 529 insertions(+), 433 deletions(-) create mode 100644 frontik/request_integrations/clients.py delete mode 100644 frontik/request_integrations/server_timing.py create mode 100644 tests/projects/consul_mock_app/pages/v1/kv/host/hostname/weight/weight.py create mode 100644 tests/projects/consul_mock_app/pages/v1/kv/upstream/upstream.py delete mode 100644 tests/projects/test_app/pages/kafka.py delete mode 100644 tests/projects/test_app/pages/log.py delete mode 100644 tests/test_consul_registration.py delete mode 100644 tests/test_file_logging.py diff --git a/examples/example_app/pages/example.py b/examples/example_app/pages/example.py index 2a58ac595..de4f26f21 100644 --- a/examples/example_app/pages/example.py +++ b/examples/example_app/pages/example.py @@ -1,8 +1,8 @@ -from frontik.dependencies import HttpClientT +from frontik.dependencies import HttpClient from frontik.routing import router @router.get('/example') -async def example_page(http_client: HttpClientT) -> dict: +async def example_page(http_client: HttpClient) -> dict: result = await http_client.get_url('http://example.com', '/') return {'example': result.status_code} diff --git a/frontik/app.py b/frontik/app.py index e1e64c3c1..6927afbf3 100644 --- a/frontik/app.py +++ b/frontik/app.py @@ -17,15 +17,20 @@ from http_client import options as http_client_options from http_client.balancing import RequestBalancerBuilder from lxml import etree +from starlette.types import ASGIApp, Receive, Scope, Send from tornado import httputil from frontik import app_integrations from frontik.app_integrations.statsd import StatsDClient, StatsDClientStub, create_statsd_client +from frontik.balancing_client import create_http_client +from frontik.dependencies import clients from frontik.handler_asgi import serve_tornado_request from frontik.options import options from frontik.process import WorkerState from frontik.routing import ( import_all_pages, + method_not_allowed_router, + not_found_router, router, routers, ) @@ -202,6 +207,8 @@ def __init__(self, frontik_app: FrontikApplication) -> None: self.get_frontik_and_apps_versions = frontik_app.get_frontik_and_apps_versions self.statsd_client = frontik_app.statsd_client + self.add_middleware(FrontikMiddleware) + @router.get('/version') async def get_version(request: Request) -> Response: @@ -212,3 +219,28 @@ async def get_version(request: Request) -> Response: @router.get('/status') async def get_status(request: Request) -> ORJSONResponse: return ORJSONResponse(request.app.get_current_status()) + + +class FrontikMiddleware: + def __init__(self, app: ASGIApp) -> None: + self.app = app + + async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: + if scope['type'] != 'http': + await self.app(scope, receive, send) + return + + clients.get()['http_client'] = create_http_client(scope) + clients.get()['app_config'] = scope['app'].config + clients.get()['statsd_client'] = scope['app'].statsd_client + await self.app(scope, receive, send) + + +@not_found_router.get('__not_found') +async def default_404() -> Response: + return Response(status_code=404) + + +@method_not_allowed_router.get('__method_not_allowed') +async def default_405(request: Request) -> Response: + return Response(status_code=405, headers={'Allow': ', '.join(request.scope['allowed_methods'])}) diff --git a/frontik/balancing_client.py b/frontik/balancing_client.py index 237669d51..d8500927a 100644 --- a/frontik/balancing_client.py +++ b/frontik/balancing_client.py @@ -1,8 +1,9 @@ import time -from fastapi import Request from http_client import HttpClient, RequestBuilder from http_client.request_response import USER_AGENT_HEADER +from starlette.datastructures import Headers +from starlette.types import Scope from frontik import request_context from frontik.auth import DEBUG_AUTH_HEADER_NAME @@ -13,44 +14,40 @@ OUTER_TIMEOUT_MS_HEADER = 'X-Outer-Timeout-Ms' -def modify_http_client_request(request: Request, balanced_request: RequestBuilder) -> None: +def modify_http_client_request(scope: Scope, balanced_request: RequestBuilder) -> None: + headers = Headers(scope=scope) balanced_request.headers['x-request-id'] = request_context.get_request_id() balanced_request.headers[OUTER_TIMEOUT_MS_HEADER] = f'{balanced_request.request_timeout * 1000:.0f}' - outer_timeout = request.headers.get(OUTER_TIMEOUT_MS_HEADER.lower()) + outer_timeout = headers.get(OUTER_TIMEOUT_MS_HEADER.lower()) if outer_timeout: timeout_checker = get_timeout_checker( - request.headers.get(USER_AGENT_HEADER.lower()), + headers.get(USER_AGENT_HEADER.lower()), float(outer_timeout), - request['start_time'], + scope['start_time'], ) timeout_checker.check(balanced_request) - if request['debug_mode'].pass_debug: + if scope['debug_mode'].pass_debug: balanced_request.headers[DEBUG_HEADER_NAME] = 'true' # debug_timestamp is added to avoid caching of debug responses balanced_request.path = make_url(balanced_request.path, debug_timestamp=int(time.time())) for header_name in ('Authorization', DEBUG_AUTH_HEADER_NAME): - authorization = request.headers.get(header_name.lower()) + authorization = headers.get(header_name.lower()) if authorization is not None: balanced_request.headers[header_name] = authorization -def get_http_client(modify_request_hook=None): - async def _get_http_client(request: Request) -> HttpClient: - def hook(balanced_request): - if modify_request_hook is not None: - modify_request_hook(balanced_request) +def create_http_client(scope: Scope) -> HttpClient: + def hook(balanced_request): + if (local_hook := scope.get('_http_client_hook')) is not None: + local_hook(balanced_request) - modify_http_client_request(request, balanced_request) + modify_http_client_request(scope, balanced_request) - http_client = request.app.http_client_factory.get_http_client( - modify_http_request_hook=hook, - debug_enabled=request['debug_mode'].enabled, - ) - - return http_client - - return _get_http_client + return scope['app'].http_client_factory.get_http_client( + modify_http_request_hook=hook, + debug_enabled=scope['debug_mode'].enabled, + ) diff --git a/frontik/debug.py b/frontik/debug.py index be97796a0..47b74e516 100644 --- a/frontik/debug.py +++ b/frontik/debug.py @@ -398,7 +398,7 @@ def transform_chunk( else: wrap_headers = {'Content-Type': media_types.APPLICATION_XML, DEBUG_HEADER_NAME: 'true'} - chunk = b'Streamable response' if response.data_written else _data_to_chunk(response.body) + chunk = b'Streamable response' if response.headers_written else _data_to_chunk(response.body) start_time = time.time() handler_name = request_context.get_handler_name() diff --git a/frontik/dependencies/__init__.py b/frontik/dependencies/__init__.py index 9b49318ce..6d03f6c1e 100644 --- a/frontik/dependencies/__init__.py +++ b/frontik/dependencies/__init__.py @@ -1,20 +1,38 @@ +import contextvars from typing import Annotated, Any -from fastapi import Depends, Request -from http_client import HttpClient +import http_client +from fastapi import Depends -from frontik.app_integrations.statsd import StatsDClient -from frontik.balancing_client import get_http_client +from frontik.app_integrations import statsd +clients: contextvars.ContextVar = contextvars.ContextVar('clients') -async def get_app_config(request: Request) -> Any: - return request.app.config +def get_app_config() -> Any: + return clients.get().get('app_config') -async def get_statsd_client(request: Request) -> StatsDClient: - return request.app.statsd_client +async def _get_app_config() -> Any: + return get_app_config() -StatsDClientT = Annotated[StatsDClient, Depends(get_statsd_client)] -AppConfig = Annotated[Any, Depends(get_app_config)] -HttpClientT = Annotated[HttpClient, Depends(get_http_client())] + +def get_http_client() -> http_client.HttpClient: + return clients.get().get('http_client') + + +async def _get_http_client() -> http_client.HttpClient: + return get_http_client() + + +def get_statsd_client() -> statsd.StatsDClient: + return clients.get().get('statsd_client') + + +async def _get_statsd_client() -> statsd.StatsDClient: + return get_statsd_client() + + +StatsDClient = Annotated[statsd.StatsDClient, Depends(_get_statsd_client)] +AppConfig = Annotated[Any, Depends(_get_app_config)] +HttpClient = Annotated[http_client.HttpClient, Depends(_get_http_client)] diff --git a/frontik/frontik_response.py b/frontik/frontik_response.py index 89e9069ec..bea292d99 100644 --- a/frontik/frontik_response.py +++ b/frontik/frontik_response.py @@ -26,7 +26,7 @@ def __init__( self.status_code = status_code self.body = body self._reason = reason - self.data_written = False + self.headers_written = False @property def reason(self) -> str: diff --git a/frontik/handler_asgi.py b/frontik/handler_asgi.py index 12527b033..96b58b2cb 100644 --- a/frontik/handler_asgi.py +++ b/frontik/handler_asgi.py @@ -5,20 +5,19 @@ import logging from contextlib import ExitStack from functools import partial -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING -from fastapi.routing import APIRoute from tornado import httputil from tornado.httputil import HTTPServerRequest -from frontik import media_types, request_context +from frontik import request_context from frontik.debug import DebugMode, DebugTransform from frontik.frontik_response import FrontikResponse from frontik.http_status import CLIENT_CLOSED_REQUEST from frontik.loggers import CUSTOM_JSON_EXTRA, JSON_REQUESTS_LOGGER from frontik.request_integrations import get_integrations from frontik.request_integrations.integrations_dto import IntegrationDto -from frontik.routing import find_route, get_allowed_methods +from frontik.routing import find_route if TYPE_CHECKING: from frontik.app import FrontikApplication, FrontikAsgiApp @@ -51,7 +50,7 @@ async def serve_tornado_request( assert tornado_request.connection is not None tornado_request.connection.set_close_callback(None) # type: ignore - if not response.data_written: + if not response.headers_written: for integration in integrations.values(): integration.set_response(response) @@ -74,20 +73,18 @@ async def process_request( debug_mode = make_debug_mode(frontik_app, tornado_request) if debug_mode.auth_failed(): assert debug_mode.failed_auth_header is not None - return make_debug_auth_failed_response(debug_mode.failed_auth_header) + return FrontikResponse( + status_code=http.client.UNAUTHORIZED, headers={'WWW-Authenticate': debug_mode.failed_auth_header} + ) assert tornado_request.method is not None scope = find_route(tornado_request.path, tornado_request.method) - route: Optional[APIRoute] = scope['route'] + tornado_request._path_format = scope['route'].path_format # type: ignore - if route is None: - response = await make_not_found_response(frontik_app, tornado_request, debug_mode, scope) - else: - tornado_request._path_format = route.path_format # type: ignore - response = await execute_asgi_page(frontik_app, asgi_app, tornado_request, scope, debug_mode, integrations) + response = await execute_asgi_page(frontik_app, asgi_app, tornado_request, scope, debug_mode, integrations) - if debug_mode.enabled and not response.data_written: + if debug_mode.enabled and not response.headers_written: debug_transform = DebugTransform(frontik_app, debug_mode) response = debug_transform.transform_chunk(tornado_request, response) @@ -131,19 +128,19 @@ async def receive(): 'more_body': False, } - async def send(data): + async def send(message): assert tornado_request.connection is not None - if data['type'] == 'http.response.start': - response.status_code = int(data['status']) - for h in data['headers']: + if message['type'] == 'http.response.start': + response.status_code = int(message['status']) + for h in message['headers']: if len(h) == 2: response.headers.add(h[0].decode(CHARSET), h[1].decode(CHARSET)) - elif data['type'] == 'http.response.body': - chunk = data['body'] - if debug_mode.enabled or not data.get('more_body'): + elif message['type'] == 'http.response.body': + chunk = message['body'] + if debug_mode.enabled or not message.get('more_body'): response.body += chunk - elif not response.data_written: + elif not response.headers_written: for integration in integrations.values(): integration.set_response(response) @@ -152,39 +149,17 @@ async def send(data): headers=response.headers, chunk=chunk, ) - response.data_written = True + response.headers_written = True else: await tornado_request.connection.write(chunk) else: - raise RuntimeError(f'Unsupported response type "{data["type"]}" for asgi app') + raise RuntimeError(f'Unsupported response type "{message["type"]}" for asgi app') await asgi_app(scope, receive, send) return response -async def make_not_found_response( - frontik_app: FrontikApplication, - tornado_request: httputil.HTTPServerRequest, - debug_mode: DebugMode, - scope: dict, -) -> FrontikResponse: - allowed_methods = get_allowed_methods(scope) - - if allowed_methods and hasattr(frontik_app, 'method_not_allowed_handler'): - return await frontik_app.method_not_allowed_handler( - tornado_request, debug_mode, path_params={'allowed_methods': allowed_methods} - ) - - if allowed_methods: - return FrontikResponse(status_code=405, headers={'Allow': ', '.join(allowed_methods)}) - - if hasattr(frontik_app, 'not_found_handler'): - return await frontik_app.not_found_handler(tornado_request, debug_mode, path_params={}) - - return build_error_data(404, 'Not Found') - - def make_debug_mode(frontik_app: FrontikApplication, tornado_request: HTTPServerRequest) -> DebugMode: debug_mode = DebugMode(tornado_request) @@ -199,16 +174,6 @@ def make_debug_mode(frontik_app: FrontikApplication, tornado_request: HTTPServer return debug_mode -def make_debug_auth_failed_response(auth_header: str) -> FrontikResponse: - return FrontikResponse(status_code=http.client.UNAUTHORIZED, headers={'WWW-Authenticate': auth_header}) - - -def build_error_data(status_code: int = 500, message: Optional[str] = 'Internal Server Error') -> FrontikResponse: - headers = {'Content-Type': media_types.TEXT_HTML} - data = f'{status_code}: {message}{status_code}: {message}'.encode() - return FrontikResponse(status_code=status_code, headers=headers, body=data) - - def _on_connection_close(tornado_request, process_request_task, integrations): request_id = integrations.get('request_id', IntegrationDto()).get_value() with request_context.request_context(request_id): diff --git a/frontik/request_integrations/__init__.py b/frontik/request_integrations/__init__.py index 5ca5862bc..c6f10077a 100644 --- a/frontik/request_integrations/__init__.py +++ b/frontik/request_integrations/__init__.py @@ -1,13 +1,13 @@ +from frontik.request_integrations.clients import clients_ctx from frontik.request_integrations.request_id import request_id_ctx from frontik.request_integrations.request_limiter import request_limiter -from frontik.request_integrations.server_timing import server_timing from frontik.request_integrations.telemetry import otel_instrumentation_ctx _integrations: list = [ ('request_id', request_id_ctx), ('request_limiter', request_limiter), - ('server_timing', server_timing), ('telemetry', otel_instrumentation_ctx), + ('clients', clients_ctx), ] diff --git a/frontik/request_integrations/clients.py b/frontik/request_integrations/clients.py new file mode 100644 index 000000000..3d9a5c3b9 --- /dev/null +++ b/frontik/request_integrations/clients.py @@ -0,0 +1,13 @@ +from contextlib import contextmanager + +from frontik.dependencies import clients +from frontik.request_integrations.integrations_dto import IntegrationDto + + +@contextmanager +def clients_ctx(_frontik_app, _tornado_request): + token = clients.set({}) + try: + yield IntegrationDto() + finally: + clients.reset(token) diff --git a/frontik/request_integrations/request_id.py b/frontik/request_integrations/request_id.py index 6b6ae9309..1b6d25c2e 100644 --- a/frontik/request_integrations/request_id.py +++ b/frontik/request_integrations/request_id.py @@ -7,7 +7,7 @@ @contextmanager -def request_id_ctx(_, tornado_request): +def request_id_ctx(_frontik_app, tornado_request): request_id = tornado_request.headers.get('X-Request-Id') or generate_uniq_timestamp_request_id() if options.validate_request_id: check_request_id(request_id) diff --git a/frontik/request_integrations/request_limiter.py b/frontik/request_integrations/request_limiter.py index f13b4d326..20fb345ca 100644 --- a/frontik/request_integrations/request_limiter.py +++ b/frontik/request_integrations/request_limiter.py @@ -46,7 +46,7 @@ def release(self) -> None: @contextmanager -def request_limiter(frontik_app, _): +def request_limiter(frontik_app, _tornado_request): active_limit = ActiveHandlersLimit(frontik_app.statsd_client) dto = IntegrationDto(active_limit.acquired) try: diff --git a/frontik/request_integrations/server_timing.py b/frontik/request_integrations/server_timing.py deleted file mode 100644 index 0c42b51f0..000000000 --- a/frontik/request_integrations/server_timing.py +++ /dev/null @@ -1,13 +0,0 @@ -from contextlib import contextmanager - -from frontik.request_integrations.integrations_dto import IntegrationDto - - -@contextmanager -def server_timing(_, tornado_request): - dto = IntegrationDto() - yield dto - if dto.response is not None: - dto.response.headers.add( - 'Server-Timing', f'frontik;desc="frontik execution time";dur={tornado_request.request_time()!s}' - ) diff --git a/frontik/routing.py b/frontik/routing.py index 3fa2aad25..57035d62a 100644 --- a/frontik/routing.py +++ b/frontik/routing.py @@ -85,6 +85,8 @@ def import_all_pages(app_module: str) -> None: router = FastAPIRouter(include_in_app=False) regex_router = FrontikRegexRouter() +not_found_router = APIRouter() +method_not_allowed_router = APIRouter() def _find_fastapi_route(scope: dict) -> Optional[APIRoute]: @@ -95,6 +97,20 @@ def _find_fastapi_route(scope: dict) -> Optional[APIRoute]: scope['route'] = route return route + route_path = scope['path'] + if route_path != '/': + if route_path.endswith('/'): + scope['path'] = scope['path'].rstrip('/') + else: + scope['path'] = scope['path'] + '/' + + for route in _fastapi_routes: + match, child_scope = route.matches(scope) + if match == Match.FULL: + scope.update(child_scope) + scope['route'] = route + return route + return None @@ -117,18 +133,28 @@ def find_route(path: str, method: str) -> dict: 'route': route, 'path_params': path_params, } - if route is not None: - scope['endpoint'] = route.endpoint if route is None: route = _find_fastapi_route(scope) if route is None and method == 'HEAD': - return find_route(path, 'GET') + scope = find_route(path, 'GET') + route = scope['route'] if route is None: routing_logger.error('match for request url %s "%s" not found', method, path) + allowed_methods = get_allowed_methods(scope) + if len(allowed_methods) > 0: + scope['allowed_methods'] = allowed_methods + route = method_not_allowed_router.routes[-1] + else: + route = not_found_router.routes[-1] + + scope['route'] = route + + scope['method'] = next(iter(route.methods)) + scope['endpoint'] = route.endpoint return scope diff --git a/frontik/testing.py b/frontik/testing.py index 49c3f5a72..871e64f3e 100644 --- a/frontik/testing.py +++ b/frontik/testing.py @@ -1,5 +1,6 @@ import asyncio import json +import logging import re from collections.abc import Callable from typing import Any, Optional, Union @@ -20,6 +21,8 @@ from frontik.options import options from frontik.util import bind_socket, make_url, safe_template +log = logging.getLogger('server') + class FrontikTestBase: @pytest.fixture(scope='class') @@ -43,6 +46,7 @@ async def _run_app(self, frontik_app, _bind_socket): http_server = HTTPServer(frontik_app, xheaders=options.xheaders, max_body_size=options.max_body_size) http_server.add_sockets([_bind_socket]) + log.info('Successfully inited application %s', frontik_app.app_name) yield diff --git a/pyproject.toml b/pyproject.toml index 57eba64f8..3410834a5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -133,8 +133,8 @@ known-third-party = ['http_client'] 'FBT', # Don't care about booleans as positional arguments in tests, e.g. via @pytest.mark.parametrize() 'PLR2004', # Magic value used in comparison 'S311', 'S105', - 'PT009', 'PT027', - 'ANN201', + 'PT009', 'PT027', 'B011', + 'ANN201', 'PLC2701', 'RUF001', 'E722','ERA001','SIM108', # should be fixed diff --git a/tests/__init__.py b/tests/__init__.py index 1e6bd495b..06489b072 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,3 +1,32 @@ +import base64 import os +import random +import socket +from itertools import chain + +from tornado.escape import to_unicode, utf8 FRONTIK_ROOT = os.path.dirname(os.path.dirname(__file__)) + + +def create_basic_auth_header(credentials: str) -> str: + return f'Basic {to_unicode(base64.b64encode(utf8(credentials)))}' + + +def find_free_port(from_port: int = 9000, to_port: int = 10000) -> int: + random_start = random.randint(from_port, to_port) + + for port in chain(range(random_start, to_port), range(from_port, random_start)): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + s.bind(('', port)) + break + except Exception: + pass + finally: + s.close() + else: + msg = f'No empty port in range {from_port}..{to_port} for frontik test instance' + raise AssertionError(msg) + + return port diff --git a/tests/instances.py b/tests/instances.py index 344aff1b0..a64c8fa52 100644 --- a/tests/instances.py +++ b/tests/instances.py @@ -1,13 +1,9 @@ from __future__ import annotations -import base64 import json -import random -import socket import subprocess import sys import time -from itertools import chain from typing import TYPE_CHECKING, Optional import requests @@ -15,7 +11,7 @@ from tornado.escape import to_unicode, utf8 from frontik import options -from tests import FRONTIK_ROOT +from tests import FRONTIK_ROOT, find_free_port if TYPE_CHECKING: from builtins import function @@ -35,29 +31,6 @@ def _run_command(command: str, port: int) -> subprocess.Popen: return subprocess.Popen(executable.split()) -def find_free_port(from_port: int = 9000, to_port: int = 10000) -> int: - random_start = random.randint(from_port, to_port) - - for port in chain(range(random_start, to_port), range(from_port, random_start)): - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - try: - s.bind(('', port)) - break - except Exception: - pass - finally: - s.close() - else: - msg = f'No empty port in range {from_port}..{to_port} for frontik test instance' - raise AssertionError(msg) - - return port - - -def create_basic_auth_header(credentials: str) -> str: - return f'Basic {to_unicode(base64.b64encode(utf8(credentials)))}' - - class FrontikTestInstance: def __init__(self, command: str, *, allow_to_create_log_files: bool = False) -> None: if not allow_to_create_log_files and options.LOG_DIR_OPTION_NAME in command: diff --git a/tests/projects/balancer_app/pages/different_datacenter.py b/tests/projects/balancer_app/pages/different_datacenter.py index c0263dd46..5daa2e9d4 100644 --- a/tests/projects/balancer_app/pages/different_datacenter.py +++ b/tests/projects/balancer_app/pages/different_datacenter.py @@ -3,13 +3,13 @@ from http_client.request_response import NoAvailableServerException from tornado.web import HTTPError -from frontik.dependencies import HttpClientT +from frontik.dependencies import HttpClient from frontik.routing import router from tests.projects.balancer_app import get_server @router.get('/different_datacenter') -async def get_page(request: Request, http_client: HttpClientT) -> str: +async def get_page(request: Request, http_client: HttpClient) -> str: free_server = get_server(request, 'free') free_server.datacenter = 'dc1' normal_server = get_server(request, 'normal') diff --git a/tests/projects/balancer_app/pages/no_available_backend.py b/tests/projects/balancer_app/pages/no_available_backend.py index 49bfd79d1..746a4b3b9 100644 --- a/tests/projects/balancer_app/pages/no_available_backend.py +++ b/tests/projects/balancer_app/pages/no_available_backend.py @@ -2,13 +2,13 @@ from http_client.balancing import Upstream from http_client.request_response import NoAvailableServerException -from frontik.dependencies import HttpClientT +from frontik.dependencies import HttpClient from frontik.routing import router from tests.projects.balancer_app.pages import check_all_requests_done @router.get('/no_available_backend') -async def get_page(request: Request, http_client: HttpClientT) -> str: +async def get_page(request: Request, http_client: HttpClient) -> str: upstreams = request.app.service_discovery.get_upstreams_unsafe() no_available_backend = 'no_available_backend' upstreams[no_available_backend] = Upstream(no_available_backend, {}, []) diff --git a/tests/projects/balancer_app/pages/no_retry_error.py b/tests/projects/balancer_app/pages/no_retry_error.py index aa5e1aa04..1b3f6aaf6 100644 --- a/tests/projects/balancer_app/pages/no_retry_error.py +++ b/tests/projects/balancer_app/pages/no_retry_error.py @@ -1,14 +1,14 @@ from fastapi import Request from http_client.balancing import Upstream -from frontik.dependencies import HttpClientT +from frontik.dependencies import HttpClient from frontik.routing import router from tests.projects.balancer_app import get_server from tests.projects.balancer_app.pages import check_all_requests_done @router.get('/no_retry_error') -async def get_page(request: Request, http_client: HttpClientT) -> str: +async def get_page(request: Request, http_client: HttpClient) -> str: upstreams = request.app.service_discovery.get_upstreams_unsafe() no_retry_error = 'no_retry_error' upstreams[no_retry_error] = Upstream(no_retry_error, {}, [get_server(request, 'broken')]) diff --git a/tests/projects/balancer_app/pages/no_retry_timeout.py b/tests/projects/balancer_app/pages/no_retry_timeout.py index dc1761f24..85eb767d7 100644 --- a/tests/projects/balancer_app/pages/no_retry_timeout.py +++ b/tests/projects/balancer_app/pages/no_retry_timeout.py @@ -3,14 +3,14 @@ from fastapi import Request from http_client.balancing import Upstream -from frontik.dependencies import HttpClientT +from frontik.dependencies import HttpClient from frontik.routing import router from tests.projects.balancer_app import get_server from tests.projects.balancer_app.pages import check_all_requests_done @router.get('/no_retry_timeout') -async def get_page(request: Request, http_client: HttpClientT) -> str: +async def get_page(request: Request, http_client: HttpClient) -> str: upstreams = request.app.service_discovery.get_upstreams_unsafe() no_retry_timeout = 'no_retry_timeout' upstreams[no_retry_timeout] = Upstream(no_retry_timeout, {}, [get_server(request, 'broken')]) diff --git a/tests/projects/balancer_app/pages/profile_with_retry.py b/tests/projects/balancer_app/pages/profile_with_retry.py index 86bc48cd6..3eee1b13d 100644 --- a/tests/projects/balancer_app/pages/profile_with_retry.py +++ b/tests/projects/balancer_app/pages/profile_with_retry.py @@ -2,13 +2,13 @@ from http_client.balancing import Upstream, UpstreamConfig from tornado.web import HTTPError -from frontik.dependencies import HttpClientT +from frontik.dependencies import HttpClient from frontik.routing import router from tests.projects.balancer_app import get_server @router.get('/profile_with_retry') -async def get_page(request: Request, http_client: HttpClientT) -> str: +async def get_page(request: Request, http_client: HttpClient) -> str: servers = [get_server(request, 'broken'), get_server(request, 'normal')] profile_with_retry = 'profile_with_retry' upstream_config = { diff --git a/tests/projects/balancer_app/pages/profile_without_retry.py b/tests/projects/balancer_app/pages/profile_without_retry.py index e18b032cf..1248269ee 100644 --- a/tests/projects/balancer_app/pages/profile_without_retry.py +++ b/tests/projects/balancer_app/pages/profile_without_retry.py @@ -1,13 +1,13 @@ from fastapi import Request from http_client.balancing import Upstream, UpstreamConfig -from frontik.dependencies import HttpClientT +from frontik.dependencies import HttpClient from frontik.routing import router from tests.projects.balancer_app import get_server @router.get('/profile_without_retry') -async def get_page(request: Request, http_client: HttpClientT) -> str: +async def get_page(request: Request, http_client: HttpClient) -> str: servers = [get_server(request, 'broken'), get_server(request, 'normal')] profile_without_retry = 'profile_without_retry' upstream_config = { diff --git a/tests/projects/balancer_app/pages/requests_count.py b/tests/projects/balancer_app/pages/requests_count.py index 39575a043..0c9bc3753 100644 --- a/tests/projects/balancer_app/pages/requests_count.py +++ b/tests/projects/balancer_app/pages/requests_count.py @@ -3,14 +3,14 @@ from fastapi import Request from http_client.balancing import Upstream -from frontik.dependencies import HttpClientT +from frontik.dependencies import HttpClient from frontik.routing import router from tests.projects.balancer_app import get_server from tests.projects.balancer_app.pages import check_all_requests_done, check_all_servers_occupied @router.get('/requests_count') -async def get_page(request: Request, http_client: HttpClientT) -> str: +async def get_page(request: Request, http_client: HttpClient) -> str: upstreams = request.app.service_discovery.get_upstreams_unsafe() requests_count_async = 'requests_count_async' upstreams[requests_count_async] = Upstream(requests_count_async, {}, [get_server(request, 'normal')]) diff --git a/tests/projects/balancer_app/pages/retry_connect.py b/tests/projects/balancer_app/pages/retry_connect.py index 34840ade2..d3e4d2caf 100644 --- a/tests/projects/balancer_app/pages/retry_connect.py +++ b/tests/projects/balancer_app/pages/retry_connect.py @@ -2,7 +2,7 @@ from http_client.balancing import Upstream from tornado.web import HTTPError -from frontik.dependencies import HttpClientT +from frontik.dependencies import HttpClient from frontik.routing import router from frontik.util import gather_list from tests.projects.balancer_app import get_server @@ -10,7 +10,7 @@ @router.get('/retry_connect') -async def get_page(request: Request, http_client: HttpClientT) -> str: +async def get_page(request: Request, http_client: HttpClient) -> str: upstreams = request.app.service_discovery.get_upstreams_unsafe() retry_connect = 'retry_connect' upstreams[retry_connect] = Upstream( diff --git a/tests/projects/balancer_app/pages/retry_connect_timeout.py b/tests/projects/balancer_app/pages/retry_connect_timeout.py index 34669fc7d..061435591 100644 --- a/tests/projects/balancer_app/pages/retry_connect_timeout.py +++ b/tests/projects/balancer_app/pages/retry_connect_timeout.py @@ -2,7 +2,7 @@ from http_client.balancing import Upstream from tornado.web import HTTPError -from frontik.dependencies import HttpClientT +from frontik.dependencies import HttpClient from frontik.routing import router from frontik.util import gather_list from tests.projects.balancer_app import get_server @@ -10,7 +10,7 @@ @router.get('/retry_connect_timeout') -async def get_page(request: Request, http_client: HttpClientT) -> str: +async def get_page(request: Request, http_client: HttpClient) -> str: retry_connect_timeout = 'retry_connect_timeout' upstreams = request.app.service_discovery.get_upstreams_unsafe() upstreams[retry_connect_timeout] = Upstream(retry_connect_timeout, {}, [get_server(request, 'normal')]) diff --git a/tests/projects/balancer_app/pages/retry_count_limit.py b/tests/projects/balancer_app/pages/retry_count_limit.py index 622d65e31..ba0c05a83 100644 --- a/tests/projects/balancer_app/pages/retry_count_limit.py +++ b/tests/projects/balancer_app/pages/retry_count_limit.py @@ -1,14 +1,14 @@ from fastapi import Request from http_client.balancing import Upstream, UpstreamConfig -from frontik.dependencies import HttpClientT +from frontik.dependencies import HttpClient from frontik.routing import router -from tests.instances import find_free_port +from tests import find_free_port from tests.projects.balancer_app import get_server_with_port @router.get('/retry_count_limit') -async def get_page(request: Request, http_client: HttpClientT) -> str: +async def get_page(request: Request, http_client: HttpClient) -> str: retry_count_limit = 'retry_count_limit' upstream = Upstream( retry_count_limit, diff --git a/tests/projects/balancer_app/pages/retry_error.py b/tests/projects/balancer_app/pages/retry_error.py index 0f13cb2f0..8790c1fe7 100644 --- a/tests/projects/balancer_app/pages/retry_error.py +++ b/tests/projects/balancer_app/pages/retry_error.py @@ -2,7 +2,7 @@ from http_client.balancing import Upstream from tornado.web import HTTPError -from frontik.dependencies import HttpClientT +from frontik.dependencies import HttpClient from frontik.routing import router from frontik.util import gather_list from tests.projects.balancer_app import get_server @@ -10,7 +10,7 @@ @router.get('/retry_error') -async def get_page(request: Request, http_client: HttpClientT) -> str: +async def get_page(request: Request, http_client: HttpClient) -> str: upstreams = request.app.service_discovery.get_upstreams_unsafe() retry_error = 'retry_error' upstreams[retry_error] = Upstream(retry_error, {}, [get_server(request, 'broken'), get_server(request, 'normal')]) diff --git a/tests/projects/balancer_app/pages/retry_non_idempotent_503.py b/tests/projects/balancer_app/pages/retry_non_idempotent_503.py index 297dcfe75..b6a7b2de0 100644 --- a/tests/projects/balancer_app/pages/retry_non_idempotent_503.py +++ b/tests/projects/balancer_app/pages/retry_non_idempotent_503.py @@ -2,7 +2,7 @@ from http_client.balancing import Upstream, UpstreamConfig from tornado.web import HTTPError -from frontik.dependencies import HttpClientT +from frontik.dependencies import HttpClient from frontik.routing import router from frontik.util import gather_list from tests.projects.balancer_app import get_server @@ -10,7 +10,7 @@ @router.get('/retry_non_idempotent_503') -async def get_page(request: Request, http_client: HttpClientT) -> str: +async def get_page(request: Request, http_client: HttpClient) -> str: upstream_config = {Upstream.DEFAULT_PROFILE: UpstreamConfig(retry_policy={503: {'idempotent': 'true'}})} upstreams = request.app.service_discovery.get_upstreams_unsafe() retry_non_idempotent = 'retry_non_idempotent_503' diff --git a/tests/projects/balancer_app/pages/retry_on_timeout.py b/tests/projects/balancer_app/pages/retry_on_timeout.py index 73914bcbe..4910495e0 100644 --- a/tests/projects/balancer_app/pages/retry_on_timeout.py +++ b/tests/projects/balancer_app/pages/retry_on_timeout.py @@ -2,14 +2,14 @@ from http_client.balancing import Upstream from tornado.web import HTTPError -from frontik.dependencies import HttpClientT +from frontik.dependencies import HttpClient from frontik.routing import router from tests.projects.balancer_app import get_server from tests.projects.balancer_app.pages import check_all_requests_done @router.get('/retry_on_timeout') -async def get_page(request: Request, http_client: HttpClientT) -> str: +async def get_page(request: Request, http_client: HttpClient) -> str: upstreams = request.app.service_discovery.get_upstreams_unsafe() retry_on_timeout = 'retry_on_timeout' upstreams[retry_on_timeout] = Upstream( diff --git a/tests/projects/balancer_app/pages/slow_start.py b/tests/projects/balancer_app/pages/slow_start.py index b230bcd9c..a7c6426b7 100644 --- a/tests/projects/balancer_app/pages/slow_start.py +++ b/tests/projects/balancer_app/pages/slow_start.py @@ -3,14 +3,14 @@ from fastapi import Request from http_client.balancing import Server, Upstream, UpstreamConfig -from frontik.dependencies import HttpClientT +from frontik.dependencies import HttpClient from frontik.routing import router from tests.projects.balancer_app import get_server from tests.projects.balancer_app.pages import check_all_requests_done, check_all_servers_were_occupied @router.get('/slow_start') -async def get_page(request: Request, http_client: HttpClientT) -> str: +async def get_page(request: Request, http_client: HttpClient) -> str: server = get_server(request, 'normal') server.weight = 5 diff --git a/tests/projects/balancer_app/pages/speculative_no_retry.py b/tests/projects/balancer_app/pages/speculative_no_retry.py index ecd75bf36..46852fc47 100644 --- a/tests/projects/balancer_app/pages/speculative_no_retry.py +++ b/tests/projects/balancer_app/pages/speculative_no_retry.py @@ -1,13 +1,13 @@ from fastapi import Request from http_client.balancing import Upstream -from frontik.dependencies import HttpClientT +from frontik.dependencies import HttpClient from frontik.routing import router from tests.projects.balancer_app import get_server @router.get('/speculative_no_retry') -async def get_page(request: Request, http_client: HttpClientT) -> str: +async def get_page(request: Request, http_client: HttpClient) -> str: upstreams = request.app.service_discovery.get_upstreams_unsafe() speculative_no_retry = 'speculative_no_retry' upstreams[speculative_no_retry] = Upstream( diff --git a/tests/projects/balancer_app/pages/speculative_retry.py b/tests/projects/balancer_app/pages/speculative_retry.py index d32439263..f55900e14 100644 --- a/tests/projects/balancer_app/pages/speculative_retry.py +++ b/tests/projects/balancer_app/pages/speculative_retry.py @@ -2,13 +2,13 @@ from http_client.balancing import Upstream from tornado.web import HTTPError -from frontik.dependencies import HttpClientT +from frontik.dependencies import HttpClient from frontik.routing import router from tests.projects.balancer_app import get_server @router.get('/speculative_retry') -async def get_page(request: Request, http_client: HttpClientT) -> str: +async def get_page(request: Request, http_client: HttpClient) -> str: upstreams = request.app.service_discovery.get_upstreams_unsafe() speculative_retry = 'speculative_retry' upstreams[speculative_retry] = Upstream( diff --git a/tests/projects/consul_mock_app/pages/v1/kv/host/hostname/weight/weight.py b/tests/projects/consul_mock_app/pages/v1/kv/host/hostname/weight/weight.py new file mode 100644 index 000000000..979a207f5 --- /dev/null +++ b/tests/projects/consul_mock_app/pages/v1/kv/host/hostname/weight/weight.py @@ -0,0 +1,18 @@ +from fastapi.responses import JSONResponse + +from frontik.routing import regex_router + + +@regex_router.get(r'^/v1/kv/host/([a-zA-Z\-_0-9\.:\-]+)/weight') +async def get_page(): + return JSONResponse([{'Value': 'NTU=', 'CreateIndex': 1, 'ModifyIndex': 1}], headers={'X-Consul-Index': '1'}) + + +@regex_router.put(r'^/v1/kv/host/([a-zA-Z\-_0-9\.:\-]+)/weight') +async def put_page(): + pass + + +@regex_router.post(r'^/v1/kv/host/([a-zA-Z\-_0-9\.:\-]+)/weight') +async def post_page(): + pass diff --git a/tests/projects/consul_mock_app/pages/v1/kv/upstream/upstream.py b/tests/projects/consul_mock_app/pages/v1/kv/upstream/upstream.py new file mode 100644 index 000000000..0da5af204 --- /dev/null +++ b/tests/projects/consul_mock_app/pages/v1/kv/upstream/upstream.py @@ -0,0 +1,18 @@ +from fastapi.responses import JSONResponse + +from frontik.routing import router + + +@router.get('/v1/kv/upstream/') +async def get_page(): + return JSONResponse([{'Value': None, 'CreateIndex': 1, 'ModifyIndex': 1}], headers={'X-Consul-Index': '1'}) + + +@router.put('/v1/kv/upstream/') +async def put_page(): + pass + + +@router.post('/v1/kv/upstream/') +async def post_page(): + pass diff --git a/tests/projects/test_app/__init__.py b/tests/projects/test_app/__init__.py index f7b3c6810..bbbf063a1 100644 --- a/tests/projects/test_app/__init__.py +++ b/tests/projects/test_app/__init__.py @@ -1,4 +1,3 @@ -import json import logging from frontik.app import FrontikApplication @@ -11,31 +10,8 @@ def __init__(self): bootstrap_logger('custom_logger', logging.DEBUG, False) super().__init__() - async def init(self): - await super().init() - self.http_client_factory.request_engine_builder.kafka_producer = TestKafkaProducer() - def application_config(self): return config def application_version_xml(self): return config.version - - -class TestKafkaProducer: - def __init__(self) -> None: - self.data: list[dict[str, dict]] = [] - self.request_id = None - - async def send(self, topic, value=None): - json_data = json.loads(value) - - if json_data['requestId'] == self.request_id: - self.data.append({topic: json_data}) - - def enable_for_request_id(self, request_id): - self.request_id = request_id - - def disable_and_get_data(self): - self.request_id = None - return self.data diff --git a/tests/projects/test_app/pages/http_client/proxy_code.py b/tests/projects/test_app/pages/http_client/proxy_code.py index bffb630fb..6b1ef96c4 100644 --- a/tests/projects/test_app/pages/http_client/proxy_code.py +++ b/tests/projects/test_app/pages/http_client/proxy_code.py @@ -1,10 +1,10 @@ from fastapi import Response -from frontik.dependencies import HttpClientT +from frontik.dependencies import HttpClient from frontik.routing import router @router.get('/http_client/proxy_code') -async def get_page(port: str, http_client: HttpClientT): +async def get_page(port: str, http_client: HttpClient): result = await http_client.get_url('http://127.0.0.1:' + port, '') return Response(str(result.status_code)) diff --git a/tests/projects/test_app/pages/kafka.py b/tests/projects/test_app/pages/kafka.py deleted file mode 100644 index 79733e20f..000000000 --- a/tests/projects/test_app/pages/kafka.py +++ /dev/null @@ -1,23 +0,0 @@ -import asyncio - -from fastapi import HTTPException, Request -from fastapi.responses import JSONResponse - -from frontik.dependencies import HttpClientT -from frontik.routing import router - - -@router.get('/kafka') -async def get_page(http_client: HttpClientT, request: Request) -> JSONResponse: - rid = request.scope['tornado_request'].request_id - http_client.request_engine_builder.kafka_producer.enable_for_request_id(rid) - - await http_client.post_url(request.headers.get('host'), request.url.path) - await asyncio.sleep(0.1) - - return JSONResponse(*http_client.request_engine_builder.kafka_producer.disable_and_get_data()) - - -@router.post('/kafka') -async def post_page(): - raise HTTPException(500) diff --git a/tests/projects/test_app/pages/log.py b/tests/projects/test_app/pages/log.py deleted file mode 100644 index 16a463830..000000000 --- a/tests/projects/test_app/pages/log.py +++ /dev/null @@ -1,22 +0,0 @@ -import logging - -from frontik.routing import router - -handler_logger = logging.getLogger('handler') -custom_logger = logging.getLogger('custom_logger') - - -@router.get('/log') -async def get_page(): - handler_logger.debug('debug') - handler_logger.info('info') - - try: - raise Exception('test') - except Exception: - handler_logger.exception('exception') - handler_logger.error('error', stack_info=True) - - handler_logger.critical('critical') - - custom_logger.fatal('fatal') diff --git a/tests/test_balancer.py b/tests/test_balancer.py index d22bab758..17c607c3c 100644 --- a/tests/test_balancer.py +++ b/tests/test_balancer.py @@ -1,4 +1,5 @@ -from tests.instances import find_free_port, frontik_balancer_app, frontik_broken_balancer_app +from tests import find_free_port +from tests.instances import frontik_balancer_app, frontik_broken_balancer_app class TestHttpError: diff --git a/tests/test_consul_registration.py b/tests/test_consul_registration.py deleted file mode 100644 index a560586db..000000000 --- a/tests/test_consul_registration.py +++ /dev/null @@ -1,68 +0,0 @@ -import time - -from tests import FRONTIK_ROOT -from tests.instances import FrontikTestInstance, common_frontik_start_options - -FRONTIK_RUN = f'{FRONTIK_ROOT}/frontik-test' -TEST_PROJECTS = f'{FRONTIK_ROOT}/tests/projects' - - -class TestConsulRegistration: - def setup_method(self): - self.consul_mock = FrontikTestInstance( - f'{FRONTIK_RUN} --app_class=tests.projects.consul_mock_app.TestApplication {common_frontik_start_options} ' - f' --config={TEST_PROJECTS}/frontik_consul_mock.cfg', - ) - self.consul_mock.start() - self.frontik_single_worker_app = FrontikTestInstance( - f'{FRONTIK_RUN} --app_class=tests.projects.no_debug_app.TestApplication {common_frontik_start_options} ' - f' --config={TEST_PROJECTS}/frontik_no_debug.cfg --consul_port={self.consul_mock.port} ' - f' --consul_enabled=True' - f' --fail_start_on_empty_upstream=False', - ) - self.frontik_multiple_worker_app = FrontikTestInstance( - f'{FRONTIK_RUN} --app_class=tests.projects.no_debug_app.TestApplication {common_frontik_start_options} ' - f' --config={TEST_PROJECTS}/frontik_no_debug.cfg --consul_port={self.consul_mock.port} --workers=3' - f' --consul_enabled=True' - f' --fail_start_on_empty_upstream=False', - ) - self.frontik_multiple_worker_app_timeout_barrier = FrontikTestInstance( - f'{FRONTIK_RUN} --app_class=tests.projects.no_debug_app.TestApplication {common_frontik_start_options} ' - f' --config={TEST_PROJECTS}/frontik_no_debug.cfg --consul_port={self.consul_mock.port} --workers=3' - f' --init_workers_timeout_sec=0' - f' --consul_enabled=True' - f' --fail_start_on_empty_upstream=False', - ) - - def teardown_method(self): - self.frontik_single_worker_app.stop() - self.frontik_multiple_worker_app.stop() - self.frontik_multiple_worker_app_timeout_barrier.stop() - self.consul_mock.stop() - - def test_single_worker_registration(self): - self.frontik_single_worker_app.start() - self.frontik_single_worker_app.stop() - registration_call_count = self.consul_mock.get_page_json('/call_registration_stat')['put_page'] - assert registration_call_count == 1, 'Application should register only once' - - def test_multiple_worker_registration(self): - self.frontik_multiple_worker_app.start() - self.frontik_multiple_worker_app.stop() - registration_call_count = self.consul_mock.get_page_json('/call_registration_stat')['put_page'] - assert registration_call_count == 1, 'Application should register only once' - - def test_multiple_worker_not_registration(self): - self.frontik_multiple_worker_app_timeout_barrier.start_with_check(lambda _: None) - - for _i in range(50): - time.sleep(0.1) - if not self.frontik_multiple_worker_app_timeout_barrier.is_alive(): - break - else: - raise Exception("application didn't stop") - - registration_call_count = self.consul_mock.get_page_json('/call_registration_stat') - assert registration_call_count == {}, 'Application should not register' - - self.frontik_multiple_worker_app_timeout_barrier.stop() diff --git a/tests/test_debug.py b/tests/test_debug.py index 9b228f942..4a43d81bd 100644 --- a/tests/test_debug.py +++ b/tests/test_debug.py @@ -11,17 +11,17 @@ from tornado.escape import to_unicode from frontik import media_types -from frontik.dependencies import HttpClientT +from frontik.dependencies import HttpClient from frontik.options import options from frontik.routing import router from frontik.testing import FrontikTestBase -from tests.instances import create_basic_auth_header +from tests import create_basic_auth_header logger = logging.getLogger('handler') @router.get('/debug') -async def get_debug_page(http_client: HttpClientT, request: Request, no_recursion: str = 'false') -> None: +async def get_debug_page(http_client: HttpClient, request: Request, no_recursion: str = 'false') -> None: logger.debug('debug: starting debug page') def _exception_trace() -> None: diff --git a/tests/test_default_urls.py b/tests/test_default_urls.py index 59f8572c3..277d03a59 100644 --- a/tests/test_default_urls.py +++ b/tests/test_default_urls.py @@ -1,34 +1,20 @@ import json -import unittest -from tests.instances import frontik_re_app, frontik_test_app +from frontik.testing import FrontikTestBase -class TestDefaultUrls(unittest.TestCase): - def test_version(self) -> None: - xml = frontik_test_app.get_page_xml('/version') - test_app_version = xml.xpath('application[@name="tests.projects.test_app"]/app-version/@number')[0] +class TestDefaultUrls(FrontikTestBase): + async def test_version(self) -> None: + xml = await self.fetch_xml('/version') + test_app_version = xml.xpath('application[@name="frontik.app"]/version')[0] - self.assertEqual(xml.tag, 'versions') - self.assertEqual('last version', test_app_version) + assert xml.tag == 'versions' + assert test_app_version.text == 'unknown' - def test_unknown_version(self) -> None: - xml = frontik_re_app.get_page_xml('/version') - re_app_version = xml.findtext('application[@name="tests.projects.re_app"]/version') + async def test_status(self) -> None: + response = await self.fetch('/status') - self.assertEqual('unknown', re_app_version) + assert response.headers['Content-Type'].startswith('application/json') - def test_no_version(self) -> None: - xml = frontik_re_app.get_page_xml('/version') - re_app_version = xml.findtext('application[@name="tests.projects.re_app"]/version') - - self.assertEqual(xml.tag, 'versions') - self.assertEqual(re_app_version, 'unknown') - - def test_status(self) -> None: - response = frontik_test_app.get_page('/status') - - self.assertTrue(response.headers['Content-Type'].startswith('application/json')) - - json_response = json.loads(response.content) - self.assertIn('uptime', json_response) + json_response = json.loads(response.raw_body) + assert 'uptime' in json_response diff --git a/tests/test_file_logging.py b/tests/test_file_logging.py deleted file mode 100644 index 9d594d334..000000000 --- a/tests/test_file_logging.py +++ /dev/null @@ -1,34 +0,0 @@ -import os -import shutil -import tempfile -import unittest - -from tests import FRONTIK_ROOT -from tests.instances import FrontikTestInstance, common_frontik_start_options - -FRONTIK_RUN = f'{FRONTIK_ROOT}/frontik-test' -TEST_PROJECTS = f'{FRONTIK_ROOT}/tests/projects' - - -class TestLogToFile(unittest.TestCase): - def setUp(self) -> None: - self.tmp_log_dir = tempfile.mkdtemp() - self.service = FrontikTestInstance( - f'{FRONTIK_RUN} --app_class=tests.projects.consul_mock_app.TestApplication {common_frontik_start_options} ' - f' --config={TEST_PROJECTS}/frontik_consul_mock.cfg --log_dir={self.tmp_log_dir} --log_level=debug', - allow_to_create_log_files=True, - ) - - def tearDown(self) -> None: - self.service.stop() - shutil.rmtree(self.tmp_log_dir, ignore_errors=True) - - def test_log_dir_is_not_empty(self) -> None: - self.service.start() - self.service.stop() - dir_contents = os.listdir(self.tmp_log_dir) - if not dir_contents: - self.fail('No log files') - empty_files = [f for f in dir_contents if os.stat(os.path.join(self.tmp_log_dir, f)).st_size == 0] - if empty_files: - self.fail(f'Empty log files: {empty_files}') diff --git a/tests/test_frontik_testing.py b/tests/test_frontik_testing.py index 089394d7c..1c0c04246 100644 --- a/tests/test_frontik_testing.py +++ b/tests/test_frontik_testing.py @@ -2,7 +2,7 @@ from fastapi import Request, Response -from frontik.dependencies import AppConfig, HttpClientT +from frontik.dependencies import AppConfig, HttpClient from frontik.routing import router from frontik.testing import FrontikTestBase from frontik.util import gather_list @@ -10,7 +10,7 @@ @router.get('/sum_values') -async def sum_values_page(config: AppConfig, http_client: HttpClientT) -> int: +async def sum_values_page(config: AppConfig, http_client: HttpClient) -> int: result = 0 service_host = config.serviceHost @@ -29,7 +29,7 @@ async def check_config_page(config: AppConfig) -> Response: @router.post('/json_stub') -async def post_page(request: Request, http_client: HttpClientT) -> Any: +async def post_page(request: Request, http_client: HttpClient) -> Any: result = await http_client.delete_url('http://backend', request.url.path, fail_fast=True) return result.data diff --git a/tests/test_http_client.py b/tests/test_http_client.py index f3a0a8413..686c09c1e 100644 --- a/tests/test_http_client.py +++ b/tests/test_http_client.py @@ -3,25 +3,22 @@ import re from typing import Any -from fastapi import Depends, Request, Response +import pytest +from fastapi import Request, Response from fastapi.responses import JSONResponse from tornado.escape import to_unicode from frontik import media_types -from frontik.balancing_client import get_http_client -from frontik.dependencies import HttpClientT +from frontik.app import FrontikApplication +from frontik.dependencies import HttpClient from frontik.loggers import JSON_FORMATTER from frontik.routing import router from frontik.testing import FrontikTestBase from frontik.util import any_to_bytes, any_to_unicode -def modify_http_client_request(balanced_request): - balanced_request.headers['X-Foo'] = 'Bar' - - @router.get('/http_client/custom_headers') -async def custom_headers_get_page(request: Request, http_client=Depends(get_http_client(modify_http_client_request))): +async def custom_headers_get_page(request: Request, http_client: HttpClient): result = await http_client.post_url(request.headers.get('host'), request.url.path) return result.data @@ -32,7 +29,7 @@ async def custom_headers_post_page(request: Request): @router.get('/http_client/fibonacci') -async def fibonacci_page(n: int, request: Request, http_client: HttpClientT): +async def fibonacci_page(n: int, request: Request, http_client: HttpClient): if n < 2: return Response('1', headers={'Content-Type': media_types.TEXT_PLAIN}) @@ -48,7 +45,7 @@ async def fibonacci_page(n: int, request: Request, http_client: HttpClientT): @router.get('/http_client/raise_error') -async def unicode_page(request: Request, http_client: HttpClientT): +async def unicode_page(request: Request, http_client: HttpClient): try: await http_client.post_url(request.headers.get('host'), '/a-вот') except UnicodeEncodeError: @@ -75,7 +72,7 @@ async def unicode_page(request: Request, http_client: HttpClientT): @router.get('/http_client/post_url') -async def post_url_get_page(request: Request, http_client: HttpClientT): +async def post_url_get_page(request: Request, http_client: HttpClient): result = await http_client.post_url(request.headers.get('host'), request.url.path, data=FIELDS, files=FILES) if not result.failed: return result.data @@ -117,7 +114,7 @@ async def post_url_post_page(request: Request): @router.get('/http_client/parse_response') -async def parse_response_get_page(request: Request, http_client: HttpClientT): +async def parse_response_get_page(request: Request, http_client: HttpClient): res = {} result = await http_client.post_url(request.headers.get('host'), request.url.path, parse_on_error=True) res.update(result.data) @@ -147,7 +144,7 @@ async def parse_response_delete_page(): @router.get('/http_client/parse_error') -async def parse_error_get_page(request: Request, http_client: HttpClientT): +async def parse_error_get_page(request: Request, http_client: HttpClient): el_result = await http_client.post_url(request.headers.get('host'), request.url.path + '?mode=xml') element = el_result.data if element is not None: @@ -169,7 +166,7 @@ async def parse_error_post_page(mode: str): @router.get('/http_client/post_simple') -async def post_simple_get_page(request: Request, http_client: HttpClientT): +async def post_simple_get_page(request: Request, http_client: HttpClient): result = await http_client.post_url(request.headers.get('host'), request.url.path) return Response(result.data) @@ -180,12 +177,39 @@ async def post_simple_post_page(): @router.get('/http_client/long_page_request') -async def long_request_page(http_client: HttpClientT, request: Request): +async def long_request_page(http_client: HttpClient, request: Request): result = await http_client.post_url(request.headers.get('host'), request.url.path, request_timeout=0.5) return {'error_received': result.failed} +def modify_http_client_request(balanced_request): + balanced_request.headers['X-Foo'] = 'Bar' + + +class HttpclientHookMiddleware: + def __init__(self, app) -> None: + self.app = app + + async def __call__(self, scope, receive, send) -> None: + if scope['type'] != 'http': + await self.app(scope, receive, send) + return + + scope['_http_client_hook'] = modify_http_client_request + await self.app(scope, receive, send) + + +class ApplicationWithHttpClientHook(FrontikApplication): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.asgi_app.add_middleware(HttpclientHookMiddleware) + + class TestHttpClient(FrontikTestBase): + @pytest.fixture(scope='class') + def frontik_app(self) -> FrontikApplication: + return ApplicationWithHttpClientHook() + async def test_post_url_simple(self): response = await self.fetch('/http_client/post_simple') assert response.raw_body == b'post_url success' @@ -234,7 +258,7 @@ async def long_page() -> None: @router.get('/long_request') -async def long_request(http_client: HttpClientT, request: Request) -> None: +async def long_request(http_client: HttpClient, request: Request) -> None: await http_client.get_url(request.headers.get('host'), '/long_page') diff --git a/tests/test_http_client_keep_alive.py b/tests/test_http_client_keep_alive.py index 4f90996c5..e0ffcd172 100644 --- a/tests/test_http_client_keep_alive.py +++ b/tests/test_http_client_keep_alive.py @@ -2,7 +2,8 @@ from contextlib import closing from typing import Any, Optional -from tests.instances import find_free_port, frontik_test_app +from tests import find_free_port +from tests.instances import frontik_test_app class TestHTTPClientKeepAlive: diff --git a/tests/test_kafka_integration.py b/tests/test_kafka_integration.py index 38f66c0ac..1c20d40fe 100644 --- a/tests/test_kafka_integration.py +++ b/tests/test_kafka_integration.py @@ -1,11 +1,68 @@ -from tests.instances import frontik_test_app +import asyncio +import json +import pytest +from fastapi import HTTPException, Request +from fastapi.responses import JSONResponse -class TestKafkaIntegration: - def test_kafka(self): - response_json = frontik_test_app.get_page_json('/kafka') +from frontik.app import FrontikApplication +from frontik.dependencies import HttpClient +from frontik.options import options +from frontik.routing import router +from frontik.testing import FrontikTestBase - assert response_json['metrics_requests']['app'] == 'test_app' + +class TestKafkaProducer: + def __init__(self) -> None: + self.data: list[dict[str, dict]] = [] + self.request_id = None + + async def send(self, topic, value=None): + json_data = json.loads(value) + + if json_data['requestId'] == self.request_id: + self.data.append({topic: json_data}) + + def enable_for_request_id(self, request_id): + self.request_id = request_id + + def disable_and_get_data(self): + self.request_id = None + return self.data + + +class KafkaApplication(FrontikApplication): + async def init(self): + await super().init() + self.http_client_factory.request_engine_builder.kafka_producer = TestKafkaProducer() + + +@router.get('/kafka') +async def get_page(http_client: HttpClient, request: Request) -> JSONResponse: + rid = request.scope['tornado_request'].request_id + http_client.request_engine_builder.kafka_producer.enable_for_request_id(rid) + + await http_client.post_url(request.headers.get('host'), request.url.path) + await asyncio.sleep(0.1) + + return JSONResponse(*http_client.request_engine_builder.kafka_producer.disable_and_get_data()) + + +@router.post('/kafka') +async def post_page(): + raise HTTPException(500) + + +class TestKafkaIntegration(FrontikTestBase): + @pytest.fixture(scope='class') + def frontik_app(self) -> FrontikApplication: + options.service_name = 'test_kafka_integration' + return KafkaApplication() + + async def test_kafka(self): + response_json = await self.fetch_json('/kafka') + + assert response_json['metrics_requests']['app'] == 'test_kafka_integration' assert response_json['metrics_requests']['dc'] == 'externalRequest' assert 'hostname' in response_json['metrics_requests'] assert 'requestId' in response_json['metrics_requests'] diff --git a/tests/test_logging.py b/tests/test_logging.py index 934231961..f88b7e972 100644 --- a/tests/test_logging.py +++ b/tests/test_logging.py @@ -1,20 +1,62 @@ import json +import logging +import os import re +import shutil import socket +import tempfile from collections import defaultdict +from logging.handlers import SysLogHandler +from typing import Any import pytest from tornado.escape import to_unicode +from frontik.loggers import _configure_file, _configure_syslog, bootstrap_logger +from frontik.options import options +from frontik.routing import router +from frontik.testing import FrontikTestBase from tests import FRONTIK_ROOT -from tests.instances import FrontikTestInstance FRONTIK_RUN = f'{FRONTIK_ROOT}/frontik-test' TEST_PROJECTS = f'{FRONTIK_ROOT}/tests/projects' +handler_logger = logging.getLogger('handler') +custom_logger = logging.getLogger('custom_logger') -class TestSyslog: - test_app: FrontikTestInstance +def add_syslog_handler_for_logger(logger_name: str) -> None: + handler = _configure_syslog(logger_name)[0] + handler.setLevel(logging.DEBUG) + logging.getLogger(logger_name).addHandler(handler) + + +def add_syslog_handler_for_root_logger() -> None: + handler = _configure_syslog('service')[0] + handler.setLevel(logging.DEBUG) + logging.root.addHandler(handler) + + +def remove_syslog_handler_from_logger(logger_name: str) -> None: + logger = logging.getLogger(logger_name) + logger.handlers = [handler for handler in handler_logger.handlers if not isinstance(handler, SysLogHandler)] + + +@router.get('/log') +async def get_page(): + handler_logger.debug('debug') + handler_logger.info('info') + + try: + raise Exception('test') + except Exception: + handler_logger.exception('exception') + handler_logger.error('error') # stack_info = True should be added + + handler_logger.critical('critical') + custom_logger.fatal('fatal') + + +class TestSyslog(FrontikTestBase): s: socket.socket @classmethod @@ -25,18 +67,31 @@ def setup_class(cls): port = cls.s.getsockname()[1] - cls.test_app = FrontikTestInstance( - f'{FRONTIK_RUN} --app_class=tests.projects.test_app.TestApplication ' - f'--config={TEST_PROJECTS}/frontik_debug.cfg --syslog=true --consul_enabled=False --syslog_host=127.0.0.1 ' - f'--syslog_tag=test --log_level=debug --syslog_port={port}', - ) + options.syslog = True + options.syslog_port = port + options.syslog_tag = 'test' + options.service_name = 'app' + + add_syslog_handler_for_logger('server') + add_syslog_handler_for_root_logger() + add_syslog_handler_for_logger('requests') + add_syslog_handler_for_logger('handler') + bootstrap_logger('custom_logger', logger_level=logging.DEBUG, use_json_formatter=False) @classmethod def teardown_class(cls): - cls.test_app.stop() + options.syslog = False - def test_send_to_syslog(self): - self.test_app.get_page('log') + remove_syslog_handler_from_logger('server') + remove_syslog_handler_from_logger('service') + remove_syslog_handler_from_logger('requests') + remove_syslog_handler_from_logger('handler') + remove_syslog_handler_from_logger('custom_logger') + + cls.s.close() + + async def test_send_to_syslog(self): + await self.fetch('/log') logs = [] @@ -53,12 +108,10 @@ def test_send_to_syslog(self): syslog_line_regexp = r'<(?P\d+)>(?P[^:]+): (?P.*)\x00' parsed_logs = defaultdict(list) for log in logs: - assert re.match(syslog_line_regexp, log) - match = re.match(syslog_line_regexp, log) - if match is not None: - priority, tag, message = match.groups() - parsed_logs[tag].append({'priority': priority, 'message': message}) + assert match is not None + priority, tag, message = match.groups() + parsed_logs[tag].append({'priority': priority, 'message': message}) expected_service_logs = [ {'priority': '14', 'message': {'lvl': 'INFO', 'logger': r'handler', 'msg': 'requested url: /log'}}, @@ -79,7 +132,6 @@ def test_send_to_syslog(self): 'lvl': 'ERROR', 'logger': r'handler', 'msg': 'error', - 'exception': r".*handler_logger\.error\('error', stack_info=True\)", }, }, {'priority': '10', 'message': {'lvl': 'CRITICAL', 'logger': r'handler', 'msg': 'critical'}}, @@ -93,7 +145,7 @@ def test_send_to_syslog(self): 'message': { 'lvl': 'INFO', 'logger': r'server', - 'msg': r'starting application tests\.projects\.test_app', + 'msg': r'Successfully inited application app', }, }, ] @@ -113,7 +165,7 @@ def test_send_to_syslog(self): { 'priority': '10', 'message': r'\[\d+\] [\d-]+ [\d:,]+ CRITICAL ' - r'custom_logger\.tests\.projects\.test_app\.pages\.log\.get_page: fatal', + r'custom_logger\.tests\.test_logging\.get_page\.tests\.test_logging\.get_page: fatal', # seems weird }, ] @@ -144,3 +196,30 @@ def assert_text_logs_match(expected_logs: list, parsed_logs: list) -> None: break else: pytest.fail(f'Log message not found: {expected_log}') + + +class TestLogToFile(FrontikTestBase): + tmp_log_dir: str + handler: Any + + @classmethod + def setup_class(cls): + cls.tmp_log_dir = tempfile.mkdtemp() + options.log_dir = cls.tmp_log_dir + cls.handler = _configure_file('server')[0] + logging.getLogger('server').addHandler(cls.handler) + + @classmethod + def teardown_class(cls): + shutil.rmtree(cls.tmp_log_dir, ignore_errors=True) + options.log_dir = None + logging.getLogger('server').removeHandler(cls.handler) + + def test_log_dir_is_not_empty(self) -> None: + dir_contents = os.listdir(self.tmp_log_dir) + if not dir_contents: + assert False, 'No log files' + + empty_files = [f for f in dir_contents if os.stat(os.path.join(self.tmp_log_dir, f)).st_size == 0] + if empty_files: + assert False, f'Empty log files: {empty_files}' diff --git a/tests/test_no_debug_mode.py b/tests/test_no_debug_mode.py index 864f4fe73..88a2429f0 100644 --- a/tests/test_no_debug_mode.py +++ b/tests/test_no_debug_mode.py @@ -9,7 +9,7 @@ from frontik.options import options from frontik.routing import router from frontik.testing import FrontikTestBase -from tests.instances import create_basic_auth_header +from tests import create_basic_auth_header @router.get('/simple') diff --git a/tests/test_routing.py b/tests/test_routing.py index bbac37cc6..cf039ccf0 100644 --- a/tests/test_routing.py +++ b/tests/test_routing.py @@ -5,17 +5,22 @@ @router.get('/simple') -async def get_page1() -> str: +async def simple_page() -> str: + return 'ok' + + +@router.get('/simple_slash/') +async def simple_slash_page() -> str: return 'ok' @regex_router.get('/id/(?P[^/]+)') -async def get_page2(request: Request) -> str: +async def id_page(request: Request) -> str: return str(request.path_params.get('id')) @router.get('/nested/nested/nested') -async def get_page3() -> str: +async def nested_page() -> str: return 'OK' @@ -36,6 +41,15 @@ async def test_not_found(self): response = await self.fetch('/not_exists') assert response.status_code == 404 - async def test_filemapping_404_on_dot_in_url(self): - response = await self.fetch('/nested/nested.nested') - assert response.status_code == 404 + async def test_ending_slash(self): + response = await self.fetch('/simple') + assert response.status_code == 200 + + response = await self.fetch('/simple/') + assert response.status_code == 200 + + response = await self.fetch('/simple_slash/') + assert response.status_code == 200 + + response = await self.fetch('/simple_slash') + assert response.status_code == 200 diff --git a/tests/test_service_discovery.py b/tests/test_service_discovery.py index 6a0c6c311..565befa37 100644 --- a/tests/test_service_discovery.py +++ b/tests/test_service_discovery.py @@ -1,3 +1,5 @@ +import time + from tests import FRONTIK_ROOT from tests.instances import FrontikTestInstance, common_frontik_start_options @@ -6,7 +8,7 @@ class TestServiceDiscovery: - def setup_method(self) -> None: + def setup_method(self): self.consul_mock = FrontikTestInstance( f'{FRONTIK_RUN} --app_class=tests.projects.consul_mock_app.TestApplication {common_frontik_start_options} ' f' --config={TEST_PROJECTS}/frontik_consul_mock.cfg', @@ -24,10 +26,18 @@ def setup_method(self) -> None: f' --consul_enabled=True' f' --fail_start_on_empty_upstream=False', ) + self.frontik_multiple_worker_app_timeout_barrier = FrontikTestInstance( + f'{FRONTIK_RUN} --app_class=tests.projects.no_debug_app.TestApplication {common_frontik_start_options} ' + f' --config={TEST_PROJECTS}/frontik_no_debug.cfg --consul_port={self.consul_mock.port} --workers=3' + f' --init_workers_timeout_sec=0' + f' --consul_enabled=True' + f' --fail_start_on_empty_upstream=False', + ) def teardown_method(self) -> None: self.frontik_single_worker_app.stop() self.frontik_multiple_worker_app.stop() + self.frontik_multiple_worker_app_timeout_barrier.stop() self.consul_mock.stop() def test_single_worker_de_registration(self): @@ -45,3 +55,18 @@ def test_multiple_worker_de_registration(self): assert registration_call_count == 1, 'Application should register only once' deregistration_call_count = self.consul_mock.get_page_json('/call_deregistration_stat')['put_page'] assert deregistration_call_count == 1, 'Application should deregister only once' + + def test_multiple_worker_not_registration(self): + self.frontik_multiple_worker_app_timeout_barrier.start_with_check(lambda _: None) + + for _i in range(50): + time.sleep(0.1) + if not self.frontik_multiple_worker_app_timeout_barrier.is_alive(): + break + else: + raise Exception("application didn't stop") + + registration_call_count = self.consul_mock.get_page_json('/call_registration_stat') + assert registration_call_count == {}, 'Application should not register' + + self.frontik_multiple_worker_app_timeout_barrier.stop() diff --git a/tests/test_statsd_integration.py b/tests/test_statsd_integration.py index 327b15124..2e261bd04 100644 --- a/tests/test_statsd_integration.py +++ b/tests/test_statsd_integration.py @@ -4,14 +4,14 @@ from tornado.escape import to_unicode from frontik.app import FrontikApplication -from frontik.dependencies import StatsDClientT +from frontik.dependencies import StatsDClient from frontik.options import options from frontik.routing import router from frontik.testing import FrontikTestBase @router.get('/statsd') -async def get_page(statsd_client: StatsDClientT) -> None: +async def get_page(statsd_client: StatsDClient) -> None: statsd_client.count('count_metric', 10, tag1='tag1', tag2='tag2') statsd_client.gauge('gauge_metric', 100, tag='tag3') statsd_client.time('time_metric', 1000, tag='tag4') diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py index b24e48683..4dec64251 100644 --- a/tests/test_telemetry.py +++ b/tests/test_telemetry.py @@ -10,7 +10,7 @@ from frontik import request_context from frontik.app import FrontikApplication from frontik.app_integrations.telemetry import FrontikIdGenerator, get_netloc, make_otel_provider -from frontik.dependencies import HttpClientT +from frontik.dependencies import HttpClient from frontik.options import options from frontik.routing import router from frontik.testing import FrontikTestBase @@ -67,7 +67,7 @@ def test_get_netloc(self) -> None: @router.get('/page_a') -async def get_page_a(request: Request, http_client: HttpClientT) -> None: +async def get_page_a(request: Request, http_client: HttpClient) -> None: await http_client.get_url(request.headers.get('host'), '/page_b')