Skip to content

Commit

Permalink
add run_if param to TaskSettings (#53)
Browse files Browse the repository at this point in the history
* add run_if param to TaskSettings

* example added to docs + code fmt + TaskRunCondition added to init

---------

Co-authored-by: dgrigo <[email protected]>
  • Loading branch information
grigoriy835 and dgrigo authored Oct 25, 2023
1 parent 03cc0f2 commit b2af63d
Show file tree
Hide file tree
Showing 9 changed files with 201 additions and 0 deletions.
2 changes: 2 additions & 0 deletions brickflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ def get_bundles_project_env() -> str:
TaskSettings,
TaskResponse,
BrickflowTriggerRule,
TaskRunCondition,
BrickflowTaskEnvVars,
StorageBasedTaskLibrary,
JarTaskLibrary,
Expand Down Expand Up @@ -301,6 +302,7 @@ def get_bundles_project_env() -> str:
"TaskSettings",
"TaskResponse",
"BrickflowTriggerRule",
"TaskRunCondition",
"BrickflowTaskEnvVars",
"StorageBasedTaskLibrary",
"JarTaskLibrary",
Expand Down
12 changes: 12 additions & 0 deletions brickflow/engine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,15 @@ class TaskType(Enum):
NOTEBOOK_TASK = "notebook_task"


class TaskRunCondition(Enum):
ALL_SUCCESS = "ALL_SUCCESS"
AT_LEAST_ONE_SUCCESS = "AT_LEAST_ONE_SUCCESS"
NONE_FAILED = "NONE_FAILED"
ALL_DONE = "ALL_DONE"
AT_LEAST_ONE_FAILED = "AT_LEAST_ONE_FAILED"
ALL_FAILED = "ALL_FAILED"


@dataclass(frozen=True)
class TaskLibrary:
@staticmethod
Expand Down Expand Up @@ -240,6 +249,7 @@ class TaskSettings:
max_retries: Optional[int] = None
min_retry_interval_millis: Optional[int] = None
retry_on_timeout: Optional[bool] = None
run_if: Optional[TaskRunCondition] = None

def merge(self, other: Optional["TaskSettings"]) -> "TaskSettings":
# overrides top level values
Expand All @@ -252,6 +262,7 @@ def merge(self, other: Optional["TaskSettings"]) -> "TaskSettings":
other.max_retries or self.max_retries,
other.min_retry_interval_millis or self.min_retry_interval_millis,
other.retry_on_timeout or self.retry_on_timeout,
other.run_if or self.run_if,
)

