Skip to content

Commit

Permalink
[fix] Adjust logic for applying outcome for IF_ELSE_CONDITION tasks (#…
Browse files Browse the repository at this point in the history
…173)

* feat: enhance if-else condition handling in Task class
  • Loading branch information
mikita-sakalouski authored Oct 21, 2024
1 parent f133723 commit 039b74c
Show file tree
Hide file tree
Showing 7 changed files with 324 additions and 8 deletions.
18 changes: 12 additions & 6 deletions brickflow/engine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -822,15 +822,21 @@ def parents(self) -> List[str]:
@property
def depends_on_names(self) -> Iterator[Dict[str, Optional[str]]]:
for i in self.depends_on:
if self.if_else_outcome:
outcome = list(self.if_else_outcome.values())[0]
task_name = i.__name__ if callable(i) and hasattr(i, "__name__") else str(i)
if (
self.workflow.get_task(task_name).task_type
== TaskType.IF_ELSE_CONDITION_TASK
and self.if_else_outcome
):
outcome = self.if_else_outcome.get(task_name)
if not outcome:
raise ValueError(
f"Task {task_name} is an if else condition task and does not have an outcome"
)
else:
outcome = None

if callable(i) and hasattr(i, "__name__"):
yield {i.__name__: outcome}
else:
yield {str(i): outcome}
yield {task_name: outcome}

