diff --git a/.gitignore b/.gitignore index f5b9d92b..7e0ec846 100644 --- a/.gitignore +++ b/.gitignore @@ -129,4 +129,3 @@ webserver_config.py # Astro dev/include/dag_factory-* -dev/dags/* diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6c2ac29e..c899831c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -17,7 +17,7 @@ repos: - id: check-yaml args: - --unsafe - exclude: 'tests/fixtures/dag_factory.yml|examples/invalid.yaml|tests/fixtures/invalid_yaml.yml' + exclude: 'tests/fixtures/dag_factory.yml|dev/dags/invalid.yaml|tests/fixtures/invalid_yaml.yml' - id: debug-statements - id: end-of-file-fixer - id: mixed-line-ending diff --git a/dev/dags/customized/__init__.py b/dev/dags/customized/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/dev/dags/customized/operators/__init__.py b/dev/dags/customized/operators/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/dev/dags/customized/operators/breakfast_operators.py b/dev/dags/customized/operators/breakfast_operators.py new file mode 100644 index 00000000..d1da8589 --- /dev/null +++ b/dev/dags/customized/operators/breakfast_operators.py @@ -0,0 +1,23 @@ +from airflow.models import BaseOperator + + +class MakeBreadOperator(BaseOperator): + template_fields = ("bread_type",) + + def __init__(self, bread_type, *args, **kwargs): + super(MakeBreadOperator, self).__init__(*args, **kwargs) + self.bread_type = bread_type + + def execute(self, context): + print("Make {} bread".format(self.bread_type)) + + +class MakeCoffeeOperator(BaseOperator): + template_fields = ("coffee_type",) + + def __init__(self, coffee_type, *args, **kwargs): + super(MakeCoffeeOperator, self).__init__(*args, **kwargs) + self.coffee_type = coffee_type + + def execute(self, context): + print("Make {} bread".format(self.coffee_type)) diff --git a/dev/dags/datasets/example_config_datasets.yml b/dev/dags/datasets/example_config_datasets.yml new file mode 100644 index 00000000..6f5914f5 --- /dev/null +++ b/dev/dags/datasets/example_config_datasets.yml @@ -0,0 +1,7 @@ +datasets: + - name: dataset_custom_1 + uri: s3://bucket-cjmm/raw/dataset_custom_1 + - name: dataset_custom_2 + uri: s3://bucket-cjmm/raw/dataset_custom_2 + - name: dataset_custom_3 + uri: s3://bucket-cjmm/raw/dataset_custom_3 diff --git a/dev/dags/datasets/example_dag_datasets.py b/dev/dags/datasets/example_dag_datasets.py new file mode 100644 index 00000000..a228fdd9 --- /dev/null +++ b/dev/dags/datasets/example_dag_datasets.py @@ -0,0 +1,17 @@ +import os +from pathlib import Path + +# The following import is here so Airflow parses this file +# from airflow import DAG +import dagfactory + +DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/" +CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR)) + +config_file = str(CONFIG_ROOT_DIR / "datasets/example_dag_datasets.yml") + +example_dag_factory = dagfactory.DagFactory(config_file) + +# Creating task dependencies +example_dag_factory.clean_dags(globals()) +example_dag_factory.generate_dags(globals()) diff --git a/dev/dags/datasets/example_dag_datasets.yml b/dev/dags/datasets/example_dag_datasets.yml new file mode 100644 index 00000000..e9613ff5 --- /dev/null +++ b/dev/dags/datasets/example_dag_datasets.yml @@ -0,0 +1,54 @@ +default: + default_args: + owner: "default_owner" + start_date: '2023-07-14' + retries: 1 + retry_delay_sec: 300 + concurrency: 1 + max_active_runs: 1 + dagrun_timeout_sec: 600 + default_view: "tree" + orientation: "LR" + +example_simple_dataset_producer_dag: + description: "Example DAG producer simple datasets" + schedule_interval: "0 5 * * *" + tasks: + task_1: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 1" + outlets: ['s3://bucket_example/raw/dataset1.json'] + task_2: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 2" + dependencies: [task_1] + outlets: ['s3://bucket_example/raw/dataset2.json'] + +example_simple_dataset_consumer_dag: + description: "Example DAG consumer simple datasets" + schedule: ['s3://bucket_example/raw/dataset1.json', 's3://bucket_example/raw/dataset2.json'] + tasks: + task_1: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 'consumer datasets'" + +example_custom_config_dataset_producer_dag: + description: "Example DAG producer custom config datasets" + schedule_interval: "0 5 * * *" + tasks: + task_1: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 1" + outlets: + file: $CONFIG_ROOT_DIR/datasets/example_config_datasets.yml + datasets: ['dataset_custom_1', 'dataset_custom_2'] + +example_custom_config_dataset_consumer_dag: + description: "Example DAG consumer custom config datasets" + schedule: + file: $CONFIG_ROOT_DIR/datasets/example_config_datasets.yml + datasets: ['dataset_custom_1', 'dataset_custom_2'] + tasks: + task_1: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 'consumer datasets'" diff --git a/dev/dags/example_customize_operator.py b/dev/dags/example_customize_operator.py new file mode 100644 index 00000000..1a7a07ff --- /dev/null +++ b/dev/dags/example_customize_operator.py @@ -0,0 +1,17 @@ +import os +from pathlib import Path + +# The following import is here so Airflow parses this file +# from airflow import DAG +import dagfactory + +DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/" +CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR)) + +config_file = str(CONFIG_ROOT_DIR / "example_customize_operator.yml") + +example_dag_factory = dagfactory.DagFactory(config_file) + +# Creating task dependencies +example_dag_factory.clean_dags(globals()) +example_dag_factory.generate_dags(globals()) diff --git a/dev/dags/example_customize_operator.yml b/dev/dags/example_customize_operator.yml new file mode 100644 index 00000000..91fb8506 --- /dev/null +++ b/dev/dags/example_customize_operator.yml @@ -0,0 +1,46 @@ +default: + default_args: + owner: "default_owner" + start_date: 2020-01-01 + retries: 1 + retry_delay_sec: 300 + concurrency: 1 + max_active_runs: 1 + dagrun_timeout_sec: 600 + default_view: "tree" + orientation: "LR" + schedule_interval: "0 1 * * *" + +example_breadfast: + default_args: + owner: "custom_owner" + start_date: 2 days + description: "this is an customized operator dag" + schedule_interval: "0 3 * * *" + tasks: + begin: + operator: airflow.operators.dummy_operator.DummyOperator + make_bread_1: + operator: customized.operators.breakfast_operators.MakeBreadOperator + bread_type: 'Sourdough' + dependencies: + - begin + make_bread_2: + operator: customized.operators.breakfast_operators.MakeBreadOperator + bread_type: 'Multigrain' + dependencies: + - begin + make_coffee_1: + operator: customized.operators.breakfast_operators.MakeCoffeeOperator + coffee_type: 'Black' + dependencies: + - begin + - make_bread_1 + - make_bread_2 + end: + operator: airflow.operators.dummy_operator.DummyOperator + dependencies: + - begin + - make_bread_1 + - make_bread_2 + - make_coffee_1 diff --git a/dev/dags/example_dag_factory.py b/dev/dags/example_dag_factory.py new file mode 100644 index 00000000..4b2ff2d7 --- /dev/null +++ b/dev/dags/example_dag_factory.py @@ -0,0 +1,17 @@ +import os +from pathlib import Path + +# The following import is here so Airflow parses this file +# from airflow import DAG +import dagfactory + +DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/" +CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR)) + +config_file = str(CONFIG_ROOT_DIR / "example_dag_factory.yml") + +example_dag_factory = dagfactory.DagFactory(config_file) + +# Creating task dependencies +example_dag_factory.clean_dags(globals()) +example_dag_factory.generate_dags(globals()) diff --git a/dev/dags/example_dag_factory.yml b/dev/dags/example_dag_factory.yml new file mode 100644 index 00000000..f1838628 --- /dev/null +++ b/dev/dags/example_dag_factory.yml @@ -0,0 +1,93 @@ +default: + default_args: + owner: "default_owner" + start_date: 2018-03-01 + end_date: 2018-03-05 + retries: 1 + retry_delay_sec: 300 + concurrency: 1 + max_active_runs: 1 + dagrun_timeout_sec: 600 + default_view: "tree" + orientation: "LR" + schedule_interval: "0 1 * * *" + on_success_callback_name: print_hello + on_success_callback_file: $CONFIG_ROOT_DIR/print_hello.py + on_failure_callback_name: print_hello + on_failure_callback_file: $CONFIG_ROOT_DIR/print_hello.py + +example_dag: + default_args: + owner: "custom_owner" + start_date: 2 days + description: "this is an example dag" + schedule_interval: "0 3 * * *" + render_template_as_native_obj: True + tasks: + task_1: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 1" + task_2: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 2" + dependencies: [task_1] + task_3: + operator: airflow.operators.python_operator.PythonOperator + python_callable_name: print_hello + python_callable_file: $CONFIG_ROOT_DIR/print_hello.py + dependencies: [task_1] + +example_dag2: + default_args: + timezone: Europe/Amsterdam + tasks: + task_1: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 1" + task_2: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 2" + dependencies: [task_1] + task_3: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 3" + dependencies: [task_1] + +example_dag3: + tasks: + task_1: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 1" + task_2: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 2" + dependencies: [task_1] + task_3: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 3" + dependencies: [task_1] + +example_dag4: + description: "this dag uses task groups" + task_groups: + task_group_1: + tooltip: "this is a task group" + dependencies: [task_1] + tasks: + task_1: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 1" + task_2: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 2" + task_group_name: task_group_1 + task_3: + operator: airflow.operators.python_operator.PythonOperator + python_callable_name: print_hello + python_callable_file: $CONFIG_ROOT_DIR/print_hello.py + task_group_name: task_group_1 + dependencies: [task_2] + task_4: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 1" + dependencies: [task_group_1] diff --git a/dev/dags/example_dynamic_task_mapping.py b/dev/dags/example_dynamic_task_mapping.py new file mode 100644 index 00000000..abfb0881 --- /dev/null +++ b/dev/dags/example_dynamic_task_mapping.py @@ -0,0 +1,16 @@ +import os +from pathlib import Path + +# The following import is here so Airflow parses this file +# from airflow import DAG +import dagfactory + +DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/" +CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR)) + +config_file = str(CONFIG_ROOT_DIR / "example_dynamic_task_mapping.yml") +example_dag_factory = dagfactory.DagFactory(config_file) + +# Creating task dependencies +example_dag_factory.clean_dags(globals()) +example_dag_factory.generate_dags(globals()) diff --git a/dev/dags/example_dynamic_task_mapping.yml b/dev/dags/example_dynamic_task_mapping.yml new file mode 100644 index 00000000..078e4f2d --- /dev/null +++ b/dev/dags/example_dynamic_task_mapping.yml @@ -0,0 +1,23 @@ +test_expand: + default_args: + owner: "custom_owner" + start_date: 2 days + description: "test expand" + schedule_interval: "0 3 * * *" + default_view: "graph" + tasks: + request: + operator: airflow.operators.python.PythonOperator + python_callable_name: example_task_mapping + python_callable_file: $CONFIG_ROOT_DIR/expand_tasks.py + process: + operator: airflow.operators.python_operator.PythonOperator + python_callable_name: expand_task + python_callable_file: $CONFIG_ROOT_DIR/expand_tasks.py + partial: + op_kwargs: + test_id: "test" + expand: + op_args: + request.output + dependencies: [request] diff --git a/dev/dags/expand_tasks.py b/dev/dags/expand_tasks.py new file mode 100644 index 00000000..8e4aa00a --- /dev/null +++ b/dev/dags/expand_tasks.py @@ -0,0 +1,8 @@ +def example_task_mapping(): + return [[1], [2], [3]] + + +def expand_task(x, test_id): + print(test_id) + print(x) + return [x] diff --git a/dev/dags/invalid.yaml b/dev/dags/invalid.yaml new file mode 100644 index 00000000..19859e50 --- /dev/null +++ b/dev/dags/invalid.yaml @@ -0,0 +1,9 @@ +name: John Doe +age: 30 +is_student: yes +address: + street: 123 Main St + city: New York + postal_code 10001 +- phone: 555-1234 +email: johndoe@example.com diff --git a/dev/dags/print_hello.py b/dev/dags/print_hello.py new file mode 100644 index 00000000..9673a4dc --- /dev/null +++ b/dev/dags/print_hello.py @@ -0,0 +1,2 @@ +def print_hello(): + print("hello")