def to_tf_dict(
Expand Down Expand Up @@ -280,6 +291,7 @@ def to_tf_dict(
"max_retries": self.max_retries,
"min_retry_interval_millis": self.min_retry_interval_millis,
"retry_on_timeout": self.retry_on_timeout,
**({"run_if": self.run_if.value} if self.run_if else {}),
}


Expand Down
24 changes: 24 additions & 0 deletions docs/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,30 @@ def all_success_task():
1. NONE_FAILED - use this if you want to trigger the task irrespective of the upstream tasks success or failure state
2. ALL_SUCCESS - use this if you want to trigger the task only if all the upstream tasks are all having success state


### Tasks conditional run

Adding condition for task running based on result of parent tasks

```python title="task_conditional_run"
from brickflow import Workflow, TaskRunCondition, TaskSettings
wf = Workflow(...)

@wf.task(
task_settings=TaskSettings(run_if=TaskRunCondition.AT_LEAST_ONE_FAILED)
)
def none_failed_task():
pass
```

This option is determining whether the task is run once its dependencies have been completed. Available options:
1. `ALL_SUCCESS`: All dependencies have executed and succeeded
2. `AT_LEAST_ONE_SUCCESS`: At least one dependency has succeeded
3. `NONE_FAILED`: None of the dependencies have failed and at least one was executed
4. `ALL_DONE`: All dependencies completed and at least one was executed
5. `AT_LEAST_ONE_FAILED`: At least one dependency failed
6. `ALL_FAILED`: ALl dependencies have failed

### Airflow Operators

We have adopted/extended certain airflow operators that might be needed to run as a task in databricks workflows.
Expand Down
30 changes: 30 additions & 0 deletions tests/codegen/expected_bundles/dev_bundle_monorepo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,36 @@ environments:
retry_on_timeout: null
task_key: task_function_4
timeout_seconds: null
- depends_on:
- task_key: task_function_4
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries: []
max_retries: null
min_retry_interval_millis: null
notebook_task:
base_parameters:
all_tasks1: test
all_tasks3: '123'
brickflow_env: dev
brickflow_internal_only_run_tasks: ''
brickflow_internal_task_name: '{{task_key}}'
brickflow_internal_workflow_name: test
brickflow_internal_workflow_prefix: ''
brickflow_internal_workflow_suffix: ''
brickflow_job_id: '{{job_id}}'
brickflow_parent_run_id: '{{parent_run_id}}'
brickflow_run_id: '{{run_id}}'
brickflow_start_date: '{{start_date}}'
brickflow_start_time: '{{start_time}}'
brickflow_task_key: '{{task_key}}'
brickflow_task_retry_count: '{{task_retry_count}}'
notebook_path: some/path/to/root/test_databricks_bundle.py
source: GIT
retry_on_timeout: null
run_if: AT_LEAST_ONE_FAILED
task_key: task_function_5
timeout_seconds: 0.0
- depends_on: []
email_notifications: {}
existing_cluster_id: existing_cluster_id
Expand Down
30 changes: 30 additions & 0 deletions tests/codegen/expected_bundles/dev_bundle_polyrepo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,36 @@ environments:
retry_on_timeout: null
task_key: task_function_4
timeout_seconds: null
- depends_on:
- task_key: task_function_4
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries: []
max_retries: null
min_retry_interval_millis: null
notebook_task:
base_parameters:
all_tasks1: test
all_tasks3: '123'
brickflow_env: dev
brickflow_internal_only_run_tasks: ''
brickflow_internal_task_name: '{{task_key}}'
brickflow_internal_workflow_name: test
brickflow_internal_workflow_prefix: ''
brickflow_internal_workflow_suffix: ''
brickflow_job_id: '{{job_id}}'
brickflow_parent_run_id: '{{parent_run_id}}'
brickflow_run_id: '{{run_id}}'
brickflow_start_date: '{{start_date}}'
brickflow_start_time: '{{start_time}}'
brickflow_task_key: '{{task_key}}'
brickflow_task_retry_count: '{{task_retry_count}}'
notebook_path: test_databricks_bundle.py
source: GIT
retry_on_timeout: null
run_if: AT_LEAST_ONE_FAILED
task_key: task_function_5
timeout_seconds: 0.0
- depends_on: []
email_notifications: {}
existing_cluster_id: existing_cluster_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,39 @@ environments:
retry_on_timeout: null
task_key: task_function_4
timeout_seconds: null
- depends_on:
- task_key: task_function_4
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries:
- pypi:
package: brickflows==0.1.0
repo: null
max_retries: null
min_retry_interval_millis: null
notebook_task:
base_parameters:
all_tasks1: test
all_tasks3: '123'
brickflow_env: dev
brickflow_internal_only_run_tasks: ''
brickflow_internal_task_name: '{{task_key}}'
brickflow_internal_workflow_name: test
brickflow_internal_workflow_prefix: ''
brickflow_internal_workflow_suffix: ''
brickflow_job_id: '{{job_id}}'
brickflow_parent_run_id: '{{parent_run_id}}'
brickflow_run_id: '{{run_id}}'
brickflow_start_date: '{{start_date}}'
brickflow_start_time: '{{start_time}}'
brickflow_task_key: '{{task_key}}'
brickflow_task_retry_count: '{{task_retry_count}}'
notebook_path: test_databricks_bundle.py
source: GIT
retry_on_timeout: null
run_if: AT_LEAST_ONE_FAILED
task_key: task_function_5
timeout_seconds: 0.0
- depends_on: []
email_notifications: {}
existing_cluster_id: existing_cluster_id
Expand Down
30 changes: 30 additions & 0 deletions tests/codegen/expected_bundles/local_bundle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,36 @@ environments:
retry_on_timeout: null
task_key: task_function_4
timeout_seconds: null
- depends_on:
- task_key: task_function_4
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries: []
max_retries: null
min_retry_interval_millis: null
notebook_task:
base_parameters:
all_tasks1: test
all_tasks3: '123'
brickflow_env: local
brickflow_internal_only_run_tasks: ''
brickflow_internal_task_name: '{{task_key}}'
brickflow_internal_workflow_name: test
brickflow_internal_workflow_prefix: ''
brickflow_internal_workflow_suffix: ''
brickflow_job_id: '{{job_id}}'
brickflow_parent_run_id: '{{parent_run_id}}'
brickflow_run_id: '{{run_id}}'
brickflow_start_date: '{{start_date}}'
brickflow_start_time: '{{start_time}}'
brickflow_task_key: '{{task_key}}'
brickflow_task_retry_count: '{{task_retry_count}}'
notebook_path: test_databricks_bundle.py
source: WORKSPACE
retry_on_timeout: null
run_if: AT_LEAST_ONE_FAILED
task_key: task_function_5
timeout_seconds: 0.0
- depends_on: []
email_notifications: {}
existing_cluster_id: existing_cluster_id
Expand Down
30 changes: 30 additions & 0 deletions tests/codegen/expected_bundles/local_bundle_prefix_suffix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,36 @@ environments:
retry_on_timeout: null
task_key: task_function_4
timeout_seconds: null
- depends_on:
- task_key: task_function_4
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries: []
max_retries: null
min_retry_interval_millis: null
notebook_task:
base_parameters:
all_tasks1: test
all_tasks3: '123'
brickflow_env: local
brickflow_internal_only_run_tasks: ''
brickflow_internal_task_name: '{{task_key}}'
brickflow_internal_workflow_name: test
brickflow_internal_workflow_prefix: ''
brickflow_internal_workflow_suffix: ''
brickflow_job_id: '{{job_id}}'
brickflow_parent_run_id: '{{parent_run_id}}'
brickflow_run_id: '{{run_id}}'
brickflow_start_date: '{{start_date}}'
brickflow_start_time: '{{start_time}}'
brickflow_task_key: '{{task_key}}'
brickflow_task_retry_count: '{{task_retry_count}}'
notebook_path: test_databricks_bundle.py
source: WORKSPACE
retry_on_timeout: null
run_if: AT_LEAST_ONE_FAILED
task_key: task_function_5
timeout_seconds: 0.0
- depends_on: []
email_notifications: {}
existing_cluster_id: existing_cluster_id
Expand Down
10 changes: 10 additions & 0 deletions tests/codegen/sample_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
TaskResponse,
DLTPipeline,
NotebookTask,
TaskSettings,
TaskRunCondition,
)
from brickflow.engine.workflow import Workflow, WorkflowPermissions, User

Expand Down Expand Up @@ -95,6 +97,14 @@ def task_function_4():
return "hello world"


@wf.task(
depends_on="task_function_4",
task_settings=TaskSettings(run_if=TaskRunCondition.AT_LEAST_ONE_FAILED),
)
def task_function_5():
return "hello world"


@wf.task(
task_type=TaskType.CUSTOM_PYTHON_TASK,
trigger_rule=BrickflowTriggerRule.NONE_FAILED,
Expand Down

0 comments on commit b2af63d

Please sign in to comment.