Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update quickstart and add feature examples #189

Merged
merged 4 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ jobs:
build:
strategy:
matrix:
python: ['3.7', '3.8', '3.9', '3.10']
python: ['3.8', '3.9', '3.10']
runs-on: 'ubuntu-latest'
steps:
- uses: actions/checkout@master
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Changed
- Removed support for Python 3.7

## [0.19.0] - 2023-07-19
### Added
Expand Down
164 changes: 128 additions & 36 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,47 @@
[![Code Style](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/ambv/black)
[![Downloads](https://pepy.tech/badge/dag-factory)](https://pepy.tech/project/dag-factory)

*dag-factory* is a library for dynamically generating [Apache Airflow](https://github.com/apache/incubator-airflow) DAGs from YAML configuration files.
- [Installation](#installation)
- [Usage](#usage)
Welcome to *dag-factory*! *dag-factory* is a library for [Apache Airflow](https://github.com/apache/incubator-airflow) to construct DAGs declaratively via configuration files.

The minimum requirements for **dag-factory** are:
- Python 3.8.0+
- Apache Airflow 2.0+

For a gentle introduction, please take a look at our [Quickstart Guide](#quickstart). For more examples, please see the [examples](/examples) folder.

- [Quickstart](#quickstart)
- [Features](#features)
- [Multiple Configuration Files](#multiple-configuration-files)
- [Dynamically Mapped Tasks](#dynamically-mapped-tasks)
- [Datasets](#datasets)
- [Custom Operators](#custom-operators)
- [Benefits](#benefits)
- [Notes](#notes)
- [HttpSensor (since 0.10.0)](#httpsensor-since-0100)
- [Contributing](#contributing)

## Installation
## Quickstart

To install *dag-factory* run `pip install dag-factory`. It requires Python 3.6.0+ and Apache Airflow 2.0+.
The following example demonstrates how to create a simple DAG using *dag-factory*. We will be generating a DAG with three tasks, where `task_2` and `task_3` depend on `task_1`.
These tasks will be leveraging the `BashOperator` to execute simple bash commands.

## Usage
![screenshot](/img/quickstart_dag.png)

After installing *dag-factory* in your Airflow environment, there are two steps to creating DAGs. First, we need to create a YAML configuration file. For example:
1. To install *dag-factory*, run the following pip command in your Airflow environment:
```bash
pip install dag-factory
```

2. Create a YAML configuration file called `config_file.yml` and save it within your airflow dags folder:
```yaml
example_dag1:
default_args:
owner: 'example_owner'
start_date: 2018-01-01 # or '2 days'
end_date: 2018-01-05
retries: 1
retry_delay_sec: 300
start_date: '2024-01-01'
schedule_interval: '0 3 * * *'
concurrency: 1
max_active_runs: 1
dagrun_timeout_sec: 60
default_view: 'tree' # or 'graph', 'duration', 'gantt', 'landing_times'
orientation: 'LR' # or 'TB', 'RL', 'BT'
catchup: False
description: 'this is an example dag!'
on_success_callback_name: print_hello
on_success_callback_file: /usr/local/airflow/dags/print_hello.py
on_failure_callback_name: print_hello
on_failure_callback_file: /usr/local/airflow/dags/print_hello.py
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
Expand All @@ -52,32 +60,116 @@ example_dag1:
bash_command: 'echo 3'
dependencies: [task_1]
```
We are setting the execution order of the tasks by specifying the `dependencies` key.

Then in the DAGs folder in your Airflow environment you need to create a python file like this:
3. In the same folder, create a python file called `generate_dags.py`. This file is responsible for generating the DAGs from the configuration file and is a one-time setup.
You won't need to modify this file unless you want to add more configuration files or change the configuration file name.

```python
from airflow import DAG
from airflow import DAG ## by default, this is needed for the dagbag to parse this file
import dagfactory
from pathlib import Path

dag_factory = dagfactory.DagFactory("/path/to/dags/config_file.yml")
config_file = Path.cwd() / "dags/config_file.yml"
dag_factory = dagfactory.DagFactory(config_file)

dag_factory.clean_dags(globals())
dag_factory.generate_dags(globals())
```

And this DAG will be generated and ready to run in Airflow!
After a few moments, the DAG will be generated and ready to run in Airflow. Unpause the DAG in the Airflow UI and watch the tasks execute!

If you have several configuration files you can import them like this:
![screenshot](/img/quickstart_gantt.png)

Please look at the [examples](/examples) folder for more examples.

## Features

### Multiple Configuration Files
If you want to split your DAG configuration into multiple files, you can do so by leveraging a suffix in the configuration file name.
```python
# 'airflow' word is required for the dagbag to parse this file
from dagfactory import load_yaml_dags

load_yaml_dags(globals_dict=globals(), suffix=['dag.yaml'])
```

![screenshot](/img/example_dag.png)
### Dynamically Mapped Tasks
If you want to create a dynamic number of tasks, you can use the `mapped_tasks` key in the configuration file. The `mapped_tasks` key is a list of dictionaries, where each dictionary represents a task.

```yaml
...
tasks:
request:
operator: airflow.operators.python.PythonOperator
python_callable_name: example_task_mapping
python_callable_file: /usr/local/airflow/dags/expand_tasks.py # this file should contain the python callable
process:
operator: airflow.operators.python_operator.PythonOperator
python_callable_name: expand_task
python_callable_file: /usr/local/airflow/dags/expand_tasks.py
partial:
op_kwargs:
test_id: "test"
expand:
op_args:
request.output
dependencies: [request]
```
![mapped_tasks_example.png](img/mapped_tasks_example.png)

### Datasets
**dag-factory** supports scheduling DAGs via [Apache Airflow Datasets](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/datasets.html).

To leverage, you need to specify the `Dataset` in the `outlets` key in the configuration file. The `outlets` key is a list of strings that represent the dataset locations.
In the `schedule` key of the consumer dag, you can set the `Dataset` you would like to schedule against. The key is a list of strings that represent the dataset locations.
The consumer dag will run when all the datasets are available.

```yaml
producer_dag:
default_args:
owner: "example_owner"
retries: 1
start_date: '2024-01-01'
description: "Example DAG producer simple datasets"
schedule_interval: "0 5 * * *"
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 1"
outlets: [ 's3://bucket_example/raw/dataset1.json' ]
task_2:![custom_operators.png](..%2F..%2FDesktop%2Fcustom_operators.png)
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 2"
dependencies: [ task_1 ]
outlets: [ 's3://bucket_example/raw/dataset2.json' ]
consumer_dag:
default_args:
owner: "example_owner"
retries: 1
start_date: '2024-01-01'
description: "Example DAG consumer simple datasets"
schedule: [ 's3://bucket_example/raw/dataset1.json', 's3://bucket_example/raw/dataset2.json' ]
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 'consumer datasets'"
```
![datasets_example.png](img/datasets_example.png)

### Custom Operators
**dag-factory** supports using custom operators. To leverage, set the path to the custom operator within the `operator` key in the configuration file. You can add any additional parameters that the custom operator requires.

```yaml
...
tasks:
begin:
operator: airflow.operators.dummy_operator.DummyOperator
make_bread_1:
operator: customized.operators.breakfast_operators.MakeBreadOperator
bread_type: 'Sourdough'
```
![custom_operators.png](img/custom_operators.png)
## Notes

### HttpSensor (since 0.10.0)
Expand All @@ -88,23 +180,23 @@ The following example shows `response_check` logic in a python file:

```yaml
task_2:
operator: airflow.sensors.http_sensor.HttpSensor
http_conn_id: 'test-http'
method: 'GET'
response_check_name: check_sensor
response_check_file: /path/to/example1/http_conn.py
dependencies: [task_1]
operator: airflow.sensors.http_sensor.HttpSensor
http_conn_id: 'test-http'
method: 'GET'
response_check_name: check_sensor
response_check_file: /path/to/example1/http_conn.py
dependencies: [task_1]
```

The `response_check` logic can also be provided as a lambda:

```yaml
task_2:
operator: airflow.sensors.http_sensor.HttpSensor
http_conn_id: 'test-http'
method: 'GET'
response_check_lambda: 'lambda response: "ok" in reponse.text'
dependencies: [task_1]
operator: airflow.sensors.http_sensor.HttpSensor
http_conn_id: 'test-http'
method: 'GET'
response_check_lambda: 'lambda response: "ok" in reponse.text'
dependencies: [task_1]
```

## Benefits
Expand Down
1 change: 1 addition & 0 deletions dagfactory/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
"""Modules and methods to export for easier access"""

from .dagfactory import DagFactory, load_yaml_dags
1 change: 1 addition & 0 deletions dagfactory/__version__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
"""Module contains the version of dag-factory"""

__version__ = "0.19.0"
55 changes: 24 additions & 31 deletions dagfactory/dagbuilder.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Module contains code for generating tasks and constructing a DAG"""

# pylint: disable=ungrouped-imports
import os
import re
Expand Down Expand Up @@ -197,34 +198,26 @@ def get_dag_params(self) -> Dict[str, Any]:

if utils.check_dict_key(dag_params["default_args"], "sla_miss_callback"):
if isinstance(dag_params["default_args"]["sla_miss_callback"], str):
dag_params["default_args"][
"sla_miss_callback"
]: Callable = import_string(
dag_params["default_args"]["sla_miss_callback"]
dag_params["default_args"]["sla_miss_callback"]: Callable = (
import_string(dag_params["default_args"]["sla_miss_callback"])
)

if utils.check_dict_key(dag_params["default_args"], "on_success_callback"):
if isinstance(dag_params["default_args"]["on_success_callback"], str):
dag_params["default_args"][
"on_success_callback"
]: Callable = import_string(
dag_params["default_args"]["on_success_callback"]
dag_params["default_args"]["on_success_callback"]: Callable = (
import_string(dag_params["default_args"]["on_success_callback"])
)

if utils.check_dict_key(dag_params["default_args"], "on_failure_callback"):
if isinstance(dag_params["default_args"]["on_failure_callback"], str):
dag_params["default_args"][
"on_failure_callback"
]: Callable = import_string(
dag_params["default_args"]["on_failure_callback"]
dag_params["default_args"]["on_failure_callback"]: Callable = (
import_string(dag_params["default_args"]["on_failure_callback"])
)

if utils.check_dict_key(dag_params["default_args"], "on_retry_callback"):
if isinstance(dag_params["default_args"]["on_retry_callback"], str):
dag_params["default_args"][
"on_retry_callback"
]: Callable = import_string(
dag_params["default_args"]["on_retry_callback"]
dag_params["default_args"]["on_retry_callback"]: Callable = (
import_string(dag_params["default_args"]["on_retry_callback"])
)

if utils.check_dict_key(dag_params, "sla_miss_callback"):
Expand Down Expand Up @@ -351,11 +344,11 @@ def make_task(operator: str, task_params: Dict[str, Any]) -> BaseOperator:
" python_callable_file: !!python/name:my_module.my_func"
)
if not task_params.get("python_callable"):
task_params[
"python_callable"
]: Callable = utils.get_python_callable(
task_params["python_callable_name"],
task_params["python_callable_file"],
task_params["python_callable"]: Callable = (
utils.get_python_callable(
task_params["python_callable_name"],
task_params["python_callable_file"],
)
)
# remove dag-factory specific parameters
# Airflow 2.0 doesn't allow these to be passed to operator
Expand Down Expand Up @@ -419,10 +412,10 @@ def make_task(operator: str, task_params: Dict[str, Any]) -> BaseOperator:
del task_params["response_check_name"]
del task_params["response_check_file"]
else:
task_params[
"response_check"
]: Callable = utils.get_python_callable_lambda(
task_params["response_check_lambda"]
task_params["response_check"]: Callable = (
utils.get_python_callable_lambda(
task_params["response_check_lambda"]
)
)
# remove dag-factory specific parameters
# Airflow 2.0 doesn't allow these to be passed to operator
Expand Down Expand Up @@ -669,18 +662,18 @@ def set_dependencies(
group_id = conf["task_group"].group_id
name = f"{group_id}.{name}"
if conf.get("dependencies"):
source: Union[
BaseOperator, "TaskGroup"
] = tasks_and_task_groups_instances[name]
source: Union[BaseOperator, "TaskGroup"] = (
tasks_and_task_groups_instances[name]
)
for dep in conf["dependencies"]:
if tasks_and_task_groups_config[dep].get("task_group"):
group_id = tasks_and_task_groups_config[dep][
"task_group"
].group_id
dep = f"{group_id}.{dep}"
dep: Union[
BaseOperator, "TaskGroup"
] = tasks_and_task_groups_instances[dep]
dep: Union[BaseOperator, "TaskGroup"] = (
tasks_and_task_groups_instances[dep]
)
source.set_upstream(dep)

@staticmethod
Expand Down
1 change: 1 addition & 0 deletions dagfactory/dagfactory.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Module contains code for loading a DagFactory config and generating DAGs"""

import logging
import os
from itertools import chain
Expand Down
5 changes: 2 additions & 3 deletions dagfactory/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Module contains various utilities used by dag-factory"""

import ast
import importlib.util
import os
Expand Down Expand Up @@ -212,9 +213,7 @@ def check_template_searchpath(template_searchpath: Union[str, List[str]]) -> boo
return False


def get_expand_partial_kwargs(
task_params: Dict[str, Any]
) -> Tuple[
def get_expand_partial_kwargs(task_params: Dict[str, Any]) -> Tuple[
Dict[str, Any],
Dict[str, Union[Dict[str, Any], Any]],
Dict[str, Union[Dict[str, Any], Any]],
Expand Down
2 changes: 1 addition & 1 deletion examples/expand_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ def example_task_mapping():
def expand_task(x, test_id):
print(test_id)
print(x)
return [x]
return [x]
Binary file added img/custom_operators.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added img/datasets_example.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file removed img/example_dag.png
Binary file not shown.
Binary file added img/mapped_tasks_example.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added img/quickstart_dag.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added img/quickstart_gantt.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Loading