Skip to content

Commit

Permalink
fix(llmobs): propagate distributed headers via signal dispatching [ba…
Browse files Browse the repository at this point in the history
…ckport #12089] (#12134)

Backports #12089 to 2.20.

Note that we had to manually cherry-pick from #12089 to account for
#11952 not being backported.

This PR makes a change to our shared distributed tracing header
injection method to dispatch signals/events instead of relying on the
global config settings, which is only modifiable via env vars. This
fixes distributed tracing for users that might rely solely on the
`LLMObs.enable()` setup config.

Programmatic `LLMObs.enable()/disable()` calls do not set the global
`config._llmobs_enabled` boolean setting, which is only controlled by
the `DD_LLMOBS_ENABLED` env var. This was problematic for users that
relied on manual `LLMObs.enable()` setup (i.e. no env vars) because our
distributed tracing injection code only checks the global config to
inject llmobs parent IDs into request headers. If users manually enabled
LLMObs without any env vars, then this would not be reflected in the
global config value and thus LLMObs parent IDs would never be injected
into the request headers.

We can't check directly if LLMObs is enabled in the http injection
module because:
1. This would require us to import significant product-specific
LLMObs-code into the shared http injector helper module which would
impact non-LLMObs users' app performance
2. Circular imports in LLMObs which imports http injector logic to use
in its own helpers

Instead of doing our check based on the global `config._llmobs_enabled`
setting, we now send a tracing event to our shared product listeners,
and register a corresponding `LLMObs._inject_llmobs_context()` hook to
be called for all inject() calls if LLMObs is enabled (we check the
LLMObs instance, not the global config setting value).

~One risk and why I don't like changing global config settings is
because this then implies that it is no longer global or tied to an env
var (I want to push for env var configuration where possible over manual
overriding/enabling). If a global enabled config can be toggled
indiscriminately then this could open a can of worms for
enabling/disabling logic in our LLMObs service, which isn't really
designed to be toggled on/off multiple times in the app's lifespan.
However if some users cannot rely on env vars, then I don't see any
other solution that does not couple tracer internal code with LLMObs
code which is a no-option.~ (UPDATE: we avoided this issue by using
signal dispatching)

# Checklist

- [x] PR author has checked that all the criteria below are met
- The PR description includes an overview of the change
- The PR description articulates the motivation for the change
- The change includes tests OR the PR description describes a testing
strategy
- The PR description notes risks associated with the change, if any
- Newly-added code is easy to change
- The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- The change includes or references documentation updates if necessary
- Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

# Reviewer Checklist 

- [x] Reviewer has checked that all the criteria below are met
- Title is accurate
- All changes are related to the pull request's stated goal
- Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- Testing strategy adequately addresses listed risks
- Newly-added code is easy to change
- Release note makes sense to a user of the library
- If necessary, author has acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment
- Backport labels are set in a manner that is consistent with the
[release branch maintenance

policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)
  • Loading branch information
Yun-Kim authored Jan 30, 2025
1 parent f6d814c commit bca45d4
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 54 deletions.
16 changes: 11 additions & 5 deletions ddtrace/llmobs/_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,10 +278,6 @@ def _start_service(self) -> None:
log.debug("Error starting evaluator runner")

def _stop_service(self) -> None:
# Remove listener hooks for span events
core.reset_listeners("trace.span_start", self._on_span_start)
core.reset_listeners("trace.span_finish", self._on_span_finish)

try:
self._evaluator_runner.stop()
# flush remaining evaluation spans & evaluations
Expand All @@ -296,6 +292,11 @@ def _stop_service(self) -> None:
except ServiceStatusError:
log.debug("Error stopping LLMObs writers")

# Remove listener hooks for span events
core.reset_listeners("trace.span_start", self._on_span_start)
core.reset_listeners("trace.span_finish", self._on_span_finish)
core.reset_listeners("http.span_inject", self._inject_llmobs_context)

forksafe.unregister(self._child_after_fork)

@classmethod
Expand Down Expand Up @@ -379,6 +380,7 @@ def enable(
# Register hooks for span events
core.on("trace.span_start", cls._instance._on_span_start)
core.on("trace.span_finish", cls._instance._on_span_finish)
core.on("http.span_inject", cls._instance._inject_llmobs_context)

atexit.register(cls.disable)
telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.LLMOBS, True)
Expand Down Expand Up @@ -1163,6 +1165,11 @@ def submit_evaluation(

cls._instance._llmobs_eval_metric_writer.enqueue(evaluation_metric)

def _inject_llmobs_context(self, span_context: Context, request_headers: Dict[str, str]) -> None:
if self.enabled is False:
return
_inject_llmobs_parent_id(span_context)

@classmethod
def inject_distributed_headers(cls, request_headers: Dict[str, str], span: Optional[Span] = None) -> Dict[str, str]:
"""Injects the span's distributed context into the given request headers."""
Expand All @@ -1180,7 +1187,6 @@ def inject_distributed_headers(cls, request_headers: Dict[str, str], span: Optio
if span is None:
log.warning("No span provided and no currently active span found.")
return request_headers
_inject_llmobs_parent_id(span.context)
HTTPPropagator.inject(span.context, request_headers)
return request_headers

Expand Down
7 changes: 2 additions & 5 deletions ddtrace/propagation/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from ddtrace._trace.span import _get_64_lowest_order_bits_as_int
from ddtrace._trace.span import _MetaDictType
from ddtrace.appsec._constants import APPSEC
from ddtrace.internal.core import dispatch
from ddtrace.settings.asm import config as asm_config

from ..constants import AUTO_KEEP
Expand Down Expand Up @@ -1052,6 +1053,7 @@ def parent_call():
:param dict headers: HTTP headers to extend with tracing attributes.
:param Span non_active_span: Only to be used if injecting a non-active span.
"""
dispatch("http.span_inject", (span_context, headers))
if not config._propagation_style_inject:
return
if non_active_span is not None and non_active_span.context is not span_context:
Expand Down Expand Up @@ -1089,11 +1091,6 @@ def parent_call():
for key in span_context._baggage:
headers[_HTTP_BAGGAGE_PREFIX + key] = span_context._baggage[key]

if config._llmobs_enabled:
from ddtrace.llmobs._utils import _inject_llmobs_parent_id

_inject_llmobs_parent_id(span_context)

if PROPAGATION_STYLE_DATADOG in config._propagation_style_inject:
_DatadogMultiHeader._inject(span_context, headers)
if PROPAGATION_STYLE_B3_MULTI in config._propagation_style_inject:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
fixes:
- |
LLM Observability: Resolves an issue where explicitly only using ``LLMObs.enable()`` to configure LLM Observability
without environment variables would not automatically propagate distributed tracing headers.
1 change: 0 additions & 1 deletion tests/llmobs/test_llmobs_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ def test_service_enable_proxy_default():
assert llmobs_instance.tracer == dummy_tracer
assert isinstance(llmobs_instance._llmobs_span_writer._clients[0], LLMObsProxiedEventClient)
assert run_llmobs_trace_filter(dummy_tracer) is not None

llmobs_service.disable()


Expand Down
54 changes: 35 additions & 19 deletions tests/llmobs/test_propagation.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,24 @@ def test_propagate_correct_llmobs_parent_id_simple(run_python_code_in_subprocess
"""
code = """
import json
import mock
from ddtrace import tracer
from ddtrace.ext import SpanTypes
from ddtrace.internal.utils.http import Response
from ddtrace.llmobs import LLMObs
from ddtrace.propagation.http import HTTPPropagator
with tracer.trace("LLMObs span", span_type=SpanTypes.LLM) as root_span:
with tracer.trace("Non-LLMObs span") as child_span:
headers = {"_DD_LLMOBS_SPAN_ID": str(root_span.span_id)}
HTTPPropagator.inject(child_span.context, headers)
with mock.patch(
"ddtrace.internal.writer.HTTPWriter._send_payload", return_value=Response(status=200, body="{}"),
):
LLMObs.enable(ml_app="test-app", api_key="<not-a-real-key>", agentless_enabled=True)
with LLMObs.workflow("LLMObs span") as root_span:
with LLMObs._instance.tracer.trace("Non-LLMObs span") as child_span:
headers = {"_DD_LLMOBS_SPAN_ID": str(root_span.span_id)}
HTTPPropagator.inject(child_span.context, headers)
print(json.dumps(headers))
"""
env = os.environ.copy()
env["DD_LLMOBS_ENABLED"] = "1"
env["DD_TRACE_ENABLED"] = "0"
stdout, stderr, status, _ = run_python_code_in_subprocess(code=code, env=env)
assert status == 0, (stdout, stderr)
Expand All @@ -93,21 +97,33 @@ def test_propagate_llmobs_parent_id_complex(run_python_code_in_subprocess):
"""
code = """
import json
import mock
from ddtrace import tracer
from ddtrace.ext import SpanTypes
from ddtrace.internal.utils.http import Response
from ddtrace.llmobs import LLMObs
from ddtrace.propagation.http import HTTPPropagator
with tracer.trace("LLMObs span", span_type=SpanTypes.LLM) as root_span:
with tracer.trace("Non-LLMObs span") as child_span:
headers = {"_DD_LLMOBS_SPAN_ID": str(root_span.span_id)}
HTTPPropagator.inject(child_span.context, headers)
with mock.patch(
"ddtrace.internal.writer.HTTPWriter._send_payload", return_value=Response(status=200, body="{}"),
):
from ddtrace import auto # simulate ddtrace-run startup to ensure env var configs also propagate
with LLMObs.workflow("LLMObs span") as root_span:
with LLMObs._instance.tracer.trace("Non-LLMObs span") as child_span:
headers = {"_DD_LLMOBS_SPAN_ID": str(root_span.span_id)}
HTTPPropagator.inject(child_span.context, headers)
print(json.dumps(headers))
"""
env = os.environ.copy()
env["DD_LLMOBS_ENABLED"] = "1"
env["DD_TRACE_ENABLED"] = "0"
env.update(
{
"DD_LLMOBS_ENABLED": "1",
"DD_TRACE_ENABLED": "0",
"DD_AGENTLESS_ENABLED": "1",
"DD_API_KEY": "<not-a-real-key>",
"DD_LLMOBS_ML_APP": "test-app",
}
)
stdout, stderr, status, _ = run_python_code_in_subprocess(code=code, env=env)
assert status == 0, (stdout, stderr)
assert stderr == b"", (stdout, stderr)
Expand All @@ -124,25 +140,25 @@ def test_propagate_llmobs_parent_id_complex(run_python_code_in_subprocess):


def test_no_llmobs_parent_id_propagated_if_no_llmobs_spans(run_python_code_in_subprocess):
"""Test that the correct LLMObs parent ID (None) is extracted from the headers in a simple distributed scenario.
"""Test that the correct LLMObs parent ID ('undefined') is extracted from headers in a simple distributed scenario.
Service A (subprocess) has spans, but none are LLMObs spans.
Service B (outside subprocess) has a LLMObs span.
Service B's span should have no LLMObs parent ID as there are no LLMObs spans from service A.
"""
code = """
import json
from ddtrace import tracer
from ddtrace.llmobs import LLMObs
from ddtrace.propagation.http import HTTPPropagator
with tracer.trace("Non-LLMObs span") as root_span:
LLMObs.enable(ml_app="ml-app", agentless_enabled=True, api_key="<not-a-real-key>")
with LLMObs._instance.tracer.trace("Non-LLMObs span") as root_span:
headers = {}
HTTPPropagator.inject(root_span.context, headers)
print(json.dumps(headers))
"""
env = os.environ.copy()
env["DD_LLMOBS_ENABLED"] = "1"
env["DD_TRACE_ENABLED"] = "0"
stdout, stderr, status, _ = run_python_code_in_subprocess(code=code, env=env)
assert status == 0, (stdout, stderr)
Expand Down
24 changes: 0 additions & 24 deletions tests/tracer/test_propagation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import os
import pickle

import mock
import pytest

import ddtrace
Expand Down Expand Up @@ -3387,29 +3386,6 @@ def test_DD_TRACE_PROPAGATION_STYLE_INJECT_overrides_DD_TRACE_PROPAGATION_STYLE(
assert result == expected_headers


def test_llmobs_enabled_injects_llmobs_parent_id():
with override_global_config(dict(_llmobs_enabled=True)):
with mock.patch("ddtrace.llmobs._utils._inject_llmobs_parent_id") as mock_llmobs_inject:
context = Context(trace_id=1, span_id=2)
HTTPPropagator.inject(context, {})
mock_llmobs_inject.assert_called_once_with(context)


def test_llmobs_disabled_does_not_inject_parent_id():
with override_global_config(dict(_llmobs_enabled=False)):
with mock.patch("ddtrace.llmobs._utils._inject_llmobs_parent_id") as mock_llmobs_inject:
context = Context(trace_id=1, span_id=2)
HTTPPropagator.inject(context, {})
mock_llmobs_inject.assert_not_called()


def test_llmobs_parent_id_not_injected_by_default():
with mock.patch("ddtrace.llmobs._utils._inject_llmobs_parent_id") as mock_llmobs_inject:
context = Context(trace_id=1, span_id=2)
HTTPPropagator.inject(context, {})
mock_llmobs_inject.assert_not_called()


@pytest.mark.parametrize(
"span_context,expected_headers",
[
Expand Down

0 comments on commit bca45d4

Please sign in to comment.