Skip to content

Commit

Permalink
Fix duplication of Task tries in the UI (apache#43891) (apache#43950)
Browse files Browse the repository at this point in the history
It was observed that there are moments where the TI tries endpoint returns duplicate
TaskInstance. I have observed this to happen when the TI is in up_for_retry state.

When the TI is in up_for_retry state, we have already recorded the previous try
in TI history and the TI try_number has not incremented at this time, so we must
exclude this recorded TI from the taskinstance tries endpoint. We know the TI because
its state is in up_for_retry, so we filter TIs with up_for_retry state when querying
for the task instance tries.

Closes: apache#41765
(cherry picked from commit 4bc1257)
  • Loading branch information
ephraimbuddy authored Nov 13, 2024
1 parent 376773a commit 510b968
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 1 deletion.
7 changes: 6 additions & 1 deletion airflow/api_connexion/endpoints/task_instance_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,12 @@ def _query(orm_object):
)
return query

task_instances = session.scalars(_query(TIH)).all() + session.scalars(_query(TI)).all()
# Exclude TaskInstance with state UP_FOR_RETRY since they have been recorded in TaskInstanceHistory
tis = session.scalars(
_query(TI).where(or_(TI.state != TaskInstanceState.UP_FOR_RETRY, TI.state.is_(None)))
).all()

task_instances = session.scalars(_query(TIH)).all() + tis
return task_instance_history_collection_schema.dump(
TaskInstanceHistoryCollection(task_instances=task_instances, total_entries=len(task_instances))
)
Expand Down
17 changes: 17 additions & 0 deletions tests/api_connexion/endpoints/test_task_instance_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -3013,6 +3013,23 @@ def test_should_respond_200(self, session):
assert response.json["total_entries"] == 2 # The task instance and its history
assert len(response.json["task_instances"]) == 2

def test_ti_in_retry_state_not_returned(self, session):
self.create_task_instances(
session=session, task_instances=[{"state": State.SUCCESS}], with_ti_history=True
)
ti = session.query(TaskInstance).one()
ti.state = State.UP_FOR_RETRY
session.merge(ti)
session.commit()

response = self.client.get(
"/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries",
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 200
assert response.json["total_entries"] == 1
assert len(response.json["task_instances"]) == 1

def test_mapped_task_should_respond_200(self, session):
tis = self.create_task_instances(session, task_instances=[{"state": State.FAILED}])
old_ti = tis[0]
Expand Down

0 comments on commit 510b968

Please sign in to comment.