Skip to content

Commit

Permalink
fix querying for runs on a tick that emits backfill IDs as well as ru…
Browse files Browse the repository at this point in the history
…ns (#27108)

Fix an issue we observed where ticks that emitted backfills were
producing an error.

## How I Tested These Changes
BK (new test case)

## Changelog
Fixed an issue where the tick timeline wouldn't load for an automation
condition sensor that emitted a backfill.
  • Loading branch information
gibsondan authored Jan 14, 2025
1 parent ef7725f commit 28bbf98
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
)
from dagster._core.storage.dagster_run import DagsterRun, RunsFilter
from dagster._core.storage.tags import REPOSITORY_LABEL_TAG, TagType, get_tag_type
from dagster._core.utils import is_valid_run_id
from dagster._core.workspace.permissions import Permissions
from dagster._utils.error import SerializableErrorInfo, serializable_error_info_from_exc_info
from dagster._utils.yaml_utils import dump_run_config_yaml
Expand Down Expand Up @@ -284,7 +285,11 @@ def resolve_runs(self, graphene_info: ResolveInfo):
from dagster_graphql.schema.pipelines.pipeline import GrapheneRun

instance = graphene_info.context.instance
run_ids = self._tick.origin_run_ids or self._tick.run_ids
run_ids = self._tick.origin_run_ids or self._tick.run_ids or []

# filter out backfills
run_ids = [run_id for run_id in run_ids if is_valid_run_id(run_id)]

if not run_ids:
return []

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
)
from dagster._core.test_utils import SingleThreadPoolExecutor, freeze_time, wait_for_futures
from dagster._core.types.loadable_target_origin import LoadableTargetOrigin
from dagster._core.utils import make_new_backfill_id
from dagster._core.workspace.context import WorkspaceRequestContext
from dagster._daemon import get_default_daemon_logger
from dagster._daemon.sensor import execute_sensor_iteration
Expand Down Expand Up @@ -478,6 +479,9 @@
tickId
status
timestamp
runs {
id
}
}
}
}
Expand Down Expand Up @@ -1409,6 +1413,12 @@ def test_sensor_ticks_filtered(graphql_context: WorkspaceRequestContext):
with freeze_time(now):
_create_tick(graphql_context) # create a success tick

success_tick_id = graphql_context.instance.get_ticks(
origin_id=sensor.get_remote_origin().get_id(),
selector_id=sensor.get_remote_origin().get_selector().get_id(),
limit=1,
)[0].tick_id

# create a started tick
started_tick_id, _ = graphql_context.instance.create_tick(
TickData(
Expand Down Expand Up @@ -1519,6 +1529,34 @@ def test_sensor_ticks_filtered(graphql_context: WorkspaceRequestContext):
assert selected_sensor
assert len(selected_sensor["sensorState"]["ticks"]) == 2

backfill_id = make_new_backfill_id()
graphql_context.instance.create_tick(
TickData(
instigator_origin_id=sensor.get_remote_origin().get_id(),
instigator_name=sensor_name,
instigator_type=InstigatorType.SENSOR,
status=TickStatus.SUCCESS,
timestamp=now.timestamp(),
selector_id=sensor.selector_id,
origin_run_ids=[backfill_id],
run_ids=[backfill_id],
)
)

result = execute_dagster_graphql(
graphql_context,
GET_TICKS_QUERY,
variables={
"sensorSelector": sensor_selector,
"statuses": ["SUCCESS"],
"tickId": str(success_tick_id),
},
)
assert len(result.data["sensorOrError"]["sensorState"]["ticks"]) == 2
assert any(
len(tick["runs"]) > 0 for tick in result.data["sensorOrError"]["sensorState"]["ticks"]
)


def _get_unloadable_sensor_origin(name):
working_directory = os.path.dirname(__file__)
Expand Down
8 changes: 8 additions & 0 deletions python_modules/dagster/dagster/_core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ def make_new_run_id() -> str:
return str(uuid.uuid4())


def is_valid_run_id(run_id: str):
try:
uuid.UUID(run_id, version=4)
return True
except ValueError:
return False


def make_new_backfill_id() -> str:
return "".join(random.choice(string.ascii_lowercase) for x in range(BACKFILL_TAG_LENGTH))

Expand Down

0 comments on commit 28bbf98

Please sign in to comment.