From 4af2c27656fd53e095385451d77800c83857697e Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Thu, 14 Nov 2024 03:14:48 +0000 Subject: [PATCH] Refactor DagRun tracing into `_trace_dagrun` helper method (#44008) related: https://github.com/apache/airflow/issues/43789 Changes: This commit - extracts the tracing logic from the `update_state` method in `DagRun` to a new helper method `_trace_dagrun`. - preserves types in some cases like: int and bool. Otel Attributes can be str, bool, int, float, Sequence[str], Sequence[bool], Sequence[int], Sequence[float] --- airflow/jobs/scheduler_job_runner.py | 1 - airflow/models/dagrun.py | 59 ++++++++++++++-------------- 2 files changed, 30 insertions(+), 30 deletions(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 65ef7c828cc0..7a82fb2c9b22 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -859,7 +859,6 @@ def process_executor_events( @classmethod def _set_span_attrs__process_executor_events(cls, span, state, ti): - # Use span.set_attributes span.set_attributes( { "category": "scheduler", diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 635cd73ccd8d..ccc4832f5fb6 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -1020,35 +1020,7 @@ def recalculate(self) -> _UnfinishedStates: dagv.version if dagv else None, ) - with Trace.start_span_from_dagrun(dagrun=self) as span: - if self._state is DagRunState.FAILED: - span.set_attribute("error", True) - attributes = { - "category": "DAG runs", - "dag_id": str(self.dag_id), - "execution_date": str(self.execution_date), - "run_id": str(self.run_id), - "queued_at": str(self.queued_at), - "run_start_date": str(self.start_date), - "run_end_date": str(self.end_date), - "run_duration": str( - (self.end_date - self.start_date).total_seconds() - if self.start_date and self.end_date - else 0 - ), - "state": str(self._state), - "external_trigger": str(self.external_trigger), - "run_type": str(self.run_type), - "data_interval_start": str(self.data_interval_start), - "data_interval_end": str(self.data_interval_end), - "dag_version": str(dagv.version if dagv else None), - "conf": str(self.conf), - } - if span.is_recording(): - span.add_event(name="queued", timestamp=datetime_to_nano(self.queued_at)) - span.add_event(name="started", timestamp=datetime_to_nano(self.start_date)) - span.add_event(name="ended", timestamp=datetime_to_nano(self.end_date)) - span.set_attributes(attributes) + self._trace_dagrun(dagv) session.flush() @@ -1060,6 +1032,35 @@ def recalculate(self) -> _UnfinishedStates: return schedulable_tis, callback + def _trace_dagrun(self, dagv) -> None: + with Trace.start_span_from_dagrun(dagrun=self) as span: + if self._state == DagRunState.FAILED: + span.set_attribute("error", True) + attributes = { + "category": "DAG runs", + "dag_id": self.dag_id, + "execution_date": str(self.execution_date), + "run_id": self.run_id, + "queued_at": str(self.queued_at), + "run_start_date": str(self.start_date), + "run_end_date": str(self.end_date), + "run_duration": (self.end_date - self.start_date).total_seconds() + if self.start_date and self.end_date + else 0, + "state": str(self._state), + "external_trigger": self.external_trigger, + "run_type": str(self.run_type), + "data_interval_start": str(self.data_interval_start), + "data_interval_end": str(self.data_interval_end), + "dag_version": str(dagv.version if dagv else None), + "conf": str(self.conf), + } + if span.is_recording(): + span.add_event(name="queued", timestamp=datetime_to_nano(self.queued_at)) + span.add_event(name="started", timestamp=datetime_to_nano(self.start_date)) + span.add_event(name="ended", timestamp=datetime_to_nano(self.end_date)) + span.set_attributes(attributes) + @provide_session def task_instance_scheduling_decisions(self, session: Session = NEW_SESSION) -> TISchedulingDecision: tis = self.get_task_instances(session=session, state=State.task_states)