Skip to content

Commit

Permalink
HH-216278 small fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
712u3 committed May 18, 2024
1 parent 0e5fe62 commit 906baeb
Show file tree
Hide file tree
Showing 30 changed files with 144 additions and 575 deletions.
23 changes: 14 additions & 9 deletions frontik/app.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
import asyncio
import importlib
import logging
import multiprocessing
import os
import time
from collections.abc import Callable
from ctypes import c_bool, c_int
from threading import Lock
from typing import Any, Optional, Union
from typing import Optional, Union

from aiokafka import AIOKafkaProducer
from fastapi import FastAPI
from fastapi import FastAPI, HTTPException
from http_client import AIOHttpClientWrapper, HttpClientFactory
from http_client import options as http_client_options
from http_client.balancing import RequestBalancerBuilder, Upstream
from lxml import etree
from tornado.web import HTTPError

import frontik.producers.json_producer
import frontik.producers.xml_producer
Expand Down Expand Up @@ -49,14 +50,18 @@ class FrontikApplication:
class DefaultConfig:
pass

def __init__(self, app_root: str, **settings: Any) -> None:
def __init__(self, app_module_name: Optional[str] = None) -> None:
self.start_time = time.time()

self.fastapi_app = FastAPI()

self.app = settings.get('app')
self.app_module: str = settings.get('app_module') # type: ignore
self.app_root = app_root
self.app_module_name: Optional[str] = app_module_name
if app_module_name is None:
app_module = importlib.import_module(self.__class__.__module__)
else:
app_module = importlib.import_module(app_module_name)
self.app_root = os.path.dirname(str(app_module.__file__))
self.app_name = app_module.__name__

self.config = self.application_config()

Expand Down Expand Up @@ -122,7 +127,7 @@ async def init(self) -> None:
statsd_client=self.statsd_client,
kafka_producer=kafka_producer,
)
self.http_client_factory = HttpClientFactory(self.app, self.http_client, request_balancer_builder)
self.http_client_factory = HttpClientFactory(self.app_name, self.http_client, request_balancer_builder)
if self.worker_state.single_worker_mode:
self.worker_state.master_done.value = True

