From 07ecfa86c89a2307f3ad5069600c89078cbc7784 Mon Sep 17 00:00:00 2001 From: Heyi Tang Date: Tue, 27 Feb 2024 19:53:17 +0800 Subject: [PATCH] [Executor][Trace] Enable traces for LLM streaming (#2127) # Description Add the span for LLM streaming scenario. This pull request primarily focuses on improving the tracing capabilities in the `src/promptflow/promptflow/_core/tracer.py` file. The changes include importing additional modules, adding a mechanism to check the OpenAI version, modifying the `enrich_span_with_trace_type` function to return the output, adding a new function `traced_generator`, and altering the `wrapped` functions to utilize the modified `enrich_span_with_trace_type`. Here are the most important changes: Import and OpenAI Version Check: * [`src/promptflow/promptflow/_core/tracer.py`](diffhunk://#diff-8f8c2ae53e5ffd37a14e8a899119fbb2742486db8faab6df3fcf506e1b720ad8R13-R20): Imported the `version` module from `importlib.metadata` and `Link` from `opentelemetry.trace`. Added a mechanism to check if the OpenAI version is legacy. [[1]](diffhunk://#diff-8f8c2ae53e5ffd37a14e8a899119fbb2742486db8faab6df3fcf506e1b720ad8R13-R20) [[2]](diffhunk://#diff-8f8c2ae53e5ffd37a14e8a899119fbb2742486db8faab6df3fcf506e1b720ad8R33-R36) Function Modifications: * [`src/promptflow/promptflow/_core/tracer.py`](diffhunk://#diff-8f8c2ae53e5ffd37a14e8a899119fbb2742486db8faab6df3fcf506e1b720ad8L291-R345): In the `enrich_span_with_trace_type` function, removed the call to `enrich_span_with_output` and instead returned its output. Added a new function `traced_generator` to create a new span for each generator iteration and enrich the span with attributes. Function Usage: * [`src/promptflow/promptflow/_core/tracer.py`](diffhunk://#diff-8f8c2ae53e5ffd37a14e8a899119fbb2742486db8faab6df3fcf506e1b720ad8L410-R452): In both `wrapped` functions, modified the usage of `enrich_span_with_trace_type` to capture its returned output. [[1]](diffhunk://#diff-8f8c2ae53e5ffd37a14e8a899119fbb2742486db8faab6df3fcf506e1b720ad8L410-R452) [[2]](diffhunk://#diff-8f8c2ae53e5ffd37a14e8a899119fbb2742486db8faab6df3fcf506e1b720ad8L459-R501) # All Promptflow Contribution checklist: - [ ] **The pull request does not introduce [breaking changes].** - [ ] **CHANGELOG is updated for new features, bug fixes or other significant changes.** - [ ] **I have read the [contribution guidelines](../CONTRIBUTING.md).** - [ ] **Create an issue and link to the pull request to get dedicated review from promptflow team. Learn more: [suggested workflow](../CONTRIBUTING.md#suggested-workflow).** ## General Guidelines and Best Practices - [ ] Title of the pull request is clear and informative. - [ ] There are a small number of commits, each of which have an informative message. This means that previously merged commits do not appear in the history of the PR. For more information on cleaning up the commits in your PR, [see this page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md). ### Testing Guidelines - [ ] Pull request includes test coverage for the included changes. --------- Co-authored-by: Heyi Co-authored-by: Lina Tang --- src/promptflow/promptflow/_core/tracer.py | 52 +++++++++++++++++++++-- 1 file changed, 49 insertions(+), 3 deletions(-) diff --git a/src/promptflow/promptflow/_core/tracer.py b/src/promptflow/promptflow/_core/tracer.py index e1c78c0fd76..c25de35c5f6 100644 --- a/src/promptflow/promptflow/_core/tracer.py +++ b/src/promptflow/promptflow/_core/tracer.py @@ -10,11 +10,17 @@ from collections.abc import Iterator from contextvars import ContextVar from datetime import datetime +from importlib.metadata import version from threading import Lock from typing import Callable, Dict, List, Optional + import opentelemetry.trace as otel_trace + +from opentelemetry.trace import Link from opentelemetry.trace.status import StatusCode +from opentelemetry.trace.span import NonRecordingSpan +from opentelemetry.sdk.trace import ReadableSpan from promptflow._core.generator_proxy import GeneratorProxy, generate_from_proxy from promptflow._core.operation_context import OperationContext @@ -26,6 +32,10 @@ from .._utils.utils import default_json_encoder from .thread_local_singleton import ThreadLocalSingleton + +IS_LEGACY_OPENAI = version("openai").startswith("0.") + + open_telemetry_tracer = otel_trace.get_tracer("promptflow") @@ -288,19 +298,55 @@ def enrich_span_with_input(span, input): def enrich_span_with_trace_type(span, inputs, output, trace_type): - enrich_span_with_output(span, output) if trace_type == TraceType.LLM: token_collector.collect_openai_tokens(span, output) elif trace_type == TraceType.EMBEDDING: token_collector.collect_openai_tokens(span, output) enrich_span_with_embedding(span, inputs, output) enrich_span_with_openai_tokens(span, trace_type) + return enrich_span_with_output(span, output) + + +def traced_generator(generator, original_span: ReadableSpan): + context = original_span.get_span_context() + link = Link(context) + # If start_trace is not called, the name of the original_span will be empty. + with open_telemetry_tracer.start_as_current_span( + f"Iterated({original_span.name})", + links=[link], + ) as span: + span.set_attributes(original_span.attributes) + generator_proxy = GeneratorProxy(generator) + yield from generator_proxy + generator_output = generator_proxy.items + + # Enrich LLM span for openai steaming message + # TODO: Enrich LLM token count for streaming scenario + if original_span.attributes["span_type"] == "LLM" and not IS_LEGACY_OPENAI: + from openai.types.chat.chat_completion_chunk import ChatCompletionChunk + chunks = [] + role = "assistant" + for item in generator_output: + if not isinstance(item, ChatCompletionChunk): + continue + if item.choices and item.choices[0].delta.content: + chunks.append(item.choices[0].delta.content) + role = item.choices[0].delta.role or role + if chunks: + text = "".join(chunks) + message = {"content": text, "role": role} + span.set_attribute("llm.generated_message", serialize_attribute(message)) + serialized_output = serialize_attribute(generator_output) + span.set_attribute("output", serialized_output) def enrich_span_with_output(span, output): try: serialized_output = serialize_attribute(output) span.set_attribute("output", serialized_output) + # If the output is a generator, while the span is a valid span, we will trace the generator. + if isinstance(output, Iterator) and not isinstance(span, NonRecordingSpan): + output = traced_generator(output, span) except Exception as e: logging.warning(f"Failed to enrich span with output: {e}") @@ -407,7 +453,7 @@ async def wrapped(*args, **kwargs): Tracer.push(trace) enrich_span_with_input(span, trace.inputs) output = await func(*args, **kwargs) - enrich_span_with_trace_type(span, trace.inputs, output, trace_type) + output = enrich_span_with_trace_type(span, trace.inputs, output, trace_type) span.set_status(StatusCode.OK) output = Tracer.pop(output) except Exception as e: @@ -456,7 +502,7 @@ def wrapped(*args, **kwargs): Tracer.push(trace) enrich_span_with_input(span, trace.inputs) output = func(*args, **kwargs) - enrich_span_with_trace_type(span, trace.inputs, output, trace_type) + output = enrich_span_with_trace_type(span, trace.inputs, output, trace_type) span.set_status(StatusCode.OK) output = Tracer.pop(output) except Exception as e: