Skip to content

Commit

Permalink
Apply feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajastro committed Oct 21, 2024
1 parent 6ca3e63 commit 988bf5c
Show file tree
Hide file tree
Showing 17 changed files with 333 additions and 2 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,3 @@ webserver_config.py

# Astro
dev/include/dag_factory-*
dev/dags/*
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ repos:
- id: check-yaml
args:
- --unsafe
exclude: 'tests/fixtures/dag_factory.yml|examples/invalid.yaml|tests/fixtures/invalid_yaml.yml'
exclude: 'tests/fixtures/dag_factory.yml|dev/dags/invalid.yaml|tests/fixtures/invalid_yaml.yml'
- id: debug-statements
- id: end-of-file-fixer
- id: mixed-line-ending
Expand Down
Empty file added dev/dags/customized/__init__.py
Empty file.
Empty file.
23 changes: 23 additions & 0 deletions dev/dags/customized/operators/breakfast_operators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from airflow.models import BaseOperator


class MakeBreadOperator(BaseOperator):
template_fields = ("bread_type",)

def __init__(self, bread_type, *args, **kwargs):
super(MakeBreadOperator, self).__init__(*args, **kwargs)
self.bread_type = bread_type

def execute(self, context):
print("Make {} bread".format(self.bread_type))


class MakeCoffeeOperator(BaseOperator):
template_fields = ("coffee_type",)

def __init__(self, coffee_type, *args, **kwargs):
super(MakeCoffeeOperator, self).__init__(*args, **kwargs)
self.coffee_type = coffee_type

def execute(self, context):
print("Make {} bread".format(self.coffee_type))
7 changes: 7 additions & 0 deletions dev/dags/datasets/example_config_datasets.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
datasets:
- name: dataset_custom_1
uri: s3://bucket-cjmm/raw/dataset_custom_1
- name: dataset_custom_2
uri: s3://bucket-cjmm/raw/dataset_custom_2
- name: dataset_custom_3
uri: s3://bucket-cjmm/raw/dataset_custom_3
17 changes: 17 additions & 0 deletions dev/dags/datasets/example_dag_datasets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import os
from pathlib import Path

# The following import is here so Airflow parses this file
# from airflow import DAG
import dagfactory

DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/"
CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR))

config_file = str(CONFIG_ROOT_DIR / "datasets/example_dag_datasets.yml")

example_dag_factory = dagfactory.DagFactory(config_file)

# Creating task dependencies
example_dag_factory.clean_dags(globals())
example_dag_factory.generate_dags(globals())
54 changes: 54 additions & 0 deletions dev/dags/datasets/example_dag_datasets.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
default:
default_args:
owner: "default_owner"
start_date: '2023-07-14'
retries: 1
retry_delay_sec: 300
concurrency: 1
max_active_runs: 1
dagrun_timeout_sec: 600
default_view: "tree"
orientation: "LR"

example_simple_dataset_producer_dag:
description: "Example DAG producer simple datasets"
schedule_interval: "0 5 * * *"
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 1"
outlets: ['s3://bucket_example/raw/dataset1.json']
task_2:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 2"
dependencies: [task_1]
outlets: ['s3://bucket_example/raw/dataset2.json']

example_simple_dataset_consumer_dag:
description: "Example DAG consumer simple datasets"
schedule: ['s3://bucket_example/raw/dataset1.json', 's3://bucket_example/raw/dataset2.json']
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 'consumer datasets'"

example_custom_config_dataset_producer_dag:
description: "Example DAG producer custom config datasets"
schedule_interval: "0 5 * * *"
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 1"
outlets:
file: $CONFIG_ROOT_DIR/datasets/example_config_datasets.yml
datasets: ['dataset_custom_1', 'dataset_custom_2']

example_custom_config_dataset_consumer_dag:
description: "Example DAG consumer custom config datasets"
schedule:
file: $CONFIG_ROOT_DIR/datasets/example_config_datasets.yml
datasets: ['dataset_custom_1', 'dataset_custom_2']
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 'consumer datasets'"
17 changes: 17 additions & 0 deletions dev/dags/example_customize_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import os
from pathlib import Path

# The following import is here so Airflow parses this file
# from airflow import DAG
import dagfactory

DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/"
CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR))

config_file = str(CONFIG_ROOT_DIR / "example_customize_operator.yml")

example_dag_factory = dagfactory.DagFactory(config_file)

# Creating task dependencies
example_dag_factory.clean_dags(globals())
example_dag_factory.generate_dags(globals())
46 changes: 46 additions & 0 deletions dev/dags/example_customize_operator.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
default:
default_args:
owner: "default_owner"
start_date: 2020-01-01
retries: 1
retry_delay_sec: 300
concurrency: 1
max_active_runs: 1
dagrun_timeout_sec: 600
default_view: "tree"
orientation: "LR"
schedule_interval: "0 1 * * *"

example_breadfast:
default_args:
owner: "custom_owner"
start_date: 2 days
description: "this is an customized operator dag"
schedule_interval: "0 3 * * *"
tasks:
begin:
operator: airflow.operators.dummy_operator.DummyOperator
make_bread_1:
operator: customized.operators.breakfast_operators.MakeBreadOperator
bread_type: 'Sourdough'
dependencies:
- begin
make_bread_2:
operator: customized.operators.breakfast_operators.MakeBreadOperator
bread_type: 'Multigrain'
dependencies:
- begin
make_coffee_1:
operator: customized.operators.breakfast_operators.MakeCoffeeOperator
coffee_type: 'Black'
dependencies:
- begin
- make_bread_1
- make_bread_2
end:
operator: airflow.operators.dummy_operator.DummyOperator
dependencies:
- begin
- make_bread_1
- make_bread_2
- make_coffee_1
17 changes: 17 additions & 0 deletions dev/dags/example_dag_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import os
from pathlib import Path

# The following import is here so Airflow parses this file
# from airflow import DAG
import dagfactory

DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/"
CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR))

config_file = str(CONFIG_ROOT_DIR / "example_dag_factory.yml")

example_dag_factory = dagfactory.DagFactory(config_file)

# Creating task dependencies
example_dag_factory.clean_dags(globals())
example_dag_factory.generate_dags(globals())
93 changes: 93 additions & 0 deletions dev/dags/example_dag_factory.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
default:
default_args:
owner: "default_owner"
start_date: 2018-03-01
end_date: 2018-03-05
retries: 1
retry_delay_sec: 300
concurrency: 1
max_active_runs: 1
dagrun_timeout_sec: 600
default_view: "tree"
orientation: "LR"
schedule_interval: "0 1 * * *"
on_success_callback_name: print_hello
on_success_callback_file: $CONFIG_ROOT_DIR/print_hello.py
on_failure_callback_name: print_hello
on_failure_callback_file: $CONFIG_ROOT_DIR/print_hello.py

example_dag:
default_args:
owner: "custom_owner"
start_date: 2 days
description: "this is an example dag"
schedule_interval: "0 3 * * *"
render_template_as_native_obj: True
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 1"
task_2:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 2"
dependencies: [task_1]
task_3:
operator: airflow.operators.python_operator.PythonOperator
python_callable_name: print_hello
python_callable_file: $CONFIG_ROOT_DIR/print_hello.py
dependencies: [task_1]

example_dag2:
default_args:
timezone: Europe/Amsterdam
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 1"
task_2:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 2"
dependencies: [task_1]
task_3:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 3"
dependencies: [task_1]

example_dag3:
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 1"
task_2:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 2"
dependencies: [task_1]
task_3:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 3"
dependencies: [task_1]

example_dag4:
description: "this dag uses task groups"
task_groups:
task_group_1:
tooltip: "this is a task group"
dependencies: [task_1]
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 1"
task_2:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 2"
task_group_name: task_group_1
task_3:
operator: airflow.operators.python_operator.PythonOperator
python_callable_name: print_hello
python_callable_file: $CONFIG_ROOT_DIR/print_hello.py
task_group_name: task_group_1
dependencies: [task_2]
task_4:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 1"
dependencies: [task_group_1]
16 changes: 16 additions & 0 deletions dev/dags/example_dynamic_task_mapping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import os
from pathlib import Path

# The following import is here so Airflow parses this file
# from airflow import DAG
import dagfactory

DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/"
CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR))

config_file = str(CONFIG_ROOT_DIR / "example_dynamic_task_mapping.yml")
example_dag_factory = dagfactory.DagFactory(config_file)

# Creating task dependencies
example_dag_factory.clean_dags(globals())
example_dag_factory.generate_dags(globals())
23 changes: 23 additions & 0 deletions dev/dags/example_dynamic_task_mapping.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
test_expand:
default_args:
owner: "custom_owner"
start_date: 2 days
description: "test expand"
schedule_interval: "0 3 * * *"
default_view: "graph"
tasks:
request:
operator: airflow.operators.python.PythonOperator
python_callable_name: example_task_mapping
python_callable_file: $CONFIG_ROOT_DIR/expand_tasks.py
process:
operator: airflow.operators.python_operator.PythonOperator
python_callable_name: expand_task
python_callable_file: $CONFIG_ROOT_DIR/expand_tasks.py
partial:
op_kwargs:
test_id: "test"
expand:
op_args:
request.output
dependencies: [request]
8 changes: 8 additions & 0 deletions dev/dags/expand_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
def example_task_mapping():
return [[1], [2], [3]]


def expand_task(x, test_id):
print(test_id)
print(x)
return [x]
9 changes: 9 additions & 0 deletions dev/dags/invalid.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
name: John Doe
age: 30
is_student: yes
address:
street: 123 Main St
city: New York
postal_code 10001
- phone: 555-1234
email: [email protected]
2 changes: 2 additions & 0 deletions dev/dags/print_hello.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def print_hello():
print("hello")

0 comments on commit 988bf5c

Please sign in to comment.