Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] Adjust logic for applying outcome for IF_ELSE_CONDITION tasks #173

Merged
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions brickflow/engine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -822,15 +822,24 @@ def parents(self) -> List[str]:
@property
def depends_on_names(self) -> Iterator[Dict[str, Optional[str]]]:
for i in self.depends_on:
if self.if_else_outcome:
outcome = list(self.if_else_outcome.values())[0]
task_name = i.__name__ if callable(i) and hasattr(i, "__name__") else str(i)
mikita-sakalouski marked this conversation as resolved.
Show resolved Hide resolved
if (
self.workflow.get_task(task_name).task_type
== TaskType.IF_ELSE_CONDITION_TASK
and self.if_else_outcome
):
outcome = self.if_else_outcome.get(task_name)
if not outcome:
raise ValueError(
f"Task {task_name} is an if else condition task and does not have an outcome"
)
else:
outcome = None

if callable(i) and hasattr(i, "__name__"):
yield {i.__name__: outcome}
yield {task_name: outcome}
else:
yield {str(i): outcome}
yield {task_name: outcome}

@property
def databricks_task_type_str(self) -> str:
Expand Down
51 changes: 51 additions & 0 deletions tests/codegen/expected_bundles/dev_bundle_monorepo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,45 @@ targets:
retry_on_timeout: null
task_key: spark_python_task_a
timeout_seconds: null
- depends_on:
- task_key: spark_python_task_a
- outcome: "false"
task_key: condition_task_test2
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries:
- pypi:
package: koheesio
repo: null
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
spark_python_task:
parameters: ["--param1", "World!", "all_tasks1", "test", "all_tasks3", "123"]
python_file: ./products/test-project/spark/python/src/run_task.py
source: GIT
task_key: spark_python_task_depended
timeout_seconds: null
- depends_on:
- outcome: "true"
task_key: condtion_task_test
- outcome: "false"
task_key: condition_task_test2
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries:
- pypi:
package: koheesio
repo: null
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
spark_python_task:
parameters: ["--param1", "World!", "all_tasks1", "test", "all_tasks3", "123"]
python_file: ./products/test-project/spark/python/src/run_task.py
source: GIT
task_key: spark_python_task_depended2
timeout_seconds: null
- depends_on:
- task_key: notebook_task_a
email_notifications: {}
Expand Down Expand Up @@ -443,6 +482,18 @@ targets:
right: "2"
task_key: "condtion_task_test"
timeout_seconds: null
- depends_on:
- task_key: "sample_sql_task_query"
email_notifications: {}
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
condition_task:
left: "1"
op: "EQUAL_TO"
right: "1"
task_key: "condition_task_test2"
timeout_seconds: null
pipelines:
test_hello_world:
catalog: null
Expand Down
51 changes: 51 additions & 0 deletions tests/codegen/expected_bundles/dev_bundle_polyrepo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,45 @@ targets:
retry_on_timeout: null
task_key: spark_python_task_a
timeout_seconds: null
- depends_on:
- task_key: spark_python_task_a
- outcome: "false"
task_key: condition_task_test2
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries:
- pypi:
package: koheesio
repo: null
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
spark_python_task:
parameters: ["--param1", "World!", "all_tasks1", "test", "all_tasks3", "123"]
python_file: ./products/test-project/spark/python/src/run_task.py
source: GIT
task_key: spark_python_task_depended
timeout_seconds: null
- depends_on:
- outcome: "true"
task_key: condtion_task_test
- outcome: "false"
task_key: condition_task_test2
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries:
- pypi:
package: koheesio
repo: null
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
spark_python_task:
parameters: ["--param1", "World!", "all_tasks1", "test", "all_tasks3", "123"]
python_file: ./products/test-project/spark/python/src/run_task.py
source: GIT
task_key: spark_python_task_depended2
timeout_seconds: null
- depends_on:
- task_key: notebook_task_a
email_notifications: {}
Expand Down Expand Up @@ -443,6 +482,18 @@ targets:
right: "2"
task_key: "condtion_task_test"
timeout_seconds: null
- depends_on:
- task_key: "sample_sql_task_query"
email_notifications: {}
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
condition_task:
left: "1"
op: "EQUAL_TO"
right: "1"
task_key: "condition_task_test2"
timeout_seconds: null
pipelines:
test_hello_world:
catalog: null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,51 @@ targets:
retry_on_timeout: null
task_key: spark_python_task_a
timeout_seconds: null
- depends_on:
- task_key: spark_python_task_a
- outcome: "false"
task_key: condition_task_test2
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries:
- pypi:
package: koheesio
repo: null
- pypi:
package: "brickflows==0.1.0"
repo: null
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
spark_python_task:
parameters: ["--param1", "World!", "all_tasks1", "test", "all_tasks3", "123"]
python_file: ./products/test-project/spark/python/src/run_task.py
source: GIT
task_key: spark_python_task_depended
timeout_seconds: null
- depends_on:
- outcome: "true"
task_key: condtion_task_test
- outcome: "false"
task_key: condition_task_test2
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries:
- pypi:
package: koheesio
repo: null
- pypi:
package: "brickflows==0.1.0"
repo: null
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
spark_python_task:
parameters: ["--param1", "World!", "all_tasks1", "test", "all_tasks3", "123"]
python_file: ./products/test-project/spark/python/src/run_task.py
source: GIT
task_key: spark_python_task_depended2
timeout_seconds: null
- depends_on:
- task_key: notebook_task_a
email_notifications: {}
Expand Down Expand Up @@ -552,6 +597,18 @@ targets:
right: "2"
task_key: "condtion_task_test"
timeout_seconds: null
- depends_on:
- task_key: "sample_sql_task_query"
email_notifications: {}
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
condition_task:
left: "1"
op: "EQUAL_TO"
right: "1"
task_key: "condition_task_test2"
timeout_seconds: null
pipelines:
test_hello_world:
catalog: null
Expand Down
69 changes: 67 additions & 2 deletions tests/codegen/expected_bundles/local_bundle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,19 @@
"retry_on_timeout": null
"task_key": "condtion_task_test"
"timeout_seconds": null
- "condition_task":
"left": "1"
"op": "EQUAL_TO"
"right": "1"
"depends_on":
- "outcome": null
"task_key": "sample_sql_task_query"
"email_notifications": {}
"max_retries": null
"min_retry_interval_millis": null
"retry_on_timeout": null
"task_key": "condition_task_test2"
"timeout_seconds": null
- "depends_on": []
"email_notifications": {}
"existing_cluster_id": "existing_cluster_id"
Expand Down Expand Up @@ -236,23 +249,75 @@
"email_notifications": {}
"existing_cluster_id": "existing_cluster_id"
"libraries":
- "pypi":
- "pypi":
"package": "koheesio"
"repo": null
"max_retries": null
"min_retry_interval_millis": null
"retry_on_timeout": null
"spark_python_task":
"parameters":
- "--param1"
- "World!"
- "all_tasks1"
- "test"
- "all_tasks3"
- "123"
"python_file": "/Workspace/Users/${workspace.current_user.userName}/.brickflow_bundles/test-project/local/files/spark/python/src/run_task.py"
"source": "WORKSPACE"
"task_key": "spark_python_task_a"
"timeout_seconds": null
- "depends_on":
- "outcome": null
"task_key": "spark_python_task_a"
- "outcome": "false"
"task_key": "condition_task_test2"
"email_notifications": {}
"existing_cluster_id": "existing_cluster_id"
"libraries":
- "pypi":
"package": "koheesio"
"repo": null
"max_retries": null
"min_retry_interval_millis": null
"retry_on_timeout": null
"spark_python_task":
"parameters":
- "--param1"
- "World!"
- "all_tasks1"
- "test"
- "all_tasks3"
- "123"
"python_file": "/Workspace/Users/${workspace.current_user.userName}/.brickflow_bundles/test-project/local/files/spark/python/src/run_task.py"
"source": "WORKSPACE"
"task_key": "spark_python_task_depended"
"timeout_seconds": null
- "depends_on":
- "outcome": "true"
"task_key": "condtion_task_test"
- "outcome": "false"
"task_key": "condition_task_test2"
"email_notifications": {}
"existing_cluster_id": "existing_cluster_id"
"libraries":
- "pypi":
"package": "koheesio"
"repo": null
"max_retries": null
"min_retry_interval_millis": null
"retry_on_timeout": null
"spark_python_task":
"parameters":
- "--param1"
- "World!"
- "all_tasks1"
- "test"
- "all_tasks3"
- "123"
"task_key": "spark_python_task_a"
"python_file": "/Workspace/Users/${workspace.current_user.userName}/.brickflow_bundles/test-project/local/files/spark/python/src/run_task.py"
"source": "WORKSPACE"
"task_key": "spark_python_task_depended2"
"timeout_seconds": null
- "depends_on": []
"email_notifications": {}
Expand Down
51 changes: 51 additions & 0 deletions tests/codegen/expected_bundles/local_bundle_prefix_suffix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,45 @@ targets:
retry_on_timeout: null
task_key: spark_python_task_a
timeout_seconds: null
- depends_on:
- task_key: spark_python_task_a
- outcome: "false"
task_key: condition_task_test2
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries:
- pypi:
package: koheesio
repo: null
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
spark_python_task:
python_file: /Workspace/Users/${workspace.current_user.userName}/.brickflow_bundles/test-project/local/files/spark/python/src/run_task.py
parameters: ["--param1", "World!", "all_tasks1", "test", "all_tasks3", "123"]
source: WORKSPACE
task_key: spark_python_task_depended
timeout_seconds: null
- depends_on:
- outcome: "true"
task_key: condtion_task_test
- outcome: "false"
task_key: condition_task_test2
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries:
- pypi:
package: koheesio
repo: null
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
spark_python_task:
python_file: /Workspace/Users/${workspace.current_user.userName}/.brickflow_bundles/test-project/local/files/spark/python/src/run_task.py
parameters: ["--param1", "World!", "all_tasks1", "test", "all_tasks3", "123"]
source: WORKSPACE
task_key: spark_python_task_depended2
timeout_seconds: null
- depends_on:
- task_key: notebook_task_a
email_notifications: {}
Expand Down Expand Up @@ -440,6 +479,18 @@ targets:
right: "2"
task_key: "condtion_task_test"
timeout_seconds: null
- depends_on:
- task_key: "sample_sql_task_query"
email_notifications: {}
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
condition_task:
left: "1"
op: "EQUAL_TO"
right: "1"
task_key: "condition_task_test2"
timeout_seconds: null
pipelines:
test_hello_world:
catalog: null
Expand Down
Loading
Loading