Skip to content

Commit

Permalink
feat: Move utils, operators and glam_subdags out of dags directory (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
mikaeld authored Oct 17, 2023
1 parent bda9a9d commit 63beb2f
Show file tree
Hide file tree
Showing 55 changed files with 987 additions and 737 deletions.
60 changes: 60 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,51 @@ jobs:
gsutil rsync -d -r dataproc_bootstrap gs://moz-fx-data-prod-airflow-dataproc-artifacts/bootstrap
gsutil rsync -d -r jobs gs://moz-fx-data-prod-airflow-dataproc-artifacts/jobs
sync-dags-repo:
executor: docker/machine
steps:
- checkout
- add_ssh_keys:
fingerprints:
- "e4:30:50:41:53:f0:d6:3a:bb:c9:38:54:2d:ca:56:41"
- run:
name: 🤖 Update DAGs repository
command: |
ssh-keyscan -t rsa github.com >> ~/.ssh/known_hosts
git config --global user.email "[email protected]"
git config --global user.name "CircleCI Job"
git clone --branch main [email protected]:mozilla/telemetry-airflow-dags.git
cd ~/project/telemetry-airflow-dags
git submodule update --init --recursive --depth 1 telemetry-airflow
cd ${CIRCLE_PROJECT_REPONAME}
git pull origin dags
cd ..
git add ${CIRCLE_PROJECT_REPONAME}
git commit --allow-empty -m "Automatic commit from ${CIRCLE_PROJECT_REPONAME} commit ${CIRCLE_SHA1:0:9} build ${CIRCLE_BUILD_NUM} [skip ci]"
git push origin main
sync-dags-branch:
executor: docker/machine
steps:
- checkout
- add_ssh_keys:
fingerprints:
- "66:9a:9f:55:2b:5c:f5:d6:ea:c5:c4:f9:7b:db:8d:c0"
- run:
name: 🤖 Update DAGs branch
command: |
ssh-keyscan -t rsa github.com >> ~/.ssh/known_hosts
git config --global user.name "CircleCI job"
git config --global user.email "[email protected]"
git clone --branch dags [email protected]:mozilla/telemetry-airflow.git \
dags-branch
cd dags-branch/
rm -rf dags/
cp -r ~/project/dags dags
git add .
git commit -m "Automatic commit - DAGs from commit ${CIRCLE_SHA1:0:9} build ${CIRCLE_BUILD_NUM} [skip ci]" \
&& git push \
|| echo "Skipping push since it looks like there were no changes"
workflows:
ci:
Expand Down Expand Up @@ -297,3 +342,18 @@ workflows:
only: /.*/
branches:
only: main

- sync-dags-branch:
name: 🔃 Update DAGs branch
filters:
branches:
only: main

- sync-dags-repo:
name: 🔃 Synchronize telemetry-airflow-dags repository
filters:
branches:
only: main
requires:
- 🔃 Update DAGs branch

68 changes: 56 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,65 @@ Some links relevant to users and developers of WTMO:
## Writing DAGs
See the Airflow's [Best Practices guide](https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#best-practices) to help you write DAGs.

**⚠ Note: How to import DAGs and modules ⚠**
### **⚠ Warning: Do not import resources from the `dags` directory in DAGs definition files ⚠**
As an example, if you have `dags/dag_a.py` and `dags/dag_b.py` and want to use a helper
function in both DAG definition files, define the helper function in the `utils` directory
such as:

#### `utils/helper.py`
```python
def helper_function():
return "Help"
```

#### `dags/dag_a.py`
```python
from airflow import DAG

from utils.helper import helper_function

with DAG("dag_a", ...):
...
```

**Modules should be imported from the project directory, such as `from dags.my_dag import load_data`
rather than `from my_dag import load_data`.**
#### `dags/dag_b.py`
```python
from airflow import DAG

In Airflow, the `dags`, `config`, and `plugins` folders are [automatically added](https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/modules_management.html#built-in-pythonpath-entries-in-airflow) to the
`PYTHONPATH` to ensure they can be imported and accessed by Airflow's execution environment.
from utils.helper import helper_function

with DAG("dag_b", ...):
...
```

WTMO deployments use git-sync sidecars to synchronize DAG files from multiple
repositories via [telemetry-airflow-dags](https://github.com/mozilla/telemetry-airflow-dags/)
using git submodules. Git-sync sidecar pattern results in the following directory structure
once deployed.
```
airflow
├─ dags
│ └── repo
│ └── telemetry-airflow-dags
│ ├── <submodule repo_a>
│ │ └── dags
│ │ └── <dag files>
│ ├── <submodule repo_b>
│ │ └── dags
│ │ └── <dag files>
│ └── <submodule repo_c>
│ └── dags
│ └── <dag files>
├─ utils
│ └── ...
└─ plugins
└── ...
```
Hence, defining `helper_function()` in `dags/dag_a.py` and
importing the function in `dags/dag_b.py` as `from dags.dag_a import helper_function`
**will not work after deployment** because of the directory structured required for
git-sync sidecars.

However, this default configuration can cause problems when running unit tests located
in the `tests` directory. Since the `PYTHONPATH` includes the `dags` directory,
but not the project directory itself, the unit tests will not be able to import code from
the `dags` directory. This limitation restricts the ability to test the DAGs effectively
within the project structure. It is also generally expected that imports should work from the
project directory rather than from any of its subdirectories. For this reason, `telemetry-airflow`'s
Dockerfile adds the project directory to `PYTHONPATH`.

## Prerequisites

Expand Down
1 change: 1 addition & 0 deletions dags/adm_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from airflow import DAG
from airflow.hooks.base import BaseHook
from airflow.sensors.external_task import ExternalTaskSensor

from operators.gcp_container_operator import GKEPodOperator
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.tags import Tag
Expand Down
1 change: 1 addition & 0 deletions dags/bhr_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from airflow.operators.subdag import SubDagOperator
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.sensors.external_task import ExternalTaskSensor

from operators.gcp_container_operator import GKEPodOperator
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.dataproc import get_dataproc_parameters, moz_dataproc_pyspark_runner
Expand Down
25 changes: 15 additions & 10 deletions dags/burnham.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
import datetime
import json
import logging
import uuid
import time

import uuid
from dataclasses import dataclass

from airflow import DAG
from airflow.operators.python import PythonOperator

from operators.bq_sensor import BigQuerySQLSensorOperator
from operators.gcp_container_operator import GKEPodOperator
from utils.tags import Tag
Expand Down Expand Up @@ -431,7 +431,8 @@ def burnham_run(
gke_namespace=DEFAULT_GKE_NAMESPACE,
**kwargs,
):
"""Create a new GKEPodOperator that runs the burnham Docker image.
"""
Create a new GKEPodOperator that runs the burnham Docker image.
:param str task_id: [Required] ID for the task
:param str burnham_test_run: [Required] UUID for the test run
Expand Down Expand Up @@ -482,7 +483,8 @@ def burnham_run(


def burnham_sensor(task_id, sql, gcp_conn_id=DEFAULT_GCP_CONN_ID, **kwargs):
"""Create a new BigQuerySQLSensorOperator that checks for burnham data.
"""
Create a new BigQuerySQLSensorOperator that checks for burnham data.
:param str task_id: [Required] ID for the task
:param str sql: [Required] SQL for the sensor
Expand Down Expand Up @@ -512,7 +514,8 @@ def burnham_bigquery_run(
gke_namespace=DEFAULT_GKE_NAMESPACE,
**kwargs,
):
"""Create a new GKEPodOperator that runs the burnham-bigquery Docker image.
"""
Create a new GKEPodOperator that runs the burnham-bigquery Docker image.
:param str task_id: [Required] ID for the task
:param str project_id: [Required] Project ID where target table lives
Expand Down Expand Up @@ -558,7 +561,8 @@ def burnham_bigquery_run(


def encode_test_scenarios(test_scenarios):
"""Encode the given test scenarios as a str.
"""
Encode the given test scenarios as a str.
:param List[Dict[str, object]] test_scenarios: [Required] ID for the task
:return: str
Expand All @@ -570,7 +574,8 @@ def encode_test_scenarios(test_scenarios):


def do_sleep(minutes):
"""Sleep for the given number of minutes.
"""
Sleep for the given number of minutes.
Writes out an update every minute to give some indication of aliveness.
"""
Expand All @@ -581,7 +586,8 @@ def do_sleep(minutes):


def sleep_task(minutes, task_id):
"""Return an operator that sleeps for a certain number of minutes.
"""
Return an operator that sleeps for a certain number of minutes.
:param int minutes: [Required] Number of minutes to sleep
:param string task_id: [Required] ID for the task
Expand All @@ -592,7 +598,7 @@ def sleep_task(minutes, task_id):
task_id=task_id,
depends_on_past=False,
python_callable=do_sleep,
op_kwargs=dict(minutes=minutes),
op_kwargs={"minutes": minutes},
)


Expand All @@ -605,7 +611,6 @@ def sleep_task(minutes, task_id):
doc_md=DOCS,
tags=tags,
) as dag:

# Generate a UUID for this test run
generate_burnham_test_run_uuid = PythonOperator(
task_id="generate_burnham_test_run_uuid",
Expand Down
1 change: 1 addition & 0 deletions dags/clean_gke_pods.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import datetime, timedelta

from airflow import DAG

from operators.gcp_container_operator import GKEPodOperator
from utils.tags import Tag

Expand Down
1 change: 1 addition & 0 deletions dags/crash_symbolication.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from airflow.operators.subdag import SubDagOperator
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.sensors.external_task import ExternalTaskSensor

from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.dataproc import get_dataproc_parameters, moz_dataproc_pyspark_runner
from utils.tags import Tag
Expand Down
5 changes: 2 additions & 3 deletions dags/data_monitoring.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from datetime import datetime

from airflow import DAG
from operators.gcp_container_operator import GKEPodOperator

from dags.utils.tags import Tag
from operators.gcp_container_operator import GKEPodOperator
from utils.tags import Tag

DOCS = """\
This DAG is related to data monitoring project it is still under development.
Expand Down Expand Up @@ -57,7 +57,6 @@
tags=TAGS,
catchup=True,
) as dag:

for target_dataset in TARGET_DATASETS:
project_id, dataset, table = target_dataset.split(".")

Expand Down
1 change: 1 addition & 0 deletions dags/dim_active_users_aggregates.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import ExternalTaskSensor

from operators.gcp_container_operator import GKEPodOperator
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.tags import Tag
Expand Down
2 changes: 1 addition & 1 deletion dags/experiment_auto_sizing.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor

from operators.gcp_container_operator import GKEPodOperator
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.tags import Tag
Expand All @@ -34,7 +35,6 @@
doc_md=__doc__,
tags=tags,
) as dag:

# Built from repo https://github.com/mozilla/auto-sizing
auto_sizing_image = "gcr.io/moz-fx-data-experiments/auto_sizing:latest"

Expand Down
1 change: 1 addition & 0 deletions dags/firefox_public_data_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from airflow import DAG
from airflow.operators.subdag import SubDagOperator
from airflow.sensors.external_task import ExternalTaskSensor

from operators.gcp_container_operator import GKEPodOperator
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.dataproc import (
Expand Down
10 changes: 5 additions & 5 deletions dags/fivetran_acoustic.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime, timedelta
from typing import Any, Dict
from typing import Any

from airflow import DAG
from airflow.hooks.base import BaseHook
Expand All @@ -8,13 +8,13 @@
from fivetran_provider.operators.fivetran import FivetranOperator
from fivetran_provider.sensors.fivetran import FivetranSensor

from dags.utils.acoustic.acoustic_client import AcousticClient
from dags.utils.callbacks import retry_tasks_callback
from dags.utils.tags import Tag
from utils.acoustic.acoustic_client import AcousticClient
from utils.callbacks import retry_tasks_callback
from utils.tags import Tag


def _generate_acoustic_report(
conn_id: str, report_type: str, config: Dict[Any, Any], *args, **kwargs
conn_id: str, report_type: str, config: dict[Any, Any], *args, **kwargs
):
"""Retrieve Acoustic connection details from Airflow, instantiate AcousticClient and generate report."""

Expand Down
9 changes: 5 additions & 4 deletions dags/glam.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
from airflow.operators.subdag import SubDagOperator
from airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
from glam_subdags.extract import extract_user_counts, extracts_subdag
from glam_subdags.general import repeated_subdag
from glam_subdags.generate_query import generate_and_run_desktop_query
from glam_subdags.histograms import histogram_aggregates_subdag

from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.glam_subdags.extract import extract_user_counts, extracts_subdag
from utils.glam_subdags.general import repeated_subdag
from utils.glam_subdags.generate_query import generate_and_run_desktop_query
from utils.glam_subdags.histograms import histogram_aggregates_subdag
from utils.tags import Tag

project_id = "moz-fx-data-shared-prod"
Expand Down
9 changes: 5 additions & 4 deletions dags/glam_dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@

from airflow import DAG
from airflow.operators.subdag import SubDagOperator
from glam_subdags.general import repeated_subdag
from glam_subdags.generate_query import generate_and_run_desktop_query
from glam_subdags.histograms import histogram_aggregates_subdag
from glam_subdags.probe_hotlist import update_hotlist

from utils.gcp import bigquery_etl_query, gke_command
from utils.glam_subdags.general import repeated_subdag
from utils.glam_subdags.generate_query import generate_and_run_desktop_query
from utils.glam_subdags.histograms import histogram_aggregates_subdag
from utils.glam_subdags.probe_hotlist import update_hotlist
from utils.tags import Tag

prod_project_id = "moz-fx-data-shared-prod"
Expand Down
Loading

0 comments on commit 63beb2f

Please sign in to comment.