Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] Adjust logic for applying outcome for IF_ELSE_CONDITION tasks #173

Merged
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions brickflow/codegen/databricks_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
Targets,
Workspace,
)
from brickflow.cli.projects import MultiProjectManager, get_brickflow_root
from brickflow.codegen import (
CodegenInterface,
DatabricksDefaultClusterTagKeys,
Expand Down Expand Up @@ -461,12 +462,30 @@ def adjust_file_path(self, file_path: str) -> str:
]
).replace("//", "/")

# Finds the start position of the project name in the given file path and calculates the cut position.
# - `file_path.find(self.project.name)`: Finds the start index of the project name in the file path.
# - `+ len(self.project.name) + 1`: Moves the start position to the character after the project name.
multi_project_manager = MultiProjectManager(
config_file_name=str(get_brickflow_root())
)
bf_project = multi_project_manager.get_project(self.project.name)

start_index_of_project_root = file_path.find(
bf_project.path_from_repo_root_to_project_root
)

if start_index_of_project_root < 0:
raise ValueError(
f"Error while adjusting file path. "
f"Project root not found in the file path: {file_path}."
)

# Finds the start position of the path_from_repo_root_to_project_root in the given file path
# and calculates the cut position.
# - `file_path.find: Finds the start index of the project root in the file path.
# - `+ len + 1`: Moves the start position to the character after the project root.
# - Adjusts the file path by appending the local bundle path to the cut file path.
cut_file_path = file_path[
file_path.find(self.project.name) + len(self.project.name) + 1 :
start_index_of_project_root
+ len(bf_project.path_from_repo_root_to_project_root)
+ 1 :
]
file_path = (
bundle_files_local_path + file_path
Expand Down
17 changes: 13 additions & 4 deletions brickflow/engine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -822,15 +822,24 @@ def parents(self) -> List[str]:
@property
def depends_on_names(self) -> Iterator[Dict[str, Optional[str]]]:
for i in self.depends_on:
if self.if_else_outcome:
outcome = list(self.if_else_outcome.values())[0]
task_name = i.__name__ if callable(i) and hasattr(i, "__name__") else str(i)
mikita-sakalouski marked this conversation as resolved.
Show resolved Hide resolved
if (
self.workflow.get_task(task_name).task_type
== TaskType.IF_ELSE_CONDITION_TASK
and self.if_else_outcome
):
outcome = self.if_else_outcome.get(task_name)
if not outcome:
raise ValueError(
f"Task {task_name} is an if else condition task and does not have an outcome"
)
else:
outcome = None

if callable(i) and hasattr(i, "__name__"):
yield {i.__name__: outcome}
yield {task_name: outcome}
else:
yield {str(i): outcome}
yield {task_name: outcome}

@property
def databricks_task_type_str(self) -> str:
Expand Down
51 changes: 51 additions & 0 deletions tests/codegen/expected_bundles/dev_bundle_monorepo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,45 @@ targets:
retry_on_timeout: null
task_key: spark_python_task_a
timeout_seconds: null
- depends_on:
- task_key: spark_python_task_a
- outcome: "false"
task_key: condition_task_test2
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries:
- pypi:
package: koheesio
repo: null
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
spark_python_task:
parameters: ["--param1", "World!", "all_tasks1", "test", "all_tasks3", "123"]
python_file: ./products/test-project/spark/python/src/run_task.py
source: GIT
task_key: spark_python_task_depended
timeout_seconds: null
- depends_on:
- outcome: "true"
task_key: condtion_task_test
- outcome: "false"
task_key: condition_task_test2
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries:
- pypi:
package: koheesio
repo: null
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
spark_python_task:
parameters: ["--param1", "World!", "all_tasks1", "test", "all_tasks3", "123"]
python_file: ./products/test-project/spark/python/src/run_task.py
source: GIT
task_key: spark_python_task_depended2
timeout_seconds: null
- depends_on:
- task_key: notebook_task_a
email_notifications: {}
Expand Down Expand Up @@ -443,6 +482,18 @@ targets:
right: "2"
task_key: "condtion_task_test"
timeout_seconds: null
- depends_on:
- task_key: "sample_sql_task_query"
email_notifications: {}
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
condition_task:
left: "1"
op: "EQUAL_TO"
right: "1"
task_key: "condition_task_test2"
timeout_seconds: null
pipelines:
test_hello_world:
catalog: null
Expand Down
51 changes: 51 additions & 0 deletions tests/codegen/expected_bundles/dev_bundle_polyrepo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,45 @@ targets:
retry_on_timeout: null
task_key: spark_python_task_a
timeout_seconds: null
- depends_on:
- task_key: spark_python_task_a
- outcome: "false"
task_key: condition_task_test2
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries:
- pypi:
package: koheesio
repo: null
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
spark_python_task:
parameters: ["--param1", "World!", "all_tasks1", "test", "all_tasks3", "123"]
python_file: ./products/test-project/spark/python/src/run_task.py
source: GIT
task_key: spark_python_task_depended
timeout_seconds: null
- depends_on:
- outcome: "true"
task_key: condtion_task_test
- outcome: "false"
task_key: condition_task_test2
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries:
- pypi:
package: koheesio
repo: null
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
spark_python_task:
parameters: ["--param1", "World!", "all_tasks1", "test", "all_tasks3", "123"]
python_file: ./products/test-project/spark/python/src/run_task.py
source: GIT
task_key: spark_python_task_depended2
timeout_seconds: null
- depends_on:
- task_key: notebook_task_a
email_notifications: {}
Expand Down Expand Up @@ -443,6 +482,18 @@ targets:
right: "2"
task_key: "condtion_task_test"
timeout_seconds: null
- depends_on:
- task_key: "sample_sql_task_query"
email_notifications: {}
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
condition_task:
left: "1"
op: "EQUAL_TO"
right: "1"
task_key: "condition_task_test2"
timeout_seconds: null
pipelines:
test_hello_world:
catalog: null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,51 @@ targets:
retry_on_timeout: null
task_key: spark_python_task_a
timeout_seconds: null
- depends_on:
- task_key: spark_python_task_a
- outcome: "false"
task_key: condition_task_test2
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries:
- pypi:
package: koheesio
repo: null
- pypi:
package: "brickflows==0.1.0"
repo: null
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
spark_python_task:
parameters: ["--param1", "World!", "all_tasks1", "test", "all_tasks3", "123"]
python_file: ./products/test-project/spark/python/src/run_task.py
source: GIT
task_key: spark_python_task_depended
timeout_seconds: null
- depends_on:
- outcome: "true"
task_key: condtion_task_test
- outcome: "false"
task_key: condition_task_test2
email_notifications: {}
existing_cluster_id: existing_cluster_id
libraries:
- pypi:
package: koheesio
repo: null
- pypi:
package: "brickflows==0.1.0"
repo: null
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
spark_python_task:
parameters: ["--param1", "World!", "all_tasks1", "test", "all_tasks3", "123"]
python_file: ./products/test-project/spark/python/src/run_task.py
source: GIT
task_key: spark_python_task_depended2
timeout_seconds: null
- depends_on:
- task_key: notebook_task_a
email_notifications: {}
Expand Down Expand Up @@ -552,6 +597,18 @@ targets:
right: "2"
task_key: "condtion_task_test"
timeout_seconds: null
- depends_on:
- task_key: "sample_sql_task_query"
email_notifications: {}
max_retries: null
min_retry_interval_millis: null
retry_on_timeout: null
condition_task:
left: "1"
op: "EQUAL_TO"
right: "1"
task_key: "condition_task_test2"
timeout_seconds: null
pipelines:
test_hello_world:
catalog: null
Expand Down
69 changes: 67 additions & 2 deletions tests/codegen/expected_bundles/local_bundle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,19 @@
"retry_on_timeout": null
"task_key": "condtion_task_test"
"timeout_seconds": null
- "condition_task":
"left": "1"
"op": "EQUAL_TO"
"right": "1"
"depends_on":
- "outcome": null
"task_key": "sample_sql_task_query"
"email_notifications": {}
"max_retries": null
"min_retry_interval_millis": null
"retry_on_timeout": null
"task_key": "condition_task_test2"
"timeout_seconds": null
- "depends_on": []
"email_notifications": {}
"existing_cluster_id": "existing_cluster_id"
Expand Down Expand Up @@ -236,23 +249,75 @@
"email_notifications": {}
"existing_cluster_id": "existing_cluster_id"
"libraries":
- "pypi":
- "pypi":
"package": "koheesio"
"repo": null
"max_retries": null
"min_retry_interval_millis": null
"retry_on_timeout": null
"spark_python_task":
"parameters":
- "--param1"
- "World!"
- "all_tasks1"
- "test"
- "all_tasks3"
- "123"
"python_file": "/Workspace/Users/${workspace.current_user.userName}/.brickflow_bundles/test-project/local/files/spark/python/src/run_task.py"
"source": "WORKSPACE"
"task_key": "spark_python_task_a"
"timeout_seconds": null
- "depends_on":
- "outcome": null
"task_key": "spark_python_task_a"
- "outcome": "false"
"task_key": "condition_task_test2"
"email_notifications": {}
"existing_cluster_id": "existing_cluster_id"
"libraries":
- "pypi":
"package": "koheesio"
"repo": null
"max_retries": null
"min_retry_interval_millis": null
"retry_on_timeout": null
"spark_python_task":
"parameters":
- "--param1"
- "World!"
- "all_tasks1"
- "test"
- "all_tasks3"
- "123"
"python_file": "/Workspace/Users/${workspace.current_user.userName}/.brickflow_bundles/test-project/local/files/spark/python/src/run_task.py"
"source": "WORKSPACE"
"task_key": "spark_python_task_depended"
"timeout_seconds": null
- "depends_on":
- "outcome": "true"
"task_key": "condtion_task_test"
- "outcome": "false"
"task_key": "condition_task_test2"
"email_notifications": {}
"existing_cluster_id": "existing_cluster_id"
"libraries":
- "pypi":
"package": "koheesio"
"repo": null
"max_retries": null
"min_retry_interval_millis": null
"retry_on_timeout": null
"spark_python_task":
"parameters":
- "--param1"
- "World!"
- "all_tasks1"
- "test"
- "all_tasks3"
- "123"
"task_key": "spark_python_task_a"
"python_file": "/Workspace/Users/${workspace.current_user.userName}/.brickflow_bundles/test-project/local/files/spark/python/src/run_task.py"
"source": "WORKSPACE"
"task_key": "spark_python_task_depended2"
"timeout_seconds": null
- "depends_on": []
"email_notifications": {}
Expand Down
Loading
Loading