Skip to content

Commit

Permalink
Feature/runtime workflow tags (#149)
Browse files Browse the repository at this point in the history
* run time tags

* add support for runtime tags

* add deployed at as default tag

* better ptach fixtures

* fix selected workflows

* update docs

* trim the tags before applying

* fix pylint

* fix pylint
  • Loading branch information
pariksheet authored Aug 26, 2024
1 parent 7eebb2c commit b2052df
Show file tree
Hide file tree
Showing 13 changed files with 95 additions and 7 deletions.
1 change: 1 addition & 0 deletions brickflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class BrickflowEnvVars(Enum):
BRICKFLOW_AUTO_ADD_LIBRARIES = "BRICKFLOW_AUTO_ADD_LIBRARIES"
BRICKFLOW_USE_PROJECT_NAME = "BRICKFLOW_USE_PROJECT_NAME" # for projects which injects project name to cli context
BRICKFLOW_PROJECT_PARAMS = "BRICKFLOW_PROJECT_PARAMS"
BRICKFLOW_PROJECT_TAGS = "BRICKFLOW_PROJECT_TAGS"


def env_chain(env_var: str, dbx_get_param: str, default: Optional[str] = None) -> str:
Expand Down
9 changes: 9 additions & 0 deletions brickflow/cli/projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,15 @@ def apply_bundles_deployment_options(
help="""Provide the runtime key-value parameters, each key-value separated by space!
Example: bf projects deploy -p DEFAULT -e local -kv key1=value1 -kv key2=value2""",
),
"--tag": click.option(
"--tag",
"-t",
type=str,
multiple=True,
callback=bind_env_var(BrickflowEnvVars.BRICKFLOW_PROJECT_TAGS.value),
help="""Provide the runtime key-value tags, each key-value separated by space!
Example: bf projects deploy -p DEFAULT -e local -t t_key1=value1 -kv t_key2=value2""",
),
}

def _apply_bundles_deployment_options(func: Callable) -> Callable[..., Any]:
Expand Down
1 change: 1 addition & 0 deletions brickflow/codegen/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def synth(self) -> None:
class DatabricksDefaultClusterTagKeys(Enum):
ENVIRONMENT = "environment"
DEPLOYED_BY = "deployed_by"
DEPLOYED_AT = "deployed_at"
BRICKFLOW_PROJECT_NAME = "brickflow_project_name"
BRICKFLOW_DEPLOYMENT_MODE = "brickflow_deployment_mode"
DATABRICKS_TF_PROVIDER_VERSION = "databricks_tf_provider_version"
Expand Down
28 changes: 23 additions & 5 deletions brickflow/codegen/databricks_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ def _get_default_tags(self, ci: CodegenInterface) -> Dict[str, str]:
return {
DatabricksDefaultClusterTagKeys.ENVIRONMENT.value: ctx.env,
DatabricksDefaultClusterTagKeys.DEPLOYED_BY.value: self._get_current_user_alphanumeric(),
DatabricksDefaultClusterTagKeys.DEPLOYED_AT.value: str(
ctx.get_current_timestamp()
),
DatabricksDefaultClusterTagKeys.BRICKFLOW_PROJECT_NAME.value: ci.project.name,
DatabricksDefaultClusterTagKeys.BRICKFLOW_DEPLOYMENT_MODE.value: "Databricks Asset Bundles",
DatabricksDefaultClusterTagKeys.BRICKFLOW_VERSION.value: get_brickflow_version(),
Expand All @@ -112,9 +115,21 @@ def _mutate_jobs(self, resource: Resources, ci: CodegenInterface) -> Resources:
# set correct names
job.name = self._rewrite_name(job.name)
# set tags
job.tags = {**self._get_default_tags(ci), **(job.tags or {})}
job.tags = {
**self._get_default_tags(ci),
**self._get_runtime_tags(),
**(job.tags or {}),
}
return resource

@staticmethod
def _get_runtime_tags() -> Dict[str, str]:
project_tags = os.environ.get(BrickflowEnvVars.BRICKFLOW_PROJECT_TAGS.value)
if project_tags:
tags = dict(tag.split("=") for tag in project_tags.split(","))
return {k.strip(): v.strip() for (k, v) in tags.items()}
return {}

def _mutate_pipelines(self, resource: Resources, ci: CodegenInterface) -> Resources:
if resource.pipelines is not None:
for pipeline in resource.pipelines.values():
Expand All @@ -125,6 +140,7 @@ def _mutate_pipelines(self, resource: Resources, ci: CodegenInterface) -> Resour
# set correct tags
cluster.custom_tags = {
**self._get_default_tags(ci),
**self._get_runtime_tags(),
**(cluster.custom_tags or {}),
}
return resource
Expand Down Expand Up @@ -699,10 +715,10 @@ def proj_to_bundle(self) -> DatabricksAssetBundles:
pipelines = {} # noqa

selected_workflows = (
os.getenv(BrickflowEnvVars.BRICKFLOW_DEPLOY_ONLY_WORKFLOWS.value, "").split(
","
)
if BrickflowEnvVars.BRICKFLOW_DEPLOY_ONLY_WORKFLOWS.value in os.environ
str(
os.getenv(BrickflowEnvVars.BRICKFLOW_DEPLOY_ONLY_WORKFLOWS.value)
).split(",")
if os.getenv(BrickflowEnvVars.BRICKFLOW_DEPLOY_ONLY_WORKFLOWS.value)
else []
)

Expand All @@ -719,12 +735,14 @@ def proj_to_bundle(self) -> DatabricksAssetBundles:
selected_workflows,
)
continue