Expand All @@ -141,7 +146,7 @@ def get_current_status(self) -> dict[str, str]:
not_started_workers = self.worker_state.init_workers_count_down.value
master_done = self.worker_state.master_done.value
if not_started_workers > 0 or not master_done:
raise HTTPError(
raise HTTPException(
500,
f'some workers are not started not_started_workers={not_started_workers}, master_done={master_done}',
)
Expand Down
3 changes: 1 addition & 2 deletions frontik/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import frontik.xml_util
from frontik import media_types, request_context
from frontik.loggers import BufferedHandler
from frontik.options import options
from frontik.version import version as frontik_version
from frontik.xml_util import dict_to_xml

Expand Down Expand Up @@ -492,6 +491,6 @@ def get_frontik_and_apps_versions(application: FrontikApplication) -> etree.Elem
etree.SubElement(versions, 'aiohttp').text = aiohttp.__version__
etree.SubElement(versions, 'python').text = sys.version.replace('\n', '')
etree.SubElement(versions, 'event_loop').text = str(type(asyncio.get_event_loop())).split("'")[1]
etree.SubElement(versions, 'application', name=options.app).extend(application.application_version_xml())
etree.SubElement(versions, 'application', name=application.app_name).extend(application.application_version_xml())

return versions
175 changes: 17 additions & 158 deletions frontik/futures.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,8 @@
from __future__ import annotations

import asyncio
import logging
import time
from functools import partial, wraps
from typing import TYPE_CHECKING, Optional
from typing import Optional

from tornado.concurrent import Future
from tornado.ioloop import IOLoop

if TYPE_CHECKING:
from collections.abc import Callable
from typing import Any

async_logger = logging.getLogger('frontik.futures')

Expand All @@ -23,165 +14,33 @@ class AbortAsyncGroup(Exception):
# AsyncGroup will become legacy in future releases
# It will be replaced with FutureGroup
class AsyncGroup:
"""
Grouping of several async requests and final callback in such way that final callback is invoked
after the last request is finished.
If any callback throws an exception, all pending callbacks would be aborted and finish_cb
would not be automatically called.
"""

def __init__(self, finish_cb: Callable, name: Optional[str] = None) -> None:
self._counter = 0
self._finish_cb: Optional[Callable] = finish_cb
def __init__(self, name: Optional[str] = None) -> None:
self._finished = False
self._name = name
self._future: Future = Future()
self._start_time = time.time()
self._futures: list[Future] = []

def is_finished(self) -> bool:
return self._finished

def abort(self) -> None:
if self._finished:
return

async_logger.info('aborting %s', self)
self._finished = True
if not self._future.done():
self._future.set_exception(AbortAsyncGroup())

def finish(self) -> None:
def add_future(self, future: Future) -> None:
if self._finished:
async_logger.warning('trying to finish already finished %s', self)
return None

self._finished = True
self._future.set_result(None)
raise RuntimeError('finish group is finished')
self._futures.append(future)

async def finish(self) -> None:
try:
if self._finish_cb is not None:
self._finish_cb()
await asyncio.gather(*self._futures)
finally:
# prevent possible cycle references
self._finish_cb = None

return None

def try_finish(self) -> None:
if self._counter == 0:
self.finish()

def try_finish_async(self):
"""Executes finish_cb in next IOLoop iteration"""
if self._counter == 0:
IOLoop.current().add_callback(self.finish)

def _inc(self) -> None:
if self._finished:
async_logger.info('ignoring adding callback in %s', self)
raise AbortAsyncGroup()

self._counter += 1

def _dec(self) -> None:
self._counter -= 1

def add(self, intermediate_cb: Callable, exception_handler: Optional[Callable] = None) -> Callable:
self._inc()

@wraps(intermediate_cb)
def new_cb(*args, **kwargs):
if self._finished:
async_logger.info('ignoring executing callback in %s', self)
return

try:
self._dec()
intermediate_cb(*args, **kwargs)
except Exception as ex:
self.abort()
if exception_handler is not None:
exception_handler(ex)
else:
raise
self._finished = True

self.try_finish()

return new_cb

def add_notification(self) -> Callable:
self._inc()

def new_cb(*args, **kwargs):
self._dec()
self.try_finish()

return new_cb

@staticmethod
def _handle_future(callback, future):
future.result()
callback()

def add_future(self, future: Future) -> None:
future.add_done_callback(partial(self._handle_future, self.add_notification()))
self._futures.append(future)
def done(self) -> bool:
return self._finished

def get_finish_future(self) -> Future:
return self._future
def pending(self) -> bool:
return not self._finished and len(self._futures) != 0

def get_gathering_future(self) -> Future:
return asyncio.gather(*self._futures)
def abort(self) -> None:
for future in self._futures:
if not future.done():
future.cancel()
self._finished = True

def __str__(self):
return f'AsyncGroup(name={self._name}, finished={self._finished})'


def future_fold(
future: Future,
result_mapper: Optional[Callable] = None,
exception_mapper: Optional[Callable] = None,
) -> Future:
"""
Creates a new future with result or exception processed by result_mapper and exception_mapper.
If result_mapper or exception_mapper raises an exception, it will be set as an exception for the resulting future.
Any of the mappers can be None — then the result or exception is left as is.
"""

res_future: Future = Future()

def _process(func: Optional[Callable], value: Any) -> None:
try:
processed = func(value) if func is not None else value
except Exception as e:
res_future.set_exception(e)
return
res_future.set_result(processed)

def _on_ready(wrapped_future):
exception = wrapped_future.exception()
if exception is not None:
if not callable(exception_mapper):

def default_exception_func(error):
raise error

_process(default_exception_func, exception)
else:
_process(exception_mapper, exception)
else:
_process(result_mapper, future.result())

IOLoop.current().add_future(future, callback=_on_ready)
return res_future


def future_map(future, func):
return future_fold(future, result_mapper=func)


def future_map_exception(future, func):
return future_fold(future, exception_mapper=func)
Loading

0 comments on commit 906baeb

Please sign in to comment.