Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for templating on_failure_callback #252

Merged
merged 28 commits into from
Oct 30, 2024

Conversation

jroach-astronomer
Copy link
Member

@jroach-astronomer jroach-astronomer commented Oct 17, 2024

Add support for templating in on_failure_callback. Requester is specifically looking to support templated parameters that do not need to be stored in a .py file.

Here's an example (from the issue submitted by @matveykortsev) for reference:

from airflow.providers.slack.notifications.slack import send_slack_notification
'on_failure_callback': [
        send_slack_notification(
            slack_conn_id='slack',
            text="""
                :red_circle: Task Failed. 
                *Task*: {{ ti.task_id }}  
                *Dag*: {{ ti.dag_id }} 
                *Execution Time*: {{ ti.execution_date }}  
                *Log Url*: {{ ti.log_url }} 
                """,
            channel="analytics-alerts",
            username="Airflow",
        )
    ],

Closes: #209

@jroach-astronomer jroach-astronomer changed the title Rebasing branch issue-209: Adding enhanced functionality for on_failure_callback Oct 17, 2024
@codecov-commenter
Copy link

codecov-commenter commented Oct 17, 2024

Codecov Report

Attention: Patch coverage is 95.23810% with 1 line in your changes missing coverage. Please review.

Project coverage is 93.95%. Comparing base (1c999f5) to head (7088269).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
dagfactory/dagbuilder.py 95.23% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #252      +/-   ##
==========================================
- Coverage   93.96%   93.95%   -0.02%     
==========================================
  Files           8        8              
  Lines         630      645      +15     
==========================================
+ Hits          592      606      +14     
- Misses         38       39       +1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@jroach-astronomer
Copy link
Member Author

@tatiana, @pankajastro - should be all set to merge. Thanks!

@tatiana tatiana changed the title issue-209: Adding enhanced functionality for on_failure_callback Add support for templating on_failure_callback Oct 17, 2024
@pankajastro pankajastro self-assigned this Oct 17, 2024
@pankajastro
Copy link
Contributor

@jroach-astronomer and I connected to discuss this PR, and we decided to add an example in the README.md for this feature. We will include this in the 0.2.0 release.

@pankajastro pankajastro added this to the DAG Factory 0.20.0 milestone Oct 18, 2024
@jroach-astronomer
Copy link
Member Author

Updated to include documentation in the README.md, as well as support for callbacks defined at default_args level. Also integrates a recent change to allow for callbacks defined using a file and name.

@pankajastro
Copy link
Contributor

Hey @jroach-astronomer, I'm wondering if you were able to test it. I tried to test this for Slack but got the below error

Broken DAG: [/usr/local/airflow/dags/example_dag_factory.py]
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/airflow/utils/module_loading.py", line 42, in import_string
    return getattr(module, class_name)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: module 'airflow.providers.slack.notifications' has no attribute 'slack import send_slack_notification'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/astro/.local/lib/python3.12/site-packages/dagfactory/dagbuilder.py", line 842, in set_callback
    on_state_callback_callable: Callable = import_string(on_state_callback_params["callback"])
                                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/utils/module_loading.py", line 44, in import_string
    raise ImportError(f'Module "{module_path}" does not define a "{class_name}" attribute/class')
ImportError: Module "airflow.providers.slack.notifications" does not define a "slack import send_slack_notification" attribute/class

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/astro/.local/lib/python3.12/site-packages/dagfactory/dagfactory.py", line 158, in clean_dags
    dags: Dict[str, Any] = self.build_dags()
                           ^^^^^^^^^^^^^^^^^
  File "/home/astro/.local/lib/python3.12/site-packages/dagfactory/dagfactory.py", line 112, in build_dags
    raise DagFactoryException(f"Failed to generate dag {dag_name}. verify config is correct") from err
dagfactory.exceptions.DagFactoryException: Failed to generate dag example_dag. verify config is correct

YAML

default:
  default_args:
    owner: "default_owner"
    start_date: 2018-03-01
    end_date: 2018-03-05
    retries: 1
    retry_delay_sec: 300
    #on_success_callback_name: print_hello_from_callback
    #on_success_callback_file: $CONFIG_ROOT_DIR/print_hello.py
    on_failure_callback:
      callback: airflow.providers.slack.notifications.slack import send_slack_notification
      text: |
          :red_circle: Task Failed.
          This task has failed and needs to be addressed.
          Please remediate this issue ASAP.
  concurrency: 1
  max_active_runs: 1
  dagrun_timeout_sec: 600
  default_view: "tree"
  orientation: "LR"
  schedule_interval: "0 1 * * *"
  on_failure_callback_name: print_hello_from_callback
  on_failure_callback_file: $CONFIG_ROOT_DIR/print_hello.py

example_dag:
  default_args:
    owner: "custom_owner"
    start_date: 2 days
  description: "this is an example dag"
  schedule_interval: "0 3 * * *"
  render_template_as_native_obj: True
  tasks:
    task_1:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: "echo 1"
    task_2:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: "echo 2"
      dependencies: [task_1]
    task_3:
      operator: airflow.operators.python_operator.PythonOperator
      python_callable_name: print_hello
      python_callable_file: $CONFIG_ROOT_DIR/print_hello.py
      dependencies: [task_1]

