Skip to content

Commit

Permalink
Add pubsub and slack example operators
Browse files Browse the repository at this point in the history
  • Loading branch information
haroldwoo committed Apr 4, 2022
1 parent 160c64d commit f19c6b8
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 10 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ build:
clean: stop
docker-compose rm -f
rm -rf logs/*
if [ -f airflow-worker.pid ]; then rm airflow-worker.pid; fi

shell:
docker-compose run web bash
Expand Down
3 changes: 3 additions & 0 deletions bin/run
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ init_connections() {
gcp_conn=(
"google_cloud_airflow_dataproc"
"google_cloud_airflow_gke"
"google_cloud_airflow_pubsub"
"google_cloud_derived_datasets"
"google_cloud_prio_a"
"google_cloud_prio_admin"
Expand Down Expand Up @@ -177,6 +178,8 @@ init_variables() {

airflow variables set "fivetran_acoustic_contact_export_connector_id" "dummy_connector_id"
airflow variables set "fivetran_acoustic_contact_export_list_id" "00000"

airflow variables set "slack_secret_token" "slack_secret_token"
}

[ $# -lt 1 ] && usage
Expand Down
34 changes: 34 additions & 0 deletions dags/example_pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from airflow import DAG
from datetime import datetime, timedelta
from utils.tags import Tag

from airflow.providers.google.cloud.operators.pubsub import PubSubPublishMessageOperator

default_args = {
"owner": "[email protected]",
"email": ["[email protected]"],
"depends_on_past": False,
"start_date": datetime(2022, 3, 23),
"email_on_failure": True,
"email_on_retry": True,
"retries": 2,
"retry_delay": timedelta(minutes=30),
}

m1 = {'data': b'Hello, World!',
'attributes': {'type': 'greeting'}
}
m2 = {'data': b'Knock, knock'}
m3 = {'attributes': {'foo': ''}}

tags = [Tag.ImpactTier.tier_3]
dag = DAG("example_pubsub", default_args=default_args, schedule_interval="@daily", tags=tags,)

publish_task = PubSubPublishMessageOperator(
task_id="publish_to_pubsub",
project_id="moz-fx-data-airflow-prod-88e0",
topic="airflow-glam-triggers",
gcp_conn_id="google_cloud_airflow_pubsub",
messages=[m1, m2, m3],
dag=dag,
)
61 changes: 61 additions & 0 deletions dags/example_slack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from airflow import DAG
from airflow.models import Variable
from datetime import datetime, timedelta
from utils.tags import Tag

from airflow.providers.slack.operators.slack import SlackAPIPostOperator
from airflow.operators.bash import BashOperator

from utils.slack import if_task_fails_alert_slack

"""
If getting "channel_not_found" errors, you need to open the slack channel settings, navigate to Integrations,
and add "Airflow-bot" to the Apps section.
"""
tags = [Tag.ImpactTier.tier_3]

"""
default_args_1 = {
"owner": "[email protected]",
"email": ["[email protected]"],
"depends_on_past": False,
"start_date": datetime(2022, 3, 23),
"email_on_failure": True,
"email_on_retry": True,
"retries": 2,
"retry_delay": timedelta(minutes=30),
}
dag1 = DAG("example_slack", default_args=default_args_1, schedule_interval="@daily", tags=tags,)
# The following example shows how to simply post to a slack channel
simple_slack_example = SlackAPIPostOperator(
task_id="post_hello",
token=Variable.get("slack_secret_token"),
text="hello world!",
channel="#airflow-alerts",
dag=dag1,
)
"""

# This example shows how to configure a dag's default args callback so it alerts slack on failures using an
# imported utils method
default_args_2 = {
"owner": "[email protected]",
"email": ["[email protected]"],
"depends_on_past": False,
"start_date": datetime(2022, 3, 23),
"email_on_failure": True,
"email_on_retry": True,
"retries": 0,
"retry_delay": timedelta(minutes=30),
# NOTE: on_failure_callback doesn't trigger until all retries are exhausted
"on_failure_callback": if_task_fails_alert_slack,
}

dag2 = DAG("example_slack", default_args=default_args_2, schedule_interval="@daily", tags=tags,)

task_with_failed_slack_alerts = BashOperator(
task_id='fail_task',
bash_command='exit 1',
dag=dag2)
22 changes: 22 additions & 0 deletions dags/utils/slack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from airflow.models import Variable
from airflow.providers.slack.operators.slack import SlackAPIPostOperator

SLACK_CHANNEL = "#airflow-alerts"

def if_task_fails_alert_slack(context):
failed_alert = SlackAPIPostOperator(
task_id='slack_failed',
channel=SLACK_CHANNEL,
token=Variable.get("slack_secret_token"),
text="""
:red_circle: Task Failed.
*Task*: {task}
*Dag*: {dag}
*Date*: {ds}
""".format(
task=context.get('task_instance').task_id,
dag=context.get('task_instance').dag_id,
ds=context.get('ds')
)
)
return failed_alert.execute(context=context)
2 changes: 2 additions & 0 deletions requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ SQLAlchemy>=1.3.18
# Airflow 2 no longer installs http provider by default, until chardet becomes an optional dependency of requests
apache-airflow-providers-http
airflow-provider-fivetran
apache-airflow-providers-slack
# Upgrade google dataproc provider to fix beta client clusterConfig and mismatch issues
apache-airflow-providers-google==5.0.0
# 2.4.0 is broken for dataproc cluster create/delete
# 2.6.0 and 3.0.0 are newer but not compatible with apache-airflow-providers-google
# yet until maybe v7.0.0 bc 'google.cloud.dataproc_v1beta2' is deprecated
google-cloud-dataproc==2.5.0
xmltodict==0.12.0
google-cloud-pubsub==2.11.0
24 changes: 14 additions & 10 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ apache-airflow-providers-imap==2.0.0 # via apache-airflow
apache-airflow-providers-jdbc==2.0.0 # via apache-airflow
apache-airflow-providers-mysql==2.1.0 # via apache-airflow
apache-airflow-providers-postgres==2.0.0 # via apache-airflow
apache-airflow-providers-slack==4.2.3 # via -r requirements.in
apache-airflow-providers-sqlite==2.0.0 # via apache-airflow
apache-airflow[amazon,apache.hive,async,celery,cncf.kubernetes,crypto,datadog,github_enterprise,google_auth,jdbc,mysql,password,postgres,statsd]==2.1.4 # via -r requirements.in, airflow-provider-fivetran, apache-airflow-providers-amazon, apache-airflow-providers-apache-hive, apache-airflow-providers-celery, apache-airflow-providers-cncf-kubernetes, apache-airflow-providers-datadog, apache-airflow-providers-google, apache-airflow-providers-http, apache-airflow-providers-jdbc, apache-airflow-providers-mysql, apache-airflow-providers-postgres
apache-airflow[amazon,apache.hive,async,celery,cncf.kubernetes,crypto,datadog,github_enterprise,google_auth,jdbc,mysql,password,postgres,statsd]==2.1.4 # via -r requirements.in, airflow-provider-fivetran, apache-airflow-providers-amazon, apache-airflow-providers-apache-hive, apache-airflow-providers-celery, apache-airflow-providers-cncf-kubernetes, apache-airflow-providers-datadog, apache-airflow-providers-google, apache-airflow-providers-http, apache-airflow-providers-jdbc, apache-airflow-providers-mysql, apache-airflow-providers-postgres, apache-airflow-providers-slack
apispec[yaml]==3.3.2 # via flask-appbuilder
argcomplete==1.12.2 # via -r requirements.in, apache-airflow, nox
attrs==20.3.0 # via apache-airflow, cattrs, jsonschema, pytest
Expand All @@ -33,6 +34,7 @@ billiard==3.6.4.0 # via celery
blinker==1.4 # via apache-airflow
boto3==1.15.18 # via -r requirements.in, apache-airflow-providers-amazon, watchtower
botocore==1.18.18 # via -r requirements.in, boto3, s3transfer
cached-property==1.5.2 # via apache-airflow
cachetools==4.2.2 # via google-auth
cattrs==1.5.0 # via apache-airflow
celery==4.4.7 # via apache-airflow-providers-celery, flower
Expand Down Expand Up @@ -72,7 +74,7 @@ flower==0.9.7 # via apache-airflow-providers-celery
future==0.18.2 # via pyhive
gevent==21.1.2 # via apache-airflow
google-ads==13.0.0 # via apache-airflow-providers-google
google-api-core[grpc,grpcgcp]==1.31.0 # via apache-airflow-providers-google, google-ads, google-api-python-client, google-cloud-appengine-logging, google-cloud-automl, google-cloud-bigquery, google-cloud-bigquery-datatransfer, google-cloud-bigquery-storage, google-cloud-bigtable, google-cloud-container, google-cloud-core, google-cloud-datacatalog, google-cloud-dataproc, google-cloud-dlp, google-cloud-kms, google-cloud-language, google-cloud-logging, google-cloud-memcache, google-cloud-monitoring, google-cloud-os-login, google-cloud-pubsub, google-cloud-redis, google-cloud-secret-manager, google-cloud-spanner, google-cloud-speech, google-cloud-tasks, google-cloud-texttospeech, google-cloud-translate, google-cloud-videointelligence, google-cloud-vision, google-cloud-workflows
google-api-core[grpc,grpcgcp]==1.31.5 # via apache-airflow-providers-google, google-ads, google-api-python-client, google-cloud-appengine-logging, google-cloud-automl, google-cloud-bigquery, google-cloud-bigquery-datatransfer, google-cloud-bigquery-storage, google-cloud-bigtable, google-cloud-container, google-cloud-core, google-cloud-datacatalog, google-cloud-dataproc, google-cloud-dlp, google-cloud-kms, google-cloud-language, google-cloud-logging, google-cloud-memcache, google-cloud-monitoring, google-cloud-os-login, google-cloud-pubsub, google-cloud-redis, google-cloud-secret-manager, google-cloud-spanner, google-cloud-speech, google-cloud-tasks, google-cloud-texttospeech, google-cloud-translate, google-cloud-videointelligence, google-cloud-vision, google-cloud-workflows
google-api-python-client==1.12.8 # via apache-airflow-providers-google
google-auth-httplib2==0.1.0 # via apache-airflow-providers-google, google-api-python-client
google-auth-oauthlib==0.4.4 # via google-ads, pandas-gbq, pydata-google-auth
Expand All @@ -95,7 +97,7 @@ google-cloud-logging==2.5.0 # via apache-airflow-providers-google
google-cloud-memcache==1.0.0 # via apache-airflow-providers-google
google-cloud-monitoring==2.4.0 # via apache-airflow-providers-google
google-cloud-os-login==2.2.1 # via apache-airflow-providers-google
google-cloud-pubsub==2.6.1 # via apache-airflow-providers-google
google-cloud-pubsub==2.11.0 # via -r requirements.in, apache-airflow-providers-google
google-cloud-redis==2.2.0 # via apache-airflow-providers-google
google-cloud-secret-manager==1.0.0 # via apache-airflow-providers-google
google-cloud-spanner==1.19.1 # via apache-airflow-providers-google
Expand All @@ -109,12 +111,13 @@ google-cloud-vision==1.0.0 # via apache-airflow-providers-google
google-cloud-workflows==1.1.0 # via apache-airflow-providers-google
google-crc32c==1.1.2 # via google-resumable-media
google-resumable-media==1.3.1 # via google-cloud-bigquery, google-cloud-storage
googleapis-common-protos[grpc]==1.53.0 # via google-ads, google-api-core, google-cloud-audit-log, grpc-google-iam-v1
googleapis-common-protos[grpc]==1.53.0 # via google-ads, google-api-core, google-cloud-audit-log, grpc-google-iam-v1, grpcio-status
graphviz==0.16 # via apache-airflow
greenlet==1.1.0 # via apache-airflow, eventlet, gevent
grpc-google-iam-v1==0.12.3 # via google-cloud-bigtable, google-cloud-container, google-cloud-datacatalog, google-cloud-kms, google-cloud-pubsub, google-cloud-secret-manager, google-cloud-spanner, google-cloud-tasks
grpcio-gcp==0.2.2 # via apache-airflow-providers-google, google-api-core
grpcio==1.38.1 # via google-ads, google-api-core, google-cloud-bigquery, google-cloud-pubsub, googleapis-common-protos, grpc-google-iam-v1, grpcio-gcp
grpcio-status==1.45.0 # via google-cloud-pubsub
grpcio==1.45.0 # via google-ads, google-api-core, google-cloud-bigquery, google-cloud-pubsub, googleapis-common-protos, grpc-google-iam-v1, grpcio-gcp, grpcio-status
gunicorn==20.1.0 # via apache-airflow
h11==0.12.0 # via httpcore
hiredis==2.0.0 # via -r requirements.in
Expand All @@ -124,7 +127,7 @@ httplib2==0.19.1 # via google-api-python-client, google-auth-httplib2
httpx==0.19.0 # via apache-airflow, apache-airflow-providers-google
humanize==3.10.0 # via flower
idna==2.10 # via anyio, email-validator, requests, rfc3986
importlib-metadata==1.7.0 # via -r requirements.in, apache-airflow
importlib-metadata==1.7.0 # via -r requirements.in, apache-airflow, argcomplete, importlib-resources, jsonschema, kombu, nox, pluggy, pytest, virtualenv
importlib-resources==1.5.0 # via apache-airflow
inflection==0.5.1 # via apache-airflow
iniconfig==1.1.1 # via pytest
Expand All @@ -140,7 +143,7 @@ jsonschema==3.2.0 # via -r requirements.in, apache-airflow, flask-appbui
kombu==4.6.10 # via -r requirements.in, celery
kubernetes==11.0.0 # via apache-airflow-providers-cncf-kubernetes
lazy-object-proxy==1.4.3 # via apache-airflow
libcst==0.3.19 # via google-cloud-bigquery-storage, google-cloud-datacatalog, google-cloud-os-login, google-cloud-pubsub, google-cloud-workflows
libcst==0.3.19 # via google-cloud-bigquery-storage, google-cloud-datacatalog, google-cloud-os-login, google-cloud-workflows
lockfile==0.12.2 # via apache-airflow, python-daemon
mako==1.1.4 # via alembic
markdown==2.6.11 # via apache-airflow
Expand Down Expand Up @@ -169,7 +172,7 @@ pluggy==1.0.0 # via pytest
prison==0.1.3 # via flask-appbuilder
prometheus-client==0.8.0 # via flower
proto-plus==1.19.0 # via google-ads, google-cloud-appengine-logging, google-cloud-automl, google-cloud-bigquery, google-cloud-bigquery-datatransfer, google-cloud-bigquery-storage, google-cloud-datacatalog, google-cloud-dataproc, google-cloud-kms, google-cloud-logging, google-cloud-memcache, google-cloud-monitoring, google-cloud-os-login, google-cloud-pubsub, google-cloud-redis, google-cloud-tasks, google-cloud-workflows
protobuf==3.17.3 # via google-api-core, google-cloud-audit-log, google-cloud-bigquery, googleapis-common-protos, mysql-connector-python, proto-plus
protobuf==3.17.3 # via google-api-core, google-cloud-audit-log, google-cloud-bigquery, googleapis-common-protos, grpcio-status, mysql-connector-python, proto-plus
psutil==5.8.0 # via apache-airflow
psycopg2-binary==2.9.1 # via apache-airflow-providers-postgres
pure-sasl==0.6.2 # via thrift-sasl
Expand Down Expand Up @@ -207,6 +210,7 @@ sasl==0.3.1 # via pyhive
setproctitle==1.2.2 # via apache-airflow
shelljob==0.5.6 # via -r requirements.in
six==1.16.0 # via bcrypt, eventlet, flask-jwt-extended, google-api-core, google-api-python-client, google-auth, google-auth-httplib2, google-cloud-core, google-resumable-media, grpcio, isodate, jsonschema, kubernetes, openapi-schema-validator, openapi-spec-validator, prison, protobuf, pyopenssl, python-dateutil, retrying, sasl, sqlalchemy-utils, tenacity, thrift, thrift-sasl, virtualenv
slack-sdk==3.15.2 # via apache-airflow-providers-slack
sniffio==1.2.0 # via anyio, httpcore, httpx
sqlalchemy-jsonfield==1.0.0 # via apache-airflow
sqlalchemy-utils==0.37.8 # via flask-appbuilder
Expand All @@ -221,7 +225,7 @@ thrift-sasl==0.4.3 # via pyhive
thrift==0.13.0 # via apache-airflow-providers-apache-hive, hmsclient, pyhive, thrift-sasl
toml==0.10.2 # via pytest
tornado==5.1.1 # via flower
typing-extensions==3.10.0.0 # via libcst, typing-inspect
typing-extensions==3.10.0.0 # via anyio, apache-airflow, libcst, rich, typing-inspect
typing-inspect==0.7.1 # via libcst
unicodecsv==0.14.1 # via apache-airflow
uritemplate==3.0.1 # via google-api-python-client
Expand All @@ -233,7 +237,7 @@ websocket-client==1.1.0 # via -r requirements.in, kubernetes
werkzeug==1.0.1 # via -r requirements.in, apache-airflow, flask, flask-jwt-extended
wtforms==2.3.3 # via flask-admin, flask-wtf
xmltodict==0.12.0 # via -r requirements.in
zipp==3.5.0 # via importlib-metadata
zipp==3.5.0 # via importlib-metadata, importlib-resources
zope.event==4.5.0 # via gevent
zope.interface==5.4.0 # via gevent

Expand Down

0 comments on commit f19c6b8

Please sign in to comment.