Skip to content

Commit

Permalink
Build DagFactory DAGs when there is an invalid YAML in the DAGs folder (
Browse files Browse the repository at this point in the history
#184)

Before, when the Airflow DAGs folder had an invalid YAML file, no DagFactory DAGs would be loaded. This PR changes this behaviour to log any invalid YAML file paths but render valid DagFactory YAML-based DAGs.

Co-authored-by: Tatiana Al-Chueyr <[email protected]>
  • Loading branch information
quydx and tatiana authored Oct 11, 2024
1 parent 0f853e4 commit 79cde23
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 11 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,5 @@ logs/
# VIM
*.sw[a-z]

# Airflow
examples/.airflowignore
11 changes: 7 additions & 4 deletions dagfactory/dagfactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,12 @@ def load_yaml_dags(
suffix = [".yaml", ".yml"]
candidate_dag_files = []
for suf in suffix:
candidate_dag_files = chain(candidate_dag_files, Path(dags_folder).rglob(f"*{suf}"))

candidate_dag_files = list(chain(candidate_dag_files, Path(dags_folder).rglob(f"*{suf}")))
for config_file_path in candidate_dag_files:
config_file_abs_path = str(config_file_path.absolute())
DagFactory(config_file_abs_path).generate_dags(globals_dict)
logging.info("DAG loaded: %s", config_file_path)
logging.info("Loading %s", config_file_abs_path)
try:
DagFactory(config_file_abs_path).generate_dags(globals_dict)
logging.info("DAG loaded: %s", config_file_path)
except Exception: # pylint: disable=broad-except
logging.exception("Failed to load dag from %s", config_file_path)
9 changes: 9 additions & 0 deletions examples/invalid.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
name: John Doe
age: 30
is_student: yes
address:
street: 123 Main St
city: New York
postal_code 10001
- phone: 555-1234
email: [email protected]
16 changes: 9 additions & 7 deletions tests/test_dagfactory.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import datetime
import logging

import pytest
from airflow.models.variable import Variable
Expand Down Expand Up @@ -431,13 +432,14 @@ def test_set_callback_after_loading_config():
td.generate_dags(globals())


def test_load_yaml_dags_fail():
with pytest.raises(Exception):
load_yaml_dags(
globals_dict=globals(),
dags_folder="tests/fixtures",
suffix=["invalid_yaml.yml"],
)
def test_load_invalid_yaml_logs_error(caplog):
caplog.set_level(logging.ERROR)
load_yaml_dags(
globals_dict=globals(),
dags_folder="tests/fixtures",
suffix=["invalid_yaml.yml"],
)
assert caplog.messages == ['Failed to load dag from tests/fixtures/invalid_yaml.yml']


def test_load_yaml_dags_succeed():
Expand Down
28 changes: 28 additions & 0 deletions tests/test_example_dags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from __future__ import annotations
from pathlib import Path

import airflow
from airflow.models.dagbag import DagBag
from packaging.version import Version


EXAMPLE_DAGS_DIR = Path(__file__).parent.parent / "examples"
AIRFLOW_IGNORE_FILE = EXAMPLE_DAGS_DIR / ".airflowignore"
AIRFLOW_VERSION = Version(airflow.__version__)


MIN_VER_DAG_FILE_VER: dict[str, list[str]] = {
"2.3": ["example_dynamic_task_mapping.py"],
}


def test_no_import_errors():
with open(AIRFLOW_IGNORE_FILE, "w+") as file:
for min_version, files in MIN_VER_DAG_FILE_VER.items():
if AIRFLOW_VERSION < Version(min_version):
print(f"Adding {files} to .airflowignore")
file.writelines([f"{file}\n" for file in files])

db = DagBag(EXAMPLE_DAGS_DIR, include_examples=False)
assert db.dags
assert not db.import_errors
6 changes: 6 additions & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ deps =
markupsafe>=1.1.1,<2.1.0
setenv =
AIRFLOW__CORE__SQL_ALCHEMY_CONN = sqlite:////tmp/airflow.db
CONFIG_ROOT_DIR = {toxinidir}/examples
PYTHONPATH = {toxinidir}:{toxinidir}/examples:{env:PYTHONPATH}
commands =
airflow db init
pytest --cov=dagfactory tests -p no:warnings --verbose --color=yes --cov-report=xml
Expand All @@ -34,6 +36,8 @@ deps =
markupsafe>=1.1.1,<2.1.0
setenv =
AIRFLOW__CORE__SQL_ALCHEMY_CONN = sqlite:////tmp/airflow.db
CONFIG_ROOT_DIR = {toxinidir}/examples
PYTHONPATH = {toxinidir}:{toxinidir}/examples:{env:PYTHONPATH}
commands =
airflow db init
pytest --cov=dagfactory tests -p no:warnings --verbose --color=yes --cov-report=xml
Expand All @@ -48,6 +52,8 @@ deps =
markupsafe>=1.1.1,<2.1.0
setenv =
AIRFLOW__CORE__SQL_ALCHEMY_CONN = sqlite:////tmp/airflow.db
CONFIG_ROOT_DIR = {toxinidir}/examples
PYTHONPATH = {toxinidir}:{toxinidir}/examples:{env:PYTHONPATH}
commands =
airflow db init
pytest --cov=dagfactory tests -p no:warnings --verbose --color=yes --cov-report=xml

0 comments on commit 79cde23

Please sign in to comment.