Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(llmobs): propagate distributed headers via signal dispatching [backport #12089] #12134

Merged
merged 2 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions ddtrace/llmobs/_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,10 +278,6 @@ def _start_service(self) -> None:
log.debug("Error starting evaluator runner")

def _stop_service(self) -> None:
# Remove listener hooks for span events
core.reset_listeners("trace.span_start", self._on_span_start)
core.reset_listeners("trace.span_finish", self._on_span_finish)

try:
self._evaluator_runner.stop()
# flush remaining evaluation spans & evaluations
Expand All @@ -296,6 +292,11 @@ def _stop_service(self) -> None:
except ServiceStatusError:
log.debug("Error stopping LLMObs writers")

# Remove listener hooks for span events
core.reset_listeners("trace.span_start", self._on_span_start)
core.reset_listeners("trace.span_finish", self._on_span_finish)
core.reset_listeners("http.span_inject", self._inject_llmobs_context)

forksafe.unregister(self._child_after_fork)

@classmethod
Expand Down Expand Up @@ -379,6 +380,7 @@ def enable(
# Register hooks for span events
core.on("trace.span_start", cls._instance._on_span_start)
core.on("trace.span_finish", cls._instance._on_span_finish)
core.on("http.span_inject", cls._instance._inject_llmobs_context)

atexit.register(cls.disable)
telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.LLMOBS, True)
Expand Down Expand Up @@ -1163,6 +1165,11 @@ def submit_evaluation(

cls._instance._llmobs_eval_metric_writer.enqueue(evaluation_metric)

def _inject_llmobs_context(self, span_context: Context, request_headers: Dict[str, str]) -> None:
if self.enabled is False:
return
_inject_llmobs_parent_id(span_context)

@classmethod
def inject_distributed_headers(cls, request_headers: Dict[str, str], span: Optional[Span] = None) -> Dict[str, str]:
"""Injects the span's distributed context into the given request headers."""
Expand All @@ -1180,7 +1187,6 @@ def inject_distributed_headers(cls, request_headers: Dict[str, str], span: Optio
if span is None:
log.warning("No span provided and no currently active span found.")
return request_headers
_inject_llmobs_parent_id(span.context)
HTTPPropagator.inject(span.context, request_headers)
return request_headers

Expand Down
7 changes: 2 additions & 5 deletions ddtrace/propagation/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from ddtrace._trace.span import _get_64_lowest_order_bits_as_int
from ddtrace._trace.span import _MetaDictType
from ddtrace.appsec._constants import APPSEC
from ddtrace.internal.core import dispatch
from ddtrace.settings.asm import config as asm_config

from ..constants import AUTO_KEEP
Expand Down Expand Up @@ -1052,6 +1053,7 @@ def parent_call():
:param dict headers: HTTP headers to extend with tracing attributes.
:param Span non_active_span: Only to be used if injecting a non-active span.
"""
dispatch("http.span_inject", (span_context, headers))
if not config._propagation_style_inject:
return
if non_active_span is not None and non_active_span.context is not span_context:
Expand Down Expand Up @@ -1089,11 +1091,6 @@ def parent_call():
for key in span_context._baggage:
headers[_HTTP_BAGGAGE_PREFIX + key] = span_context._baggage[key]

if config._llmobs_enabled:
from ddtrace.llmobs._utils import _inject_llmobs_parent_id

_inject_llmobs_parent_id(span_context)

if PROPAGATION_STYLE_DATADOG in config._propagation_style_inject:
_DatadogMultiHeader._inject(span_context, headers)
if PROPAGATION_STYLE_B3_MULTI in config._propagation_style_inject:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
fixes:
- |
LLM Observability: Resolves an issue where explicitly only using ``LLMObs.enable()`` to configure LLM Observability
without environment variables would not automatically propagate distributed tracing headers.
1 change: 0 additions & 1 deletion tests/llmobs/test_llmobs_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ def test_service_enable_proxy_default():
assert llmobs_instance.tracer == dummy_tracer
assert isinstance(llmobs_instance._llmobs_span_writer._clients[0], LLMObsProxiedEventClient)
assert run_llmobs_trace_filter(dummy_tracer) is not None

llmobs_service.disable()


Expand Down
54 changes: 35 additions & 19 deletions tests/llmobs/test_propagation.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,24 @@ def test_propagate_correct_llmobs_parent_id_simple(run_python_code_in_subprocess
"""
code = """
import json
import mock

from ddtrace import tracer
from ddtrace.ext import SpanTypes
from ddtrace.internal.utils.http import Response
from ddtrace.llmobs import LLMObs
from ddtrace.propagation.http import HTTPPropagator

with tracer.trace("LLMObs span", span_type=SpanTypes.LLM) as root_span:
with tracer.trace("Non-LLMObs span") as child_span:
headers = {"_DD_LLMOBS_SPAN_ID": str(root_span.span_id)}
HTTPPropagator.inject(child_span.context, headers)
with mock.patch(
"ddtrace.internal.writer.HTTPWriter._send_payload", return_value=Response(status=200, body="{}"),
):
LLMObs.enable(ml_app="test-app", api_key="<not-a-real-key>", agentless_enabled=True)
with LLMObs.workflow("LLMObs span") as root_span:
with LLMObs._instance.tracer.trace("Non-LLMObs span") as child_span:
headers = {"_DD_LLMOBS_SPAN_ID": str(root_span.span_id)}
HTTPPropagator.inject(child_span.context, headers)

print(json.dumps(headers))
"""
env = os.environ.copy()
env["DD_LLMOBS_ENABLED"] = "1"
env["DD_TRACE_ENABLED"] = "0"
stdout, stderr, status, _ = run_python_code_in_subprocess(code=code, env=env)
assert status == 0, (stdout, stderr)
Expand All @@ -93,21 +97,33 @@ def test_propagate_llmobs_parent_id_complex(run_python_code_in_subprocess):
"""
code = """
import json
import mock

from ddtrace import tracer
from ddtrace.ext import SpanTypes
from ddtrace.internal.utils.http import Response
from ddtrace.llmobs import LLMObs
from ddtrace.propagation.http import HTTPPropagator

with tracer.trace("LLMObs span", span_type=SpanTypes.LLM) as root_span:
with tracer.trace("Non-LLMObs span") as child_span:
headers = {"_DD_LLMOBS_SPAN_ID": str(root_span.span_id)}
HTTPPropagator.inject(child_span.context, headers)
with mock.patch(
"ddtrace.internal.writer.HTTPWriter._send_payload", return_value=Response(status=200, body="{}"),
):
from ddtrace import auto # simulate ddtrace-run startup to ensure env var configs also propagate
with LLMObs.workflow("LLMObs span") as root_span:
with LLMObs._instance.tracer.trace("Non-LLMObs span") as child_span:
headers = {"_DD_LLMOBS_SPAN_ID": str(root_span.span_id)}
HTTPPropagator.inject(child_span.context, headers)

print(json.dumps(headers))
"""
env = os.environ.copy()
env["DD_LLMOBS_ENABLED"] = "1"
env["DD_TRACE_ENABLED"] = "0"
env.update(
{
"DD_LLMOBS_ENABLED": "1",
"DD_TRACE_ENABLED": "0",
"DD_AGENTLESS_ENABLED": "1",
"DD_API_KEY": "<not-a-real-key>",
"DD_LLMOBS_ML_APP": "test-app",
}
)
stdout, stderr, status, _ = run_python_code_in_subprocess(code=code, env=env)
assert status == 0, (stdout, stderr)
assert stderr == b"", (stdout, stderr)
Expand All @@ -124,25 +140,25 @@ def test_propagate_llmobs_parent_id_complex(run_python_code_in_subprocess):


def test_no_llmobs_parent_id_propagated_if_no_llmobs_spans(run_python_code_in_subprocess):
"""Test that the correct LLMObs parent ID (None) is extracted from the headers in a simple distributed scenario.
"""Test that the correct LLMObs parent ID ('undefined') is extracted from headers in a simple distributed scenario.
Service A (subprocess) has spans, but none are LLMObs spans.
Service B (outside subprocess) has a LLMObs span.
Service B's span should have no LLMObs parent ID as there are no LLMObs spans from service A.
"""
code = """
import json

from ddtrace import tracer
from ddtrace.llmobs import LLMObs
from ddtrace.propagation.http import HTTPPropagator

with tracer.trace("Non-LLMObs span") as root_span:
LLMObs.enable(ml_app="ml-app", agentless_enabled=True, api_key="<not-a-real-key>")
with LLMObs._instance.tracer.trace("Non-LLMObs span") as root_span:
headers = {}
HTTPPropagator.inject(root_span.context, headers)

print(json.dumps(headers))
"""
env = os.environ.copy()
env["DD_LLMOBS_ENABLED"] = "1"
env["DD_TRACE_ENABLED"] = "0"
stdout, stderr, status, _ = run_python_code_in_subprocess(code=code, env=env)
assert status == 0, (stdout, stderr)
Expand Down
24 changes: 0 additions & 24 deletions tests/tracer/test_propagation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import os
import pickle

import mock
import pytest

import ddtrace
Expand Down Expand Up @@ -3387,29 +3386,6 @@ def test_DD_TRACE_PROPAGATION_STYLE_INJECT_overrides_DD_TRACE_PROPAGATION_STYLE(
assert result == expected_headers


def test_llmobs_enabled_injects_llmobs_parent_id():
with override_global_config(dict(_llmobs_enabled=True)):
with mock.patch("ddtrace.llmobs._utils._inject_llmobs_parent_id") as mock_llmobs_inject:
context = Context(trace_id=1, span_id=2)
HTTPPropagator.inject(context, {})
mock_llmobs_inject.assert_called_once_with(context)


def test_llmobs_disabled_does_not_inject_parent_id():
with override_global_config(dict(_llmobs_enabled=False)):
with mock.patch("ddtrace.llmobs._utils._inject_llmobs_parent_id") as mock_llmobs_inject:
context = Context(trace_id=1, span_id=2)
HTTPPropagator.inject(context, {})
mock_llmobs_inject.assert_not_called()


def test_llmobs_parent_id_not_injected_by_default():
with mock.patch("ddtrace.llmobs._utils._inject_llmobs_parent_id") as mock_llmobs_inject:
context = Context(trace_id=1, span_id=2)
HTTPPropagator.inject(context, {})
mock_llmobs_inject.assert_not_called()


@pytest.mark.parametrize(
"span_context,expected_headers",
[
Expand Down
Loading