-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HH-216278 small fixes #703
+157
−586
Closed
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,17 +1,8 @@ | ||
from __future__ import annotations | ||
|
||
import asyncio | ||
import logging | ||
import time | ||
from functools import partial, wraps | ||
from typing import TYPE_CHECKING, Optional | ||
from typing import Optional | ||
|
||
from tornado.concurrent import Future | ||
from tornado.ioloop import IOLoop | ||
|
||
if TYPE_CHECKING: | ||
from collections.abc import Callable | ||
from typing import Any | ||
|
||
async_logger = logging.getLogger('frontik.futures') | ||
|
||
|
@@ -23,165 +14,33 @@ class AbortAsyncGroup(Exception): | |
# AsyncGroup will become legacy in future releases | ||
# It will be replaced with FutureGroup | ||
class AsyncGroup: | ||
""" | ||
Grouping of several async requests and final callback in such way that final callback is invoked | ||
after the last request is finished. | ||
If any callback throws an exception, all pending callbacks would be aborted and finish_cb | ||
would not be automatically called. | ||
""" | ||
|
||
def __init__(self, finish_cb: Callable, name: Optional[str] = None) -> None: | ||
self._counter = 0 | ||
self._finish_cb: Optional[Callable] = finish_cb | ||
def __init__(self, name: Optional[str] = None) -> None: | ||
self._finished = False | ||
self._name = name | ||
self._future: Future = Future() | ||
self._start_time = time.time() | ||
self._futures: list[Future] = [] | ||
|
||
def is_finished(self) -> bool: | ||
return self._finished | ||
|
||
def abort(self) -> None: | ||
if self._finished: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. уже давно все логи заспамлены лишними ошибками AbortAsyncGroup |
||
return | ||
|
||
async_logger.info('aborting %s', self) | ||
self._finished = True | ||
if not self._future.done(): | ||
self._future.set_exception(AbortAsyncGroup()) | ||
|
||
def finish(self) -> None: | ||
def add_future(self, future: Future) -> None: | ||
if self._finished: | ||
async_logger.warning('trying to finish already finished %s', self) | ||
return None | ||
|
||
self._finished = True | ||
self._future.set_result(None) | ||
raise RuntimeError('finish group is finished') | ||
self._futures.append(future) | ||
|
||
async def finish(self) -> None: | ||
try: | ||
if self._finish_cb is not None: | ||
self._finish_cb() | ||
await asyncio.gather(*self._futures) | ||
finally: | ||
# prevent possible cycle references | ||
self._finish_cb = None | ||
|
||
return None | ||
|
||
def try_finish(self) -> None: | ||
if self._counter == 0: | ||
self.finish() | ||
|
||
def try_finish_async(self): | ||
"""Executes finish_cb in next IOLoop iteration""" | ||
if self._counter == 0: | ||
IOLoop.current().add_callback(self.finish) | ||
|
||
def _inc(self) -> None: | ||
if self._finished: | ||
async_logger.info('ignoring adding callback in %s', self) | ||
raise AbortAsyncGroup() | ||
|
||
self._counter += 1 | ||
|
||
def _dec(self) -> None: | ||
self._counter -= 1 | ||
|
||
def add(self, intermediate_cb: Callable, exception_handler: Optional[Callable] = None) -> Callable: | ||
self._inc() | ||
|
||
@wraps(intermediate_cb) | ||
def new_cb(*args, **kwargs): | ||
if self._finished: | ||
async_logger.info('ignoring executing callback in %s', self) | ||
return | ||
|
||
try: | ||
self._dec() | ||
intermediate_cb(*args, **kwargs) | ||
except Exception as ex: | ||
self.abort() | ||
if exception_handler is not None: | ||
exception_handler(ex) | ||
else: | ||
raise | ||
self._finished = True | ||
|
||
self.try_finish() | ||
|
||
return new_cb | ||
|
||
def add_notification(self) -> Callable: | ||
self._inc() | ||
|
||
def new_cb(*args, **kwargs): | ||
self._dec() | ||
self.try_finish() | ||
|
||
return new_cb | ||
|
||
@staticmethod | ||
def _handle_future(callback, future): | ||
future.result() | ||
callback() | ||
|
||
def add_future(self, future: Future) -> None: | ||
future.add_done_callback(partial(self._handle_future, self.add_notification())) | ||
self._futures.append(future) | ||
def done(self) -> bool: | ||
return self._finished | ||
|
||
def get_finish_future(self) -> Future: | ||
return self._future | ||
def pending(self) -> bool: | ||
return not self._finished and len(self._futures) != 0 | ||
|
||
def get_gathering_future(self) -> Future: | ||
return asyncio.gather(*self._futures) | ||
def abort(self) -> None: | ||
for future in self._futures: | ||
if not future.done(): | ||
future.cancel() | ||
self._finished = True | ||
|
||
def __str__(self): | ||
return f'AsyncGroup(name={self._name}, finished={self._finished})' | ||
|
||
|
||
def future_fold( | ||
future: Future, | ||
result_mapper: Optional[Callable] = None, | ||
exception_mapper: Optional[Callable] = None, | ||
) -> Future: | ||
""" | ||
Creates a new future with result or exception processed by result_mapper and exception_mapper. | ||
If result_mapper or exception_mapper raises an exception, it will be set as an exception for the resulting future. | ||
Any of the mappers can be None — then the result or exception is left as is. | ||
""" | ||
|
||
res_future: Future = Future() | ||
|
||
def _process(func: Optional[Callable], value: Any) -> None: | ||
try: | ||
processed = func(value) if func is not None else value | ||
except Exception as e: | ||
res_future.set_exception(e) | ||
return | ||
res_future.set_result(processed) | ||
|
||
def _on_ready(wrapped_future): | ||
exception = wrapped_future.exception() | ||
if exception is not None: | ||
if not callable(exception_mapper): | ||
|
||
def default_exception_func(error): | ||
raise error | ||
|
||
_process(default_exception_func, exception) | ||
else: | ||
_process(exception_mapper, exception) | ||
else: | ||
_process(result_mapper, future.result()) | ||
|
||
IOLoop.current().add_future(future, callback=_on_ready) | ||
return res_future | ||
|
||
|
||
def future_map(future, func): | ||
return future_fold(future, result_mapper=func) | ||
|
||
|
||
def future_map_exception(future, func): | ||
return future_fold(future, exception_mapper=func) |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
в тестах неудобно по 100 параметров задавать app/app_class/app_module/app_root + мупи очень тяжело было объяснить что где-то что-то не None
сделал 1 параметр