Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for templating on_failure_callback #252

Merged
merged 28 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
00fd992
Rebasing branch
jroach-astronomer Oct 17, 2024
ad1947c
Enabled pre-commit hooks
jroach-astronomer Oct 17, 2024
90d0ea7
Adding support for default_args, as well as on_failure_callbacks defi…
jroach-astronomer Oct 18, 2024
d231db0
Updating callback to properly point to provider package.
jroach-astronomer Oct 21, 2024
9c3d726
Updating docs, examples for callbacks
jroach-astronomer Oct 23, 2024
c346824
Removing callback leveraging Slack
jroach-astronomer Oct 24, 2024
6b39c07
Changing empty operator to Python operator that simply prints a message
jroach-astronomer Oct 24, 2024
4e9d4db
Adding example_callbacks.py to .airflowignore
jroach-astronomer Oct 24, 2024
54bbe56
Debugging test failures
jroach-astronomer Oct 24, 2024
f69ef0b
Debugging test failures
jroach-astronomer Oct 24, 2024
cc62b7b
Debugging test failures
jroach-astronomer Oct 24, 2024
96c3b83
Debugging test failures
jroach-astronomer Oct 24, 2024
a1f2f46
Debugging test failures
jroach-astronomer Oct 24, 2024
ef8d78f
Attempting to tackle .airflowignore issue
jroach-astronomer Oct 24, 2024
9e9bca5
Attempting to tackle .airflowignore issue
jroach-astronomer Oct 24, 2024
34ce43f
Attempting to tackle .airflowignore issue
jroach-astronomer Oct 24, 2024
c58cba8
Attempting to tackle .airflowignore issue
jroach-astronomer Oct 24, 2024
9a0e6e7
Attempting to tackle .airflowignore issue
jroach-astronomer Oct 24, 2024
bb58452
Attempting to tackle .airflowignore issue
jroach-astronomer Oct 24, 2024
fa7b10f
Attempting to tackle .airflowignore issue
jroach-astronomer Oct 24, 2024
df23132
Attempting to tackle .airflowignore issue
jroach-astronomer Oct 24, 2024
a6ce1bc
Provided implementation for Notifiers
jroach-astronomer Oct 25, 2024
1287cd3
Updating unit tests, making a quick change to integration tests
jroach-astronomer Oct 25, 2024
277c8b2
Reverting integration test changes
jroach-astronomer Oct 25, 2024
e6babe2
Adding version checking for callback imports
jroach-astronomer Oct 25, 2024
3ffc6c5
Adding version checking for callback imports
jroach-astronomer Oct 25, 2024
2bbe1ee
Updating tests, registering mark
jroach-astronomer Oct 30, 2024
7088269
Update tests/test_example_dags.py
pankajastro Oct 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,51 @@ consumer_dag:
bread_type: 'Sourdough'
```
![custom_operators.png](img/custom_operators.png)

### Callbacks
**dag-factory** also supports using "callbacks" at the DAG, Task, and TaskGroup level. These callbacks can be defined in
a few different ways. The first points directly to a Python function that has been defined in the `include/callbacks.py`
file.

```yaml
example_dag1:
on_failure_callback: include.callbacks.example_callback1
...
```

Here, the `on_success_callback` points to first a file, and then to a function name within that file. Notice that this
callback is defined using `default_args`, meaning this callback will be applied to all tasks.

```yaml
example_dag1:
...
default_args:
on_success_callback_file: /usr/local/airflow/include/callbacks.py
on_success_callback_name: example_callback1
```

**dag-factory** users can also leverage provider-built tools when configuring callbacks. In this example, the
`send_slack_notification` function from the Slack provider is used to dispatch a message when a DAG failure occurs. This
function is passed to the `callback` key under `on_failure_callback`. This pattern allows for callback definitions to
take parameters (such as `text`, `channel`, and `username`, as shown here).

**Note that this functionality is currently only supported for `on_failure_callback`'s defined at the DAG-level, or in
`default_args`. Support for other callback types and Task/TaskGroup-level definitions are coming soon.**

```yaml
example_dag1:
on_failure_callback:
callback: airflow.providers.slack.notifications.slack.send_slack_notification
slack_conn_id: example_slack_id
text: |
:red_circle: Task Failed.
This task has failed and needs to be addressed.
Please remediate this issue ASAP.
channel: analytics-alerts
username: Airflow
...
```

## Notes

### HttpSensor (since 1.0.0)
Expand Down
72 changes: 59 additions & 13 deletions dagfactory/dagbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import re
from copy import deepcopy
from datetime import datetime, timedelta
from functools import partial
from typing import Any, Callable, Dict, List, Union

from airflow import DAG, configuration
Expand Down Expand Up @@ -178,10 +179,9 @@ def get_dag_params(self) -> Dict[str, Any]:
)

if utils.check_dict_key(dag_params["default_args"], "on_failure_callback"):
if isinstance(dag_params["default_args"]["on_failure_callback"], str):
dag_params["default_args"]["on_failure_callback"]: Callable = import_string(
dag_params["default_args"]["on_failure_callback"]
)
dag_params["default_args"]["on_failure_callback"]: Callable = self.set_callback(
parameters=dag_params["default_args"], callback_type="on_failure_callback"
)

if utils.check_dict_key(dag_params["default_args"], "on_retry_callback"):
if isinstance(dag_params["default_args"]["on_retry_callback"], str):
Expand All @@ -198,8 +198,9 @@ def get_dag_params(self) -> Dict[str, Any]:
dag_params["on_success_callback"]: Callable = import_string(dag_params["on_success_callback"])

if utils.check_dict_key(dag_params, "on_failure_callback"):
if isinstance(dag_params["on_failure_callback"], str):
dag_params["on_failure_callback"]: Callable = import_string(dag_params["on_failure_callback"])
dag_params["on_failure_callback"]: Callable = self.set_callback(
parameters=dag_params, callback_type="on_failure_callback"
)

if utils.check_dict_key(dag_params, "on_success_callback_name") and utils.check_dict_key(
dag_params, "on_success_callback_file"
Expand All @@ -212,9 +213,8 @@ def get_dag_params(self) -> Dict[str, Any]:
if utils.check_dict_key(dag_params, "on_failure_callback_name") and utils.check_dict_key(
dag_params, "on_failure_callback_file"
):
dag_params["on_failure_callback"]: Callable = utils.get_python_callable(
dag_params["on_failure_callback_name"],
dag_params["on_failure_callback_file"],
dag_params["on_failure_callback"] = self.set_callback(
parameters=dag_params, callback_type="on_failure_callback", has_name_and_file=True
)

if utils.check_dict_key(dag_params["default_args"], "on_success_callback_name") and utils.check_dict_key(
Expand All @@ -229,10 +229,8 @@ def get_dag_params(self) -> Dict[str, Any]:
if utils.check_dict_key(dag_params["default_args"], "on_failure_callback_name") and utils.check_dict_key(
dag_params["default_args"], "on_failure_callback_file"
):

dag_params["default_args"]["on_failure_callback"]: Callable = utils.get_python_callable(
dag_params["default_args"]["on_failure_callback_name"],
dag_params["default_args"]["on_failure_callback_file"],
dag_params["default_args"]["on_failure_callback"] = self.set_callback(
parameters=dag_params["default_args"], callback_type="on_failure_callback", has_name_and_file=True
)

if utils.check_dict_key(dag_params, "template_searchpath"):
Expand Down Expand Up @@ -805,3 +803,51 @@ def build(self) -> Dict[str, Union[str, DAG]]:
self.set_dependencies(tasks, tasks_dict, dag_params.get("task_groups", {}), task_groups_dict)

return {"dag_id": dag_params["dag_id"], "dag": dag}

@staticmethod
def set_callback(parameters: Union[dict, str], callback_type: str, has_name_and_file=False) -> Callable:
"""
Update the passed-in config with the callback.

:param parameters:
:param callback_type:
:param has_name_and_file:
:returns: Callable
"""
# There is scenario where a callback is passed in via a file and a name. For the most part, this will be a
# Python callable that is treated similarly to a Python callable that the PythonOperator may leverage. That
# being said, what if this is not a Python callable? What if this is another type?
if has_name_and_file:
return utils.get_python_callable(
python_callable_name=parameters[f"{callback_type}_name"],
python_callable_file=parameters[f"{callback_type}_file"],
)

# If the value stored at parameters[callback_type] is a string, it should be imported under the assumption that
# it is a function that is "ready to be called". If not returning the function, something like this could be
# used to update the config parameters[callback_type] = import_string(parameters[callback_type])
if isinstance(parameters[callback_type], str):
return import_string(parameters[callback_type])

# Otherwise, if the parameter[callback_type] is a dictionary, it should be treated similar to the Python
# callable
elif isinstance(parameters[callback_type], dict):
# Pull the on_failure_callback dictionary from dag_params
on_state_callback_params: dict = parameters[callback_type]

# Check to see if there is a "callback" key in the on_failure_callback dictionary. If there is, parse
# out that callable, and add the parameters
if utils.check_dict_key(on_state_callback_params, "callback"):
if isinstance(on_state_callback_params["callback"], str):
on_state_callback_callable: Callable = import_string(on_state_callback_params["callback"])
del on_state_callback_params["callback"]

# Return the callable, this time, using the params provided in the YAML file, rather than a .py
# file with a callable configured. If not returning the partial, something like this could be used
# to update the config ... parameters[callback_type]: Callable = partial(...)
if hasattr(on_state_callback_callable, "notify"):
return on_state_callback_callable(**on_state_callback_params)

return partial(on_state_callback_callable, **on_state_callback_params)

raise DagFactoryConfigException(f"Invalid type passed to {callback_type}")
Empty file.
16 changes: 16 additions & 0 deletions dev/dags/customized/callables/python.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""
failure.py

Create a callable that intentionally "fails".

Author: Jake Roach
Date: 2024-10-22
"""


def succeeding_task():
print("Task has executed successfully!")


def failing_task():
raise Exception("Intentionally failing this Task to trigger on_failure_callback.")
Empty file.
11 changes: 11 additions & 0 deletions dev/dags/customized/callbacks/custom_callbacks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"""
example_callbacks.py

Author: Jake Roach
Date: 2024-10-22
"""


def output_message(context, param1, param2):
print("A callback has been raised!")
print(f"{param1} ---------- {param2}")
17 changes: 17 additions & 0 deletions dev/dags/example_callbacks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import os
from pathlib import Path

# The following import is here so Airflow parses this file
# from airflow import DAG
import dagfactory

DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/"
CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR))

config_file = str(CONFIG_ROOT_DIR / "example_callbacks.yml")

example_dag_factory = dagfactory.DagFactory(config_file)

# Creating task dependencies
example_dag_factory.clean_dags(globals())
example_dag_factory.generate_dags(globals())
28 changes: 28 additions & 0 deletions dev/dags/example_callbacks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
example_callbacks:
default_args:
start_date: "2024-01-01"
on_failure_callback:
callback: airflow.providers.slack.notifications.slack.send_slack_notification
slack_conn_id: slack_conn_id
text: |
:red_circle: Task Failed.
This task has failed and needs to be addressed.
Please remediate this issue ASAP.
channel: "#channel"
schedule_interval: "@daily"
catchup: False
on_failure_callback:
callback: customized.callbacks.custom_callbacks.output_message
param1: param1
param2: param2
tasks:
start:
operator: airflow.operators.python.PythonOperator
python_callable_file: $CONFIG_ROOT_DIR/customized/callables/python.py
python_callable_name: succeeding_task
end:
operator: airflow.operators.python.PythonOperator
python_callable_file: $CONFIG_ROOT_DIR/customized/callables/python.py
python_callable_name: failing_task
dependencies:
- start
1 change: 1 addition & 0 deletions dev/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
# Astro Runtime includes the following pre-installed providers packages: https://www.astronomer.io/docs/astro/runtime-image-architecture#provider-packages
apache-airflow-providers-slack
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ dependencies = [

[project.optional-dependencies]
tests = [
"apache-airflow-providers-slack",
"pytest>=6.0",
"pytest-cov",
"pre-commit"
Expand Down Expand Up @@ -95,7 +96,7 @@ universal = true
[tool.pytest.ini_options]
filterwarnings = ["ignore::DeprecationWarning"]
minversion = "6.0"
markers = ["integration"]
markers = ["integration", "callbacks"]

######################################
# THIRD PARTY TOOLS
Expand Down
Loading
Loading