Skip to content

Commit

Permalink
Align timers and timing metrics (ms) across all metrics loggers (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
dirrao authored Sep 11, 2024
1 parent 303db58 commit 08daffe
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 13 deletions.
11 changes: 11 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,17 @@ metrics:
example: "\"scheduler,executor,dagrun,pool,triggerer,celery\"
or \"^scheduler,^executor,heartbeat|timeout\""
default: ""
metrics_consistency_on:
description: |
Enables metrics consistency across all metrics loggers (ex: timer and timing metrics).
.. warning::
It is enabled by default from Airflow 3.
version_added: 2.10.0
type: string
example: ~
default: "True"
statsd_on:
description: |
Enables sending metrics to StatsD.
Expand Down
15 changes: 14 additions & 1 deletion airflow/metrics/datadog_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import datetime
import logging
import warnings
from typing import TYPE_CHECKING

from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.metrics.protocols import Timer
from airflow.metrics.validators import (
AllowListValidator,
Expand All @@ -40,6 +42,14 @@

log = logging.getLogger(__name__)

metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True)
if not metrics_consistency_on:
warnings.warn(
"Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)


class SafeDogStatsdLogger:
"""DogStatsd Logger."""
Expand Down Expand Up @@ -134,7 +144,10 @@ def timing(
tags_list = []
if self.metrics_validator.test(stat):
if isinstance(dt, datetime.timedelta):
dt = dt.total_seconds()
if metrics_consistency_on:
dt = dt.total_seconds() * 1000.0
else:
dt = dt.total_seconds()
return self.dogstatsd.timing(metric=stat, value=dt, tags=tags_list)
return None

Expand Down
14 changes: 13 additions & 1 deletion airflow/metrics/otel_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from opentelemetry.sdk.resources import SERVICE_NAME, Resource

from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.metrics.protocols import Timer
from airflow.metrics.validators import (
OTEL_NAME_MAX_LENGTH,
Expand Down Expand Up @@ -71,6 +72,14 @@
# Delimiter is placed between the universal metric prefix and the unique metric name.
DEFAULT_METRIC_NAME_DELIMITER = "."

metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True)
if not metrics_consistency_on:
warnings.warn(
"Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)


def full_name(name: str, *, prefix: str = DEFAULT_METRIC_NAME_PREFIX) -> str:
"""Assembles the prefix, delimiter, and name and returns it as a string."""
Expand Down Expand Up @@ -274,7 +283,10 @@ def timing(
"""OTel does not have a native timer, stored as a Gauge whose value is number of seconds elapsed."""
if self.metrics_validator.test(stat) and name_is_otel_safe(self.prefix, stat):
if isinstance(dt, datetime.timedelta):
dt = dt.total_seconds()
if metrics_consistency_on:
dt = dt.total_seconds() * 1000.0
else:
dt = dt.total_seconds()
self.metrics_map.set_gauge_value(full_name(prefix=self.prefix, name=stat), float(dt), False, tags)

def timer(
Expand Down
16 changes: 15 additions & 1 deletion airflow/metrics/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,23 @@

import datetime
import time
import warnings
from typing import Union

from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.typing_compat import Protocol

DeltaType = Union[int, float, datetime.timedelta]

metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True)
if not metrics_consistency_on:
warnings.warn(
"Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)


class TimerProtocol(Protocol):
"""Type protocol for StatsLogger.timer."""
Expand Down Expand Up @@ -116,6 +127,9 @@ def start(self) -> Timer:
def stop(self, send: bool = True) -> None:
"""Stop the timer, and optionally send it to stats backend."""
if self._start_time is not None:
self.duration = time.perf_counter() - self._start_time
if metrics_consistency_on:
self.duration = 1000.0 * (time.perf_counter() - self._start_time) # Convert to milliseconds.
else:
self.duration = time.perf_counter() - self._start_time
if send and self.real_timer:
self.real_timer.stop()
19 changes: 17 additions & 2 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
from airflow.exceptions import (
AirflowException,
AirflowFailException,
AirflowProviderDeprecationWarning,
AirflowRescheduleException,
AirflowSensorTimeout,
AirflowSkipException,
Expand Down Expand Up @@ -168,6 +169,14 @@

PAST_DEPENDS_MET = "past_depends_met"

metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True)
if not metrics_consistency_on:
warnings.warn(
"Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)


class TaskReturnCode(Enum):
"""
Expand Down Expand Up @@ -2809,7 +2818,10 @@ def emit_state_change_metric(self, new_state: TaskInstanceState) -> None:
self.task_id,
)
return
timing = timezone.utcnow() - self.queued_dttm
if metrics_consistency_on:
timing = timezone.utcnow() - self.queued_dttm
else:
timing = (timezone.utcnow() - self.queued_dttm).total_seconds()
elif new_state == TaskInstanceState.QUEUED:
metric_name = "scheduled_duration"
if self.start_date is None:
Expand All @@ -2822,7 +2834,10 @@ def emit_state_change_metric(self, new_state: TaskInstanceState) -> None:
self.task_id,
)
return
timing = timezone.utcnow() - self.start_date
if metrics_consistency_on:
timing = timezone.utcnow() - self.start_date
else:
timing = (timezone.utcnow() - self.start_date).total_seconds()
else:
raise NotImplementedError("no metric emission setup for state %s", new_state)

Expand Down
1 change: 1 addition & 0 deletions newsfragments/39908.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Publishing timer and timing metrics in seconds has been deprecated. In Airflow 3, ``metrics_consistency_on`` from ``metrics`` is enabled by default. You can disable this for backward compatibility. To publish all timer and timing metrics in milliseconds, ensure metrics consistency is enabled
5 changes: 5 additions & 0 deletions tests/_internals/forbidden_warnings.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ def pytest_itemcollected(self, item: pytest.Item):
# Add marker at the beginning of the markers list. In this case, it does not conflict with
# filterwarnings markers, which are set explicitly in the test suite.
item.add_marker(pytest.mark.filterwarnings(f"error::{fw}"), append=False)
item.add_marker(
pytest.mark.filterwarnings(
"ignore:Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.:DeprecationWarning"
)
)

@pytest.hookimpl(hookwrapper=True, trylast=True)
def pytest_sessionfinish(self, session: pytest.Session, exitstatus: int):
Expand Down
25 changes: 21 additions & 4 deletions tests/core/test_otel_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from opentelemetry.metrics import MeterProvider

from airflow.exceptions import InvalidStatsNameException
from airflow.metrics import otel_logger, protocols
from airflow.metrics.otel_logger import (
OTEL_NAME_MAX_LENGTH,
UP_DOWN_COUNTERS,
Expand Down Expand Up @@ -234,12 +235,22 @@ def test_gauge_value_is_correct(self, name):

assert self.map[full_name(name)].value == 1

def test_timing_new_metric(self, name):
self.stats.timing(name, dt=123)
@pytest.mark.parametrize(
"metrics_consistency_on",
[True, False],
)
def test_timing_new_metric(self, metrics_consistency_on, name):
import datetime

otel_logger.metrics_consistency_on = metrics_consistency_on

self.stats.timing(name, dt=datetime.timedelta(seconds=123))

self.meter.get_meter().create_observable_gauge.assert_called_once_with(
name=full_name(name), callbacks=ANY
)
expected_value = 123000.0 if metrics_consistency_on else 123
assert self.map[full_name(name)].value == expected_value

def test_timing_new_metric_with_tags(self, name):
tags = {"hello": "world"}
Expand All @@ -265,13 +276,19 @@ def test_timing_existing_metric(self, name):
# time.perf_count() is called once to get the starting timestamp and again
# to get the end timestamp. timer() should return the difference as a float.

@pytest.mark.parametrize(
"metrics_consistency_on",
[True, False],
)
@mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
def test_timer_with_name_returns_float_and_stores_value(self, mock_time, name):
def test_timer_with_name_returns_float_and_stores_value(self, mock_time, metrics_consistency_on, name):
protocols.metrics_consistency_on = metrics_consistency_on
with self.stats.timer(name) as timer:
pass

assert isinstance(timer.duration, float)
assert timer.duration == 3.14
expected_duration = 3140.0 if metrics_consistency_on else 3.14
assert timer.duration == expected_duration
assert mock_time.call_count == 2
self.meter.get_meter().create_observable_gauge.assert_called_once_with(
name=full_name(name), callbacks=ANY
Expand Down
30 changes: 26 additions & 4 deletions tests/core/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import importlib
import logging
import re
import time
from unittest import mock
from unittest.mock import Mock

Expand All @@ -28,6 +29,7 @@

import airflow
from airflow.exceptions import AirflowConfigException, InvalidStatsNameException, RemovedInAirflow3Warning
from airflow.metrics import datadog_logger, protocols
from airflow.metrics.datadog_logger import SafeDogStatsdLogger
from airflow.metrics.statsd_logger import SafeStatsdLogger
from airflow.metrics.validators import (
Expand Down Expand Up @@ -224,24 +226,44 @@ def test_does_send_stats_using_dogstatsd_when_statsd_and_dogstatsd_both_on(self)
metric="empty_key", sample_rate=1, tags=[], value=1
)

def test_timer(self):
with self.dogstatsd.timer("empty_timer"):
@pytest.mark.parametrize(
"metrics_consistency_on",
[True, False],
)
@mock.patch.object(time, "perf_counter", side_effect=[0.0, 100.0])
def test_timer(self, time_mock, metrics_consistency_on):
protocols.metrics_consistency_on = metrics_consistency_on

with self.dogstatsd.timer("empty_timer") as timer:
pass
self.dogstatsd_client.timed.assert_called_once_with("empty_timer", tags=[])
expected_duration = 100.0
if metrics_consistency_on:
expected_duration = 1000.0 * 100.0
assert expected_duration == timer.duration
assert time_mock.call_count == 2

def test_empty_timer(self):
with self.dogstatsd.timer():
pass
self.dogstatsd_client.timed.assert_not_called()

def test_timing(self):
@pytest.mark.parametrize(
"metrics_consistency_on",
[True, False],
)
def test_timing(self, metrics_consistency_on):
import datetime

datadog_logger.metrics_consistency_on = metrics_consistency_on

self.dogstatsd.timing("empty_timer", 123)
self.dogstatsd_client.timing.assert_called_once_with(metric="empty_timer", value=123, tags=[])

self.dogstatsd.timing("empty_timer", datetime.timedelta(seconds=123))
self.dogstatsd_client.timing.assert_called_with(metric="empty_timer", value=123.0, tags=[])
self.dogstatsd_client.timing.assert_called_with(
metric="empty_timer", value=123000.0 if metrics_consistency_on else 123.0, tags=[]
)

def test_gauge(self):
self.dogstatsd.gauge("empty", 123)
Expand Down

0 comments on commit 08daffe

Please sign in to comment.