diff --git a/.gitignore b/.gitignore index f4fbee9c..7e0ec846 100644 --- a/.gitignore +++ b/.gitignore @@ -10,7 +10,6 @@ __pycache__/ .Python build/ bin/ -include/ develop-eggs/ dist/ downloads/ @@ -127,3 +126,6 @@ logs/ examples/.airflowignore airflow.cfg webserver_config.py + +# Astro +dev/include/dag_factory-* diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6c2ac29e..c899831c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -17,7 +17,7 @@ repos: - id: check-yaml args: - --unsafe - exclude: 'tests/fixtures/dag_factory.yml|examples/invalid.yaml|tests/fixtures/invalid_yaml.yml' + exclude: 'tests/fixtures/dag_factory.yml|dev/dags/invalid.yaml|tests/fixtures/invalid_yaml.yml' - id: debug-statements - id: end-of-file-fixer - id: mixed-line-ending diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index a3bd400c..00000000 --- a/Dockerfile +++ /dev/null @@ -1,51 +0,0 @@ -FROM python:3.8-slim - -ARG AIRFLOW_VERSION=2.8.0 -ARG PYTHON_VERSION=3.8 -ARG AIRFLOW_HOME=/usr/local/airflow -ENV SLUGIFY_USES_TEXT_UNIDECODE=yes -ENV CONFIG_ROOT_DIR=/usr/local/airflow/dags/ - -RUN set -ex \ - && buildDeps=' \ - freetds-dev \ - python3-dev \ - libkrb5-dev \ - libsasl2-dev \ - libssl-dev \ - libffi-dev \ - libpq-dev \ - git \ - ' \ - && apt-get update -yqq \ - && apt-get upgrade -yqq \ - && apt-get install -yqq --no-install-recommends \ - && apt-get install curl -y \ - $buildDeps \ - freetds-bin \ - build-essential \ - python3-pip \ - && useradd -ms /bin/bash -d ${AIRFLOW_HOME} airflow \ - && pip install -U pip setuptools wheel \ - && apt-get clean \ - && rm -rf \ - /var/lib/apt/lists/* \ - /tmp/* \ - /var/tmp/* \ - /usr/share/man \ - /usr/share/doc \ - /usr/share/doc-base - -ARG CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-$AIRFLOW_VERSION/constraints-$PYTHON_VERSION.txt" -RUN curl -sSL $CONSTRAINT_URL -o /tmp/constraint.txt -# Workaround to remove PyYAML constraint that will work on both Linux and MacOS -RUN sed '/PyYAML==/d' /tmp/constraint.txt > /tmp/constraint.txt.tmp -RUN mv /tmp/constraint.txt.tmp /tmp/constraint.txt - -RUN pip install apache-airflow[http]==${AIRFLOW_VERSION} --constraint /tmp/constraint.txt -ADD . / -RUN pip install -e . - -RUN chmod +x /scripts/entrypoint.sh - -ENTRYPOINT ["/scripts/entrypoint.sh"] diff --git a/Makefile b/Makefile index 20b5db51..38357cae 100644 --- a/Makefile +++ b/Makefile @@ -27,16 +27,20 @@ clean: ## Removes build and test artifacts @find . -name '*~' -exec rm -f {} + @find . -name '__pycache__' -exec rm -rf {} + -.PHONY: docker-build -docker-build: - @echo "==> Building docker image for local testing" - @docker build -t dag_factory:latest . + +.PHONY: build-whl +build-whl: ## Build installable whl file + python3 -m build --outdir dev/include/ + cd examples && ln -sf ../dev/dags/* . .PHONY: docker-run -docker-run: docker-build ## Runs local Airflow for testing - @docker run -d -e AIRFLOW__CORE__DAGS_FOLDER=/usr/local/airflow/dags -v $(PWD)/examples:/usr/local/airflow/dags -p 127.0.0.1:8080:8080 --name=dag_factory dag_factory:latest - @echo "==> Airflow is running at http://localhost:8080" +docker-run: build-whl ## Runs local Airflow for testing + @if ! lsof -i :8080 | grep LISTEN > /dev/null; then \ + cd dev && astro dev start; \ + else \ + cd dev && astro dev restart; \ + fi .PHONY: docker-stop docker-stop: ## Stop Docker container - @docker stop dag_factory; docker rm dag_factory + cd dev && astro dev stop diff --git a/dev/.astro/config.yaml b/dev/.astro/config.yaml new file mode 100644 index 00000000..259b0548 --- /dev/null +++ b/dev/.astro/config.yaml @@ -0,0 +1,2 @@ +project: + name: dev diff --git a/dev/.astro/dag_integrity_exceptions.txt b/dev/.astro/dag_integrity_exceptions.txt new file mode 100644 index 00000000..0d6bd898 --- /dev/null +++ b/dev/.astro/dag_integrity_exceptions.txt @@ -0,0 +1 @@ +# Add dag files to exempt from parse test below. ex: dags/ diff --git a/dev/.astro/test_dag_integrity_default.py b/dev/.astro/test_dag_integrity_default.py new file mode 100644 index 00000000..c287dc63 --- /dev/null +++ b/dev/.astro/test_dag_integrity_default.py @@ -0,0 +1,130 @@ +"""Test the validity of all DAGs. **USED BY DEV PARSE COMMAND DO NOT EDIT**""" + +import logging +import os +from contextlib import contextmanager + +import pytest +from airflow.hooks.base import BaseHook +from airflow.models import Connection, DagBag, Variable +from airflow.utils.db import initdb + +# init airflow database +initdb() + +# The following code patches errors caused by missing OS Variables, Airflow Connections, and Airflow Variables + + +# =========== MONKEYPATCH BaseHook.get_connection() =========== +def basehook_get_connection_monkeypatch(key: str, *args, **kwargs): + print(f"Attempted to fetch connection during parse returning an empty Connection object for {key}") + return Connection(key) + + +BaseHook.get_connection = basehook_get_connection_monkeypatch +# # =========== /MONKEYPATCH BASEHOOK.GET_CONNECTION() =========== + + +# =========== MONKEYPATCH OS.GETENV() =========== +def os_getenv_monkeypatch(key: str, *args, **kwargs): + default = None + if args: + default = args[0] # os.getenv should get at most 1 arg after the key + if kwargs: + default = kwargs.get("default", None) # and sometimes kwarg if people are using the sig + + env_value = os.environ.get(key, None) + + if env_value: + return env_value # if the env_value is set, return it + if key == "JENKINS_HOME" and default is None: # fix https://github.com/astronomer/astro-cli/issues/601 + return None + if default: + return default # otherwise return whatever default has been passed + return f"MOCKED_{key.upper()}_VALUE" # if absolutely nothing has been passed - return the mocked value + + +os.getenv = os_getenv_monkeypatch +# # =========== /MONKEYPATCH OS.GETENV() =========== + +# =========== MONKEYPATCH VARIABLE.GET() =========== + + +class magic_dict(dict): + def __init__(self, *args, **kwargs): + self.update(*args, **kwargs) + + def __getitem__(self, key): + return {}.get(key, "MOCKED_KEY_VALUE") + + +_no_default = object() # allow falsey defaults + + +def variable_get_monkeypatch(key: str, default_var=_no_default, deserialize_json=False): + print(f"Attempted to get Variable value during parse, returning a mocked value for {key}") + + if default_var is not _no_default: + return default_var + if deserialize_json: + return magic_dict() + return "NON_DEFAULT_MOCKED_VARIABLE_VALUE" + + +Variable.get = variable_get_monkeypatch +# # =========== /MONKEYPATCH VARIABLE.GET() =========== + + +@contextmanager +def suppress_logging(namespace): + """ + Suppress logging within a specific namespace to keep tests "clean" during build + """ + logger = logging.getLogger(namespace) + old_value = logger.disabled + logger.disabled = True + try: + yield + finally: + logger.disabled = old_value + + +def get_import_errors(): + """ + Generate a tuple for import errors in the dag bag, and include DAGs without errors. + """ + with suppress_logging("airflow"): + dag_bag = DagBag(include_examples=False) + + def strip_path_prefix(path): + return os.path.relpath(path, os.environ.get("AIRFLOW_HOME")) + + # Initialize an empty list to store the tuples + result = [] + + # Iterate over the items in import_errors + for k, v in dag_bag.import_errors.items(): + result.append((strip_path_prefix(k), v.strip())) + + # Check if there are DAGs without errors + for file_path in dag_bag.dags: + # Check if the file_path is not in import_errors, meaning no errors + if file_path not in dag_bag.import_errors: + result.append((strip_path_prefix(file_path), "No import errors")) + + return result + + +@pytest.mark.parametrize("rel_path, rv", get_import_errors(), ids=[x[0] for x in get_import_errors()]) +def test_file_imports(rel_path, rv): + """Test for import errors on a file""" + if os.path.exists(".astro/dag_integrity_exceptions.txt"): + with open(".astro/dag_integrity_exceptions.txt", "r") as f: + exceptions = f.readlines() + print(f"Exceptions: {exceptions}") + if (rv != "No import errors") and rel_path not in exceptions: + # If rv is not "No import errors," consider it a failed test + raise Exception(f"{rel_path} failed to import with message \n {rv}") + else: + # If rv is "No import errors," consider it a passed test + print(f"{rel_path} passed the import test") diff --git a/dev/.dockerignore b/dev/.dockerignore new file mode 100644 index 00000000..a334663b --- /dev/null +++ b/dev/.dockerignore @@ -0,0 +1,8 @@ +astro +.git +.env +airflow_settings.yaml +logs/ +.venv +airflow.db +airflow.cfg diff --git a/dev/.gitignore b/dev/.gitignore new file mode 100644 index 00000000..0e8bcca9 --- /dev/null +++ b/dev/.gitignore @@ -0,0 +1,11 @@ +.git +.env +.DS_Store +airflow_settings.yaml +__pycache__/ +astro +.venv +airflow-webserver.pid +webserver_config.py +airflow.cfg +airflow.db diff --git a/dev/Dockerfile b/dev/Dockerfile new file mode 100644 index 00000000..0db81d65 --- /dev/null +++ b/dev/Dockerfile @@ -0,0 +1,5 @@ +FROM quay.io/astronomer/astro-runtime:12.2.0 + +ENV CONFIG_ROOT_DIR=/usr/local/airflow/dags/ + +RUN pip install /usr/local/airflow/include/*.whl diff --git a/dev/README.md b/dev/README.md new file mode 100644 index 00000000..599d7034 --- /dev/null +++ b/dev/README.md @@ -0,0 +1,48 @@ +Overview +======== + +Welcome to Astronomer! This project was generated after you ran 'astro dev init' using the Astronomer CLI. This readme describes the contents of the project, as well as how to run Apache Airflow on your local machine. + +Project Contents +================ + +Your Astro project contains the following files and folders: + +- dags: This folder contains the Python files for your Airflow DAGs. By default, this directory includes one example DAG: + - `example_astronauts`: This DAG shows a simple ETL pipeline example that queries the list of astronauts currently in space from the Open Notify API and prints a statement for each astronaut. The DAG uses the TaskFlow API to define tasks in Python, and dynamic task mapping to dynamically print a statement for each astronaut. For more on how this DAG works, see our [Getting started tutorial](https://www.astronomer.io/docs/learn/get-started-with-airflow). +- Dockerfile: This file contains a versioned Astro Runtime Docker image that provides a differentiated Airflow experience. If you want to execute other commands or overrides at runtime, specify them here. +- include: This folder contains any additional files that you want to include as part of your project. It is empty by default. +- packages.txt: Install OS-level packages needed for your project by adding them to this file. It is empty by default. +- requirements.txt: Install Python packages needed for your project by adding them to this file. It is empty by default. +- plugins: Add custom or community plugins for your project to this file. It is empty by default. +- airflow_settings.yaml: Use this local-only file to specify Airflow Connections, Variables, and Pools instead of entering them in the Airflow UI as you develop DAGs in this project. + +Deploy Your Project Locally +=========================== + +1. Start Airflow on your local machine by running 'astro dev start'. + +This command will spin up 4 Docker containers on your machine, each for a different Airflow component: + +- Postgres: Airflow's Metadata Database +- Webserver: The Airflow component responsible for rendering the Airflow UI +- Scheduler: The Airflow component responsible for monitoring and triggering tasks +- Triggerer: The Airflow component responsible for triggering deferred tasks + +2. Verify that all 4 Docker containers were created by running 'docker ps'. + +Note: Running 'astro dev start' will start your project with the Airflow Webserver exposed at port 8080 and Postgres exposed at port 5432. If you already have either of those ports allocated, you can either [stop your existing Docker containers or change the port](https://www.astronomer.io/docs/astro/cli/troubleshoot-locally#ports-are-not-available-for-my-local-airflow-webserver). + +3. Access the Airflow UI for your local Airflow project. To do so, go to http://localhost:8080/ and log in with 'admin' for both your Username and Password. + +You should also be able to access your Postgres Database at 'localhost:5432/postgres'. + +Deploy Your Project to Astronomer +================================= + +If you have an Astronomer account, pushing code to a Deployment on Astronomer is simple. For deploying instructions, refer to Astronomer documentation: https://www.astronomer.io/docs/astro/deploy-code/ + +Contact +======= + +The Astronomer CLI is maintained with love by the Astronomer team. To report a bug or suggest a change, reach out to our support. diff --git a/examples/customized/__init__.py b/dev/dags/customized/__init__.py similarity index 100% rename from examples/customized/__init__.py rename to dev/dags/customized/__init__.py diff --git a/examples/customized/operators/__init__.py b/dev/dags/customized/operators/__init__.py similarity index 100% rename from examples/customized/operators/__init__.py rename to dev/dags/customized/operators/__init__.py diff --git a/examples/customized/operators/breakfast_operators.py b/dev/dags/customized/operators/breakfast_operators.py similarity index 100% rename from examples/customized/operators/breakfast_operators.py rename to dev/dags/customized/operators/breakfast_operators.py diff --git a/examples/datasets/example_config_datasets.yml b/dev/dags/datasets/example_config_datasets.yml similarity index 100% rename from examples/datasets/example_config_datasets.yml rename to dev/dags/datasets/example_config_datasets.yml diff --git a/examples/datasets/example_dag_datasets.py b/dev/dags/datasets/example_dag_datasets.py similarity index 100% rename from examples/datasets/example_dag_datasets.py rename to dev/dags/datasets/example_dag_datasets.py diff --git a/examples/datasets/example_dag_datasets.yml b/dev/dags/datasets/example_dag_datasets.yml similarity index 100% rename from examples/datasets/example_dag_datasets.yml rename to dev/dags/datasets/example_dag_datasets.yml diff --git a/dev/dags/example_customize_operator.py b/dev/dags/example_customize_operator.py new file mode 100644 index 00000000..1a7a07ff --- /dev/null +++ b/dev/dags/example_customize_operator.py @@ -0,0 +1,17 @@ +import os +from pathlib import Path + +# The following import is here so Airflow parses this file +# from airflow import DAG +import dagfactory + +DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/" +CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR)) + +config_file = str(CONFIG_ROOT_DIR / "example_customize_operator.yml") + +example_dag_factory = dagfactory.DagFactory(config_file) + +# Creating task dependencies +example_dag_factory.clean_dags(globals()) +example_dag_factory.generate_dags(globals()) diff --git a/dev/dags/example_customize_operator.yml b/dev/dags/example_customize_operator.yml new file mode 100644 index 00000000..91fb8506 --- /dev/null +++ b/dev/dags/example_customize_operator.yml @@ -0,0 +1,46 @@ +default: + default_args: + owner: "default_owner" + start_date: 2020-01-01 + retries: 1 + retry_delay_sec: 300 + concurrency: 1 + max_active_runs: 1 + dagrun_timeout_sec: 600 + default_view: "tree" + orientation: "LR" + schedule_interval: "0 1 * * *" + +example_breadfast: + default_args: + owner: "custom_owner" + start_date: 2 days + description: "this is an customized operator dag" + schedule_interval: "0 3 * * *" + tasks: + begin: + operator: airflow.operators.dummy_operator.DummyOperator + make_bread_1: + operator: customized.operators.breakfast_operators.MakeBreadOperator + bread_type: 'Sourdough' + dependencies: + - begin + make_bread_2: + operator: customized.operators.breakfast_operators.MakeBreadOperator + bread_type: 'Multigrain' + dependencies: + - begin + make_coffee_1: + operator: customized.operators.breakfast_operators.MakeCoffeeOperator + coffee_type: 'Black' + dependencies: + - begin + - make_bread_1 + - make_bread_2 + end: + operator: airflow.operators.dummy_operator.DummyOperator + dependencies: + - begin + - make_bread_1 + - make_bread_2 + - make_coffee_1 diff --git a/dev/dags/example_dag_factory.py b/dev/dags/example_dag_factory.py new file mode 100644 index 00000000..4b2ff2d7 --- /dev/null +++ b/dev/dags/example_dag_factory.py @@ -0,0 +1,17 @@ +import os +from pathlib import Path + +# The following import is here so Airflow parses this file +# from airflow import DAG +import dagfactory + +DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/" +CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR)) + +config_file = str(CONFIG_ROOT_DIR / "example_dag_factory.yml") + +example_dag_factory = dagfactory.DagFactory(config_file) + +# Creating task dependencies +example_dag_factory.clean_dags(globals()) +example_dag_factory.generate_dags(globals()) diff --git a/dev/dags/example_dag_factory.yml b/dev/dags/example_dag_factory.yml new file mode 100644 index 00000000..f1838628 --- /dev/null +++ b/dev/dags/example_dag_factory.yml @@ -0,0 +1,93 @@ +default: + default_args: + owner: "default_owner" + start_date: 2018-03-01 + end_date: 2018-03-05 + retries: 1 + retry_delay_sec: 300 + concurrency: 1 + max_active_runs: 1 + dagrun_timeout_sec: 600 + default_view: "tree" + orientation: "LR" + schedule_interval: "0 1 * * *" + on_success_callback_name: print_hello + on_success_callback_file: $CONFIG_ROOT_DIR/print_hello.py + on_failure_callback_name: print_hello + on_failure_callback_file: $CONFIG_ROOT_DIR/print_hello.py + +example_dag: + default_args: + owner: "custom_owner" + start_date: 2 days + description: "this is an example dag" + schedule_interval: "0 3 * * *" + render_template_as_native_obj: True + tasks: + task_1: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 1" + task_2: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 2" + dependencies: [task_1] + task_3: + operator: airflow.operators.python_operator.PythonOperator + python_callable_name: print_hello + python_callable_file: $CONFIG_ROOT_DIR/print_hello.py + dependencies: [task_1] + +example_dag2: + default_args: + timezone: Europe/Amsterdam + tasks: + task_1: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 1" + task_2: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 2" + dependencies: [task_1] + task_3: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 3" + dependencies: [task_1] + +example_dag3: + tasks: + task_1: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 1" + task_2: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 2" + dependencies: [task_1] + task_3: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 3" + dependencies: [task_1] + +example_dag4: + description: "this dag uses task groups" + task_groups: + task_group_1: + tooltip: "this is a task group" + dependencies: [task_1] + tasks: + task_1: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 1" + task_2: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 2" + task_group_name: task_group_1 + task_3: + operator: airflow.operators.python_operator.PythonOperator + python_callable_name: print_hello + python_callable_file: $CONFIG_ROOT_DIR/print_hello.py + task_group_name: task_group_1 + dependencies: [task_2] + task_4: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 1" + dependencies: [task_group_1] diff --git a/dev/dags/example_dynamic_task_mapping.py b/dev/dags/example_dynamic_task_mapping.py new file mode 100644 index 00000000..abfb0881 --- /dev/null +++ b/dev/dags/example_dynamic_task_mapping.py @@ -0,0 +1,16 @@ +import os +from pathlib import Path + +# The following import is here so Airflow parses this file +# from airflow import DAG +import dagfactory + +DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/" +CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR)) + +config_file = str(CONFIG_ROOT_DIR / "example_dynamic_task_mapping.yml") +example_dag_factory = dagfactory.DagFactory(config_file) + +# Creating task dependencies +example_dag_factory.clean_dags(globals()) +example_dag_factory.generate_dags(globals()) diff --git a/dev/dags/example_dynamic_task_mapping.yml b/dev/dags/example_dynamic_task_mapping.yml new file mode 100644 index 00000000..078e4f2d --- /dev/null +++ b/dev/dags/example_dynamic_task_mapping.yml @@ -0,0 +1,23 @@ +test_expand: + default_args: + owner: "custom_owner" + start_date: 2 days + description: "test expand" + schedule_interval: "0 3 * * *" + default_view: "graph" + tasks: + request: + operator: airflow.operators.python.PythonOperator + python_callable_name: example_task_mapping + python_callable_file: $CONFIG_ROOT_DIR/expand_tasks.py + process: + operator: airflow.operators.python_operator.PythonOperator + python_callable_name: expand_task + python_callable_file: $CONFIG_ROOT_DIR/expand_tasks.py + partial: + op_kwargs: + test_id: "test" + expand: + op_args: + request.output + dependencies: [request] diff --git a/dev/dags/expand_tasks.py b/dev/dags/expand_tasks.py new file mode 100644 index 00000000..8e4aa00a --- /dev/null +++ b/dev/dags/expand_tasks.py @@ -0,0 +1,8 @@ +def example_task_mapping(): + return [[1], [2], [3]] + + +def expand_task(x, test_id): + print(test_id) + print(x) + return [x] diff --git a/dev/dags/invalid.yaml b/dev/dags/invalid.yaml new file mode 100644 index 00000000..19859e50 --- /dev/null +++ b/dev/dags/invalid.yaml @@ -0,0 +1,9 @@ +name: John Doe +age: 30 +is_student: yes +address: + street: 123 Main St + city: New York + postal_code 10001 +- phone: 555-1234 +email: johndoe@example.com diff --git a/dev/dags/print_hello.py b/dev/dags/print_hello.py new file mode 100644 index 00000000..9673a4dc --- /dev/null +++ b/dev/dags/print_hello.py @@ -0,0 +1,2 @@ +def print_hello(): + print("hello") diff --git a/dev/packages.txt b/dev/packages.txt new file mode 100644 index 00000000..e69de29b diff --git a/dev/requirements.txt b/dev/requirements.txt new file mode 100644 index 00000000..1bb359bb --- /dev/null +++ b/dev/requirements.txt @@ -0,0 +1 @@ +# Astro Runtime includes the following pre-installed providers packages: https://www.astronomer.io/docs/astro/runtime-image-architecture#provider-packages diff --git a/dev/tests/dags/test_dag_example.py b/dev/tests/dags/test_dag_example.py new file mode 100644 index 00000000..3336fbd2 --- /dev/null +++ b/dev/tests/dags/test_dag_example.py @@ -0,0 +1,74 @@ +"""Example DAGs test. This test ensures that all Dags have tags, retries set to two, and no import errors. This is an example pytest and may not be fit the context of your DAGs. Feel free to add and remove tests.""" + +import logging +import os +from contextlib import contextmanager + +import pytest +from airflow.models import DagBag + + +@contextmanager +def suppress_logging(namespace): + logger = logging.getLogger(namespace) + old_value = logger.disabled + logger.disabled = True + try: + yield + finally: + logger.disabled = old_value + + +def get_import_errors(): + """ + Generate a tuple for import errors in the dag bag + """ + with suppress_logging("airflow"): + dag_bag = DagBag(include_examples=False) + + def strip_path_prefix(path): + return os.path.relpath(path, os.environ.get("AIRFLOW_HOME")) + + # prepend "(None,None)" to ensure that a test object is always created even if it's a no op. + return [(None, None)] + [(strip_path_prefix(k), v.strip()) for k, v in dag_bag.import_errors.items()] + + +def get_dags(): + """ + Generate a tuple of dag_id, in the DagBag + """ + with suppress_logging("airflow"): + dag_bag = DagBag(include_examples=False) + + def strip_path_prefix(path): + return os.path.relpath(path, os.environ.get("AIRFLOW_HOME")) + + return [(k, v, strip_path_prefix(v.fileloc)) for k, v in dag_bag.dags.items()] + + +@pytest.mark.parametrize("rel_path,rv", get_import_errors(), ids=[x[0] for x in get_import_errors()]) +def test_file_imports(rel_path, rv): + """Test for import errors on a file""" + if rel_path and rv: + raise Exception(f"{rel_path} failed to import with message \n {rv}") + + +APPROVED_TAGS = {} + + +@pytest.mark.parametrize("dag_id,dag,fileloc", get_dags(), ids=[x[2] for x in get_dags()]) +def test_dag_tags(dag_id, dag, fileloc): + """ + test if a DAG is tagged and if those TAGs are in the approved list + """ + assert dag.tags, f"{dag_id} in {fileloc} has no tags" + if APPROVED_TAGS: + assert not set(dag.tags) - APPROVED_TAGS + + +@pytest.mark.parametrize("dag_id,dag, fileloc", get_dags(), ids=[x[2] for x in get_dags()]) +def test_dag_retries(dag_id, dag, fileloc): + """ + test if a DAG has retries set + """ + assert dag.default_args.get("retries", None) >= 2, f"{dag_id} in {fileloc} must have task retries >= 2." diff --git a/examples/customized b/examples/customized new file mode 120000 index 00000000..c3f678a6 --- /dev/null +++ b/examples/customized @@ -0,0 +1 @@ +../dev/dags/customized \ No newline at end of file diff --git a/examples/datasets b/examples/datasets new file mode 120000 index 00000000..4aad6a7b --- /dev/null +++ b/examples/datasets @@ -0,0 +1 @@ +../dev/dags/datasets \ No newline at end of file diff --git a/examples/example_customize_operator.py b/examples/example_customize_operator.py deleted file mode 100644 index 1a7a07ff..00000000 --- a/examples/example_customize_operator.py +++ /dev/null @@ -1,17 +0,0 @@ -import os -from pathlib import Path - -# The following import is here so Airflow parses this file -# from airflow import DAG -import dagfactory - -DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/" -CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR)) - -config_file = str(CONFIG_ROOT_DIR / "example_customize_operator.yml") - -example_dag_factory = dagfactory.DagFactory(config_file) - -# Creating task dependencies -example_dag_factory.clean_dags(globals()) -example_dag_factory.generate_dags(globals()) diff --git a/examples/example_customize_operator.py b/examples/example_customize_operator.py new file mode 120000 index 00000000..bd56f0d9 --- /dev/null +++ b/examples/example_customize_operator.py @@ -0,0 +1 @@ +../dev/dags/example_customize_operator.py \ No newline at end of file diff --git a/examples/example_customize_operator.yml b/examples/example_customize_operator.yml deleted file mode 100644 index 91fb8506..00000000 --- a/examples/example_customize_operator.yml +++ /dev/null @@ -1,46 +0,0 @@ -default: - default_args: - owner: "default_owner" - start_date: 2020-01-01 - retries: 1 - retry_delay_sec: 300 - concurrency: 1 - max_active_runs: 1 - dagrun_timeout_sec: 600 - default_view: "tree" - orientation: "LR" - schedule_interval: "0 1 * * *" - -example_breadfast: - default_args: - owner: "custom_owner" - start_date: 2 days - description: "this is an customized operator dag" - schedule_interval: "0 3 * * *" - tasks: - begin: - operator: airflow.operators.dummy_operator.DummyOperator - make_bread_1: - operator: customized.operators.breakfast_operators.MakeBreadOperator - bread_type: 'Sourdough' - dependencies: - - begin - make_bread_2: - operator: customized.operators.breakfast_operators.MakeBreadOperator - bread_type: 'Multigrain' - dependencies: - - begin - make_coffee_1: - operator: customized.operators.breakfast_operators.MakeCoffeeOperator - coffee_type: 'Black' - dependencies: - - begin - - make_bread_1 - - make_bread_2 - end: - operator: airflow.operators.dummy_operator.DummyOperator - dependencies: - - begin - - make_bread_1 - - make_bread_2 - - make_coffee_1 diff --git a/examples/example_customize_operator.yml b/examples/example_customize_operator.yml new file mode 120000 index 00000000..c9fc298e --- /dev/null +++ b/examples/example_customize_operator.yml @@ -0,0 +1 @@ +../dev/dags/example_customize_operator.yml \ No newline at end of file diff --git a/examples/example_dag_factory.py b/examples/example_dag_factory.py deleted file mode 100644 index 4b2ff2d7..00000000 --- a/examples/example_dag_factory.py +++ /dev/null @@ -1,17 +0,0 @@ -import os -from pathlib import Path - -# The following import is here so Airflow parses this file -# from airflow import DAG -import dagfactory - -DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/" -CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR)) - -config_file = str(CONFIG_ROOT_DIR / "example_dag_factory.yml") - -example_dag_factory = dagfactory.DagFactory(config_file) - -# Creating task dependencies -example_dag_factory.clean_dags(globals()) -example_dag_factory.generate_dags(globals()) diff --git a/examples/example_dag_factory.py b/examples/example_dag_factory.py new file mode 120000 index 00000000..b24b7d13 --- /dev/null +++ b/examples/example_dag_factory.py @@ -0,0 +1 @@ +../dev/dags/example_dag_factory.py \ No newline at end of file diff --git a/examples/example_dag_factory.yml b/examples/example_dag_factory.yml deleted file mode 100644 index f1838628..00000000 --- a/examples/example_dag_factory.yml +++ /dev/null @@ -1,93 +0,0 @@ -default: - default_args: - owner: "default_owner" - start_date: 2018-03-01 - end_date: 2018-03-05 - retries: 1 - retry_delay_sec: 300 - concurrency: 1 - max_active_runs: 1 - dagrun_timeout_sec: 600 - default_view: "tree" - orientation: "LR" - schedule_interval: "0 1 * * *" - on_success_callback_name: print_hello - on_success_callback_file: $CONFIG_ROOT_DIR/print_hello.py - on_failure_callback_name: print_hello - on_failure_callback_file: $CONFIG_ROOT_DIR/print_hello.py - -example_dag: - default_args: - owner: "custom_owner" - start_date: 2 days - description: "this is an example dag" - schedule_interval: "0 3 * * *" - render_template_as_native_obj: True - tasks: - task_1: - operator: airflow.operators.bash_operator.BashOperator - bash_command: "echo 1" - task_2: - operator: airflow.operators.bash_operator.BashOperator - bash_command: "echo 2" - dependencies: [task_1] - task_3: - operator: airflow.operators.python_operator.PythonOperator - python_callable_name: print_hello - python_callable_file: $CONFIG_ROOT_DIR/print_hello.py - dependencies: [task_1] - -example_dag2: - default_args: - timezone: Europe/Amsterdam - tasks: - task_1: - operator: airflow.operators.bash_operator.BashOperator - bash_command: "echo 1" - task_2: - operator: airflow.operators.bash_operator.BashOperator - bash_command: "echo 2" - dependencies: [task_1] - task_3: - operator: airflow.operators.bash_operator.BashOperator - bash_command: "echo 3" - dependencies: [task_1] - -example_dag3: - tasks: - task_1: - operator: airflow.operators.bash_operator.BashOperator - bash_command: "echo 1" - task_2: - operator: airflow.operators.bash_operator.BashOperator - bash_command: "echo 2" - dependencies: [task_1] - task_3: - operator: airflow.operators.bash_operator.BashOperator - bash_command: "echo 3" - dependencies: [task_1] - -example_dag4: - description: "this dag uses task groups" - task_groups: - task_group_1: - tooltip: "this is a task group" - dependencies: [task_1] - tasks: - task_1: - operator: airflow.operators.bash_operator.BashOperator - bash_command: "echo 1" - task_2: - operator: airflow.operators.bash_operator.BashOperator - bash_command: "echo 2" - task_group_name: task_group_1 - task_3: - operator: airflow.operators.python_operator.PythonOperator - python_callable_name: print_hello - python_callable_file: $CONFIG_ROOT_DIR/print_hello.py - task_group_name: task_group_1 - dependencies: [task_2] - task_4: - operator: airflow.operators.bash_operator.BashOperator - bash_command: "echo 1" - dependencies: [task_group_1] diff --git a/examples/example_dag_factory.yml b/examples/example_dag_factory.yml new file mode 120000 index 00000000..fbc87d2c --- /dev/null +++ b/examples/example_dag_factory.yml @@ -0,0 +1 @@ +../dev/dags/example_dag_factory.yml \ No newline at end of file diff --git a/examples/example_dynamic_task_mapping.py b/examples/example_dynamic_task_mapping.py deleted file mode 100644 index abfb0881..00000000 --- a/examples/example_dynamic_task_mapping.py +++ /dev/null @@ -1,16 +0,0 @@ -import os -from pathlib import Path - -# The following import is here so Airflow parses this file -# from airflow import DAG -import dagfactory - -DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/" -CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR)) - -config_file = str(CONFIG_ROOT_DIR / "example_dynamic_task_mapping.yml") -example_dag_factory = dagfactory.DagFactory(config_file) - -# Creating task dependencies -example_dag_factory.clean_dags(globals()) -example_dag_factory.generate_dags(globals()) diff --git a/examples/example_dynamic_task_mapping.py b/examples/example_dynamic_task_mapping.py new file mode 120000 index 00000000..9f7228f8 --- /dev/null +++ b/examples/example_dynamic_task_mapping.py @@ -0,0 +1 @@ +../dev/dags/example_dynamic_task_mapping.py \ No newline at end of file diff --git a/examples/example_dynamic_task_mapping.yml b/examples/example_dynamic_task_mapping.yml deleted file mode 100644 index 078e4f2d..00000000 --- a/examples/example_dynamic_task_mapping.yml +++ /dev/null @@ -1,23 +0,0 @@ -test_expand: - default_args: - owner: "custom_owner" - start_date: 2 days - description: "test expand" - schedule_interval: "0 3 * * *" - default_view: "graph" - tasks: - request: - operator: airflow.operators.python.PythonOperator - python_callable_name: example_task_mapping - python_callable_file: $CONFIG_ROOT_DIR/expand_tasks.py - process: - operator: airflow.operators.python_operator.PythonOperator - python_callable_name: expand_task - python_callable_file: $CONFIG_ROOT_DIR/expand_tasks.py - partial: - op_kwargs: - test_id: "test" - expand: - op_args: - request.output - dependencies: [request] diff --git a/examples/example_dynamic_task_mapping.yml b/examples/example_dynamic_task_mapping.yml new file mode 120000 index 00000000..31d61e99 --- /dev/null +++ b/examples/example_dynamic_task_mapping.yml @@ -0,0 +1 @@ +../dev/dags/example_dynamic_task_mapping.yml \ No newline at end of file diff --git a/examples/expand_tasks.py b/examples/expand_tasks.py deleted file mode 100644 index 8e4aa00a..00000000 --- a/examples/expand_tasks.py +++ /dev/null @@ -1,8 +0,0 @@ -def example_task_mapping(): - return [[1], [2], [3]] - - -def expand_task(x, test_id): - print(test_id) - print(x) - return [x] diff --git a/examples/expand_tasks.py b/examples/expand_tasks.py new file mode 120000 index 00000000..da07d4d1 --- /dev/null +++ b/examples/expand_tasks.py @@ -0,0 +1 @@ +../dev/dags/expand_tasks.py \ No newline at end of file diff --git a/examples/invalid.yaml b/examples/invalid.yaml deleted file mode 100644 index 19859e50..00000000 --- a/examples/invalid.yaml +++ /dev/null @@ -1,9 +0,0 @@ -name: John Doe -age: 30 -is_student: yes -address: - street: 123 Main St - city: New York - postal_code 10001 -- phone: 555-1234 -email: johndoe@example.com diff --git a/examples/invalid.yaml b/examples/invalid.yaml new file mode 120000 index 00000000..f9bbaeaf --- /dev/null +++ b/examples/invalid.yaml @@ -0,0 +1 @@ +../dev/dags/invalid.yaml \ No newline at end of file diff --git a/examples/print_hello.py b/examples/print_hello.py deleted file mode 100644 index 9673a4dc..00000000 --- a/examples/print_hello.py +++ /dev/null @@ -1,2 +0,0 @@ -def print_hello(): - print("hello") diff --git a/examples/print_hello.py b/examples/print_hello.py new file mode 120000 index 00000000..b80800b6 --- /dev/null +++ b/examples/print_hello.py @@ -0,0 +1 @@ +../dev/dags/print_hello.py \ No newline at end of file diff --git a/scripts/entrypoint.sh b/scripts/entrypoint.sh deleted file mode 100644 index a2205879..00000000 --- a/scripts/entrypoint.sh +++ /dev/null @@ -1,13 +0,0 @@ -#!/usr/bin/env bash - -export AIRFLOW__CORE__LOAD_EXAMPLES=False -airflow db init -# for airflow 2.0 -airflow users create --username admin \ - --firstname test \ - --lastname test \ - --role Admin \ - --email admin@example.org \ - -p admin - -airflow scheduler & exec airflow webserver