From b2052dff3db91545645097af88515b34c008a45d Mon Sep 17 00:00:00 2001 From: Pari Date: Mon, 26 Aug 2024 15:52:24 +0200 Subject: [PATCH] Feature/runtime workflow tags (#149) * run time tags * add support for runtime tags * add deployed at as default tag * better ptach fixtures * fix selected workflows * update docs * trim the tags before applying * fix pylint * fix pylint --- brickflow/__init__.py | 1 + brickflow/cli/projects.py | 9 ++++++ brickflow/codegen/__init__.py | 1 + brickflow/codegen/databricks_bundle.py | 28 +++++++++++++++---- brickflow/context/context.py | 5 ++++ docs/environment-variables.md | 21 ++++++++++++++ .../expected_bundles/dev_bundle_monorepo.yml | 1 + .../expected_bundles/dev_bundle_polyrepo.yml | 1 + .../dev_bundle_polyrepo_with_auto_libs.yml | 2 ++ .../codegen/expected_bundles/local_bundle.yml | 5 +++- .../local_bundle_continuous_schedule.yml | 1 + .../local_bundle_prefix_suffix.yml | 1 + tests/codegen/test_databricks_bundle.py | 26 ++++++++++++++++- 13 files changed, 95 insertions(+), 7 deletions(-) diff --git a/brickflow/__init__.py b/brickflow/__init__.py index 9d70d5f1..ff6f6227 100644 --- a/brickflow/__init__.py +++ b/brickflow/__init__.py @@ -74,6 +74,7 @@ class BrickflowEnvVars(Enum): BRICKFLOW_AUTO_ADD_LIBRARIES = "BRICKFLOW_AUTO_ADD_LIBRARIES" BRICKFLOW_USE_PROJECT_NAME = "BRICKFLOW_USE_PROJECT_NAME" # for projects which injects project name to cli context BRICKFLOW_PROJECT_PARAMS = "BRICKFLOW_PROJECT_PARAMS" + BRICKFLOW_PROJECT_TAGS = "BRICKFLOW_PROJECT_TAGS" def env_chain(env_var: str, dbx_get_param: str, default: Optional[str] = None) -> str: diff --git a/brickflow/cli/projects.py b/brickflow/cli/projects.py index 8a72fedb..1a60aa49 100644 --- a/brickflow/cli/projects.py +++ b/brickflow/cli/projects.py @@ -495,6 +495,15 @@ def apply_bundles_deployment_options( help="""Provide the runtime key-value parameters, each key-value separated by space! Example: bf projects deploy -p DEFAULT -e local -kv key1=value1 -kv key2=value2""", ), + "--tag": click.option( + "--tag", + "-t", + type=str, + multiple=True, + callback=bind_env_var(BrickflowEnvVars.BRICKFLOW_PROJECT_TAGS.value), + help="""Provide the runtime key-value tags, each key-value separated by space! + Example: bf projects deploy -p DEFAULT -e local -t t_key1=value1 -kv t_key2=value2""", + ), } def _apply_bundles_deployment_options(func: Callable) -> Callable[..., Any]: diff --git a/brickflow/codegen/__init__.py b/brickflow/codegen/__init__.py index 6ac8abfc..00b459c4 100644 --- a/brickflow/codegen/__init__.py +++ b/brickflow/codegen/__init__.py @@ -28,6 +28,7 @@ def synth(self) -> None: class DatabricksDefaultClusterTagKeys(Enum): ENVIRONMENT = "environment" DEPLOYED_BY = "deployed_by" + DEPLOYED_AT = "deployed_at" BRICKFLOW_PROJECT_NAME = "brickflow_project_name" BRICKFLOW_DEPLOYMENT_MODE = "brickflow_deployment_mode" DATABRICKS_TF_PROVIDER_VERSION = "databricks_tf_provider_version" diff --git a/brickflow/codegen/databricks_bundle.py b/brickflow/codegen/databricks_bundle.py index c14d1fe6..5e225342 100644 --- a/brickflow/codegen/databricks_bundle.py +++ b/brickflow/codegen/databricks_bundle.py @@ -95,6 +95,9 @@ def _get_default_tags(self, ci: CodegenInterface) -> Dict[str, str]: return { DatabricksDefaultClusterTagKeys.ENVIRONMENT.value: ctx.env, DatabricksDefaultClusterTagKeys.DEPLOYED_BY.value: self._get_current_user_alphanumeric(), + DatabricksDefaultClusterTagKeys.DEPLOYED_AT.value: str( + ctx.get_current_timestamp() + ), DatabricksDefaultClusterTagKeys.BRICKFLOW_PROJECT_NAME.value: ci.project.name, DatabricksDefaultClusterTagKeys.BRICKFLOW_DEPLOYMENT_MODE.value: "Databricks Asset Bundles", DatabricksDefaultClusterTagKeys.BRICKFLOW_VERSION.value: get_brickflow_version(), @@ -112,9 +115,21 @@ def _mutate_jobs(self, resource: Resources, ci: CodegenInterface) -> Resources: # set correct names job.name = self._rewrite_name(job.name) # set tags - job.tags = {**self._get_default_tags(ci), **(job.tags or {})} + job.tags = { + **self._get_default_tags(ci), + **self._get_runtime_tags(), + **(job.tags or {}), + } return resource + @staticmethod + def _get_runtime_tags() -> Dict[str, str]: + project_tags = os.environ.get(BrickflowEnvVars.BRICKFLOW_PROJECT_TAGS.value) + if project_tags: + tags = dict(tag.split("=") for tag in project_tags.split(",")) + return {k.strip(): v.strip() for (k, v) in tags.items()} + return {} + def _mutate_pipelines(self, resource: Resources, ci: CodegenInterface) -> Resources: if resource.pipelines is not None: for pipeline in resource.pipelines.values(): @@ -125,6 +140,7 @@ def _mutate_pipelines(self, resource: Resources, ci: CodegenInterface) -> Resour # set correct tags cluster.custom_tags = { **self._get_default_tags(ci), + **self._get_runtime_tags(), **(cluster.custom_tags or {}), } return resource @@ -699,10 +715,10 @@ def proj_to_bundle(self) -> DatabricksAssetBundles: pipelines = {} # noqa selected_workflows = ( - os.getenv(BrickflowEnvVars.BRICKFLOW_DEPLOY_ONLY_WORKFLOWS.value, "").split( - "," - ) - if BrickflowEnvVars.BRICKFLOW_DEPLOY_ONLY_WORKFLOWS.value in os.environ + str( + os.getenv(BrickflowEnvVars.BRICKFLOW_DEPLOY_ONLY_WORKFLOWS.value) + ).split(",") + if os.getenv(BrickflowEnvVars.BRICKFLOW_DEPLOY_ONLY_WORKFLOWS.value) else [] ) @@ -719,12 +735,14 @@ def proj_to_bundle(self) -> DatabricksAssetBundles: selected_workflows, ) continue + git_ref = self.project.git_reference or "" ref_type = git_ref.split("/", maxsplit=1)[0] ref_type = ( ref_type if ref_type.startswith("git_") else f"git_{ref_type}" ) # handle git_branch and git_tag ref_value = "/".join(git_ref.split("/")[1:]) + # only add git source if not local git_conf = ( JobsGitSource( diff --git a/brickflow/context/context.py b/brickflow/context/context.py index 75adccf5..b5d21745 100644 --- a/brickflow/context/context.py +++ b/brickflow/context/context.py @@ -1,4 +1,6 @@ import base64 +import time + import binascii import copy import functools @@ -175,6 +177,9 @@ def _configure(self) -> None: def current_task(self) -> Optional[str]: return self._current_task + def get_current_timestamp(self) -> int: + return int(time.time() * 1000) + def _set_current_task(self, task_key: str) -> None: self._current_task = task_key diff --git a/docs/environment-variables.md b/docs/environment-variables.md index 2d481e71..6f681bc2 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -56,6 +56,27 @@ ctx.get_project_parameter("key1") ## returns value1 ctx.get_project_parameter("key2") ## returns value2 ``` +# Runtime project tags + +This allows for passing runtime tags to be applied to workflows. +After deployment, the tags will be applied to the workflows in the Databricks workspace. + +### Environment Variable +The tags are passed as a comma separated list when setting through environment variable. +The key-value pairs are separated by an equal sign. + +```shell +export BRICKFLOW_PROJECT_TAGS="tag1=value1,tag2=value2" +bf projects deploy -p DEFAULT -e local +``` + +### CLI + +Provide the runtime key-value tags, each key-value separated by space when using CLI. + +```shell +bf projects deploy -p DEFAULT -e local --tag tag1=value1 --tag tag2=value2 +``` # Workflow prefixing or suffixing diff --git a/tests/codegen/expected_bundles/dev_bundle_monorepo.yml b/tests/codegen/expected_bundles/dev_bundle_monorepo.yml index cde3f6d7..4616fb0e 100644 --- a/tests/codegen/expected_bundles/dev_bundle_monorepo.yml +++ b/tests/codegen/expected_bundles/dev_bundle_monorepo.yml @@ -44,6 +44,7 @@ targets: brickflow_project_name: test-project brickflow_deployment_mode: Databricks Asset Bundles deployed_by: test_user + deployed_at: "1704067200000" environment: dev test: test2 brickflow_version: 1.0.0 diff --git a/tests/codegen/expected_bundles/dev_bundle_polyrepo.yml b/tests/codegen/expected_bundles/dev_bundle_polyrepo.yml index 215e0d87..475cd09d 100644 --- a/tests/codegen/expected_bundles/dev_bundle_polyrepo.yml +++ b/tests/codegen/expected_bundles/dev_bundle_polyrepo.yml @@ -44,6 +44,7 @@ targets: brickflow_project_name: test-project brickflow_deployment_mode: Databricks Asset Bundles deployed_by: test_user + deployed_at: "1704067200000" environment: dev test: test2 brickflow_version: 1.0.0 diff --git a/tests/codegen/expected_bundles/dev_bundle_polyrepo_with_auto_libs.yml b/tests/codegen/expected_bundles/dev_bundle_polyrepo_with_auto_libs.yml index 05361499..a26d8100 100644 --- a/tests/codegen/expected_bundles/dev_bundle_polyrepo_with_auto_libs.yml +++ b/tests/codegen/expected_bundles/dev_bundle_polyrepo_with_auto_libs.yml @@ -22,6 +22,7 @@ targets: brickflow_deployment_mode: Databricks Asset Bundles brickflow_project_name: test-project brickflow_version: 1.0.0 + deployed_at: "1704067200000" deployed_by: test_user environment: dev tasks: @@ -113,6 +114,7 @@ targets: brickflow_project_name: test-project brickflow_deployment_mode: Databricks Asset Bundles deployed_by: test_user + deployed_at: "1704067200000" brickflow_version: 1.0.0 environment: dev test: test2 diff --git a/tests/codegen/expected_bundles/local_bundle.yml b/tests/codegen/expected_bundles/local_bundle.yml index e80ed638..931af1f5 100644 --- a/tests/codegen/expected_bundles/local_bundle.yml +++ b/tests/codegen/expected_bundles/local_bundle.yml @@ -37,7 +37,10 @@ "brickflow_project_name": "test-project" "brickflow_version": "1.0.0" "deployed_by": "test_user" + "deployed_at": "1704067200000" "environment": "local" + "tag1": "value1" + "tag2": "value2" "test": "test2" "tasks": - "condition_task": @@ -465,4 +468,4 @@ "file_path": "/Users/${workspace.current_user.userName}/.brickflow_bundles/test-project/local/files" "root_path": "/Users/${workspace.current_user.userName}/.brickflow_bundles/test-project/local" "state_path": "/Users/${workspace.current_user.userName}/.brickflow_bundles/test-project/local/state" -"workspace": {} \ No newline at end of file +"workspace": {} diff --git a/tests/codegen/expected_bundles/local_bundle_continuous_schedule.yml b/tests/codegen/expected_bundles/local_bundle_continuous_schedule.yml index 07a86638..d6dd8b3c 100644 --- a/tests/codegen/expected_bundles/local_bundle_continuous_schedule.yml +++ b/tests/codegen/expected_bundles/local_bundle_continuous_schedule.yml @@ -35,6 +35,7 @@ "brickflow_project_name": "test-project" "brickflow_version": "1.0.0" "deployed_by": "test_user" + "deployed_at": "1704067200000" "environment": "local" "test": "test2" "tasks": diff --git a/tests/codegen/expected_bundles/local_bundle_prefix_suffix.yml b/tests/codegen/expected_bundles/local_bundle_prefix_suffix.yml index 000290c1..17c2284f 100644 --- a/tests/codegen/expected_bundles/local_bundle_prefix_suffix.yml +++ b/tests/codegen/expected_bundles/local_bundle_prefix_suffix.yml @@ -41,6 +41,7 @@ targets: brickflow_project_name: test-project brickflow_deployment_mode: Databricks Asset Bundles deployed_by: test_user + deployed_at: "1704067200000" brickflow_version: 1.0.0 environment: local test: test2 diff --git a/tests/codegen/test_databricks_bundle.py b/tests/codegen/test_databricks_bundle.py index a9eb33c6..e867e57e 100644 --- a/tests/codegen/test_databricks_bundle.py +++ b/tests/codegen/test_databricks_bundle.py @@ -79,12 +79,17 @@ class TestBundleCodegen: BrickflowEnvVars.BRICKFLOW_ENV.value: "local", BrickflowEnvVars.BRICKFLOW_DEPLOYMENT_MODE.value: BrickflowDeployMode.BUNDLE.value, BrickflowEnvVars.BRICKFLOW_PROJECT_PARAMS.value: "k1=v1,k2=v2", + BrickflowEnvVars.BRICKFLOW_PROJECT_TAGS.value: "tag1 = value1, tag2 =value2 ", # spaces will be trimmed }, ) @patch("brickflow.engine.task.get_job_id", return_value=12345678901234.0) @patch("subprocess.check_output") @patch("brickflow.context.ctx.get_parameter") @patch("importlib.metadata.version") + @patch( + "brickflow.context.ctx.get_current_timestamp", + MagicMock(return_value=1704067200000), + ) def test_generate_bundle_local( self, bf_version_mock: Mock, @@ -137,6 +142,10 @@ def test_generate_bundle_local( @patch("subprocess.check_output") @patch("brickflow.context.ctx.get_parameter") @patch("importlib.metadata.version") + @patch( + "brickflow.context.ctx.get_current_timestamp", + MagicMock(return_value=1704067200000), + ) def test_generate_bundle_local_prefix_suffix( self, bf_version_mock: Mock, @@ -187,6 +196,10 @@ def test_generate_bundle_local_prefix_suffix( @patch("subprocess.check_output") @patch("brickflow.context.ctx.get_parameter") @patch("importlib.metadata.version") + @patch( + "brickflow.context.ctx.get_current_timestamp", + MagicMock(return_value=1704067200000), + ) def test_generate_bundle_dev( self, bf_version_mock: Mock, @@ -201,7 +214,6 @@ def test_generate_bundle_dev( sub_proc_mock.return_value = git_ref_b bf_version_mock.return_value = "1.0.0" get_job_id_mock.return_value = 12345678901234.0 - workspace_client = get_workspace_client_mock() # get caller part breaks here @@ -244,6 +256,10 @@ def test_generate_bundle_dev( @patch("subprocess.check_output") @patch("brickflow.context.ctx.get_parameter") @patch("importlib.metadata.version") + @patch( + "brickflow.context.ctx.get_current_timestamp", + MagicMock(return_value=1704067200000), + ) def test_generate_bundle_dev_auto_add_libs( self, bf_version_mock: Mock, @@ -313,6 +329,10 @@ def some_task(): @patch("subprocess.check_output") @patch("brickflow.context.ctx.get_parameter") @patch("importlib.metadata.version") + @patch( + "brickflow.context.ctx.get_current_timestamp", + MagicMock(return_value=1704067200000), + ) def test_generate_bundle_dev_monorepo( self, bf_version_mock: Mock, @@ -467,6 +487,10 @@ def test_import_blocks(self): @patch("subprocess.check_output") @patch("brickflow.context.ctx.get_parameter") @patch("importlib.metadata.version") + @patch( + "brickflow.context.ctx.get_current_timestamp", + MagicMock(return_value=1704067200000), + ) def test_schedule_continuous( self, bf_version_mock: Mock,