diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 2e624db82736..9c9f7c2153c2 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1106,6 +1106,17 @@ metrics: example: "\"scheduler,executor,dagrun,pool,triggerer,celery\" or \"^scheduler,^executor,heartbeat|timeout\"" default: "" + metrics_consistency_on: + description: | + Enables metrics consistency across all metrics loggers (ex: timer and timing metrics). + + .. warning:: + + It is enabled by default from Airflow 3. + version_added: 2.10.0 + type: string + example: ~ + default: "True" statsd_on: description: | Enables sending metrics to StatsD. diff --git a/airflow/metrics/datadog_logger.py b/airflow/metrics/datadog_logger.py index 156407977305..c7bcf1986d85 100644 --- a/airflow/metrics/datadog_logger.py +++ b/airflow/metrics/datadog_logger.py @@ -19,9 +19,11 @@ import datetime import logging +import warnings from typing import TYPE_CHECKING from airflow.configuration import conf +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.metrics.protocols import Timer from airflow.metrics.validators import ( AllowListValidator, @@ -40,6 +42,14 @@ log = logging.getLogger(__name__) +metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True) +if not metrics_consistency_on: + warnings.warn( + "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) + class SafeDogStatsdLogger: """DogStatsd Logger.""" @@ -134,7 +144,10 @@ def timing( tags_list = [] if self.metrics_validator.test(stat): if isinstance(dt, datetime.timedelta): - dt = dt.total_seconds() + if metrics_consistency_on: + dt = dt.total_seconds() * 1000.0 + else: + dt = dt.total_seconds() return self.dogstatsd.timing(metric=stat, value=dt, tags=tags_list) return None diff --git a/airflow/metrics/otel_logger.py b/airflow/metrics/otel_logger.py index 5dac960c169a..e8d0f54d7328 100644 --- a/airflow/metrics/otel_logger.py +++ b/airflow/metrics/otel_logger.py @@ -31,6 +31,7 @@ from opentelemetry.sdk.resources import SERVICE_NAME, Resource from airflow.configuration import conf +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.metrics.protocols import Timer from airflow.metrics.validators import ( OTEL_NAME_MAX_LENGTH, @@ -71,6 +72,14 @@ # Delimiter is placed between the universal metric prefix and the unique metric name. DEFAULT_METRIC_NAME_DELIMITER = "." +metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True) +if not metrics_consistency_on: + warnings.warn( + "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) + def full_name(name: str, *, prefix: str = DEFAULT_METRIC_NAME_PREFIX) -> str: """Assembles the prefix, delimiter, and name and returns it as a string.""" @@ -274,7 +283,10 @@ def timing( """OTel does not have a native timer, stored as a Gauge whose value is number of seconds elapsed.""" if self.metrics_validator.test(stat) and name_is_otel_safe(self.prefix, stat): if isinstance(dt, datetime.timedelta): - dt = dt.total_seconds() + if metrics_consistency_on: + dt = dt.total_seconds() * 1000.0 + else: + dt = dt.total_seconds() self.metrics_map.set_gauge_value(full_name(prefix=self.prefix, name=stat), float(dt), False, tags) def timer( diff --git a/airflow/metrics/protocols.py b/airflow/metrics/protocols.py index c46942ce95f7..7eef7929e02d 100644 --- a/airflow/metrics/protocols.py +++ b/airflow/metrics/protocols.py @@ -19,12 +19,23 @@ import datetime import time +import warnings from typing import Union +from airflow.configuration import conf +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.typing_compat import Protocol DeltaType = Union[int, float, datetime.timedelta] +metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True) +if not metrics_consistency_on: + warnings.warn( + "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) + class TimerProtocol(Protocol): """Type protocol for StatsLogger.timer.""" @@ -116,6 +127,9 @@ def start(self) -> Timer: def stop(self, send: bool = True) -> None: """Stop the timer, and optionally send it to stats backend.""" if self._start_time is not None: - self.duration = time.perf_counter() - self._start_time + if metrics_consistency_on: + self.duration = 1000.0 * (time.perf_counter() - self._start_time) # Convert to milliseconds. + else: + self.duration = time.perf_counter() - self._start_time if send and self.real_timer: self.real_timer.stop() diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 165f5c798730..5f82d84fe5c6 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -74,6 +74,7 @@ from airflow.exceptions import ( AirflowException, AirflowFailException, + AirflowProviderDeprecationWarning, AirflowRescheduleException, AirflowSensorTimeout, AirflowSkipException, @@ -168,6 +169,14 @@ PAST_DEPENDS_MET = "past_depends_met" +metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True) +if not metrics_consistency_on: + warnings.warn( + "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) + class TaskReturnCode(Enum): """ @@ -2809,7 +2818,10 @@ def emit_state_change_metric(self, new_state: TaskInstanceState) -> None: self.task_id, ) return - timing = timezone.utcnow() - self.queued_dttm + if metrics_consistency_on: + timing = timezone.utcnow() - self.queued_dttm + else: + timing = (timezone.utcnow() - self.queued_dttm).total_seconds() elif new_state == TaskInstanceState.QUEUED: metric_name = "scheduled_duration" if self.start_date is None: @@ -2822,7 +2834,10 @@ def emit_state_change_metric(self, new_state: TaskInstanceState) -> None: self.task_id, ) return - timing = timezone.utcnow() - self.start_date + if metrics_consistency_on: + timing = timezone.utcnow() - self.start_date + else: + timing = (timezone.utcnow() - self.start_date).total_seconds() else: raise NotImplementedError("no metric emission setup for state %s", new_state) diff --git a/newsfragments/39908.significant.rst b/newsfragments/39908.significant.rst new file mode 100644 index 000000000000..bd4a2967ba4f --- /dev/null +++ b/newsfragments/39908.significant.rst @@ -0,0 +1 @@ +Publishing timer and timing metrics in seconds has been deprecated. In Airflow 3, ``metrics_consistency_on`` from ``metrics`` is enabled by default. You can disable this for backward compatibility. To publish all timer and timing metrics in milliseconds, ensure metrics consistency is enabled diff --git a/tests/_internals/forbidden_warnings.py b/tests/_internals/forbidden_warnings.py index c78e4b0333f7..324d2ff6f982 100644 --- a/tests/_internals/forbidden_warnings.py +++ b/tests/_internals/forbidden_warnings.py @@ -62,6 +62,11 @@ def pytest_itemcollected(self, item: pytest.Item): # Add marker at the beginning of the markers list. In this case, it does not conflict with # filterwarnings markers, which are set explicitly in the test suite. item.add_marker(pytest.mark.filterwarnings(f"error::{fw}"), append=False) + item.add_marker( + pytest.mark.filterwarnings( + "ignore:Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.:DeprecationWarning" + ) + ) @pytest.hookimpl(hookwrapper=True, trylast=True) def pytest_sessionfinish(self, session: pytest.Session, exitstatus: int): diff --git a/tests/core/test_otel_logger.py b/tests/core/test_otel_logger.py index 6cba116f652b..d5697e585b45 100644 --- a/tests/core/test_otel_logger.py +++ b/tests/core/test_otel_logger.py @@ -25,6 +25,7 @@ from opentelemetry.metrics import MeterProvider from airflow.exceptions import InvalidStatsNameException +from airflow.metrics import otel_logger, protocols from airflow.metrics.otel_logger import ( OTEL_NAME_MAX_LENGTH, UP_DOWN_COUNTERS, @@ -234,12 +235,22 @@ def test_gauge_value_is_correct(self, name): assert self.map[full_name(name)].value == 1 - def test_timing_new_metric(self, name): - self.stats.timing(name, dt=123) + @pytest.mark.parametrize( + "metrics_consistency_on", + [True, False], + ) + def test_timing_new_metric(self, metrics_consistency_on, name): + import datetime + + otel_logger.metrics_consistency_on = metrics_consistency_on + + self.stats.timing(name, dt=datetime.timedelta(seconds=123)) self.meter.get_meter().create_observable_gauge.assert_called_once_with( name=full_name(name), callbacks=ANY ) + expected_value = 123000.0 if metrics_consistency_on else 123 + assert self.map[full_name(name)].value == expected_value def test_timing_new_metric_with_tags(self, name): tags = {"hello": "world"} @@ -265,13 +276,19 @@ def test_timing_existing_metric(self, name): # time.perf_count() is called once to get the starting timestamp and again # to get the end timestamp. timer() should return the difference as a float. + @pytest.mark.parametrize( + "metrics_consistency_on", + [True, False], + ) @mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14]) - def test_timer_with_name_returns_float_and_stores_value(self, mock_time, name): + def test_timer_with_name_returns_float_and_stores_value(self, mock_time, metrics_consistency_on, name): + protocols.metrics_consistency_on = metrics_consistency_on with self.stats.timer(name) as timer: pass assert isinstance(timer.duration, float) - assert timer.duration == 3.14 + expected_duration = 3140.0 if metrics_consistency_on else 3.14 + assert timer.duration == expected_duration assert mock_time.call_count == 2 self.meter.get_meter().create_observable_gauge.assert_called_once_with( name=full_name(name), callbacks=ANY diff --git a/tests/core/test_stats.py b/tests/core/test_stats.py index 902a0ed0037f..5127b95927a8 100644 --- a/tests/core/test_stats.py +++ b/tests/core/test_stats.py @@ -20,6 +20,7 @@ import importlib import logging import re +import time from unittest import mock from unittest.mock import Mock @@ -28,6 +29,7 @@ import airflow from airflow.exceptions import AirflowConfigException, InvalidStatsNameException, RemovedInAirflow3Warning +from airflow.metrics import datadog_logger, protocols from airflow.metrics.datadog_logger import SafeDogStatsdLogger from airflow.metrics.statsd_logger import SafeStatsdLogger from airflow.metrics.validators import ( @@ -224,24 +226,44 @@ def test_does_send_stats_using_dogstatsd_when_statsd_and_dogstatsd_both_on(self) metric="empty_key", sample_rate=1, tags=[], value=1 ) - def test_timer(self): - with self.dogstatsd.timer("empty_timer"): + @pytest.mark.parametrize( + "metrics_consistency_on", + [True, False], + ) + @mock.patch.object(time, "perf_counter", side_effect=[0.0, 100.0]) + def test_timer(self, time_mock, metrics_consistency_on): + protocols.metrics_consistency_on = metrics_consistency_on + + with self.dogstatsd.timer("empty_timer") as timer: pass self.dogstatsd_client.timed.assert_called_once_with("empty_timer", tags=[]) + expected_duration = 100.0 + if metrics_consistency_on: + expected_duration = 1000.0 * 100.0 + assert expected_duration == timer.duration + assert time_mock.call_count == 2 def test_empty_timer(self): with self.dogstatsd.timer(): pass self.dogstatsd_client.timed.assert_not_called() - def test_timing(self): + @pytest.mark.parametrize( + "metrics_consistency_on", + [True, False], + ) + def test_timing(self, metrics_consistency_on): import datetime + datadog_logger.metrics_consistency_on = metrics_consistency_on + self.dogstatsd.timing("empty_timer", 123) self.dogstatsd_client.timing.assert_called_once_with(metric="empty_timer", value=123, tags=[]) self.dogstatsd.timing("empty_timer", datetime.timedelta(seconds=123)) - self.dogstatsd_client.timing.assert_called_with(metric="empty_timer", value=123.0, tags=[]) + self.dogstatsd_client.timing.assert_called_with( + metric="empty_timer", value=123000.0 if metrics_consistency_on else 123.0, tags=[] + ) def test_gauge(self): self.dogstatsd.gauge("empty", 123)