From 48e0c2f89fe80d2c72488bf5207c247f8ced755e Mon Sep 17 00:00:00 2001 From: Felix Uellendall Date: Thu, 10 Mar 2022 12:17:59 +0100 Subject: [PATCH] [EXPERIMENTAL] Add transfer nodes - improve test coverage - add experimental decorator --- .github/workflows/test.yml | 7 +++ README.md | 11 ++++- airflow_diagrams/__init__.py | 7 ++- airflow_diagrams/airflow.py | 62 +++++++++++++++++++++++--- airflow_diagrams/cli.py | 10 ++++- airflow_diagrams/diagram.jinja2 | 2 +- airflow_diagrams/utils.py | 14 +++++- pyproject.toml | 3 ++ tests/test_airflow.py | 79 ++++++++++++++++++++++++++++++--- tests/test_cli.py | 45 +++++++++++++++++-- 10 files changed, 220 insertions(+), 20 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 9d7113a..cc406a9 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -12,10 +12,15 @@ jobs: strategy: fail-fast: false matrix: + experimental-features: [false, true] python: ['3.9', '3.10'] include: - python: '3.11.0-alpha - 3.11.0' experimental: true + experimental-features: false + - python: '3.11.0-alpha - 3.11.0' + experimental: true + experimental-features: true steps: - name: Check out repository uses: actions/checkout@v2 @@ -42,6 +47,8 @@ jobs: - name: Install library run: poetry install --no-interaction - name: Run tests + env: + AIRFLOW_DIAGRAMS__EXPERIMENTAL: ${{ matrix.experimental-features }} run: poetry run pytest --cov-report=xml --cov=airflow_diagrams tests/ - name: Upload coverage to Codecov uses: codecov/codecov-action@v2 diff --git a/README.md b/README.md index 62d786f..3d74844 100644 --- a/README.md +++ b/README.md @@ -33,12 +33,17 @@ pip install airflow-diagrams Then just call it like this: ```console -Usage: airflow-diagrams generate [OPTIONS] +Usage: airflow-diagrams generate [OPTIONS] [EXPERIMENTAL] Generates _diagrams.py in directory which contains the definition to create a diagram. Run this file and you will get a rendered diagram. +Arguments: + [EXPERIMENTAL] Enable experimental features by setting the variable to + 'true'. [env var: AIRFLOW_DIAGRAMS__EXPERIMENTAL;default: + False] + Options: -d, --airflow-dag-id TEXT The dag id from which to generate the diagram. By default it generates for all. @@ -66,6 +71,10 @@ Options: _Examples of generated diagrams can be found in the [examples](examples) directory._ +## 🧪 Experimental Features + +* **Transfer Nodes**: Convert Airflow transfer operators into two tasks i.e. source & destination grouped in a cluster + ## 🤔 How it Works 1. ℹ️ It connects, by using the official [Apache Airflow Python Client](https://github.com/apache/airflow-client-python), to your Airflow installation to retrieve all DAGs (in case you don't specify any `dag_id`) and all Tasks for the DAG(s). diff --git a/airflow_diagrams/__init__.py b/airflow_diagrams/__init__.py index c91b4dd..a78b407 100644 --- a/airflow_diagrams/__init__.py +++ b/airflow_diagrams/__init__.py @@ -1,8 +1,13 @@ """Top-level package for airflow-diagrams.""" from importlib.metadata import version -from os import getcwd +from os import getcwd, getenv from os.path import dirname, join, realpath __app_name__ = "airflow-diagrams" __version__ = version(__name__) __location__ = realpath(join(getcwd(), dirname(__file__))) +__experimental__ = getenv("AIRFLOW_DIAGRAMS__EXPERIMENTAL", "False").lower() in ( + "true", + "1", + "t", +) diff --git a/airflow_diagrams/airflow.py b/airflow_diagrams/airflow.py index 2853667..6f380c2 100644 --- a/airflow_diagrams/airflow.py +++ b/airflow_diagrams/airflow.py @@ -1,10 +1,12 @@ from dataclasses import dataclass +from re import findall from typing import Generator, Optional from airflow_client.client.api.dag_api import DAGApi from airflow_client.client.api_client import ApiClient, Configuration from airflow_diagrams.class_ref import ClassRef +from airflow_diagrams.utils import experimental @dataclass @@ -50,19 +52,69 @@ def get_tasks(self) -> list[AirflowTask]: :returns: a list of Airflow Tasks """ - return [ + # TODO: Enable type checking when https://github.com/apache/airflow-client-python/issues/20 is fixed. + response = self.dag_api.get_tasks(self.dag_id, _check_return_type=False) + + tasks = [ AirflowTask( class_ref=ClassRef(**task["class_ref"]), task_id=task["task_id"], downstream_task_ids=task["downstream_task_ids"], group_name=None, ) - # TODO: Enable type checking when https://github.com/apache/airflow-client-python/issues/20 is fixed. - for task in self.dag_api.get_tasks(self.dag_id, _check_return_type=False)[ - "tasks" - ] + for task in response["tasks"] ] + transfer_nodes(tasks) + + return tasks + + +@experimental +def transfer_nodes(tasks: list[AirflowTask]) -> None: + """ + Transfer Nodes replaces an Airflow transfer task by two tasks i.e. source & destination clustered. + + :param tasks: The tasks to modify. + """ + transfer_tasks = [ + task for task in tasks if ".transfers." in task.class_ref.module_path + ] + + for task in transfer_tasks: + class_name_words = findall("[A-Z][^A-Z]*", task.class_ref.class_name) + to_index = class_name_words.index("To") + source_class_name = "".join(class_name_words[:to_index]) + destination_class_name = "".join(class_name_words[to_index + 1 :]) + source_task_id = f"[SOURCE] {task.task_id}" + destination_task_id = f"[DESTINATION] {task.task_id}" + source = AirflowTask( + class_ref=ClassRef( + module_path="", + class_name=source_class_name, + ), + task_id=source_task_id, + downstream_task_ids=[destination_task_id], + group_name=task.task_id, + ) + destination = AirflowTask( + class_ref=ClassRef( + module_path="", + class_name=destination_class_name, + ), + task_id=destination_task_id, + downstream_task_ids=task.downstream_task_ids, + group_name=task.task_id, + ) + tasks.extend([source, destination]) + tasks.remove(task) + + transfer_task_ids = list(map(lambda task: task.task_id, transfer_tasks)) + for t_idx, t in enumerate(tasks): + for dt_idx, dt_id in enumerate(t.downstream_task_ids): + if dt_id in transfer_task_ids: + tasks[t_idx].downstream_task_ids[dt_idx] = f"[SOURCE] {dt_id}" + class AirflowApiTree: """Retrieve Airflow Api information as a Tree.""" diff --git a/airflow_diagrams/cli.py b/airflow_diagrams/cli.py index 4f6a119..0f4aeb7 100644 --- a/airflow_diagrams/cli.py +++ b/airflow_diagrams/cli.py @@ -110,9 +110,14 @@ def generate( # dead: disable exists=True, dir_okay=False, ), + experimental: bool = Argument( + False, + envvar="AIRFLOW_DIAGRAMS__EXPERIMENTAL", + help="Enable experimental features by setting the variable to 'true'.", + ), ) -> None: if verbose: - rprint("💬 Running with verbose output...") + rprint("💬Running with verbose output...") logging.basicConfig( level=logging.DEBUG, format="%(message)s", @@ -120,6 +125,9 @@ def generate( # dead: disable handlers=[RichHandler()], ) + if experimental: + rprint("🧪Running with experimental features..") + mappings: dict = load_mappings(mapping_file) if mapping_file else {} diagrams_class_refs: list[ClassRef] = retrieve_class_refs( diff --git a/airflow_diagrams/diagram.jinja2 b/airflow_diagrams/diagram.jinja2 index 4c2dbfb..c2bea37 100644 --- a/airflow_diagrams/diagram.jinja2 +++ b/airflow_diagrams/diagram.jinja2 @@ -9,7 +9,7 @@ with Diagram("{{ name }}", show=False): {% for node in nodes -%} {% if node.cluster -%} with {{ node.cluster.get_variable() }}: - {{ node.get_variable() }} = {{ node.class_name }}("{{ node.get_label(label_wrap) }}") + {{ node.get_variable() }} = {{ node.class_name }}() {% else -%} {{ node.get_variable() }} = {{ node.class_name }}("{{ node.get_label(label_wrap) }}") {% endif -%} diff --git a/airflow_diagrams/utils.py b/airflow_diagrams/utils.py index 795a4dd..79aa11a 100644 --- a/airflow_diagrams/utils.py +++ b/airflow_diagrams/utils.py @@ -1,9 +1,10 @@ +import logging import os from pathlib import Path import yaml -from airflow_diagrams import __location__ +from airflow_diagrams import __experimental__, __location__ def load_abbreviations() -> dict: @@ -31,3 +32,14 @@ def load_mappings(file: Path) -> dict: "r", ) as mapping_yaml: return yaml.safe_load(mapping_yaml) + + +def experimental(func): + """Decorate experimental features.""" + + def wrapper(*args, **kwargs): + if __experimental__: + logging.debug("Calling experimental feature: %s", func.__name__) + func(*args, **kwargs) + + return wrapper diff --git a/pyproject.toml b/pyproject.toml index 1b42d41..06a7119 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,3 +35,6 @@ pytest-order = "^1.0.1" [build-system] requires = ["poetry-core>=1.0.0"] build-backend = "poetry.core.masonry.api" + +[tool.coverage.run] +omit = ["airflow_diagrams/__main__.py"] diff --git a/tests/test_airflow.py b/tests/test_airflow.py index 016ac11..f367daa 100644 --- a/tests/test_airflow.py +++ b/tests/test_airflow.py @@ -1,7 +1,18 @@ +import pytest + +from airflow_diagrams import __experimental__ from airflow_diagrams.airflow import AirflowDag, AirflowTask from airflow_diagrams.class_ref import ClassRef +def test_airflow_dag_eq(airflow_api_tree): + """Test Airflow DAG equality""" + kwargs = dict(dag_id="test_dag", dag_api=airflow_api_tree.dag_api) + airflow_dag = AirflowDag(**kwargs) + assert airflow_dag == AirflowDag(**kwargs) + assert airflow_dag != dict(**kwargs) + + def test_airflow_dag_get_tasks(airflow_api_tree): """Test getting tasks from Airflow DAG""" dag_id = "test_dag" @@ -38,6 +49,67 @@ def test_airflow_dag_get_tasks(airflow_api_tree): ] +@pytest.mark.skipif(not __experimental__, reason="Transfer nodes are experimental.") +def test_airflow_dag_get_tasks_with_transfers(airflow_api_tree): + """Test getting tasks from Airflow DAG""" + dag_id = "test_dag" + airflow_api_tree.dag_api.get_tasks.return_value = dict( + tasks=[ + dict( + class_ref=dict( + module_path="fizz", + class_name="Fizz", + ), + task_id="test_task_0", + downstream_task_ids=["test_task_1"], + ), + dict( + class_ref=dict( + module_path="foo.transfers.bar", + class_name="FooToBar", + ), + task_id="test_task_1", + downstream_task_ids=[], + ), + ], + ) + assert airflow_api_tree.get_dags(dag_id=dag_id)[0].get_tasks() == [ + AirflowTask( + class_ref=ClassRef( + **dict( + module_path="fizz", + class_name="Fizz", + ) + ), + task_id="test_task_0", + downstream_task_ids=["[SOURCE] test_task_1"], + group_name=None, + ), + AirflowTask( + class_ref=ClassRef( + **dict( + module_path="", + class_name="Foo", + ) + ), + task_id="[SOURCE] test_task_1", + downstream_task_ids=["[DESTINATION] test_task_1"], + group_name="test_task_1", + ), + AirflowTask( + class_ref=ClassRef( + **dict( + module_path="", + class_name="Bar", + ) + ), + task_id="[DESTINATION] test_task_1", + downstream_task_ids=[], + group_name="test_task_1", + ), + ] + + def test_airflow_api_tree_get_dags(airflow_api_tree): """Test getting dags from Airflow API Tree""" airflow_api_tree.dag_api.get_dags.return_value = dict( @@ -66,10 +138,3 @@ def test_airflow_api_tree_get_dags_with_dag_id(airflow_api_tree): AirflowDag(dag_id=dag_id, dag_api=airflow_api_tree.dag_api), ] airflow_api_tree.dag_api.assert_not_called() - - -def test_airflow_dag_eq(airflow_api_tree): - """Test Airflow DAG equality""" - airflow_dag_kwargs = dict(dag_id="test_dag", dag_api=airflow_api_tree.dag_api) - assert AirflowDag(**airflow_dag_kwargs) == AirflowDag(**airflow_dag_kwargs) - assert AirflowDag(**airflow_dag_kwargs) != dict(**airflow_dag_kwargs) diff --git a/tests/test_cli.py b/tests/test_cli.py index 884325c..b4c86d4 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,9 +1,12 @@ import pytest from typer.testing import CliRunner -from airflow_diagrams import __app_name__, __version__, cli +from airflow_diagrams import __app_name__, __experimental__, __version__, cli -runner = CliRunner() +if __experimental__: + runner = CliRunner(env={"AIRFLOW_DIAGRAMS__EXPERIMENTAL": "true"}) +else: + runner = CliRunner() @pytest.fixture @@ -36,6 +39,10 @@ def test_version(): assert f"{__app_name__} v{__version__}\n" == result.stdout +@pytest.mark.skipif( + __experimental__, + reason="Only run test when experimental features are disabled.", +) def test_generate(mock_dag): """Test end-to-end""" result = runner.invoke(cli.app, ["generate", "--output-path", "generated/"]) @@ -51,6 +58,10 @@ def test_generate(mock_dag): ).replace("\n", "") == result.stdout.replace("\n", "") +@pytest.mark.skipif( + __experimental__, + reason="Only run test when experimental features are disabled.", +) def test_generate_with_progress(mock_dag): """Test end-to-end""" result = runner.invoke( @@ -69,6 +80,26 @@ def test_generate_with_progress(mock_dag): ).replace("\n", "") == result.stdout.replace("\n", "") +@pytest.mark.skipif( + not __experimental__, + reason="Only run test when experimental features are enabled.", +) +def test_generate_experimental(mock_dag): + """Test end-to-end with experimental features enabled""" + result = runner.invoke(cli.app, ["generate", "--output-path", "generated/"]) + assert result.exit_code == 0 + assert ( + "🧪Running with experimental features..\n" + "ℹ️ Retrieving Airflow DAGs...\n" + " ℹ️ Retrieving Airflow Tasks for Airflow DAG test_dag...\n" + "🪄 Processing Airflow DAG test_dag...\n" + " 🪄 Processing Airflow Task test_task (module.path.ClassName) with downstream tasks []...\n" + " 🔮Found match programming.flowchart.Action.\n" + "🎨Generated diagrams file generated/test_dag_diagrams.py.\n" + "Done. 🎉\n" + ).replace("\n", "") == result.stdout.replace("\n", "") + + def test_generate_with_verbose(mock_dag): """Test that logging level is DEBUG""" result = runner.invoke( @@ -76,9 +107,13 @@ def test_generate_with_verbose(mock_dag): ["generate", "--output-path", "generated/", "--verbose"], ) assert result.exit_code == 0 - assert result.stdout.startswith("💬 Running with verbose output...") + assert result.stdout.startswith("💬Running with verbose output...") +@pytest.mark.skipif( + __experimental__, + reason="Only run test when experimental features are disabled.", +) @pytest.mark.order(after="test_download") def test_generate_from_file(mock_dag): """Test generate from Airflow info file""" @@ -97,6 +132,10 @@ def test_generate_from_file(mock_dag): ).replace("\n", "") == result.stdout.replace("\n", "") +@pytest.mark.skipif( + __experimental__, + reason="Only run test when experimental features are disabled.", +) def test_download(mock_dag): """Test downloading Airflow information""" result = runner.invoke(cli.app, ["download", "generated/airflow_dags.yml"])