Based on the example here: https://airflow.apache.org/docs/apache-airflow-providers-slack/stable/notifications/slackwebhook_notifier_howto_guide.html look like we may need to handle the notification import a bit differently or I'm doing something wrong it testing?

@jroach-astronomer
Copy link
Member Author

@pankajastro, good catch, there is a typo in my example. It should be 'airflow.providers.slack.notifications.slack.send_slack_notification' for that callback.

I will change that and push a new commit.

@jroach-astronomer
Copy link
Member Author

@pankajastro, good catch, there is a typo in my example. It should be 'airflow.providers.slack.notifications.slack.send_slack_notification' for that callback.

I will change that and push a new commit.

Update pushed!

@pankajastro
Copy link
Contributor

I am getting the below error. Do you have yml which I can use for testing

[2024-10-21, 20:42:10 UTC] {taskinstance.py:3311} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/dags/customized/operators/breakfast_operators.py", line 13, in execute
    raise Exception("error")
Exception: error
[2024-10-21, 20:42:10 UTC] {taskinstance.py:1225} INFO - Marking task as FAILED. dag_id=example_breadfast, task_id=make_bread_1, run_id=manual__2024-10-21T20:42:06.309471+00:00, execution_date=20241021T204206, start_date=20241021T204210, end_date=20241021T204210
[2024-10-21, 20:42:10 UTC] {taskinstance.py:1563} INFO - Executing callback at index 0: partial
[2024-10-21, 20:42:10 UTC] {taskinstance.py:1567} ERROR - Error in callback at index 0: partial
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 273, in _run_raw_task
    TaskInstance._execute_task_with_callbacks(
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3159, in _execute_task_with_callbacks
    result = self._execute_task(context, task_orig)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3183, in _execute_task
    return _execute_task(self, context, task_orig)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/dags/customized/operators/breakfast_operators.py", line 13, in execute
    raise Exception("error")
Exception: error
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 1565, in _run_finished_callback
    callback(context)
TypeError: SlackNotifier.__init__() takes 1 positional argument but 2 positional arguments (and 4 keyword-only arguments) were given

@tatiana
Copy link
Collaborator

tatiana commented Oct 21, 2024

We are moving this to the 0.21 milestone since @jroach-astronomer is on leave until after the next release. We can have pre-releases with this change as soon as the issue pointed out by @pankajastro is solved.

@jroach-astronomer
Copy link
Member Author

jroach-astronomer commented Oct 24, 2024

@pankajastro, @tatiana, I've updated the dev/dags/ directory to include the example_callbacks.py file. However, I'm running into an issue when it comes to adding example_callbacks.py to the .airflowignore for the integration tests. It seems to be exactly what was mentioned in this issue (apache/airflow#23532), as it's only happening for Airflow Version == 2.3. Any thoughts here?

@jroach-astronomer
Copy link
Member Author

@pankajastro, I've gone ahead and testing the changes using the Astro project in the dev/ directory. Here was the issue! I did not include a slack_conn_id in the on_failure_callback definition (hence the parameter exception).

Those 3 failing integration tests are due to the known symlink issue in Airflow 2.3.0. We'll need to resolve that in order for the integration tests to run.

I'm also going to complete adding this callback functionality to all callbacks (at the DAG and Task-level, include success, retry, etc.).

@jroach-astronomer
Copy link
Member Author

@pankajastro, @tatiana, wanted to check in on integration tests. Please see the comment above. Thanks!

@pankajastro
Copy link
Contributor

pankajastro commented Oct 28, 2024

Those 3 failing integration tests are due to the known symlink issue in Airflow 2.3.0. We'll need to resolve that in order for the integration tests to run.

I think we can skip this particular test for Airflow 2.3.0 you may have to adjust a bit https://github.com/astronomer/dag-factory/blob/main/tests/test_example_dags.py

@pankajastro
Copy link
Contributor

Which Slack channel are you using to test this? We may need to configure it in CI. Has it already been configured?

@pankajastro
Copy link
Contributor

I'm able to test below YML DAG with slack webhook notification

example_callbacks:
  default_args:
    start_date: "2024-01-01"
    on_failure_callback:
      callback: airflow.providers.slack.notifications.slack_webhook.send_slack_webhook_notification
      slack_webhook_conn_id: slack_conn_id
      text: "Hello World!"
  schedule_interval: "@daily"
  catchup: False
  tasks:
    start:
      operator: airflow.operators.python.PythonOperator
      python_callable_file: $CONFIG_ROOT_DIR/customized/callables/python.py
      python_callable_name: succeeding_task
    end:
      operator: airflow.operators.python.PythonOperator
      python_callable_file: $CONFIG_ROOT_DIR/customized/callables/python.py
      python_callable_name: failing_task
      dependencies:
        - start

Copy link
Contributor

@pankajastro pankajastro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, we may need to fix failure CI!

dev/dags/dags Outdated Show resolved Hide resolved
@jroach-astronomer jroach-astronomer requested a review from a team as a code owner October 30, 2024 11:34
Copy link
Contributor

@pankajastro pankajastro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work @jroach-astronomer 🎉

@pankajastro pankajastro merged commit cce8f05 into astronomer:main Oct 30, 2024
67 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support for templating in on_failure_callback
4 participants