diff --git a/docs/dependency_injection.md b/docs/dependency_injection.md new file mode 100644 index 000000000..204c609eb --- /dev/null +++ b/docs/dependency_injection.md @@ -0,0 +1,82 @@ +## Dependency injection + +Dependency injection is a pattern when a function receives other functions that it requires, +instead of creating them internally. +In frontik implementation, dependencies are simple functions, +which run after `RequestHandler.prepare` and before* handler code is executed. +Dependencies are great for running common actions before actual request processing takes place. + +Here is what a dependencies may look like: + +```python +from frontik.dependency_manager import dep + + +async def get_session_dependency(handler: PageHandler) -> Session: + token = handler.get_cookie('token') + return await session_client.get_session(token) + + +class Page(PageHandler): + # Can be used on class level + dependencies = (another_dependency,) + + async def get_page(self, session=dep(get_session_dependency)): + self.json.put({'result': session}) +``` + +Dependency can be sync or async functions. When page is executed all ready to run +async dependencies run in parallel with asyncio.gather(). If something finishes the page +(call self.finish() or raise Exception), then we stop executing the remaining dependencies + +Dependencies can depend on another dependencies, thus we have a dependency graph. +Within one execution of a graph, the same dependencies will be executed once. +Sameness is determined by {function.__module__}.{function.__name__} +Dependencies can come from factories, then it turns out that there are several different dependencies +with the same name. In this case the one that is specified explicitly in the method arg or +in class level will be taken, the rest from the graph depths will be discarded + + +There is an opportunity to specify priorities for dependencies: +```python +from frontik.dependency_manager import dep + + +async def get_session_dependency(handler: PageHandler) -> Session: + token = handler.get_cookie('token') + return await session_client.get_session(token) + + +class Page(PageHandler): + # Can be used on class level + dependencies = (another_dependency,) + _priority_dependency_names: list[str] = [ + side_dependency, + another_dependency, + ] + + async def get_page(self, session=dep(get_session_dependency)): + self.json.put({'result': session}) +``` +If any of the _priority_dependency_names are present in the current graph, +they will be executed before all the other dependencies sequentially. +In the given example `another_dependency` -> `get_session_dependency` -> `get_page` + + +*It is also possible to specify "async" dependencies: +```python +from frontik.dependency_manager import dep, async_deps + + +async def get_session_dependency(handler: PageHandler) -> Session: + token = handler.get_cookie('token') + return await session_client.get_session(token) + + +class Page(PageHandler): + @async_deps([get_session_dependency]) + async def get_page(self): + self.json.put({'result': 'done'}) +``` +The passed list will not block the execution of the page_method, so they can be executed in parallel + diff --git a/docs/preprocessors.md b/docs/preprocessors.md index 1c69b2cae..a73f82652 100644 --- a/docs/preprocessors.md +++ b/docs/preprocessors.md @@ -1,5 +1,7 @@ ## Preprocessors +Deprecated, see https://github.com/hhru/frontik/blob/master/docs/dependency_injection.md + The first step of page generation is preprocessing. Preprocessors are simple functions, which run after `RequestHandler.prepare` and before handler code is executed. Preprocessors are great for running common actions before actual request processing takes place. diff --git a/frontik/dependency_manager/__init__.py b/frontik/dependency_manager/__init__.py new file mode 100644 index 000000000..62dd4e7c6 --- /dev/null +++ b/frontik/dependency_manager/__init__.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from frontik.dependency_manager.dependencies import DependencyMarker +from frontik.dependency_manager.graph_builder import build_sub_graph, get_dependency_graph +from frontik.dependency_manager.graph_runner import execute_graph +from frontik.preprocessors import DependencyGroupMarker, Preprocessor + +if TYPE_CHECKING: + from collections.abc import Callable + + from frontik.handler import PageHandler + + +def dep(dependency: Preprocessor | Callable | list[Callable]) -> Any: + """ + add dependency to page_method, it will be run before page_method and provide result + + async def get_page(self, session=dep(get_session)): + ... + """ + if isinstance(dependency, Preprocessor) and not isinstance(dependency.preprocessor_function, DependencyGroupMarker): + return DependencyMarker(dependency.preprocessor_function) + + if isinstance(dependency, list): + return DependencyGroupMarker(dependency) + + if callable(dependency): + return DependencyMarker(dependency) + + msg = 'Bad dependency type, only func or list[func]' + raise ValueError(msg) + + +def async_deps(async_dependencies: list[Callable]) -> Callable: + """ + add dependencies that will be run in parallel with page_method + + @async_dep([get_session, get_data]) + async def get_page(self): + ... + """ + + def decorator(execute_page_method: Callable) -> Callable: + setattr(execute_page_method, '_async_deps', async_dependencies) + return execute_page_method + + return decorator + + +async def build_and_run_sub_graph(handler: PageHandler, functions_to_run: list) -> None: + sub_graph = build_sub_graph(handler, functions_to_run) + await execute_graph(handler, sub_graph) + + +async def execute_page_method_with_dependencies(handler: PageHandler, page_method: Any) -> Any: + main_graph = get_dependency_graph(page_method.__func__, handler.__class__) + setattr(handler, '_main_graph', main_graph) + await execute_graph(handler, main_graph) + return main_graph.root_dep.result diff --git a/frontik/dependency_manager/dependencies.py b/frontik/dependency_manager/dependencies.py new file mode 100644 index 000000000..fdd6ce56e --- /dev/null +++ b/frontik/dependency_manager/dependencies.py @@ -0,0 +1,94 @@ +from __future__ import annotations + +import asyncio +from graphlib import TopologicalSorter +from typing import TYPE_CHECKING + +from frontik.preprocessors import make_full_name + +if TYPE_CHECKING: + from collections.abc import Callable + + from frontik.handler import PageHandler + + +class DependencyMarker: + def __init__(self, func: Callable) -> None: + self.func = func + + +class Dependency: + def __init__(self, func: Callable) -> None: + self.func = func + self.args: list = [] + self.result = None + self.finished = False + self.task: asyncio.Task | None = None + self.waited = True + + async def run(self) -> None: + """ + replace self.args with the result of completed sub_dependencies and run self.func + if sub_dependency is not finished raise RuntimeError + """ + if self.finished: + return + + for i, arg in enumerate(self.args): + if isinstance(arg, Dependency): + if not arg.finished: + raise RuntimeError(f'Graph corrupted, run {self}, before finishing {arg}') + self.args[i] = arg.result + + if asyncio.iscoroutinefunction(self.func): + if self.waited: + self.result = await self.func(*self.args) + else: + asyncio.create_task(self.func(*self.args)) + else: + self.result = self.func(*self.args) + self.finished = True + + def __repr__(self): + return make_full_name(self.func) + + +class DependencyGraph: + """ + known_deps - to prevent re-registration of function multiple times + registered_deps - to make correct dependency_links in case of building a sub_graph + dependency_links - links dict for build TopologicalSorter + handler_cls - special argument type for using special dependencies for example get_handler() + """ + + def __init__(self, root_dep: Dependency, handler_cls: type) -> None: + self.root_dep: Dependency = root_dep + self.known_deps: dict[str, Dependency] = {} + self.registered_deps: set[Dependency] = set() + self.dependency_links: dict[Dependency, set[Dependency]] = {root_dep: set()} + self.handler_cls: type = handler_cls + self.topological_sorter: TopologicalSorter[Dependency] | None = None + self.special_deps: set[Dependency] = set() + + def build_topological_sorter(self) -> None: + self.topological_sorter = TopologicalSorter(self.dependency_links) + self.topological_sorter.prepare() + + async def run_dependency(self, dependency: Dependency) -> None: + await dependency.run() + if self.topological_sorter is None: + raise RuntimeError('There is no topological_sorter in dependency graph') + self.topological_sorter.done(dependency) + + +def make_stub_dependency() -> Dependency: + def stub(): + pass + + dependency = Dependency(stub) + dependency.finished = True + return dependency + + +def get_handler(handler: PageHandler) -> PageHandler: + return handler diff --git a/frontik/dependency_manager/graph_builder.py b/frontik/dependency_manager/graph_builder.py new file mode 100644 index 000000000..65fa889ef --- /dev/null +++ b/frontik/dependency_manager/graph_builder.py @@ -0,0 +1,251 @@ +from __future__ import annotations + +import inspect +from copy import copy, deepcopy +from itertools import chain +from typing import TYPE_CHECKING, Any + +from frontik.dependency_manager.dependencies import ( + Dependency, + DependencyGraph, + DependencyMarker, + get_handler, + make_stub_dependency, +) +from frontik.preprocessors import ( + DependencyGroupMarker, + Preprocessor, + get_all_preprocessors_functions, + get_simple_preprocessors_functions, + make_full_name, +) + +if TYPE_CHECKING: + from collections.abc import Callable, Generator, Iterable + + from frontik.handler import PageHandler + + +def build_sub_graph(handler: PageHandler, dependencies_to_run: list) -> DependencyGraph: + """ + building sub_graph + duplicated dependencies will be taken from main graph + """ + root_dep = make_stub_dependency() + sub_graph = DependencyGraph(root_dep, handler.__class__) + + main_graph: DependencyGraph = getattr(handler, '_main_graph') + sub_graph.known_deps = main_graph.known_deps + + # collect dependencies which defined explicitly + shallow_dependencies = _get_shallow_functions(dependencies_to_run) + _register_side_dependencies(sub_graph, root_dep, shallow_dependencies, deep_scan=False) + + # collect all dependencies with deep_scan + all_dependencies = _get_all_functions(dependencies_to_run) + _register_side_dependencies(sub_graph, root_dep, all_dependencies, deep_scan=True) + + _set_priority_links(handler.__class__._priority_dependency_names, sub_graph) + sub_graph.build_topological_sorter() + return sub_graph + + +def _get_shallow_functions(dependencies_to_run: list) -> Generator: + for dependency_item in dependencies_to_run: + if not isinstance(dependency_item, DependencyGroupMarker): + yield dependency_item + + +def _get_all_functions(dependencies_to_run: list) -> Generator: + for dependency_item in dependencies_to_run: + if isinstance(dependency_item, DependencyGroupMarker): + yield from dependency_item.deps + else: + yield dependency_item + + +def get_dependency_graph(page_method_func: Callable, handler_cls: type) -> DependencyGraph: + """ + build meta_graph or make deepcopy as main_graph if meta_graph existed + + register dependencies from page_method_func args + register legacy preprocessors and class level dependencies + add extra links for handler priority_list + """ + if hasattr(page_method_func, '_meta_graph'): + return deepcopy(page_method_func._meta_graph) + + root_dep = Dependency(page_method_func) + meta_graph = DependencyGraph(root_dep, handler_cls) + + handler_dependencies = getattr(handler_cls, 'dependencies', []) + simple_preprocessors = chain(get_simple_preprocessors_functions(page_method_func), handler_dependencies) + all_preprocessors = chain(get_all_preprocessors_functions(page_method_func), handler_dependencies) + + # collect dependencies which defined explicitly + _register_dependency_params(meta_graph, root_dep, add_to_args=False, deep_scan=False) + _register_side_dependencies(meta_graph, root_dep, simple_preprocessors, deep_scan=False) + + # collect all dependencies with deep_scan + _register_dependency_params(meta_graph, root_dep, add_to_args=True, deep_scan=True) + _register_side_dependencies(meta_graph, root_dep, all_preprocessors, deep_scan=True) + + async_dependencies = getattr(page_method_func, '_async_deps', []) + _register_async_dependencies(meta_graph, async_dependencies) + + priorities = getattr(handler_cls, '_priority_dependency_names', []) + _set_priority_links(priorities, meta_graph) + + meta_graph.build_topological_sorter() + setattr(page_method_func, '_meta_graph', meta_graph) + return deepcopy(meta_graph) + + +def _register_side_dependencies( + graph: DependencyGraph, + root_dep: Dependency, + side_dependencies: Iterable, + deep_scan: bool, +) -> None: + for function_or_preprocessor in side_dependencies: + dependency = _make_dependency_for_graph(graph, function_or_preprocessor, deep_scan=deep_scan) + if deep_scan: + _register_sub_dependency(graph, root_dep, dependency, add_to_args=False) + + +def _register_async_dependencies(graph: DependencyGraph, async_dependencies: Iterable) -> None: + root_dep = make_stub_dependency() + for dependency_function in async_dependencies: + dependency = _make_dependency_for_graph(graph, dependency_function, deep_scan=True) + dependency.waited = False + _register_sub_dependency(graph, root_dep, dependency, add_to_args=False) + + +def _set_priority_links(priority_list: list[str], graph: DependencyGraph) -> None: + """ + add extra links for handler priority_list + + filter priority_list against registered dependencies + link each with each in a chain + link remaining registered dependencies on last one from priority_list + """ + priority_filtered: list[Dependency] = [] + for func_name in priority_list: + if func_name not in graph.known_deps: + continue + priority_dep = graph.known_deps[func_name] + if priority_dep in graph.registered_deps and priority_dep not in priority_filtered: + priority_filtered.append(priority_dep) + + if len(priority_filtered) > 1: + for i in range(len(priority_filtered) - 1): + cur_dep = priority_filtered[i] + next_dep = priority_filtered[i + 1] + + if next_dep not in graph.dependency_links: + graph.dependency_links[next_dep] = {cur_dep} + continue + + if cur_dep not in graph.dependency_links[next_dep]: + graph.dependency_links[next_dep].add(cur_dep) + + if len(priority_filtered) > 0: + last_priority_dep = priority_filtered[-1] + should_depends_on_last = copy(graph.registered_deps) + + for d in priority_filtered: + should_depends_on_last.discard(d) + for sd in graph.dependency_links.get(d, set()): + should_depends_on_last.discard(sd) + + for d in should_depends_on_last: + if d not in graph.dependency_links: + graph.dependency_links[d] = {last_priority_dep} + elif last_priority_dep not in graph.dependency_links[d]: + graph.dependency_links[d].add(last_priority_dep) + + +def _register_dependency_params( + graph: DependencyGraph, + dependency: Dependency, + add_to_args: bool, + deep_scan: bool, +) -> None: + signature_params = inspect.signature(dependency.func).parameters + + for param_name, param in signature_params.items(): + if isinstance(param.default, DependencyMarker): + sub_dependency = _make_dependency_for_graph(graph, param.default.func, deep_scan) + if deep_scan: + _register_sub_dependency(graph, dependency, sub_dependency, add_to_args) + continue + + elif issubclass(graph.handler_cls, param.annotation): + sub_dependency = _make_dependency_for_graph(graph, get_handler, deep_scan) + graph.special_deps.add(sub_dependency) + if deep_scan: + _register_sub_dependency(graph, dependency, sub_dependency, add_to_args) + continue + + elif param_name == 'self': + sub_dependency = _make_dependency_for_graph(graph, get_handler, deep_scan) + graph.special_deps.add(sub_dependency) + if deep_scan: + _register_sub_dependency(graph, dependency, sub_dependency, add_to_args) + + else: + raise ValueError(f'Only dependencies or handler could be in params, dep:{dependency} param:{param}') + + +def _register_sub_dependency( + graph: DependencyGraph, + dependency: Dependency, + sub_dependency: Dependency, + add_to_args: bool, +) -> None: + """ + register sub dependency + add to parent dependency args (if it was in signature) + add link to graph + deep scan sub_dependency parameters + """ + if sub_dependency not in graph.registered_deps: + graph.registered_deps.add(sub_dependency) + need_add_to_args = len(sub_dependency.args) == 0 + if sub_dependency not in graph.special_deps: + _register_dependency_params(graph, sub_dependency, need_add_to_args, True) + + if add_to_args: + dependency.args.append(sub_dependency) + + if dependency in graph.dependency_links: + graph.dependency_links[dependency].add(sub_dependency) + else: + graph.dependency_links[dependency] = {sub_dependency} + + +def _make_dependency_for_graph(graph: DependencyGraph, function_or_preprocessor: Any, deep_scan: bool) -> Dependency: + """ + make dependency from function + if function is Preprocessor, then take underlying function + duplicates would be avoided based on known_deps from graph + + if there are two different dependency with same name (factory's dependency), then leave only first + if they both are in signature explicitly, raise ValueError('Dependency conflict') + """ + if isinstance(function_or_preprocessor, Preprocessor): + function_or_preprocessor = function_or_preprocessor.preprocessor_function + + function_name = make_full_name(function_or_preprocessor) + + if function_name in graph.known_deps: + sub_dependency = graph.known_deps[function_name] + + if sub_dependency.func != function_or_preprocessor and not deep_scan: + raise ValueError(f'Dependency conflict {sub_dependency.func} != {function_or_preprocessor}') + + else: + sub_dependency = Dependency(function_or_preprocessor) + graph.known_deps[function_name] = sub_dependency + + return sub_dependency diff --git a/frontik/dependency_manager/graph_runner.py b/frontik/dependency_manager/graph_runner.py new file mode 100644 index 000000000..fc27f8fa4 --- /dev/null +++ b/frontik/dependency_manager/graph_runner.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +import asyncio +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from frontik.dependency_manager.dependencies import Dependency, DependencyGraph + from frontik.handler import PageHandler + + +async def run_special_dependencies(handler: PageHandler, graph: DependencyGraph) -> None: + for dependency in graph.special_deps: + dependency.result = dependency.func(handler) + dependency.finished = True + + +async def execute_graph(handler: PageHandler, graph: DependencyGraph) -> None: + await run_special_dependencies(handler, graph) + + pending_tasks: set[asyncio.Task] = set() + topological_sorter = graph.topological_sorter + if topological_sorter is None: + raise RuntimeError('There is no topological_sorter in dependency graph') + + while topological_sorter.is_active(): + dependencies_to_run: tuple[Dependency, ...] = topological_sorter.get_ready() + + if handler.is_finished(): + for p in pending_tasks: + p.cancel() + return + + for dependency in dependencies_to_run: + task = asyncio.create_task(graph.run_dependency(dependency)) + pending_tasks.add(task) + + if pending_tasks: + done, pending = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED) + for d in done: + if d.exception() is not None: + raise d.exception() # type: ignore + pending_tasks.remove(d) + for p in pending: + pending_tasks.add(p) diff --git a/frontik/handler.py b/frontik/handler.py index 528478693..f03bbf1f7 100644 --- a/frontik/handler.py +++ b/frontik/handler.py @@ -4,7 +4,6 @@ import http.client import json import logging -import math import re import time from asyncio import Task @@ -27,18 +26,18 @@ from frontik import media_types, request_context from frontik.auth import DEBUG_AUTH_HEADER_NAME from frontik.debug import DEBUG_HEADER_NAME, DebugMode +from frontik.dependency_manager import execute_page_method_with_dependencies from frontik.futures import AbortAsyncGroup, AsyncGroup from frontik.http_status import ALLOWED_STATUSES, CLIENT_CLOSED_REQUEST from frontik.loggers.stages import StagesLogger from frontik.options import options -from frontik.preprocessors import _get_preprocessor_name, _get_preprocessors, _unwrap_preprocessors from frontik.timeout_tracking import get_timeout_checker from frontik.util import gather_dict, make_url from frontik.validator import BaseValidationModel, Validators from frontik.version import version as frontik_version if TYPE_CHECKING: - from collections.abc import Callable, Coroutine, Reversible + from collections.abc import Callable, Coroutine, Iterable from http_client import HttpClient from tornado.httputil import HTTPServerRequest @@ -94,8 +93,8 @@ def _fail_fast_policy(fail_fast: bool, waited: bool, host: str, path: str) -> bo class PageHandler(RequestHandler): - preprocessors: Reversible = () - _priority_preprocessor_names: list = [] + dependencies: Iterable = () + _priority_dependency_names: list[str] = [] def __init__(self, application: FrontikApplication, request: HTTPServerRequest, **kwargs: Any) -> None: self.name = self.__class__.__name__ @@ -107,8 +106,6 @@ def __init__(self, application: FrontikApplication, request: HTTPServerRequest, super().__init__(application, request, **kwargs) - self._launched_preprocessors: list = [] - self._preprocessor_futures: list | None = [] self._exception_hooks: list = [] self.statsd_client: StatsDClient | StatsDClientStub @@ -370,24 +367,8 @@ def options(self, *args, **kwargs): async def _execute_page(self, page_handler_method: Callable[[], Coroutine[Any, Any, None]]) -> None: self.stages_logger.commit_stage('prepare') - preprocessors = _get_preprocessors(page_handler_method.__func__) # type: ignore - def _prioritise_preprocessor_by_list(preprocessor): - name = _get_preprocessor_name(preprocessor) - if name in self._priority_preprocessor_names: - return self._priority_preprocessor_names.index(name) - else: - return math.inf - - preprocessors.sort(key=_prioritise_preprocessor_by_list) - preprocessors_to_run = _unwrap_preprocessors(self.preprocessors) + preprocessors - preprocessors_completed = await self._run_preprocessors(preprocessors_to_run) - - if not preprocessors_completed: - self.log.info('page was already finished, skipping page method') - return - - await page_handler_method() + await execute_page_method_with_dependencies(self, page_handler_method) self._handler_finished_notification() await self.finish_group.get_gathering_future() @@ -644,17 +625,7 @@ def finish(self, chunk: str | bytes | dict | None = None) -> Future[None]: self.cleanup() return finish_future - # Preprocessors and postprocessors - - def add_preprocessor_future(self, future): - if self._preprocessor_futures is None: - msg = 'preprocessors chain is already finished, calling add_preprocessor_future at this time is incorrect' - raise Exception( - msg, - ) - - self._preprocessor_futures.append(future) - return future + # postprocessors def set_mandatory_header(self, name: str, value: str) -> None: self._mandatory_headers[name] = value @@ -681,38 +652,6 @@ def clear_cookie(self, name: str, path: str = "/", domain: str | None = None) -> del self._mandatory_cookies[name] super().clear_cookie(name, path=path, domain=domain) - def was_preprocessor_called(self, preprocessor: Any) -> bool: - return preprocessor.preprocessor_name in self._launched_preprocessors - - async def _run_preprocessor_function(self, preprocessor_function: Callable) -> None: - if asyncio.iscoroutinefunction(preprocessor_function): - await preprocessor_function(self) - else: - preprocessor_function(self) - self._launched_preprocessors.append(_get_preprocessor_name(preprocessor_function)) - - async def run_preprocessor(self, preprocessor): - if self._finished: - self.log.info('page was already finished, cannot init preprocessor') - return False - await self._run_preprocessor_function(preprocessor.function) - - async def _run_preprocessors(self, preprocessor_functions: list) -> bool: - for p in preprocessor_functions: - await self._run_preprocessor_function(p) - if self._finished: - self.log.info('page was already finished, breaking preprocessors chain') - return False - await asyncio.gather(*self._preprocessor_futures) # type: ignore - - self._preprocessor_futures = None - - if self._finished: - self.log.info('page was already finished, breaking preprocessors chain') - return False - - return True - async def _run_postprocessors(self, postprocessors: list) -> bool: for p in postprocessors: if asyncio.iscoroutinefunction(p): diff --git a/frontik/preprocessors.py b/frontik/preprocessors.py index 796003a80..2f8b0e389 100644 --- a/frontik/preprocessors.py +++ b/frontik/preprocessors.py @@ -1,93 +1,59 @@ -from __future__ import annotations +from collections.abc import Callable, Generator +from typing import Any -import asyncio -from functools import wraps -from typing import TYPE_CHECKING -if TYPE_CHECKING: - from collections.abc import Callable, Reversible - from typing import Any +class DependencyGroupMarker: + __name__ = 'dep_group' + def __init__(self, deps: list[Callable]) -> None: + self.deps = deps -def _get_preprocessor_name(preprocessor_function: Any) -> str: - return f'{preprocessor_function.__module__}.{preprocessor_function.__name__}' +class Preprocessor: + """Deprecated, use frontik.dependency_manager.Dependency""" -def preprocessor(function_or_list: Callable | Reversible[Callable]) -> Callable: - """Creates a preprocessor decorator for `PageHandler.get_page`, `PageHandler.post_page` etc. + def __init__(self, preprocessor_function: Callable | DependencyGroupMarker) -> None: + self.preprocessor_function = preprocessor_function - Preprocessor is a function that accepts handler instance as its only parameter. - Preprocessor can return a ``Future`` (any other value is ignored) and is considered - finished when this ``Future`` is resolved. + @property + def preprocessor_name(self) -> str: + return make_full_name(self.preprocessor_function) - Several ``@preprocessor`` decorators are executed sequentially. + def __call__(self, page_func: Callable) -> Callable: + setattr(page_func, '_preprocessors', [*get_preprocessors(page_func), self.preprocessor_function]) + return page_func - Usage:: - @preprocessor - def get_a(handler): - future = Future() - # Do something asynchronously - yield future - @preprocessor - def get_b(handler): - # Do something - return None +def preprocessor(preprocessor_function: Callable | DependencyGroupMarker) -> Preprocessor: + """Deprecated, use frontik.dependency_manager.Dependency""" + return Preprocessor(preprocessor_function) - class Page(PageHandler): - @get_a - @get_b - # Can also be rewritten as: - # @preprocessor([get_a, get_b]) - def get_page(self): - pass - When the ``Future`` returned by ``get_a`` is resolved, ``get_b`` is called. - Finally, after ``get_b`` is executed, ``get_page`` will be called. - """ - - def preprocessor_decorator(func: Callable) -> Callable: - if callable(function_or_list): - _register_preprocessors(func, [function_or_list]) - else: - for dep in reversed(function_or_list): - dep(func) - - return func - - if callable(function_or_list): - dep_name = function_or_list.__name__ - preprocessor_decorator.preprocessor_name = _get_preprocessor_name(function_or_list) # type: ignore - preprocessor_decorator.function = function_or_list # type: ignore - else: - dep_name = str([f.__name__ for f in function_or_list]) - preprocessor_decorator.func_name = f'preprocessor_decorator({dep_name})' # type: ignore - - return preprocessor_decorator - - -def _get_preprocessors(func: Callable) -> list: +def get_preprocessors(func: Callable) -> list: return getattr(func, '_preprocessors', []) -def _unwrap_preprocessors(preprocessors: Reversible) -> list: - return _get_preprocessors(preprocessor(preprocessors)(lambda: None)) +def get_simple_preprocessors_functions(func: Callable) -> Generator: + for preproc in getattr(func, '_preprocessors', []): + if not isinstance(preproc, DependencyGroupMarker): + yield preproc -def _register_preprocessors(func: Callable, preprocessors: list[Callable]) -> None: - func._preprocessors = preprocessors + _get_preprocessors(func) # type: ignore +def get_all_preprocessors_functions(func: Callable) -> Generator: + for preproc in getattr(func, '_preprocessors', []): + if isinstance(preproc, DependencyGroupMarker): + for func_or_preproc in preproc.deps: + if isinstance(func_or_preproc, Preprocessor): + yield func_or_preproc.preprocessor_function + else: + yield func_or_preproc + else: + yield preproc def make_preprocessors_names_list(preprocessors_list: list) -> list[str]: - return [p.preprocessor_name for p in preprocessors_list] - - -def _wrap_async_func_to_tornado_coroutine(func): - @wraps(func) - def wrapper(*args, **kwargs): - return asyncio.create_task(func(*args, **kwargs)) + return [p.preprocessor_name if isinstance(p, Preprocessor) else make_full_name(p) for p in preprocessors_list] - wrapper.__wrapped__ = func # type: ignore - wrapper.__tornado_coroutine__ = True # type: ignore - return wrapper +def make_full_name(func: Callable | Any) -> str: + return f'{func.__module__}.{func.__name__}' diff --git a/pyproject.toml b/pyproject.toml index e02103eb4..052cacacc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -104,9 +104,9 @@ select = [ ] preview = true ignore = [ - 'N818', - 'B028', - 'B904' + 'N818', 'EM101', 'EM102', + 'B028', 'B904', + 'B008', 'B009', 'B010', ] [tool.ruff.per-file-ignores] diff --git a/tests/projects/test_app/pages/fail_fast/__init__.py b/tests/projects/test_app/pages/fail_fast/__init__.py index 94e81d7a7..b1acd5ce5 100644 --- a/tests/projects/test_app/pages/fail_fast/__init__.py +++ b/tests/projects/test_app/pages/fail_fast/__init__.py @@ -4,7 +4,7 @@ @preprocessor -def get_page_preprocessor(handler): +def get_page_preprocessor(handler: PageHandler) -> None: handler.json.put({'preprocessor': True}) diff --git a/tests/projects/test_app/pages/preprocessors/__init__.py b/tests/projects/test_app/pages/preprocessors/__init__.py deleted file mode 100644 index ac2ed7fee..000000000 --- a/tests/projects/test_app/pages/preprocessors/__init__.py +++ /dev/null @@ -1,81 +0,0 @@ -import asyncio -import time -from collections.abc import Callable - -from tornado.concurrent import Future - -from frontik.handler import PageHandler -from frontik.preprocessors import preprocessor - - -def pp0(name: str) -> Callable: - @preprocessor - def pp(handler): - handler.run.append(name) - - return pp - - -@preprocessor -async def pp1(handler): - handler.run.append('pp1-before') - - ready_future: Future = Future() - ready_future.set_result('pp1-between') - result = await ready_future - - handler.run.append(result) - - wait_future: Future = Future() - handler.add_timeout(time.time() + 0.1, lambda: wait_future.set_result('pp1-after')) - result = await wait_future - - handler.run.append(result) - - -@preprocessor -async def pp2(handler): - future: Future = Future() - - async def put_request() -> None: - res = await handler.put_url(handler.request.host, handler.request.path) - handler.json.put({'put_request_finished': True}) - future.set_result(res.data) - - handler.run_task(put_request()) - handler.run.append('pp2') - handler.pp2_future = future - - await future - - -@preprocessor -async def pp3(handler): - handler.run.append('pp3') - result = await handler.pp2_future - handler.json.put(result) - - -class Page(PageHandler): - preprocessors = [pp0('pp01'), pp0('pp02')] - - def prepare(self): - super().prepare() - - self.run: list[str] = [] - self.json.put({'run': self.run}) - - self.add_postprocessor(self.postprocessor) - - @pp1 - @preprocessor([pp2, pp3]) - async def get_page(self): - self.run.append('get_page') - - async def put_page(self): - self.text = {'put_request_preprocessors': self.run} - - @staticmethod - async def postprocessor(handler): - handler.json.put({'postprocessor': True}) - await asyncio.sleep(0.1) diff --git a/tests/projects/test_app/pages/preprocessors/aborted.py b/tests/projects/test_app/pages/preprocessors/aborted.py deleted file mode 100644 index 3a797ea13..000000000 --- a/tests/projects/test_app/pages/preprocessors/aborted.py +++ /dev/null @@ -1,59 +0,0 @@ -from tornado.web import HTTPError - -from frontik.handler import FinishWithPostprocessors, HTTPErrorWithPostprocessors, PageHandler -from frontik.preprocessors import preprocessor - - -@preprocessor -def pp_before(handler): - handler.run.append('before') - - -@preprocessor -async def pp(handler): - async def post_request() -> None: - await handler.put_url(handler.request.host, handler.request.path) - handler.json.put({'put_request_finished': True}) - - future = handler.run_task(post_request()) - handler.run.append('pp') - - if handler.get_argument('raise_error', 'false') != 'false': - raise HTTPError(400) - elif handler.get_argument('raise_custom_error', 'false') != 'false': - handler.json.replace({'custom_error': True}) - raise HTTPErrorWithPostprocessors(400) - elif handler.get_argument('abort_preprocessors', 'false') != 'false': - raise FinishWithPostprocessors(wait_finish_group=True) - elif handler.get_argument('abort_preprocessors_nowait', 'false') != 'false': - raise FinishWithPostprocessors(wait_finish_group=False) - elif handler.get_argument('redirect', 'false') != 'false': - handler.redirect(handler.request.host + handler.request.path + '?redirected=true') - elif handler.get_argument('finish', 'false') != 'false': - handler.finish('finished') - else: - await future - - -@preprocessor -def pp_after(handler): - handler.run.append('after') - - -class Page(PageHandler): - def prepare(self): - super().prepare() - - self.run: list[str] = [] - self.json.put({'run': self.run}) - - self.add_postprocessor(lambda handler: handler.json.put({'postprocessor': True})) - - @pp_before - @pp - @pp_after - async def get_page(self): - self.run.append('get_page') - - async def put_page(self): - pass diff --git a/tests/projects/test_app/pages/preprocessors/aborted_nonblocking_group.py b/tests/projects/test_app/pages/preprocessors/aborted_nonblocking_group.py deleted file mode 100644 index 5275fdf2d..000000000 --- a/tests/projects/test_app/pages/preprocessors/aborted_nonblocking_group.py +++ /dev/null @@ -1,34 +0,0 @@ -from frontik.handler import FinishWithPostprocessors, PageHandler -from frontik.preprocessors import preprocessor - - -@preprocessor -def pp1(handler): - handler.set_header('content-type', 'text/plain') - - -@preprocessor -def pp2(handler): - async def pp2_coro() -> None: - await handler.post_url(handler.request.host, handler.request.uri + '&from=pp') - - if handler.get_argument('finish', None): - handler.set_status(400) - handler.finish('DONE_IN_PP') - - elif handler.get_argument('abort', None): - raise FinishWithPostprocessors() - - handler.add_preprocessor_future(pp2_coro()) - - -class Page(PageHandler): - @pp1 - @pp2 - async def get_page(self): - # Page handler method must not be called - self.set_status(404) - - @pp1 - async def post_page(self): - pass diff --git a/tests/projects/test_app/pages/preprocessors/preprocessor_future_return.py b/tests/projects/test_app/pages/preprocessors/preprocessor_future_return.py deleted file mode 100644 index 57cdb0f81..000000000 --- a/tests/projects/test_app/pages/preprocessors/preprocessor_future_return.py +++ /dev/null @@ -1,28 +0,0 @@ -from tornado.concurrent import Future - -from frontik.handler import PageHandler -from frontik.preprocessors import preprocessor - - -@preprocessor -async def pp1(handler): - handler.future = Future() - await handler.post_url(handler.request.host, handler.request.uri) - handler.future.set_result(True) - handler.future_result = 'test' - - -@preprocessor -async def pp2(handler): - await handler.future - handler.json.put({'test': handler.future_result}) - - -class Page(PageHandler): - @pp1 - @pp2 - async def get_page(self): - pass - - async def post_page(self): - pass diff --git a/tests/projects/test_app/pages/preprocessors/preprocessor_futures.py b/tests/projects/test_app/pages/preprocessors/preprocessor_futures.py deleted file mode 100644 index d317598f2..000000000 --- a/tests/projects/test_app/pages/preprocessors/preprocessor_futures.py +++ /dev/null @@ -1,56 +0,0 @@ -import time -from collections.abc import Callable - -from tornado.concurrent import Future - -from frontik.handler import PageHandler -from frontik.preprocessors import preprocessor - - -def waiting_preprocessor(sleep_time_sec: float, preprocessor_name: str, add_preprocessor_future: bool) -> Callable: - @preprocessor - def pp(handler): - def _put_to_completed(): - handler.completed_preprocessors = getattr(handler, 'completed_preprocessors', []) - handler.completed_preprocessors.append(preprocessor_name) - wait_future.set_result(preprocessor_name) - - wait_future: Future = Future() - handler.add_timeout(time.time() + sleep_time_sec, _put_to_completed) - - if add_preprocessor_future: - handler.add_preprocessor_future(wait_future) - - return pp - - -@preprocessor -async def pp_1(handler): - def add_preprocessor(): - handler.add_preprocessor_future(Future()) - - def _done(_): - handler.add_timeout( - time.time() + 0.2, - handler.finish_group.add(add_preprocessor, handler._handle_request_exception), - ) - - future: Future = Future() - handler.add_future(future, handler.finish_group.add(_done)) - future.set_result(None) - await future - - -class Page(PageHandler): - @waiting_preprocessor(0.7, "should_finish_after_page_finish", False) - @waiting_preprocessor(0.5, "should_finish_third", True) - @waiting_preprocessor(0.1, "should_finish_first", False) - @waiting_preprocessor(0.3, "should_finish_second", True) - @waiting_preprocessor(0.9, "should_finish_after_page_finish", False) - async def get_page(self): - assert hasattr(self, 'completed_preprocessors') - self.json.put({'preprocessors': self.completed_preprocessors}) - - @pp_1 - async def post_page(self): - pass diff --git a/tests/projects/test_app/pages/preprocessors/priority_preprocessors.py b/tests/projects/test_app/pages/preprocessors/priority_preprocessors.py deleted file mode 100644 index 9214500df..000000000 --- a/tests/projects/test_app/pages/preprocessors/priority_preprocessors.py +++ /dev/null @@ -1,37 +0,0 @@ -from frontik.handler import PageHandler -from frontik.preprocessors import make_preprocessors_names_list, preprocessor - - -@preprocessor -def pp0(handler): - handler.called_preprocessors = ['pp0'] - - -@preprocessor -def pp1(handler): - handler.called_preprocessors.append('pp1') - - -@preprocessor -def pp2(handler): - handler.called_preprocessors.append('pp2') - - -@preprocessor -def pp3(handler): - handler.called_preprocessors.append('pp3') - - -class Page(PageHandler): - preprocessors = [pp0] - _priority_preprocessor_names = make_preprocessors_names_list([pp2, pp1]) - - @pp1 - @pp3 - @pp2 - async def get_page(self): - self.json.put( - { - 'order': self.called_preprocessors, # type: ignore - }, - ) diff --git a/tests/projects/test_app/pages/preprocessors/was_async_preprocessor_called.py b/tests/projects/test_app/pages/preprocessors/was_async_preprocessor_called.py deleted file mode 100644 index cb1c1bc6f..000000000 --- a/tests/projects/test_app/pages/preprocessors/was_async_preprocessor_called.py +++ /dev/null @@ -1,38 +0,0 @@ -from frontik.handler import PageHandler -from frontik.preprocessors import preprocessor - - -@preprocessor -async def pp0(handler): - pass - - -@preprocessor -async def pp1(handler): - pass - - -@preprocessor -async def pp2(handler): - pass - - -@preprocessor -async def pp3(handler): - pass - - -class Page(PageHandler): - preprocessors = [pp0] - - @pp1 - @pp2 - async def get_page(self): - self.json.put( - { - 'pp0': self.was_preprocessor_called(pp0), - 'pp1': self.was_preprocessor_called(pp1), - 'pp2': self.was_preprocessor_called(pp2), - 'pp3': self.was_preprocessor_called(pp3), - }, - ) diff --git a/tests/projects/test_app/pages/preprocessors/was_preprocessor_called.py b/tests/projects/test_app/pages/preprocessors/was_preprocessor_called.py deleted file mode 100644 index 2f3e7d1c9..000000000 --- a/tests/projects/test_app/pages/preprocessors/was_preprocessor_called.py +++ /dev/null @@ -1,38 +0,0 @@ -from frontik.handler import PageHandler -from frontik.preprocessors import preprocessor - - -@preprocessor -def pp0(handler): - pass - - -@preprocessor -def pp1(handler): - pass - - -@preprocessor -def pp2(handler): - pass - - -@preprocessor -def pp3(handler): - pass - - -class Page(PageHandler): - preprocessors = [pp0] - - @pp1 - @pp2 - async def get_page(self): - self.json.put( - { - 'pp0': self.was_preprocessor_called(pp0), - 'pp1': self.was_preprocessor_called(pp1), - 'pp2': self.was_preprocessor_called(pp2), - 'pp3': self.was_preprocessor_called(pp3), - }, - ) diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py new file mode 100644 index 000000000..1109e4ca7 --- /dev/null +++ b/tests/test_dependencies.py @@ -0,0 +1,189 @@ +import asyncio +from collections.abc import Callable + +import pytest + +from frontik.dependency_manager import async_deps, build_and_run_sub_graph, dep, execute_page_method_with_dependencies +from frontik.handler import PageHandler + + +class TestPageHandler(PageHandler): + _priority_dependency_names: list[str] = [] + x = '0' + + def __init__(self) -> None: + self.finished = False + + def is_finished(self): + return self.finished + + +DEP_LOG = [] + + +async def get_session(handler: TestPageHandler) -> str: + DEP_LOG.append('get_session') + await asyncio.sleep(0.1) + return 'session' + handler.x + + +def check_session(handler: TestPageHandler, _session: str = dep(get_session)) -> str: + DEP_LOG.append('check_session') + return 'check' + handler.x + + +async def get_some_data(handler: TestPageHandler) -> str: + DEP_LOG.append('get_some_data') + await asyncio.sleep(0.1) + return 'data' + handler.x + + +def dep_factory(closure_param: int) -> Callable: + def internal_dep() -> int: + DEP_LOG.append(f'internal_dep_{closure_param}') + return closure_param + + return internal_dep + + +def dep_group(data: int = dep(dep_factory(2)), _: str = dep(check_session), __: str = dep(get_some_data)) -> int: + DEP_LOG.append('dep_group') + return data + + +async def exception_dep() -> None: + DEP_LOG.append('exception_dep') + msg = 'stub_error' + raise ArithmeticError(msg) + + +async def finisher_dep(handler: TestPageHandler) -> None: + DEP_LOG.append('finisher_dep') + handler.finished = True + + +async def dep_with_subgraph(handler: TestPageHandler) -> None: + await build_and_run_sub_graph(handler, [finisher_dep]) + + +class SimpleHandler(TestPageHandler): + x = '1' + + async def get_page(self, session=dep(get_session), check=dep(check_session), data=dep(get_some_data)): + DEP_LOG.append('get_page') + return f'{session}_{check}_{data}' + + async def post_page(self, group=dep(dep_group), data=dep(dep_factory(1))): + DEP_LOG.append('post_page') + return f'{group}_{data}' + + async def put_page(self, data1=dep(dep_factory(1)), data2=dep(dep_factory(2))): + DEP_LOG.append('put_page') + return f'{data1}_{data2}' + + +class PriorityHandler(TestPageHandler): + _priority_dependency_names: list[str] = [ + 'tests.test_dependencies.internal_dep', + 'tests.test_dependencies.get_some_data', + 'tests.test_dependencies.finisher_dep', + ] + + async def get_page(self, session=dep(get_session), check=dep(check_session), data=dep(get_some_data)): + DEP_LOG.append('get_page') + return f'{session}_{check}_{data}' + + async def post_page(self, _=dep(exception_dep)): + pass + + async def put_page(self, group=dep(dep_group), data=dep(dep_factory(1)), _=dep(finisher_dep)): + DEP_LOG.append('put_page') + return f'{group}_{data}' + + +class SubGraphHandler(TestPageHandler): + dependencies = [dep_factory(1)] + _priority_dependency_names: list[str] = [ + 'tests.test_dependencies.internal_dep', + 'tests.test_dependencies.get_some_data', + 'tests.test_dependencies.finisher_dep', + ] + + async def get_page(self, data=dep(get_some_data)): + await build_and_run_sub_graph(self, [check_session]) + return data + + async def post_page(self, data1=dep(dep_group), data2=dep(dep_with_subgraph)): + return f'{data1}_{data2}' + + +class AsyncDependencyHandler(TestPageHandler): + @async_deps([check_session]) + async def get_page(self): + DEP_LOG.append('get_page') + + +class TestDependencies: + @staticmethod + def setup_method(): + DEP_LOG.clear() + + async def test_simple_dependencies(self): + handler = SimpleHandler() + res = await asyncio.wait_for(execute_page_method_with_dependencies(handler, handler.get_page), timeout=0.15) + assert len(DEP_LOG) == 4 + assert DEP_LOG.index('check_session') > DEP_LOG.index('get_session') + assert res == 'session1_check1_data1' + + async def test_dep_group(self): + handler = SimpleHandler() + res = await asyncio.wait_for(execute_page_method_with_dependencies(handler, handler.post_page), timeout=0.15) + assert len(DEP_LOG) == 6 + assert DEP_LOG.index('check_session') > DEP_LOG.index('get_session') + assert res == '1_1' + + async def test_dep_conflict(self): + handler = SimpleHandler() + with pytest.raises(ValueError, match=r'Dependency conflict .*'): + await execute_page_method_with_dependencies(handler, handler.put_page) + assert len(DEP_LOG) == 0 + + async def test_deps_with_priority(self): + handler = PriorityHandler() + res = await execute_page_method_with_dependencies(handler, handler.get_page) + assert len(DEP_LOG) == 4 + assert DEP_LOG[0] == 'get_some_data' + assert 'internal_dep' not in DEP_LOG + assert DEP_LOG.index('check_session') > DEP_LOG.index('get_session') + assert res == 'session0_check0_data0' + + async def test_exception_in_dep(self): + handler = PriorityHandler() + with pytest.raises(ArithmeticError, match=r'stub_error'): + await execute_page_method_with_dependencies(handler, handler.post_page) + + async def test_dep_with_finisher(self): + handler = PriorityHandler() + res = await execute_page_method_with_dependencies(handler, handler.put_page) + assert len(DEP_LOG) == 3 + assert DEP_LOG[0] == 'internal_dep_1' + assert DEP_LOG[1] == 'get_some_data' + assert DEP_LOG[2] == 'finisher_dep' + assert res is None + + async def test_subgraph_in_page(self): + handler = SubGraphHandler() + res = await execute_page_method_with_dependencies(handler, handler.get_page) + assert ['internal_dep_1', 'get_some_data', 'get_session', 'check_session'] == DEP_LOG + assert res == 'data0' + + async def test_subgraph_in_dep(self): + handler = SubGraphHandler() + res = await execute_page_method_with_dependencies(handler, handler.post_page) + assert ['internal_dep_1', 'get_some_data', 'get_session', 'finisher_dep'] == DEP_LOG + assert res is None + + async def test_async_deps(self): + handler = AsyncDependencyHandler() + await execute_page_method_with_dependencies(handler, handler.get_page) + assert ['get_page', 'get_session', 'check_session'] == DEP_LOG diff --git a/tests/test_preprocessors.py b/tests/test_preprocessors.py index 2ae4ce1e9..b46b194cc 100644 --- a/tests/test_preprocessors.py +++ b/tests/test_preprocessors.py @@ -1,102 +1,199 @@ -import json -import unittest - -import requests - -from tests.instances import frontik_test_app - - -class TestPreprocessors(unittest.TestCase): - def test_preprocessors(self): - response_json = frontik_test_app.get_page_json('preprocessors') - self.assertEqual( - response_json, - { - 'run': ['pp01', 'pp02', 'pp1-before', 'pp1-between', 'pp1-after', 'pp2', 'pp3', 'get_page'], - 'put_request_finished': True, - 'put_request_preprocessors': ['pp01', 'pp02'], - 'postprocessor': True, - }, - ) - - def test_preprocessor_futures(self): - response_json = frontik_test_app.get_page_json('preprocessors/preprocessor_futures') - self.assertEqual( - response_json, - {'preprocessors': ['should_finish_first', 'should_finish_second', 'should_finish_third']}, - ) - - def test_was_preprocessor_called(self): - response_json = frontik_test_app.get_page_json('preprocessors/was_preprocessor_called') - self.assertEqual( - response_json, - { - 'pp0': True, - 'pp1': True, - 'pp2': True, - 'pp3': False, - }, - ) - - def test_was_async_preprocessor_called(self): - response_json = frontik_test_app.get_page_json('preprocessors/was_async_preprocessor_called') - self.assertEqual( - response_json, - { - 'pp0': True, - 'pp1': True, - 'pp2': True, - 'pp3': False, - }, - ) - - def test_priority_preprocessors(self): - response_json = frontik_test_app.get_page_json('preprocessors/priority_preprocessors') - self.assertEqual(response_json, {'order': ['pp0', 'pp2', 'pp1', 'pp3']}) - - def test_add_preprocessor_future_after_preprocessors(self) -> None: - response = frontik_test_app.get_page('preprocessors/preprocessor_futures', method=requests.post) - self.assertEqual(response.status_code, 500) - - def test_add_preprocessor_future_return_value(self): - response_json = frontik_test_app.get_page_json('preprocessors/preprocessor_future_return') - self.assertEqual(response_json, {'test': 'test'}) - - def test_preprocessors_abort(self): - response_json = frontik_test_app.get_page_json('preprocessors/aborted?abort_preprocessors=true') - self.assertEqual(response_json, {'run': ['before', 'pp'], 'put_request_finished': True, 'postprocessor': True}) - - def test_preprocessors_abort_nowait(self): - response_json = frontik_test_app.get_page_json('preprocessors/aborted?abort_preprocessors_nowait=true') - self.assertEqual(response_json, {'run': ['before', 'pp'], 'postprocessor': True}) - - def test_preprocessors_raise_error(self): - response = frontik_test_app.get_page('preprocessors/aborted?raise_error=true') - self.assertEqual(response.status_code, 400) - self.assertEqual(response.content, b'