Skip to content

Commit

Permalink
Merge branch 'main' into feature/120-continuous-schedule
Browse files Browse the repository at this point in the history
  • Loading branch information
pariksheet committed Aug 8, 2024
2 parents 1912224 + 790910c commit b716169
Show file tree
Hide file tree
Showing 27 changed files with 3,060 additions and 857 deletions.
2 changes: 2 additions & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ Thanks to the contributors who helped on this project apart from the authors
* [Chanukya Konuganti](https://www.linkedin.com/in/chanukyakonuganti/)
* [Maxim Mityutko](https://www.linkedin.com/in/mityutko/)
* [Raju Gujjalapati](https://in.linkedin.com/in/raju-gujjalapati-470a88171)
* [Madhusudan Koukutla](https://www.linkedin.com/in/madhusudan-reddy/)
* [Surya Teja Jagatha](https://www.linkedin.com/in/surya-teja-jagatha/)

# Honorary Mentions
Thanks to the team below for invaluable insights and support throughout the initial release of this project
Expand Down
2 changes: 2 additions & 0 deletions brickflow/engine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -1109,6 +1109,8 @@ def get_brickflow_libraries(enable_plugins: bool = False) -> List[TaskLibrary]:
PypiTaskLibrary("apache-airflow==2.7.3"),
PypiTaskLibrary("snowflake==0.6.0"),
PypiTaskLibrary("tableauserverclient==0.25"),
PypiTaskLibrary("boxsdk==3.9.2"),
PypiTaskLibrary("cerberus-python-client==2.5.4"),
MavenTaskLibrary("com.cronutils:cron-utils:9.2.0"),
]
else:
Expand Down
24 changes: 24 additions & 0 deletions brickflow/engine/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,30 @@ def _add_task(
else:
ensure_plugins = ensure_brickflow_plugins

# NOTE: REMOTE WORKSPACE RUN JOB OVERRIDE
# This is a temporary override for the RunJobTask because Databricks does not natively support
# triggering the job run in the remote workspace. By default, Databricks SDK derives the workspace URL
# from the runtime, and hence it is not required by the RunJobTask. The assumption is that if `host` parameter
# is set, user wants to trigger a remote job, in this case we set the task type to BRICKFLOW_TASK to
# enforce notebook type execution and replacing the original callable function with the RunJobInRemoteWorkspace
if task_type == TaskType.RUN_JOB_TASK:
func = f()
if func.host:
from brickflow_plugins.databricks.run_job import RunJobInRemoteWorkspace

task_type = TaskType.BRICKFLOW_TASK

def run_job_func() -> Callable:
# Using parameter values from the original RunJobTask
return RunJobInRemoteWorkspace(
job_name=func.job_name,
databricks_host=func.host,
databricks_token=func.token,
).execute()

f = run_job_func
# NOTE: END REMOTE WORKSPACE RUN JOB OVERRIDE

self.tasks[task_id] = Task(
task_id=task_id,
task_func=f,
Expand Down
8 changes: 8 additions & 0 deletions brickflow_plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ def setup_logger():
SnowflakeOperator,
UcToSnowflakeOperator,
)
from brickflow_plugins.databricks.box_operator import (
BoxToVolumesOperator,
VolumesToBoxOperator,
BoxOperator,
)


def load_plugins(cache_bust: Optional[pluggy.PluginManager] = None) -> None:
Expand Down Expand Up @@ -75,6 +80,9 @@ def ensure_installation():
"UcToSnowflakeOperator",
"TableauRefreshDataSourceOperator",
"TableauRefreshWorkBookOperator",
"BoxToVolumesOperator",
"VolumesToBoxOperator",
"BoxOperator",
"load_plugins",
"ensure_installation",
]
Loading

0 comments on commit b716169

Please sign in to comment.