diff --git a/dags/helper_ops.py b/dags/helper_ops.py index f9922b22..f915d62c 100644 --- a/dags/helper_ops.py +++ b/dags/helper_ops.py @@ -131,7 +131,7 @@ def wait_op(dag, process): ) -def scale_up_cluster_op(dag, stage, key, initial_size, total_size, queue): +def scale_up_cluster_op(dag, stage, key, initial_size, total_size, queue, trigger_rule="one_success"): return PythonOperator( task_id='resize_{}_{}'.format(stage, total_size), python_callable=ramp_up_cluster, @@ -139,13 +139,13 @@ def scale_up_cluster_op(dag, stage, key, initial_size, total_size, queue): default_args=default_args, weight_rule=WeightRule.ABSOLUTE, priority_weight=1000, - trigger_rule="one_success", + trigger_rule=trigger_rule, queue=queue, dag=dag ) -def scale_down_cluster_op(dag, stage, key, size, queue): +def scale_down_cluster_op(dag, stage, key, size, queue, trigger_rule="all_success"): return PythonOperator( task_id='resize_{}_{}'.format(stage, size), python_callable=ramp_down_cluster, @@ -153,7 +153,7 @@ def scale_down_cluster_op(dag, stage, key, size, queue): default_args=default_args, weight_rule=WeightRule.ABSOLUTE, priority_weight=1000, - trigger_rule="all_done", + trigger_rule=trigger_rule, queue=queue, dag=dag )