-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
- Loading branch information
Showing
20 changed files
with
969 additions
and
615 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
## Dependency injection | ||
|
||
Dependency injection is a pattern when a function receives other functions that it requires, | ||
instead of creating them internally. | ||
In frontik implementation, dependencies are simple functions, | ||
which run after `RequestHandler.prepare` and before* handler code is executed. | ||
Dependencies are great for running common actions before actual request processing takes place. | ||
|
||
Here is what a dependencies may look like: | ||
|
||
```python | ||
from frontik.dependency_manager import dep | ||
|
||
|
||
async def get_session_dependency(handler: PageHandler) -> Session: | ||
token = handler.get_cookie('token') | ||
return await session_client.get_session(token) | ||
|
||
|
||
class Page(PageHandler): | ||
# Can be used on class level | ||
dependencies = (another_dependency,) | ||
|
||
async def get_page(self, session=dep(get_session_dependency)): | ||
self.json.put({'result': session}) | ||
``` | ||
|
||
Dependency can be sync or async functions. When page is executed all ready to run | ||
async dependencies run in parallel with asyncio.gather(). If something finishes the page | ||
(call self.finish() or raise Exception), then we stop executing the remaining dependencies | ||
|
||
Dependencies can depend on another dependencies, thus we have a dependency graph. | ||
Within one execution of a graph, the same dependencies will be executed once. | ||
Sameness is determined by {function.__module__}.{function.__name__} | ||
Dependencies can come from factories, then it turns out that there are several different dependencies | ||
with the same name. In this case the one that is specified explicitly in the method arg or | ||
in class level will be taken, the rest from the graph depths will be discarded | ||
|
||
|
||
There is an opportunity to specify priorities for dependencies: | ||
```python | ||
from frontik.dependency_manager import dep | ||
|
||
|
||
async def get_session_dependency(handler: PageHandler) -> Session: | ||
token = handler.get_cookie('token') | ||
return await session_client.get_session(token) | ||
|
||
|
||
class Page(PageHandler): | ||
# Can be used on class level | ||
dependencies = (another_dependency,) | ||
_priority_dependency_names: list[str] = [ | ||
side_dependency, | ||
another_dependency, | ||
] | ||
|
||
async def get_page(self, session=dep(get_session_dependency)): | ||
self.json.put({'result': session}) | ||
``` | ||
If any of the _priority_dependency_names are present in the current graph, | ||
they will be executed before all the other dependencies sequentially. | ||
In the given example `another_dependency` -> `get_session_dependency` -> `get_page` | ||
|
||
|
||
*It is also possible to specify "async" dependencies: | ||
```python | ||
from frontik.dependency_manager import dep, async_deps | ||
|
||
|
||
async def get_session_dependency(handler: PageHandler) -> Session: | ||
token = handler.get_cookie('token') | ||
return await session_client.get_session(token) | ||
|
||
|
||
class Page(PageHandler): | ||
@async_deps([get_session_dependency]) | ||
async def get_page(self): | ||
self.json.put({'result': 'done'}) | ||
``` | ||
The passed list will not block the execution of the page_method, so they can be executed in parallel | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
from __future__ import annotations | ||
|
||
from typing import TYPE_CHECKING, Any | ||
|
||
from frontik.dependency_manager.dependencies import DependencyMarker | ||
from frontik.dependency_manager.graph_builder import build_sub_graph, get_dependency_graph | ||
from frontik.dependency_manager.graph_runner import execute_graph | ||
from frontik.preprocessors import DependencyGroupMarker, Preprocessor | ||
|
||
if TYPE_CHECKING: | ||
from collections.abc import Callable | ||
|
||
from frontik.handler import PageHandler | ||
|
||
|
||
def dep(dependency: Preprocessor | Callable | list[Callable]) -> Any: | ||
""" | ||
add dependency to page_method, it will be run before page_method and provide result | ||
async def get_page(self, session=dep(get_session)): | ||
... | ||
""" | ||
if isinstance(dependency, Preprocessor) and not isinstance(dependency.preprocessor_function, DependencyGroupMarker): | ||
return DependencyMarker(dependency.preprocessor_function) | ||
|
||
if isinstance(dependency, list): | ||
return DependencyGroupMarker(dependency) | ||
|
||
if callable(dependency): | ||
return DependencyMarker(dependency) | ||
|
||
msg = 'Bad dependency type, only func or list[func]' | ||
raise ValueError(msg) | ||
|
||
|
||
def async_deps(async_dependencies: list[Callable]) -> Callable: | ||
""" | ||
add dependencies that will be run in parallel with page_method | ||
@async_dep([get_session, get_data]) | ||
async def get_page(self): | ||
... | ||
""" | ||
|
||
def decorator(execute_page_method: Callable) -> Callable: | ||
setattr(execute_page_method, '_async_deps', async_dependencies) | ||
return execute_page_method | ||
|
||
return decorator | ||
|
||
|
||
async def build_and_run_sub_graph(handler: PageHandler, functions_to_run: list) -> None: | ||
sub_graph = build_sub_graph(handler, functions_to_run) | ||
await execute_graph(handler, sub_graph) | ||
|
||
|
||
async def execute_page_method_with_dependencies(handler: PageHandler, page_method: Any) -> Any: | ||
main_graph = get_dependency_graph(page_method.__func__, handler.__class__) | ||
setattr(handler, '_main_graph', main_graph) | ||
await execute_graph(handler, main_graph) | ||
return main_graph.root_dep.result |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
from __future__ import annotations | ||
|
||
import asyncio | ||
from graphlib import TopologicalSorter | ||
from typing import TYPE_CHECKING | ||
|
||
from frontik.preprocessors import make_full_name | ||
|
||
if TYPE_CHECKING: | ||
from collections.abc import Callable | ||
|
||
from frontik.handler import PageHandler | ||
|
||
|
||
class DependencyMarker: | ||
def __init__(self, func: Callable) -> None: | ||
self.func = func | ||
|
||
|
||
class Dependency: | ||
def __init__(self, func: Callable) -> None: | ||
self.func = func | ||
self.args: list = [] | ||
self.result = None | ||
self.finished = False | ||
self.task: asyncio.Task | None = None | ||
self.waited = True | ||
|
||
async def run(self) -> None: | ||
""" | ||
replace self.args with the result of completed sub_dependencies and run self.func | ||
if sub_dependency is not finished raise RuntimeError | ||
""" | ||
if self.finished: | ||
return | ||
|
||
for i, arg in enumerate(self.args): | ||
if isinstance(arg, Dependency): | ||
if not arg.finished: | ||
raise RuntimeError(f'Graph corrupted, run {self}, before finishing {arg}') | ||
self.args[i] = arg.result | ||
|
||
if asyncio.iscoroutinefunction(self.func): | ||
if self.waited: | ||
self.result = await self.func(*self.args) | ||
else: | ||
asyncio.create_task(self.func(*self.args)) | ||
else: | ||
self.result = self.func(*self.args) | ||
self.finished = True | ||
|
||
def __repr__(self): | ||
return make_full_name(self.func) | ||
|
||
|
||
class DependencyGraph: | ||
""" | ||
known_deps - to prevent re-registration of function multiple times | ||
registered_deps - to make correct dependency_links in case of building a sub_graph | ||
dependency_links - links dict for build TopologicalSorter | ||
handler_cls - special argument type for using special dependencies for example get_handler() | ||
""" | ||
|
||
def __init__(self, root_dep: Dependency, handler_cls: type) -> None: | ||
self.root_dep: Dependency = root_dep | ||
self.known_deps: dict[str, Dependency] = {} | ||
self.registered_deps: set[Dependency] = set() | ||
self.dependency_links: dict[Dependency, set[Dependency]] = {root_dep: set()} | ||
self.handler_cls: type = handler_cls | ||
self.topological_sorter: TopologicalSorter[Dependency] | None = None | ||
self.special_deps: set[Dependency] = set() | ||
|
||
def build_topological_sorter(self) -> None: | ||
self.topological_sorter = TopologicalSorter(self.dependency_links) | ||
self.topological_sorter.prepare() | ||
|
||
async def run_dependency(self, dependency: Dependency) -> None: | ||
await dependency.run() | ||
if self.topological_sorter is None: | ||
raise RuntimeError('There is no topological_sorter in dependency graph') | ||
self.topological_sorter.done(dependency) | ||
|
||
|
||
def make_stub_dependency() -> Dependency: | ||
def stub(): | ||
pass | ||
|
||
dependency = Dependency(stub) | ||
dependency.finished = True | ||
return dependency | ||
|
||
|
||
def get_handler(handler: PageHandler) -> PageHandler: | ||
return handler |
Oops, something went wrong.