Skip to content

Commit

Permalink
[Executor][Trace] Enable traces for LLM streaming (#2127)
Browse files Browse the repository at this point in the history
# Description

Add the span for LLM streaming scenario.

This pull request primarily focuses on improving the tracing
capabilities in the `src/promptflow/promptflow/_core/tracer.py` file.
The changes include importing additional modules, adding a mechanism to
check the OpenAI version, modifying the `enrich_span_with_trace_type`
function to return the output, adding a new function `traced_generator`,
and altering the `wrapped` functions to utilize the modified
`enrich_span_with_trace_type`.

Here are the most important changes:

Import and OpenAI Version Check:

*
[`src/promptflow/promptflow/_core/tracer.py`](diffhunk://#diff-8f8c2ae53e5ffd37a14e8a899119fbb2742486db8faab6df3fcf506e1b720ad8R13-R20):
Imported the `version` module from `importlib.metadata` and `Link` from
`opentelemetry.trace`. Added a mechanism to check if the OpenAI version
is legacy.
[[1]](diffhunk://#diff-8f8c2ae53e5ffd37a14e8a899119fbb2742486db8faab6df3fcf506e1b720ad8R13-R20)
[[2]](diffhunk://#diff-8f8c2ae53e5ffd37a14e8a899119fbb2742486db8faab6df3fcf506e1b720ad8R33-R36)

Function Modifications:

*
[`src/promptflow/promptflow/_core/tracer.py`](diffhunk://#diff-8f8c2ae53e5ffd37a14e8a899119fbb2742486db8faab6df3fcf506e1b720ad8L291-R345):
In the `enrich_span_with_trace_type` function, removed the call to
`enrich_span_with_output` and instead returned its output. Added a new
function `traced_generator` to create a new span for each generator
iteration and enrich the span with attributes.

Function Usage:

*
[`src/promptflow/promptflow/_core/tracer.py`](diffhunk://#diff-8f8c2ae53e5ffd37a14e8a899119fbb2742486db8faab6df3fcf506e1b720ad8L410-R452):
In both `wrapped` functions, modified the usage of
`enrich_span_with_trace_type` to capture its returned output.
[[1]](diffhunk://#diff-8f8c2ae53e5ffd37a14e8a899119fbb2742486db8faab6df3fcf506e1b720ad8L410-R452)
[[2]](diffhunk://#diff-8f8c2ae53e5ffd37a14e8a899119fbb2742486db8faab6df3fcf506e1b720ad8L459-R501)

# All Promptflow Contribution checklist:
- [ ] **The pull request does not introduce [breaking changes].**
- [ ] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [ ] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [ ] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [ ] Title of the pull request is clear and informative.
- [ ] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [ ] Pull request includes test coverage for the included changes.

---------

Co-authored-by: Heyi <[email protected]>
Co-authored-by: Lina Tang <[email protected]>
  • Loading branch information
3 people authored Feb 27, 2024
1 parent 0d74315 commit 07ecfa8
Showing 1 changed file with 49 additions and 3 deletions.
52 changes: 49 additions & 3 deletions src/promptflow/promptflow/_core/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,17 @@
from collections.abc import Iterator
from contextvars import ContextVar
from datetime import datetime
from importlib.metadata import version
from threading import Lock
from typing import Callable, Dict, List, Optional


import opentelemetry.trace as otel_trace

from opentelemetry.trace import Link
from opentelemetry.trace.status import StatusCode
from opentelemetry.trace.span import NonRecordingSpan
from opentelemetry.sdk.trace import ReadableSpan

from promptflow._core.generator_proxy import GeneratorProxy, generate_from_proxy
from promptflow._core.operation_context import OperationContext
Expand All @@ -26,6 +32,10 @@
from .._utils.utils import default_json_encoder
from .thread_local_singleton import ThreadLocalSingleton


IS_LEGACY_OPENAI = version("openai").startswith("0.")


open_telemetry_tracer = otel_trace.get_tracer("promptflow")


Expand Down Expand Up @@ -288,19 +298,55 @@ def enrich_span_with_input(span, input):


def enrich_span_with_trace_type(span, inputs, output, trace_type):
enrich_span_with_output(span, output)
if trace_type == TraceType.LLM:
token_collector.collect_openai_tokens(span, output)
elif trace_type == TraceType.EMBEDDING:
token_collector.collect_openai_tokens(span, output)
enrich_span_with_embedding(span, inputs, output)
enrich_span_with_openai_tokens(span, trace_type)
return enrich_span_with_output(span, output)


def traced_generator(generator, original_span: ReadableSpan):
context = original_span.get_span_context()
link = Link(context)
# If start_trace is not called, the name of the original_span will be empty.
with open_telemetry_tracer.start_as_current_span(
f"Iterated({original_span.name})",
links=[link],
) as span:
span.set_attributes(original_span.attributes)
generator_proxy = GeneratorProxy(generator)
yield from generator_proxy
generator_output = generator_proxy.items

# Enrich LLM span for openai steaming message
# TODO: Enrich LLM token count for streaming scenario
if original_span.attributes["span_type"] == "LLM" and not IS_LEGACY_OPENAI:
from openai.types.chat.chat_completion_chunk import ChatCompletionChunk
chunks = []
role = "assistant"
for item in generator_output:
if not isinstance(item, ChatCompletionChunk):
continue
if item.choices and item.choices[0].delta.content:
chunks.append(item.choices[0].delta.content)
role = item.choices[0].delta.role or role
if chunks:
text = "".join(chunks)
message = {"content": text, "role": role}
span.set_attribute("llm.generated_message", serialize_attribute(message))
serialized_output = serialize_attribute(generator_output)
span.set_attribute("output", serialized_output)


def enrich_span_with_output(span, output):
try:
serialized_output = serialize_attribute(output)
span.set_attribute("output", serialized_output)
# If the output is a generator, while the span is a valid span, we will trace the generator.
if isinstance(output, Iterator) and not isinstance(span, NonRecordingSpan):
output = traced_generator(output, span)
except Exception as e:
logging.warning(f"Failed to enrich span with output: {e}")

Expand Down Expand Up @@ -407,7 +453,7 @@ async def wrapped(*args, **kwargs):
Tracer.push(trace)
enrich_span_with_input(span, trace.inputs)
output = await func(*args, **kwargs)
enrich_span_with_trace_type(span, trace.inputs, output, trace_type)
output = enrich_span_with_trace_type(span, trace.inputs, output, trace_type)
span.set_status(StatusCode.OK)
output = Tracer.pop(output)
except Exception as e:
Expand Down Expand Up @@ -456,7 +502,7 @@ def wrapped(*args, **kwargs):
Tracer.push(trace)
enrich_span_with_input(span, trace.inputs)
output = func(*args, **kwargs)
enrich_span_with_trace_type(span, trace.inputs, output, trace_type)
output = enrich_span_with_trace_type(span, trace.inputs, output, trace_type)
span.set_status(StatusCode.OK)
output = Tracer.pop(output)
except Exception as e:
Expand Down

0 comments on commit 07ecfa8

Please sign in to comment.