@property
def databricks_task_type_str(self) -> str:
Expand Down
51 changes: 51 additions & 0 deletions tests/codegen/expected_bundles/dev_bundle_monorepo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,45 @@ targets:
retry_on_timeout: null
task_key: spark_python_task_a
timeout_seconds: null
- depends_on:
- task_key: spark_python_task_a
- outcome: "false"
task_key: condition_task_test2
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries:
- pypi:
package: koheesio
repo: null
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
spark_python_task:
parameters: ["--param1", "World!", "all_tasks1", "test", "all_tasks3", "123"]
python_file: ./products/test-project/spark/python/src/run_task.py
source: GIT
task_key: spark_python_task_depended
timeout_seconds: null
- depends_on:
- outcome: "true"
task_key: condtion_task_test
- outcome: "false"
task_key: condition_task_test2
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries:
- pypi:
package: koheesio
repo: null
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
spark_python_task:
parameters: ["--param1", "World!", "all_tasks1", "test", "all_tasks3", "123"]
python_file: ./products/test-project/spark/python/src/run_task.py
source: GIT
task_key: spark_python_task_depended2
timeout_seconds: null
- depends_on:
- task_key: notebook_task_a
email_notifications: {}
Expand Down Expand Up @@ -443,6 +482,18 @@ targets:
right: "2"
task_key: "condtion_task_test"
timeout_seconds: null
- depends_on:
- task_key: "sample_sql_task_query"
email_notifications: {}
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
condition_task:
left: "1"
op: "EQUAL_TO"
right: "1"
task_key: "condition_task_test2"
timeout_seconds: null
pipelines:
test_hello_world:
catalog: null
Expand Down
51 changes: 51 additions & 0 deletions tests/codegen/expected_bundles/dev_bundle_polyrepo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,45 @@ targets:
retry_on_timeout: null
task_key: spark_python_task_a
timeout_seconds: null
- depends_on:
- task_key: spark_python_task_a
- outcome: "false"
task_key: condition_task_test2
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries:
- pypi:
package: koheesio
repo: null
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
spark_python_task:
parameters: ["--param1", "World!", "all_tasks1", "test", "all_tasks3", "123"]
python_file: ./products/test-project/spark/python/src/run_task.py
source: GIT
task_key: spark_python_task_depended
timeout_seconds: null
- depends_on:
- outcome: "true"
task_key: condtion_task_test
- outcome: "false"
task_key: condition_task_test2
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries:
- pypi:
package: koheesio
repo: null
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
spark_python_task:
parameters: ["--param1", "World!", "all_tasks1", "test", "all_tasks3", "123"]
python_file: ./products/test-project/spark/python/src/run_task.py
source: GIT
task_key: spark_python_task_depended2
timeout_seconds: null
- depends_on:
- task_key: notebook_task_a
email_notifications: {}
Expand Down Expand Up @@ -443,6 +482,18 @@ targets:
right: "2"
task_key: "condtion_task_test"
timeout_seconds: null
- depends_on:
- task_key: "sample_sql_task_query"
email_notifications: {}
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
condition_task:
left: "1"
op: "EQUAL_TO"
right: "1"
task_key: "condition_task_test2"
timeout_seconds: null
pipelines:
test_hello_world:
catalog: null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,51 @@ targets:
retry_on_timeout: null
task_key: spark_python_task_a
timeout_seconds: null
- depends_on:
- task_key: spark_python_task_a
- outcome: "false"
task_key: condition_task_test2
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries:
- pypi:
package: koheesio
repo: null
- pypi:
package: "brickflows==0.1.0"
repo: null
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
spark_python_task:
parameters: ["--param1", "World!", "all_tasks1", "test", "all_tasks3", "123"]
python_file: ./products/test-project/spark/python/src/run_task.py
source: GIT
task_key: spark_python_task_depended
timeout_seconds: null
- depends_on:
- outcome: "true"
task_key: condtion_task_test
- outcome: "false"
task_key: condition_task_test2
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries:
- pypi:
package: koheesio
repo: null
- pypi:
package: "brickflows==0.1.0"
repo: null
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
spark_python_task:
parameters: ["--param1", "World!", "all_tasks1", "test", "all_tasks3", "123"]
python_file: ./products/test-project/spark/python/src/run_task.py
source: GIT
task_key: spark_python_task_depended2
timeout_seconds: null
- depends_on:
- task_key: notebook_task_a
email_notifications: {}
Expand Down Expand Up @@ -552,6 +597,18 @@ targets:
right: "2"
task_key: "condtion_task_test"
timeout_seconds: null
- depends_on:
- task_key: "sample_sql_task_query"
email_notifications: {}
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
condition_task:
left: "1"
op: "EQUAL_TO"
right: "1"
task_key: "condition_task_test2"
timeout_seconds: null
pipelines:
test_hello_world:
catalog: null
Expand Down
69 changes: 67 additions & 2 deletions tests/codegen/expected_bundles/local_bundle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,19 @@
"retry_on_timeout": null
"task_key": "condtion_task_test"
"timeout_seconds": null
- "condition_task":
"left": "1"
"op": "EQUAL_TO"
"right": "1"
"depends_on":
- "outcome": null
"task_key": "sample_sql_task_query"
"email_notifications": {}
"max_retries": null
"min_retry_interval_millis": null
"retry_on_timeout": null
"task_key": "condition_task_test2"
"timeout_seconds": null
- "depends_on": []
"email_notifications": {}
"existing_cluster_id": "existing_cluster_id"
Expand Down Expand Up @@ -236,23 +249,75 @@
"email_notifications": {}
"existing_cluster_id": "existing_cluster_id"
"libraries":
- "pypi":
- "pypi":
"package": "koheesio"
"repo": null
"max_retries": null
"min_retry_interval_millis": null
"retry_on_timeout": null
"spark_python_task":
"parameters":
- "--param1"
- "World!"
- "all_tasks1"
- "test"
- "all_tasks3"
- "123"
"python_file": "/Workspace/Users/${workspace.current_user.userName}/.brickflow_bundles/test-project/local/files/spark/python/src/run_task.py"
"source": "WORKSPACE"
"task_key": "spark_python_task_a"
"timeout_seconds": null
- "depends_on":
- "outcome": null
"task_key": "spark_python_task_a"
- "outcome": "false"
"task_key": "condition_task_test2"
"email_notifications": {}
"existing_cluster_id": "existing_cluster_id"
"libraries":
- "pypi":
"package": "koheesio"
"repo": null
"max_retries": null
"min_retry_interval_millis": null
"retry_on_timeout": null
"spark_python_task":
"parameters":
- "--param1"
- "World!"
- "all_tasks1"
- "test"
- "all_tasks3"
- "123"
"python_file": "/Workspace/Users/${workspace.current_user.userName}/.brickflow_bundles/test-project/local/files/spark/python/src/run_task.py"
"source": "WORKSPACE"
"task_key": "spark_python_task_depended"
"timeout_seconds": null
- "depends_on":
- "outcome": "true"
"task_key": "condtion_task_test"
- "outcome": "false"
"task_key": "condition_task_test2"
"email_notifications": {}
"existing_cluster_id": "existing_cluster_id"
"libraries":
- "pypi":
"package": "koheesio"
"repo": null
"max_retries": null
"min_retry_interval_millis": null
"retry_on_timeout": null
"spark_python_task":
"parameters":
- "--param1"
- "World!"
- "all_tasks1"
- "test"
- "all_tasks3"
- "123"
"task_key": "spark_python_task_a"
"python_file": "/Workspace/Users/${workspace.current_user.userName}/.brickflow_bundles/test-project/local/files/spark/python/src/run_task.py"
"source": "WORKSPACE"
"task_key": "spark_python_task_depended2"
"timeout_seconds": null
- "depends_on": []
"email_notifications": {}
Expand Down
51 changes: 51 additions & 0 deletions tests/codegen/expected_bundles/local_bundle_prefix_suffix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,45 @@ targets:
retry_on_timeout: null
task_key: spark_python_task_a
timeout_seconds: null
- depends_on:
- task_key: spark_python_task_a
- outcome: "false"
task_key: condition_task_test2
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries:
- pypi:
package: koheesio
repo: null
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
spark_python_task:
python_file: /Workspace/Users/${workspace.current_user.userName}/.brickflow_bundles/test-project/local/files/spark/python/src/run_task.py
parameters: ["--param1", "World!", "all_tasks1", "test", "all_tasks3", "123"]
source: WORKSPACE
task_key: spark_python_task_depended
timeout_seconds: null
- depends_on:
- outcome: "true"
task_key: condtion_task_test
- outcome: "false"
task_key: condition_task_test2
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries:
- pypi:
package: koheesio
repo: null
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
spark_python_task:
python_file: /Workspace/Users/${workspace.current_user.userName}/.brickflow_bundles/test-project/local/files/spark/python/src/run_task.py
parameters: ["--param1", "World!", "all_tasks1", "test", "all_tasks3", "123"]
source: WORKSPACE
task_key: spark_python_task_depended2
timeout_seconds: null
- depends_on:
- task_key: notebook_task_a
email_notifications: {}
Expand Down Expand Up @@ -440,6 +479,18 @@ targets:
right: "2"
task_key: "condtion_task_test"
timeout_seconds: null
- depends_on:
- task_key: "sample_sql_task_query"
email_notifications: {}
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
condition_task:
left: "1"
op: "EQUAL_TO"
right: "1"
task_key: "condition_task_test2"
timeout_seconds: null
pipelines:
test_hello_world:
catalog: null
Expand Down
Loading

0 comments on commit 039b74c

Please sign in to comment.