git_ref = self.project.git_reference or ""
ref_type = git_ref.split("/", maxsplit=1)[0]
ref_type = (
ref_type if ref_type.startswith("git_") else f"git_{ref_type}"
) # handle git_branch and git_tag
ref_value = "/".join(git_ref.split("/")[1:])

# only add git source if not local
git_conf = (
JobsGitSource(
Expand Down
5 changes: 5 additions & 0 deletions brickflow/context/context.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import base64
import time

import binascii
import copy
import functools
Expand Down Expand Up @@ -175,6 +177,9 @@ def _configure(self) -> None:
def current_task(self) -> Optional[str]:
return self._current_task

def get_current_timestamp(self) -> int:
return int(time.time() * 1000)

def _set_current_task(self, task_key: str) -> None:
self._current_task = task_key

Expand Down
21 changes: 21 additions & 0 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,27 @@ ctx.get_project_parameter("key1") ## returns value1
ctx.get_project_parameter("key2") ## returns value2
```

# Runtime project tags

This allows for passing runtime tags to be applied to workflows.
After deployment, the tags will be applied to the workflows in the Databricks workspace.

### Environment Variable
The tags are passed as a comma separated list when setting through environment variable.
The key-value pairs are separated by an equal sign.

```shell
export BRICKFLOW_PROJECT_TAGS="tag1=value1,tag2=value2"
bf projects deploy -p DEFAULT -e local
```

### CLI

Provide the runtime key-value tags, each key-value separated by space when using CLI.

```shell
bf projects deploy -p DEFAULT -e local --tag tag1=value1 --tag tag2=value2
```

# Workflow prefixing or suffixing

Expand Down
1 change: 1 addition & 0 deletions tests/codegen/expected_bundles/dev_bundle_monorepo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ targets:
brickflow_project_name: test-project
brickflow_deployment_mode: Databricks Asset Bundles
deployed_by: test_user
deployed_at: "1704067200000"
environment: dev
test: test2
brickflow_version: 1.0.0
Expand Down
1 change: 1 addition & 0 deletions tests/codegen/expected_bundles/dev_bundle_polyrepo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ targets:
brickflow_project_name: test-project
brickflow_deployment_mode: Databricks Asset Bundles
deployed_by: test_user
deployed_at: "1704067200000"
environment: dev
test: test2
brickflow_version: 1.0.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ targets:
brickflow_deployment_mode: Databricks Asset Bundles
brickflow_project_name: test-project
brickflow_version: 1.0.0
deployed_at: "1704067200000"
deployed_by: test_user
environment: dev
tasks:
Expand Down Expand Up @@ -113,6 +114,7 @@ targets:
brickflow_project_name: test-project
brickflow_deployment_mode: Databricks Asset Bundles
deployed_by: test_user
deployed_at: "1704067200000"
brickflow_version: 1.0.0
environment: dev
test: test2
Expand Down
5 changes: 4 additions & 1 deletion tests/codegen/expected_bundles/local_bundle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@
"brickflow_project_name": "test-project"
"brickflow_version": "1.0.0"
"deployed_by": "test_user"
"deployed_at": "1704067200000"
"environment": "local"
"tag1": "value1"
"tag2": "value2"
"test": "test2"
"tasks":
- "condition_task":
Expand Down Expand Up @@ -465,4 +468,4 @@
"file_path": "/Users/${workspace.current_user.userName}/.brickflow_bundles/test-project/local/files"
"root_path": "/Users/${workspace.current_user.userName}/.brickflow_bundles/test-project/local"
"state_path": "/Users/${workspace.current_user.userName}/.brickflow_bundles/test-project/local/state"
"workspace": {}
"workspace": {}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"brickflow_project_name": "test-project"
"brickflow_version": "1.0.0"
"deployed_by": "test_user"
"deployed_at": "1704067200000"
"environment": "local"
"test": "test2"
"tasks":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ targets:
brickflow_project_name: test-project
brickflow_deployment_mode: Databricks Asset Bundles
deployed_by: test_user
deployed_at: "1704067200000"
brickflow_version: 1.0.0
environment: local
test: test2
Expand Down
26 changes: 25 additions & 1 deletion tests/codegen/test_databricks_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,17 @@ class TestBundleCodegen:
BrickflowEnvVars.BRICKFLOW_ENV.value: "local",
BrickflowEnvVars.BRICKFLOW_DEPLOYMENT_MODE.value: BrickflowDeployMode.BUNDLE.value,
BrickflowEnvVars.BRICKFLOW_PROJECT_PARAMS.value: "k1=v1,k2=v2",
BrickflowEnvVars.BRICKFLOW_PROJECT_TAGS.value: "tag1 = value1, tag2 =value2 ", # spaces will be trimmed
},
)
@patch("brickflow.engine.task.get_job_id", return_value=12345678901234.0)
@patch("subprocess.check_output")
@patch("brickflow.context.ctx.get_parameter")
@patch("importlib.metadata.version")
@patch(
"brickflow.context.ctx.get_current_timestamp",
MagicMock(return_value=1704067200000),
)
def test_generate_bundle_local(
self,
bf_version_mock: Mock,
Expand Down Expand Up @@ -137,6 +142,10 @@ def test_generate_bundle_local(
@patch("subprocess.check_output")
@patch("brickflow.context.ctx.get_parameter")
@patch("importlib.metadata.version")
@patch(
"brickflow.context.ctx.get_current_timestamp",
MagicMock(return_value=1704067200000),
)
def test_generate_bundle_local_prefix_suffix(
self,
bf_version_mock: Mock,
Expand Down Expand Up @@ -187,6 +196,10 @@ def test_generate_bundle_local_prefix_suffix(
@patch("subprocess.check_output")
@patch("brickflow.context.ctx.get_parameter")
@patch("importlib.metadata.version")
@patch(
"brickflow.context.ctx.get_current_timestamp",
MagicMock(return_value=1704067200000),
)
def test_generate_bundle_dev(
self,
bf_version_mock: Mock,
Expand All @@ -201,7 +214,6 @@ def test_generate_bundle_dev(
sub_proc_mock.return_value = git_ref_b
bf_version_mock.return_value = "1.0.0"
get_job_id_mock.return_value = 12345678901234.0

workspace_client = get_workspace_client_mock()

# get caller part breaks here
Expand Down Expand Up @@ -244,6 +256,10 @@ def test_generate_bundle_dev(
@patch("subprocess.check_output")
@patch("brickflow.context.ctx.get_parameter")
@patch("importlib.metadata.version")
@patch(
"brickflow.context.ctx.get_current_timestamp",
MagicMock(return_value=1704067200000),
)
def test_generate_bundle_dev_auto_add_libs(
self,
bf_version_mock: Mock,
Expand Down Expand Up @@ -313,6 +329,10 @@ def some_task():
@patch("subprocess.check_output")
@patch("brickflow.context.ctx.get_parameter")
@patch("importlib.metadata.version")
@patch(
"brickflow.context.ctx.get_current_timestamp",
MagicMock(return_value=1704067200000),
)
def test_generate_bundle_dev_monorepo(
self,
bf_version_mock: Mock,
Expand Down Expand Up @@ -467,6 +487,10 @@ def test_import_blocks(self):
@patch("subprocess.check_output")
@patch("brickflow.context.ctx.get_parameter")
@patch("importlib.metadata.version")
@patch(
"brickflow.context.ctx.get_current_timestamp",
MagicMock(return_value=1704067200000),
)
def test_schedule_continuous(
self,
bf_version_mock: Mock,
Expand Down

0 comments on commit b2052df

Please sign in to comment.