diff --git a/src/promptflow/promptflow/_core/run_tracker.py b/src/promptflow/promptflow/_core/run_tracker.py index 7637369aa9e..7cadf496a52 100644 --- a/src/promptflow/promptflow/_core/run_tracker.py +++ b/src/promptflow/promptflow/_core/run_tracker.py @@ -17,6 +17,7 @@ from promptflow._utils.logger_utils import flow_logger from promptflow._utils.multimedia_utils import default_json_encoder from promptflow._utils.openai_metrics_calculator import OpenAIMetricsCalculator +from promptflow._utils.run_tracker_utils import _deep_copy_and_extract_items_from_generator_proxy from promptflow.contracts.run_info import FlowRunInfo, RunInfo, Status from promptflow.contracts.run_mode import RunMode from promptflow.contracts.tool import ConnectionType @@ -177,11 +178,21 @@ def _update_flow_run_info_with_node_runs(self, run_info: FlowRunInfo): # TODO: Refactor Tracer to support flow level tracing, # then we can remove the hard-coded root level api_calls here. # It has to be a list for UI backward compatibility. - # TODO: Add input, output, error to top level. Adding them would require - # the same technique of handingling image and generator in Tracer, - # which introduces duplicated logic. We should do it in the refactoring. start_timestamp = run_info.start_time.astimezone(timezone.utc).timestamp() if run_info.start_time else None end_timestamp = run_info.end_time.astimezone(timezone.utc).timestamp() if run_info.end_time else None + # This implementation deep copies the inputs and output of the flow run, and extracts items from GeneratorProxy. + # So that both image and generator will be supported. + # It's a short term solution, while the long term one will be implemented in the next generation of Tracer. + inputs = None + output = None + try: + inputs = _deep_copy_and_extract_items_from_generator_proxy(run_info.inputs) + output = _deep_copy_and_extract_items_from_generator_proxy(run_info.output) + except Exception as e: + flow_logger.warning( + f"Failed to serialize inputs or output for flow run because of {e}." + "The inputs and output field in api_calls will be None." + ) run_info.api_calls = [ { "name": "flow", @@ -191,6 +202,9 @@ def _update_flow_run_info_with_node_runs(self, run_info: FlowRunInfo): "end_time": end_timestamp, "children": self._collect_traces_from_nodes(run_id), "system_metrics": run_info.system_metrics, + "inputs": inputs, + "output": output, + "error": run_info.error, } ] diff --git a/src/promptflow/promptflow/_utils/run_tracker_utils.py b/src/promptflow/promptflow/_utils/run_tracker_utils.py new file mode 100644 index 00000000000..0a846555eef --- /dev/null +++ b/src/promptflow/promptflow/_utils/run_tracker_utils.py @@ -0,0 +1,23 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +from copy import deepcopy + +from promptflow._core.generator_proxy import GeneratorProxy + + +def _deep_copy_and_extract_items_from_generator_proxy(value: object) -> object: + """Deep copy value, and if there is a GeneratorProxy, deepcopy the items from it. + + :param value: Any object. + :type value: Object + :return: Deep copied value. + :rtype: Object + """ + if isinstance(value, list): + return [_deep_copy_and_extract_items_from_generator_proxy(v) for v in value] + elif isinstance(value, dict): + return {k: _deep_copy_and_extract_items_from_generator_proxy(v) for k, v in value.items()} + elif isinstance(value, GeneratorProxy): + return deepcopy(value.items) + return deepcopy(value) diff --git a/src/promptflow/tests/executor/e2etests/test_traces.py b/src/promptflow/tests/executor/e2etests/test_traces.py index f3b97daac2e..d8dc891b06c 100644 --- a/src/promptflow/tests/executor/e2etests/test_traces.py +++ b/src/promptflow/tests/executor/e2etests/test_traces.py @@ -218,10 +218,9 @@ def test_flow_with_trace(self, flow_file, dev_connections): assert flow_trace["system_metrics"]["prompt_tokens"] == 0 assert flow_trace["system_metrics"]["completion_tokens"] == 0 assert flow_trace["system_metrics"]["total_tokens"] == 0 - # TODO: These assertions should be fixed after added these fields to the top level trace - assert "inputs" not in flow_trace - assert "output" not in flow_trace - assert "error" not in flow_trace + assert isinstance(flow_trace["inputs"], dict) + assert flow_trace["output"] == {"output": "Hello, User 1!"} + assert flow_trace["error"] is None if sys.platform != "darwin": assert flow_trace["end_time"] - flow_trace["start_time"] == pytest.approx(1.5, abs=0.3) assert flow_trace["system_metrics"]["duration"] == pytest.approx(1.5, abs=0.3) @@ -298,7 +297,7 @@ class TestOTelTracer: ("openai_completion_api_flow", get_comletion_input(False), 3), ("llm_tool", {"topic": "Hello", "stream": False}, 4), ("flow_with_async_llm_tasks", get_flow_sample_inputs("flow_with_async_llm_tasks"), 6), - ] + ], ) def test_otel_trace( self, diff --git a/src/promptflow/tests/executor/unittests/_core/test_run_tracker.py b/src/promptflow/tests/executor/unittests/_core/test_run_tracker.py index 3b9a935b036..06a106d4a9c 100644 --- a/src/promptflow/tests/executor/unittests/_core/test_run_tracker.py +++ b/src/promptflow/tests/executor/unittests/_core/test_run_tracker.py @@ -91,6 +91,11 @@ def test_run_tracker(self): assert run_info_flow.api_calls[0]["node_name"] == "flow" assert run_info_flow.api_calls[0]["type"] == "Flow" assert run_info_flow.api_calls[0]["system_metrics"]["total_tokens"] == 60 + assert run_info_flow.api_calls[0]["inputs"] == flow_input + assert run_info_flow.api_calls[0]["output"] is None + assert ( + "The output 'unserialized_value' for flow is incorrect." in run_info_flow.api_calls[0]["error"]["message"] + ) assert isinstance(run_info_flow.api_calls[0]["start_time"], float) assert isinstance(run_info_flow.api_calls[0]["end_time"], float) assert len(run_info_flow.api_calls[0]["children"]) == 4, "There should be 4 children under root." diff --git a/src/promptflow/tests/executor/unittests/_utils/test_run_tracker_utils.py b/src/promptflow/tests/executor/unittests/_utils/test_run_tracker_utils.py new file mode 100644 index 00000000000..a420b0daab8 --- /dev/null +++ b/src/promptflow/tests/executor/unittests/_utils/test_run_tracker_utils.py @@ -0,0 +1,38 @@ +import pytest + +from promptflow._core.generator_proxy import GeneratorProxy +from promptflow._utils.run_tracker_utils import _deep_copy_and_extract_items_from_generator_proxy + + +@pytest.mark.unittest +class TestDeepCopyAndExtract: + def test_deep_copy_simple_value(self): + value = 10 + result = _deep_copy_and_extract_items_from_generator_proxy(value) + assert value == result + + def test_deep_copy_list(self): + value = [1, 2, 3] + result = _deep_copy_and_extract_items_from_generator_proxy(value) + assert value == result + assert id(value) != id(result), "List should be deep copied" + + def test_deep_copy_dict(self): + value = {"a": 1, "b": 2} + result = _deep_copy_and_extract_items_from_generator_proxy(value) + assert value == result + assert id(value) != id(result), "Dict should be deep copied" + + def test_extract_generator_proxy_items(self): + generator_proxy_value = GeneratorProxy(None) + generator_proxy_value._items = [1, 2] + expected = [1, 2] + result = _deep_copy_and_extract_items_from_generator_proxy(generator_proxy_value) + assert expected == result + + def test_composite(self): + value = {"a": [1, 2, 3], "b": GeneratorProxy(None)} + value["b"]._items = [1, 2] + expected = {"a": [1, 2, 3], "b": [1, 2]} + result = _deep_copy_and_extract_items_from_generator_proxy(value) + assert expected == result