Skip to content

Commit

Permalink
HH-196544 add dependency graph
Browse files Browse the repository at this point in the history
  • Loading branch information
712u3 committed Nov 16, 2023
1 parent cf8dc65 commit 36c728a
Show file tree
Hide file tree
Showing 20 changed files with 969 additions and 615 deletions.
82 changes: 82 additions & 0 deletions docs/dependency_injection.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
## Dependency injection

Dependency injection is a pattern when a function receives other functions that it requires,
instead of creating them internally.
In frontik implementation, dependencies are simple functions,
which run after `RequestHandler.prepare` and before* handler code is executed.
Dependencies are great for running common actions before actual request processing takes place.

Here is what a dependencies may look like:

```python
from frontik.dependency_manager import dep


async def get_session_dependency(handler: PageHandler) -> Session:
token = handler.get_cookie('token')
return await session_client.get_session(token)


class Page(PageHandler):
# Can be used on class level
dependencies = (another_dependency,)

async def get_page(self, session=dep(get_session_dependency)):
self.json.put({'result': session})
```

Dependency can be sync or async functions. When page is executed all ready to run
async dependencies run in parallel with asyncio.gather(). If something finishes the page
(call self.finish() or raise Exception), then we stop executing the remaining dependencies

Dependencies can depend on another dependencies, thus we have a dependency graph.
Within one execution of a graph, the same dependencies will be executed once.
Sameness is determined by {function.__module__}.{function.__name__}
Dependencies can come from factories, then it turns out that there are several different dependencies
with the same name. In this case the one that is specified explicitly in the method arg or
in class level will be taken, the rest from the graph depths will be discarded


There is an opportunity to specify priorities for dependencies:
```python
from frontik.dependency_manager import dep


async def get_session_dependency(handler: PageHandler) -> Session:
token = handler.get_cookie('token')
return await session_client.get_session(token)


class Page(PageHandler):
# Can be used on class level
dependencies = (another_dependency,)
_priority_dependency_names: list[str] = [
side_dependency,
another_dependency,
]

async def get_page(self, session=dep(get_session_dependency)):
self.json.put({'result': session})
```
If any of the _priority_dependency_names are present in the current graph,
they will be executed before all the other dependencies sequentially.
In the given example `another_dependency` -> `get_session_dependency` -> `get_page`


*It is also possible to specify "async" dependencies:
```python
from frontik.dependency_manager import dep, async_deps


async def get_session_dependency(handler: PageHandler) -> Session:
token = handler.get_cookie('token')
return await session_client.get_session(token)


class Page(PageHandler):
@async_deps([get_session_dependency])
async def get_page(self):
self.json.put({'result': 'done'})
```
The passed list will not block the execution of the page_method, so they can be executed in parallel

2 changes: 2 additions & 0 deletions docs/preprocessors.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## Preprocessors

Deprecated, see https://github.com/hhru/frontik/blob/master/docs/dependency_injection.md

The first step of page generation is preprocessing. Preprocessors are simple functions, which run after
`RequestHandler.prepare` and before handler code is executed. Preprocessors are great for running common actions
before actual request processing takes place.
Expand Down
61 changes: 61 additions & 0 deletions frontik/dependency_manager/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any

from frontik.dependency_manager.dependencies import DependencyMarker
from frontik.dependency_manager.graph_builder import build_sub_graph, get_dependency_graph
from frontik.dependency_manager.graph_runner import execute_graph
from frontik.preprocessors import DependencyGroupMarker, Preprocessor

if TYPE_CHECKING:
from collections.abc import Callable

from frontik.handler import PageHandler


def dep(dependency: Preprocessor | Callable | list[Callable]) -> Any:
"""
add dependency to page_method, it will be run before page_method and provide result
async def get_page(self, session=dep(get_session)):
...
"""
if isinstance(dependency, Preprocessor) and not isinstance(dependency.preprocessor_function, DependencyGroupMarker):
return DependencyMarker(dependency.preprocessor_function)

if isinstance(dependency, list):
return DependencyGroupMarker(dependency)

if callable(dependency):
return DependencyMarker(dependency)

msg = 'Bad dependency type, only func or list[func]'
raise ValueError(msg)


def async_deps(async_dependencies: list[Callable]) -> Callable:
"""
add dependencies that will be run in parallel with page_method
@async_dep([get_session, get_data])
async def get_page(self):
...
"""

def decorator(execute_page_method: Callable) -> Callable:
setattr(execute_page_method, '_async_deps', async_dependencies)
return execute_page_method

return decorator


async def build_and_run_sub_graph(handler: PageHandler, functions_to_run: list) -> None:
sub_graph = build_sub_graph(handler, functions_to_run)
await execute_graph(handler, sub_graph)


async def execute_page_method_with_dependencies(handler: PageHandler, page_method: Any) -> Any:
main_graph = get_dependency_graph(page_method.__func__, handler.__class__)
setattr(handler, '_main_graph', main_graph)
await execute_graph(handler, main_graph)
return main_graph.root_dep.result
94 changes: 94 additions & 0 deletions frontik/dependency_manager/dependencies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from __future__ import annotations

import asyncio
from graphlib import TopologicalSorter
from typing import TYPE_CHECKING

from frontik.preprocessors import make_full_name

if TYPE_CHECKING:
from collections.abc import Callable

from frontik.handler import PageHandler


class DependencyMarker:
def __init__(self, func: Callable) -> None:
self.func = func


class Dependency:
def __init__(self, func: Callable) -> None:
self.func = func
self.args: list = []
self.result = None
self.finished = False
self.task: asyncio.Task | None = None
self.waited = True

async def run(self) -> None:
"""
replace self.args with the result of completed sub_dependencies and run self.func
if sub_dependency is not finished raise RuntimeError
"""
if self.finished:
return

for i, arg in enumerate(self.args):
if isinstance(arg, Dependency):
if not arg.finished:
raise RuntimeError(f'Graph corrupted, run {self}, before finishing {arg}')
self.args[i] = arg.result

if asyncio.iscoroutinefunction(self.func):
if self.waited:
self.result = await self.func(*self.args)
else:
asyncio.create_task(self.func(*self.args))
else:
self.result = self.func(*self.args)
self.finished = True

def __repr__(self):
return make_full_name(self.func)


class DependencyGraph:
"""
known_deps - to prevent re-registration of function multiple times
registered_deps - to make correct dependency_links in case of building a sub_graph
dependency_links - links dict for build TopologicalSorter
handler_cls - special argument type for using special dependencies for example get_handler()
"""

def __init__(self, root_dep: Dependency, handler_cls: type) -> None:
self.root_dep: Dependency = root_dep
self.known_deps: dict[str, Dependency] = {}
self.registered_deps: set[Dependency] = set()
self.dependency_links: dict[Dependency, set[Dependency]] = {root_dep: set()}
self.handler_cls: type = handler_cls
self.topological_sorter: TopologicalSorter[Dependency] | None = None
self.special_deps: set[Dependency] = set()

def build_topological_sorter(self) -> None:
self.topological_sorter = TopologicalSorter(self.dependency_links)
self.topological_sorter.prepare()

async def run_dependency(self, dependency: Dependency) -> None:
await dependency.run()
if self.topological_sorter is None:
raise RuntimeError('There is no topological_sorter in dependency graph')
self.topological_sorter.done(dependency)


def make_stub_dependency() -> Dependency:
def stub():
pass

dependency = Dependency(stub)
dependency.finished = True
return dependency


def get_handler(handler: PageHandler) -> PageHandler:
return handler
Loading

0 comments on commit 36c728a

Please sign in to comment.