Skip to content

Commit

Permalink
Add timeout parameter to workflow class (#83)
Browse files Browse the repository at this point in the history
* Add timeout parameter to workflow class

* Add warning if task timeout exceeds workflow timeout

* Change timeout_seconds from 'float' to 'int'

---------

Co-authored-by: Brent Johnson <[email protected]>
  • Loading branch information
brent-johnson and Brent Johnson authored Jan 25, 2024
1 parent 09a2af9 commit 548b6cb
Show file tree
Hide file tree
Showing 13 changed files with 48 additions and 4 deletions.
4 changes: 2 additions & 2 deletions brickflow/bundles/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -1525,7 +1525,7 @@ class Config:
...,
description='A unique name for the task. This field is used to refer to this task from other tasks.\nThis field is required and must be unique within its parent job.\nOn Update or Reset, this field is used to reference the tasks to be updated or reset.',
)
timeout_seconds: Optional[float] = Field(
timeout_seconds: Optional[int] = Field(
None,
description='An optional timeout applied to each run of this job task. A value of `0` means no timeout.',
)
Expand Down Expand Up @@ -1661,7 +1661,7 @@ class Config:
tasks: Optional[List[JobsTasks]] = Field(
None, description='A list of task specifications to be executed by this job.'
)
timeout_seconds: Optional[float] = Field(
timeout_seconds: Optional[int] = Field(
None,
description='An optional timeout applied to each run of this job. A value of `0` means no timeout.',
)
Expand Down
1 change: 1 addition & 0 deletions brickflow/codegen/databricks_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,7 @@ def proj_to_bundle(self) -> DatabricksAssetBundles:
permissions=self.workflow_obj_to_permissions(
workflow
), # will be none if not set
timeout_seconds=workflow.timeout_seconds,
email_notifications=workflow.email_notifications,
notification_settings=workflow.notification_settings,
webhook_notifications=workflow.webhook_notifications,
Expand Down
15 changes: 14 additions & 1 deletion brickflow/engine/workflow.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import abc
import logging
import functools
from dataclasses import dataclass, field
from typing import Callable, List, Optional, Dict, Union, Iterator, Any
Expand Down Expand Up @@ -117,8 +118,8 @@ class Workflow:
schedule_pause_status: str = "UNPAUSED"
default_cluster: Optional[Cluster] = None
clusters: List[Cluster] = field(default_factory=lambda: [])

health: Optional[List[JobsHealthRules]] = None
timeout_seconds: Optional[int] = None
default_task_settings: TaskSettings = TaskSettings()
email_notifications: Optional[WorkflowEmailNotifications] = None
webhook_notifications: Optional[WorkflowWebhookNotifications] = None
Expand Down Expand Up @@ -255,6 +256,13 @@ def pop_task(self, task_id: str) -> None:
def task_exists(self, task_id: str) -> bool:
return task_id in self.tasks

def log_timeout_warning(self, task_settings: TaskSettings) -> bool:
if task_settings is not None and self.timeout_seconds is not None:
if task_settings.timeout_seconds is not None:
if task_settings.timeout_seconds > self.timeout_seconds:
return True
return False

def _set_active_task(self, task_id: str) -> None:
self.active_task = task_id

Expand Down Expand Up @@ -301,6 +309,11 @@ def _add_task(
"Some how default cluster wasnt set please raise a github issue."
)

if self.log_timeout_warning(task_settings): # type: ignore
logging.warning(
"Task timeout_seconds should not exceed workflow timeout_seconds",
)

_libraries = libraries or [] + self.libraries
_depends_on = (
[depends_on]
Expand Down
1 change: 1 addition & 0 deletions docs/workflows.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def task_function(*, test="var"):
14. Define a workflow task and associate it to the workflow
15. Define the schedule pause status. It is defaulted to "UNPAUSED"
16. Define health check condition that triggers duration warning threshold exceeded notifications
17. Define timeout_seconds check condition that triggers workflow failure if duration exceeds threshold

### Clusters

Expand Down
1 change: 1 addition & 0 deletions tests/codegen/expected_bundles/dev_bundle_monorepo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ targets:
email_notifications: null
notification_settings: null
webhook_notifications: null
timeout_seconds: 42
health:
rules:
- metric: "RUN_DURATION_SECONDS"
Expand Down
1 change: 1 addition & 0 deletions tests/codegen/expected_bundles/dev_bundle_polyrepo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ targets:
email_notifications: null
notification_settings: null
webhook_notifications: null
timeout_seconds: 42
health:
rules:
- metric: "RUN_DURATION_SECONDS"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ targets:
email_notifications: null
notification_settings: null
webhook_notifications: null
timeout_seconds: 42
health:
rules:
- metric: "RUN_DURATION_SECONDS"
Expand Down
1 change: 1 addition & 0 deletions tests/codegen/expected_bundles/local_bundle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ targets:
email_notifications: null
notification_settings: null
webhook_notifications: null
timeout_seconds: 42
health:
rules:
- metric: "RUN_DURATION_SECONDS"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ targets:
email_notifications: null
notification_settings: null
webhook_notifications: null
timeout_seconds: 42
health:
rules:
- metric: "RUN_DURATION_SECONDS"
Expand Down
1 change: 1 addition & 0 deletions tests/codegen/sample_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
{"metric": "RUN_DURATION_SECONDS", "op": "GREATER_THAN", "value": 7200.0}
]
},
timeout_seconds=42,
)


Expand Down
1 change: 1 addition & 0 deletions tests/engine/sample_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
{"metric": "RUN_DURATION_SECONDS", "op": "GREATER_THAN", "value": 7200}
]
},
timeout_seconds=42,
)


Expand Down
21 changes: 20 additions & 1 deletion tests/engine/test_task.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
from collections import namedtuple
from unittest.mock import Mock, patch

Expand Down Expand Up @@ -97,7 +98,6 @@ def test_custom_task_params(self):
"all_tasks3": "123",
"test": "var",
}

wf.common_task_parameters = {}
assert wf.get_task(task_function_nokwargs.__name__).custom_task_parameters == {}

Expand Down Expand Up @@ -323,6 +323,25 @@ def test_task_settings_merge(self):
"retry_on_timeout": default_bool,
}

def test_task_settings_small_timeout(self):
small_to = TaskSettings(
timeout_seconds=30,
)
print(small_to.timeout_seconds)
assert not wf.log_timeout_warning(small_to)

def test_task_settings_big_timeout_warning(self):
big_to = TaskSettings(
timeout_seconds=datetime.timedelta(hours=0.5).seconds,
)
print(big_to.timeout_seconds)
assert wf.log_timeout_warning(big_to)

def test_task_settings_no_timeout_warning(self):
no_to = TaskSettings()
print(no_to.timeout_seconds)
assert not wf.log_timeout_warning(no_to)

def test_task_libraries(self):
s3_path = "s3://somepath-in-s3"
repo = "somerepo"
Expand Down
3 changes: 3 additions & 0 deletions tests/engine/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ def test_health_settings(self):
]
}

def test_timeout_seconds(self):
assert wf.timeout_seconds == 42

def test_user(self):
principal = "[email protected]"
u = User(principal)
Expand Down

0 comments on commit 548b6cb

Please sign in to comment.