diff --git a/brickflow/__init__.py b/brickflow/__init__.py index 773ce6cb..1d5433fb 100644 --- a/brickflow/__init__.py +++ b/brickflow/__init__.py @@ -259,6 +259,7 @@ def get_bundles_project_env() -> str: TaskSettings, TaskResponse, BrickflowTriggerRule, + TaskRunCondition, BrickflowTaskEnvVars, StorageBasedTaskLibrary, JarTaskLibrary, @@ -301,6 +302,7 @@ def get_bundles_project_env() -> str: "TaskSettings", "TaskResponse", "BrickflowTriggerRule", + "TaskRunCondition", "BrickflowTaskEnvVars", "StorageBasedTaskLibrary", "JarTaskLibrary", diff --git a/brickflow/engine/task.py b/brickflow/engine/task.py index a94d7d53..8a299ffc 100644 --- a/brickflow/engine/task.py +++ b/brickflow/engine/task.py @@ -97,6 +97,15 @@ class TaskType(Enum): NOTEBOOK_TASK = "notebook_task" +class TaskRunCondition(Enum): + ALL_SUCCESS = "ALL_SUCCESS" + AT_LEAST_ONE_SUCCESS = "AT_LEAST_ONE_SUCCESS" + NONE_FAILED = "NONE_FAILED" + ALL_DONE = "ALL_DONE" + AT_LEAST_ONE_FAILED = "AT_LEAST_ONE_FAILED" + ALL_FAILED = "ALL_FAILED" + + @dataclass(frozen=True) class TaskLibrary: @staticmethod @@ -240,6 +249,7 @@ class TaskSettings: max_retries: Optional[int] = None min_retry_interval_millis: Optional[int] = None retry_on_timeout: Optional[bool] = None + run_if: Optional[TaskRunCondition] = None def merge(self, other: Optional["TaskSettings"]) -> "TaskSettings": # overrides top level values @@ -252,6 +262,7 @@ def merge(self, other: Optional["TaskSettings"]) -> "TaskSettings": other.max_retries or self.max_retries, other.min_retry_interval_millis or self.min_retry_interval_millis, other.retry_on_timeout or self.retry_on_timeout, + other.run_if or self.run_if, ) def to_tf_dict( @@ -280,6 +291,7 @@ def to_tf_dict( "max_retries": self.max_retries, "min_retry_interval_millis": self.min_retry_interval_millis, "retry_on_timeout": self.retry_on_timeout, + **({"run_if": self.run_if.value} if self.run_if else {}), } diff --git a/docs/tasks.md b/docs/tasks.md index 2304c721..8c2c97ea 100644 --- a/docs/tasks.md +++ b/docs/tasks.md @@ -253,6 +253,30 @@ def all_success_task(): 1. NONE_FAILED - use this if you want to trigger the task irrespective of the upstream tasks success or failure state 2. ALL_SUCCESS - use this if you want to trigger the task only if all the upstream tasks are all having success state + +### Tasks conditional run + +Adding condition for task running based on result of parent tasks + +```python title="task_conditional_run" +from brickflow import Workflow, TaskRunCondition, TaskSettings +wf = Workflow(...) + +@wf.task( + task_settings=TaskSettings(run_if=TaskRunCondition.AT_LEAST_ONE_FAILED) +) +def none_failed_task(): + pass +``` + +This option is determining whether the task is run once its dependencies have been completed. Available options: +1. `ALL_SUCCESS`: All dependencies have executed and succeeded +2. `AT_LEAST_ONE_SUCCESS`: At least one dependency has succeeded +3. `NONE_FAILED`: None of the dependencies have failed and at least one was executed +4. `ALL_DONE`: All dependencies completed and at least one was executed +5. `AT_LEAST_ONE_FAILED`: At least one dependency failed +6. `ALL_FAILED`: ALl dependencies have failed + ### Airflow Operators We have adopted/extended certain airflow operators that might be needed to run as a task in databricks workflows. diff --git a/tests/codegen/expected_bundles/dev_bundle_monorepo.yml b/tests/codegen/expected_bundles/dev_bundle_monorepo.yml index f56dd142..3f3b2328 100644 --- a/tests/codegen/expected_bundles/dev_bundle_monorepo.yml +++ b/tests/codegen/expected_bundles/dev_bundle_monorepo.yml @@ -201,6 +201,36 @@ environments: retry_on_timeout: null task_key: task_function_4 timeout_seconds: null + - depends_on: + - task_key: task_function_4 + email_notifications: {} + existing_cluster_id: existing_cluster_id + libraries: [] + max_retries: null + min_retry_interval_millis: null + notebook_task: + base_parameters: + all_tasks1: test + all_tasks3: '123' + brickflow_env: dev + brickflow_internal_only_run_tasks: '' + brickflow_internal_task_name: '{{task_key}}' + brickflow_internal_workflow_name: test + brickflow_internal_workflow_prefix: '' + brickflow_internal_workflow_suffix: '' + brickflow_job_id: '{{job_id}}' + brickflow_parent_run_id: '{{parent_run_id}}' + brickflow_run_id: '{{run_id}}' + brickflow_start_date: '{{start_date}}' + brickflow_start_time: '{{start_time}}' + brickflow_task_key: '{{task_key}}' + brickflow_task_retry_count: '{{task_retry_count}}' + notebook_path: some/path/to/root/test_databricks_bundle.py + source: GIT + retry_on_timeout: null + run_if: AT_LEAST_ONE_FAILED + task_key: task_function_5 + timeout_seconds: 0.0 - depends_on: [] email_notifications: {} existing_cluster_id: existing_cluster_id diff --git a/tests/codegen/expected_bundles/dev_bundle_polyrepo.yml b/tests/codegen/expected_bundles/dev_bundle_polyrepo.yml index 8171e12c..d2335fc8 100644 --- a/tests/codegen/expected_bundles/dev_bundle_polyrepo.yml +++ b/tests/codegen/expected_bundles/dev_bundle_polyrepo.yml @@ -201,6 +201,36 @@ environments: retry_on_timeout: null task_key: task_function_4 timeout_seconds: null + - depends_on: + - task_key: task_function_4 + email_notifications: {} + existing_cluster_id: existing_cluster_id + libraries: [] + max_retries: null + min_retry_interval_millis: null + notebook_task: + base_parameters: + all_tasks1: test + all_tasks3: '123' + brickflow_env: dev + brickflow_internal_only_run_tasks: '' + brickflow_internal_task_name: '{{task_key}}' + brickflow_internal_workflow_name: test + brickflow_internal_workflow_prefix: '' + brickflow_internal_workflow_suffix: '' + brickflow_job_id: '{{job_id}}' + brickflow_parent_run_id: '{{parent_run_id}}' + brickflow_run_id: '{{run_id}}' + brickflow_start_date: '{{start_date}}' + brickflow_start_time: '{{start_time}}' + brickflow_task_key: '{{task_key}}' + brickflow_task_retry_count: '{{task_retry_count}}' + notebook_path: test_databricks_bundle.py + source: GIT + retry_on_timeout: null + run_if: AT_LEAST_ONE_FAILED + task_key: task_function_5 + timeout_seconds: 0.0 - depends_on: [] email_notifications: {} existing_cluster_id: existing_cluster_id diff --git a/tests/codegen/expected_bundles/dev_bundle_polyrepo_with_auto_libs.yml b/tests/codegen/expected_bundles/dev_bundle_polyrepo_with_auto_libs.yml index fac192de..e151fb4b 100644 --- a/tests/codegen/expected_bundles/dev_bundle_polyrepo_with_auto_libs.yml +++ b/tests/codegen/expected_bundles/dev_bundle_polyrepo_with_auto_libs.yml @@ -275,6 +275,39 @@ environments: retry_on_timeout: null task_key: task_function_4 timeout_seconds: null + - depends_on: + - task_key: task_function_4 + email_notifications: {} + existing_cluster_id: existing_cluster_id + libraries: + - pypi: + package: brickflows==0.1.0 + repo: null + max_retries: null + min_retry_interval_millis: null + notebook_task: + base_parameters: + all_tasks1: test + all_tasks3: '123' + brickflow_env: dev + brickflow_internal_only_run_tasks: '' + brickflow_internal_task_name: '{{task_key}}' + brickflow_internal_workflow_name: test + brickflow_internal_workflow_prefix: '' + brickflow_internal_workflow_suffix: '' + brickflow_job_id: '{{job_id}}' + brickflow_parent_run_id: '{{parent_run_id}}' + brickflow_run_id: '{{run_id}}' + brickflow_start_date: '{{start_date}}' + brickflow_start_time: '{{start_time}}' + brickflow_task_key: '{{task_key}}' + brickflow_task_retry_count: '{{task_retry_count}}' + notebook_path: test_databricks_bundle.py + source: GIT + retry_on_timeout: null + run_if: AT_LEAST_ONE_FAILED + task_key: task_function_5 + timeout_seconds: 0.0 - depends_on: [] email_notifications: {} existing_cluster_id: existing_cluster_id diff --git a/tests/codegen/expected_bundles/local_bundle.yml b/tests/codegen/expected_bundles/local_bundle.yml index 0cbeaebd..6d930d87 100644 --- a/tests/codegen/expected_bundles/local_bundle.yml +++ b/tests/codegen/expected_bundles/local_bundle.yml @@ -198,6 +198,36 @@ environments: retry_on_timeout: null task_key: task_function_4 timeout_seconds: null + - depends_on: + - task_key: task_function_4 + email_notifications: {} + existing_cluster_id: existing_cluster_id + libraries: [] + max_retries: null + min_retry_interval_millis: null + notebook_task: + base_parameters: + all_tasks1: test + all_tasks3: '123' + brickflow_env: local + brickflow_internal_only_run_tasks: '' + brickflow_internal_task_name: '{{task_key}}' + brickflow_internal_workflow_name: test + brickflow_internal_workflow_prefix: '' + brickflow_internal_workflow_suffix: '' + brickflow_job_id: '{{job_id}}' + brickflow_parent_run_id: '{{parent_run_id}}' + brickflow_run_id: '{{run_id}}' + brickflow_start_date: '{{start_date}}' + brickflow_start_time: '{{start_time}}' + brickflow_task_key: '{{task_key}}' + brickflow_task_retry_count: '{{task_retry_count}}' + notebook_path: test_databricks_bundle.py + source: WORKSPACE + retry_on_timeout: null + run_if: AT_LEAST_ONE_FAILED + task_key: task_function_5 + timeout_seconds: 0.0 - depends_on: [] email_notifications: {} existing_cluster_id: existing_cluster_id diff --git a/tests/codegen/expected_bundles/local_bundle_prefix_suffix.yml b/tests/codegen/expected_bundles/local_bundle_prefix_suffix.yml index e91e2279..92f469d6 100644 --- a/tests/codegen/expected_bundles/local_bundle_prefix_suffix.yml +++ b/tests/codegen/expected_bundles/local_bundle_prefix_suffix.yml @@ -198,6 +198,36 @@ environments: retry_on_timeout: null task_key: task_function_4 timeout_seconds: null + - depends_on: + - task_key: task_function_4 + email_notifications: {} + existing_cluster_id: existing_cluster_id + libraries: [] + max_retries: null + min_retry_interval_millis: null + notebook_task: + base_parameters: + all_tasks1: test + all_tasks3: '123' + brickflow_env: local + brickflow_internal_only_run_tasks: '' + brickflow_internal_task_name: '{{task_key}}' + brickflow_internal_workflow_name: test + brickflow_internal_workflow_prefix: '' + brickflow_internal_workflow_suffix: '' + brickflow_job_id: '{{job_id}}' + brickflow_parent_run_id: '{{parent_run_id}}' + brickflow_run_id: '{{run_id}}' + brickflow_start_date: '{{start_date}}' + brickflow_start_time: '{{start_time}}' + brickflow_task_key: '{{task_key}}' + brickflow_task_retry_count: '{{task_retry_count}}' + notebook_path: test_databricks_bundle.py + source: WORKSPACE + retry_on_timeout: null + run_if: AT_LEAST_ONE_FAILED + task_key: task_function_5 + timeout_seconds: 0.0 - depends_on: [] email_notifications: {} existing_cluster_id: existing_cluster_id diff --git a/tests/codegen/sample_workflow.py b/tests/codegen/sample_workflow.py index 48db7f42..124c4f22 100644 --- a/tests/codegen/sample_workflow.py +++ b/tests/codegen/sample_workflow.py @@ -5,6 +5,8 @@ TaskResponse, DLTPipeline, NotebookTask, + TaskSettings, + TaskRunCondition, ) from brickflow.engine.workflow import Workflow, WorkflowPermissions, User @@ -95,6 +97,14 @@ def task_function_4(): return "hello world" +@wf.task( + depends_on="task_function_4", + task_settings=TaskSettings(run_if=TaskRunCondition.AT_LEAST_ONE_FAILED), +) +def task_function_5(): + return "hello world" + + @wf.task( task_type=TaskType.CUSTOM_PYTHON_TASK, trigger_rule=BrickflowTriggerRule.NONE_FAILED,