diff --git a/dags/example_dag_with_taskflow_api.py b/dags/example_dag_with_taskflow_api.py index 0bd1ba5c0..e5d3c56d3 100644 --- a/dags/example_dag_with_taskflow_api.py +++ b/dags/example_dag_with_taskflow_api.py @@ -1,25 +1,68 @@ -# DAG exhibiting task flow paradigm in airflow 2.0 -# https://airflow.apache.org/docs/apache-airflow/2.0.2/tutorial_taskflow_api.html -# Modified for our use case - import json +import pendulum + from airflow.decorators import dag, task -from airflow.utils.dates import days_ago -# These args will get passed on to each operator -# You can override them on a per-task basis during operator initialization +from airflow.models import Variable +from airflow.hooks.base_hook import BaseHook +from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator + + +def task_failure_slack_alert(context): + print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}") + + environment = Variable.get("airflow_environment", default_var="local") + if environment not in ["dev", "prod"] : + print(f"{environment} mode - skip sending alerts.") + return + + ti = context['ti'] # to get the Task Instance + TASK_STATE = ti.state + TASK_ID = context.get('task_instance').task_id + DAG_ID = context.get('task_instance').dag_id + EXECUTION_TIME = context.get('execution_date') + LOG_URL = context.get('task_instance').log_url + + slack_msg = f""" + :red_circle: Task Failed. + *Dag*: {DAG_ID} + *Task*: {TASK_ID} + *Task State*: {TASK_STATE} + *Execution Time*: {EXECUTION_TIME} + *Log URL*: <{LOG_URL}|*Logs*> + """ + + SLACK_FAILURE_ALERT_CONN_ID = "slack_data_alerts" + CHANNEL = BaseHook.get_connection(SLACK_FAILURE_ALERT_CONN_ID).login + + slack_alert = SlackWebhookOperator( + task_id=TASK_ID, + slack_webhook_conn_id = SLACK_FAILURE_ALERT_CONN_ID, + message=slack_msg, + channel=CHANNEL, + username='airflow' + ) + + return slack_alert.execute(context=context) + default_args = { - 'owner': 'airflow', + 'owner':'data platform', + 'retries': 0, + 'start_date': pendulum.datetime(2021, 1, 1, tz="UTC") } -@dag(default_args=default_args, schedule_interval="@daily", start_date=days_ago(2), tags=['example']) -def dag_with_taskflow_api(): - """ - ### TaskFlow API Tutorial Documentation - This is a simple ETL data pipeline example which demonstrates the use of - the TaskFlow API using three simple tasks for Extract, Transform, and Load. - Documentation that goes along with the Airflow TaskFlow API tutorial is - located - [here](https://airflow.apache.org/docs/stable/tutorial_taskflow_api.html) - """ + +@dag( + dag_id="example_taskflow_api", + description="Taskflow api example", + schedule=None, + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + default_args = default_args, + catchup=False, + on_success_callback=None, + on_failure_callback=task_failure_slack_alert, + tags=["example"], +) + +def tutorial_taskflow_api(): @task() def extract(): """ @@ -32,6 +75,7 @@ def extract(): order_data_dict = json.loads(data_string) return order_data_dict + @task(multiple_outputs=True) def transform(order_data_dict: dict): """ @@ -45,6 +89,7 @@ def transform(order_data_dict: dict): total_order_value += value return {"total_order_value": total_order_value} + @task() def load(total_order_value: float): """ @@ -53,8 +98,10 @@ def load(total_order_value: float): instead of saving it to end user review, just prints it out. """ - print("Total order value is: %.2f" % total_order_value) + print(f"Total order value is: {total_order_value:.2f}") + order_data = extract() order_summary = transform(order_data) load(order_summary["total_order_value"]) -dag_with_taskflow_api = dag_with_taskflow_api() + +tutorial_taskflow_api() diff --git a/docker/config/.env.localrunner b/docker/config/.env.localrunner index 04859fb09..a6b705046 100644 --- a/docker/config/.env.localrunner +++ b/docker/config/.env.localrunner @@ -2,10 +2,24 @@ # Example environment variables using temporary security credentials # AWS_ACCESS_KEY_ID=XXXXXXXXXX # AWS_SECRET_ACCESS_KEY=YYYYYYYYYYYY -# AWS_SESSION_TOKEN=ZZZZZZZZZZ +# AWS_SESSION_TOKEN=ZZZZZZZZZZ # to change default password you'll need to delete the db-data folder (when running locally) DEFAULT_PASSWORD="test" S3_DAGS_PATH="" S3_PLUGINS_PATH="" S3_REQUIREMENTS_PATH="" +ENVIRONMENT="local" + +# Airflow webserver instance name +AIRFLOW__WEBSERVER__INSTANCE_NAME="DAGs (Local)" + +# Airflow variables +# Example: "AIRFLOW_VAR_{}" +AIRFLOW_VAR_AIRFLOW_ENVIRONMENT="local" + +# Example of JSON format variable +# AIRFLOW_VAR_SLACK_DMS_MONITOR='{"key":"","value":""}' + +# Airflow connections +# Example: "AIRFLOW_CONN_{}" diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 90a2f9902..49eef0d26 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -4,3 +4,4 @@ apache-airflow-providers-snowflake==5.2.1 apache-airflow-providers-mysql==5.5.1 apache-airflow-providers-slack==8.5.1 boto3==1.33.13 +astronomer-cosmos==1.3.2