Skip to content

Commit

Permalink
Feature support job schedule pause status (#42)
Browse files Browse the repository at this point in the history
* Adding new argument schedule_pause_status to the Workflow object

* Updating docs

* Updating tests
  • Loading branch information
asingamaneni authored Sep 16, 2023
1 parent 21ad028 commit 5a10093
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 0 deletions.
1 change: 1 addition & 0 deletions brickflow/codegen/databricks_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ def workflow_obj_to_schedule(workflow: Workflow) -> Optional[JobsSchedule]:
return JobsSchedule(
quartz_cron_expression=workflow.schedule_quartz_expression,
timezone_id=workflow.timezone,
pause_status=workflow.schedule_pause_status,
)
return None

Expand Down
12 changes: 12 additions & 0 deletions brickflow/engine/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
from brickflow.engine.utils import wraps_keyerror


class WorkflowConfigError(Exception):
pass


class NoWorkflowComputeError(Exception):
pass

Expand Down Expand Up @@ -109,6 +113,7 @@ class Workflow:
_name: str
schedule_quartz_expression: Optional[str] = None
timezone: str = "UTC"
schedule_pause_status: str = "UNPAUSED"
default_cluster: Optional[Cluster] = None
clusters: List[Cluster] = field(default_factory=lambda: [])
default_task_settings: TaskSettings = TaskSettings()
Expand Down Expand Up @@ -155,6 +160,13 @@ def __post_init__(self) -> None:
# the default cluster is set to the first cluster if it is not configured
self.default_cluster = self.clusters[0]

self.schedule_pause_status = self.schedule_pause_status.upper()
allowed_scheduled_pause_statuses = ["PAUSED", "UNPAUSED"]
if self.schedule_pause_status not in allowed_scheduled_pause_statuses:
raise WorkflowConfigError(
f"schedule_pause_status must be one of {allowed_scheduled_pause_statuses}"
)

# def __hash__(self) -> int:
# import json
#
Expand Down
2 changes: 2 additions & 0 deletions docs/workflows.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ wf = Workflow( # (1)!
# Optional parameters below
schedule_quartz_expression="0 0/20 0 ? * * *", # (4)!
timezone="UTC", # (5)!
schedule_pause_status="PAUSED", # (15)!
default_task_settings=TaskSettings( # (6)!
email_notifications=EmailNotifications(
on_start=["[email protected]"],
Expand Down Expand Up @@ -65,6 +66,7 @@ def task_function(*, test="var"):
12. Suffix for the name of the workflow
13. Define the common task parameters that can be used in all the tasks
14. Define a workflow task and associate it to the workflow
15. Define the schedule pause status. It is defaulted to "UNPAUSED"

### Clusters

Expand Down
1 change: 1 addition & 0 deletions tests/codegen/expected_bundles/dev_bundle_monorepo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ environments:
schedule:
quartz_cron_expression: '* * * * *'
timezone_id: UTC
pause_status: "UNPAUSED"
tags:
brickflow_project_name: test-project
brickflow_deployment_mode: Databricks Asset Bundles
Expand Down
1 change: 1 addition & 0 deletions tests/codegen/expected_bundles/dev_bundle_polyrepo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ environments:
schedule:
quartz_cron_expression: '* * * * *'
timezone_id: UTC
pause_status: "UNPAUSED"
tags:
brickflow_project_name: test-project
brickflow_deployment_mode: Databricks Asset Bundles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ environments:
schedule:
quartz_cron_expression: '* * * * *'
timezone_id: UTC
pause_status: "UNPAUSED"
tags:
brickflow_project_name: test-project
brickflow_deployment_mode: Databricks Asset Bundles
Expand Down
1 change: 1 addition & 0 deletions tests/codegen/expected_bundles/local_bundle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ environments:
schedule:
quartz_cron_expression: '* * * * *'
timezone_id: UTC
pause_status: "UNPAUSED"
tags:
brickflow_project_name: test-project
brickflow_deployment_mode: Databricks Asset Bundles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ environments:
schedule:
quartz_cron_expression: '* * * * *'
timezone_id: UTC
pause_status: "UNPAUSED"
tags:
brickflow_project_name: test-project
brickflow_deployment_mode: Databricks Asset Bundles
Expand Down
29 changes: 29 additions & 0 deletions tests/engine/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
ServicePrincipal,
Workflow,
NoWorkflowComputeError,
WorkflowConfigError,
)
from tests.engine.sample_workflow import wf, task_function

Expand Down Expand Up @@ -226,3 +227,31 @@ def test_another_workflow(self):

assert len(wf1.graph.nodes) == 2
assert len(wf.graph.nodes) == 10

def test_schedule_run_status_workflow(self):
this_wf = Workflow("test", clusters=[Cluster("name", "spark", "vm-node")])
assert this_wf.schedule_pause_status == "UNPAUSED"

this_wf = Workflow(
"test",
clusters=[Cluster("name", "spark", "vm-node")],
schedule_pause_status="PAUSED",
)
assert this_wf.schedule_pause_status == "PAUSED"

this_wf = Workflow(
"test",
clusters=[Cluster("name", "spark", "vm-node")],
schedule_pause_status="paused",
)
assert this_wf.schedule_pause_status == "PAUSED"

with pytest.raises(WorkflowConfigError) as excinfo:
Workflow(
"test",
clusters=[Cluster("name", "spark", "vm-node")],
schedule_pause_status="invalid",
)
assert "schedule_pause_status must be one of ['PAUSED', 'UNPAUSED']" == str(
excinfo.value
)

0 comments on commit 5a10093

Please sign in to comment.