From 2f6f72481941af1ee1de87a901e8603d609ebb03 Mon Sep 17 00:00:00 2001 From: Ming Gu Date: Thu, 8 Feb 2024 12:48:56 +0800 Subject: [PATCH] Add input/output/error to root level api_calls (#1995) # Description Add input/output to root level api_calls. Note that this is a short-term solution which uses inelegant way to generate flow level input/output/error. Long-term solution will be included in the next generation of trace. Normal case: ![image](https://github.com/microsoft/promptflow/assets/10575286/9c0d4b10-37fe-4dfb-befe-d3751d3978d8) Image case: ![image](https://github.com/microsoft/promptflow/assets/10575286/409b6f14-ef08-4fec-8ee6-c33515dd824a) Generator case: ![image](https://github.com/microsoft/promptflow/assets/10575286/f45eff5b-da18-4fe4-8b9b-3ac0e3e33ca8) # All Promptflow Contribution checklist: - [X] **The pull request does not introduce [breaking changes].** - [ ] **CHANGELOG is updated for new features, bug fixes or other significant changes.** - [X] **I have read the [contribution guidelines](../CONTRIBUTING.md).** - [ ] **Create an issue and link to the pull request to get dedicated review from promptflow team. Learn more: [suggested workflow](../CONTRIBUTING.md#suggested-workflow).** ## General Guidelines and Best Practices - [X] Title of the pull request is clear and informative. - [X] There are a small number of commits, each of which have an informative message. This means that previously merged commits do not appear in the history of the PR. For more information on cleaning up the commits in your PR, [see this page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md). ### Testing Guidelines - [X] Pull request includes test coverage for the included changes. --- .../promptflow/_core/run_tracker.py | 20 ++++++++-- .../promptflow/_utils/run_tracker_utils.py | 23 +++++++++++ .../tests/executor/e2etests/test_traces.py | 9 ++--- .../unittests/_core/test_run_tracker.py | 5 +++ .../_utils/test_run_tracker_utils.py | 38 +++++++++++++++++++ 5 files changed, 87 insertions(+), 8 deletions(-) create mode 100644 src/promptflow/promptflow/_utils/run_tracker_utils.py create mode 100644 src/promptflow/tests/executor/unittests/_utils/test_run_tracker_utils.py diff --git a/src/promptflow/promptflow/_core/run_tracker.py b/src/promptflow/promptflow/_core/run_tracker.py index 7637369aa9e..7cadf496a52 100644 --- a/src/promptflow/promptflow/_core/run_tracker.py +++ b/src/promptflow/promptflow/_core/run_tracker.py @@ -17,6 +17,7 @@ from promptflow._utils.logger_utils import flow_logger from promptflow._utils.multimedia_utils import default_json_encoder from promptflow._utils.openai_metrics_calculator import OpenAIMetricsCalculator +from promptflow._utils.run_tracker_utils import _deep_copy_and_extract_items_from_generator_proxy from promptflow.contracts.run_info import FlowRunInfo, RunInfo, Status from promptflow.contracts.run_mode import RunMode from promptflow.contracts.tool import ConnectionType @@ -177,11 +178,21 @@ def _update_flow_run_info_with_node_runs(self, run_info: FlowRunInfo): # TODO: Refactor Tracer to support flow level tracing, # then we can remove the hard-coded root level api_calls here. # It has to be a list for UI backward compatibility. - # TODO: Add input, output, error to top level. Adding them would require - # the same technique of handingling image and generator in Tracer, - # which introduces duplicated logic. We should do it in the refactoring. start_timestamp = run_info.start_time.astimezone(timezone.utc).timestamp() if run_info.start_time else None end_timestamp = run_info.end_time.astimezone(timezone.utc).timestamp() if run_info.end_time else None + # This implementation deep copies the inputs and output of the flow run, and extracts items from GeneratorProxy. + # So that both image and generator will be supported. + # It's a short term solution, while the long term one will be implemented in the next generation of Tracer. + inputs = None + output = None + try: + inputs = _deep_copy_and_extract_items_from_generator_proxy(run_info.inputs) + output = _deep_copy_and_extract_items_from_generator_proxy(run_info.output) + except Exception as e: + flow_logger.warning( + f"Failed to serialize inputs or output for flow run because of {e}." + "The inputs and output field in api_calls will be None." + ) run_info.api_calls = [ { "name": "flow", @@ -191,6 +202,9 @@ def _update_flow_run_info_with_node_runs(self, run_info: FlowRunInfo): "end_time": end_timestamp, "children": self._collect_traces_from_nodes(run_id), "system_metrics": run_info.system_metrics, + "inputs": inputs, + "output": output, + "error": run_info.error, } ] diff --git a/src/promptflow/promptflow/_utils/run_tracker_utils.py b/src/promptflow/promptflow/_utils/run_tracker_utils.py new file mode 100644 index 00000000000..0a846555eef --- /dev/null +++ b/src/promptflow/promptflow/_utils/run_tracker_utils.py @@ -0,0 +1,23 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +from copy import deepcopy + +from promptflow._core.generator_proxy import GeneratorProxy + + +def _deep_copy_and_extract_items_from_generator_proxy(value: object) -> object: + """Deep copy value, and if there is a GeneratorProxy, deepcopy the items from it. + + :param value: Any object. + :type value: Object + :return: Deep copied value. + :rtype: Object + """ + if isinstance(value, list): + return [_deep_copy_and_extract_items_from_generator_proxy(v) for v in value] + elif isinstance(value, dict): + return {k: _deep_copy_and_extract_items_from_generator_proxy(v) for k, v in value.items()} + elif isinstance(value, GeneratorProxy): + return deepcopy(value.items) + return deepcopy(value) diff --git a/src/promptflow/tests/executor/e2etests/test_traces.py b/src/promptflow/tests/executor/e2etests/test_traces.py index f3b97daac2e..d8dc891b06c 100644 --- a/src/promptflow/tests/executor/e2etests/test_traces.py +++ b/src/promptflow/tests/executor/e2etests/test_traces.py @@ -218,10 +218,9 @@ def test_flow_with_trace(self, flow_file, dev_connections): assert flow_trace["system_metrics"]["prompt_tokens"] == 0 assert flow_trace["system_metrics"]["completion_tokens"] == 0 assert flow_trace["system_metrics"]["total_tokens"] == 0 - # TODO: These assertions should be fixed after added these fields to the top level trace - assert "inputs" not in flow_trace - assert "output" not in flow_trace - assert "error" not in flow_trace + assert isinstance(flow_trace["inputs"], dict) + assert flow_trace["output"] == {"output": "Hello, User 1!"} + assert flow_trace["error"] is None if sys.platform != "darwin": assert flow_trace["end_time"] - flow_trace["start_time"] == pytest.approx(1.5, abs=0.3) assert flow_trace["system_metrics"]["duration"] == pytest.approx(1.5, abs=0.3) @@ -298,7 +297,7 @@ class TestOTelTracer: ("openai_completion_api_flow", get_comletion_input(False), 3), ("llm_tool", {"topic": "Hello", "stream": False}, 4), ("flow_with_async_llm_tasks", get_flow_sample_inputs("flow_with_async_llm_tasks"), 6), - ] + ], ) def test_otel_trace( self, diff --git a/src/promptflow/tests/executor/unittests/_core/test_run_tracker.py b/src/promptflow/tests/executor/unittests/_core/test_run_tracker.py index 3b9a935b036..06a106d4a9c 100644 --- a/src/promptflow/tests/executor/unittests/_core/test_run_tracker.py +++ b/src/promptflow/tests/executor/unittests/_core/test_run_tracker.py @@ -91,6 +91,11 @@ def test_run_tracker(self): assert run_info_flow.api_calls[0]["node_name"] == "flow" assert run_info_flow.api_calls[0]["type"] == "Flow" assert run_info_flow.api_calls[0]["system_metrics"]["total_tokens"] == 60 + assert run_info_flow.api_calls[0]["inputs"] == flow_input + assert run_info_flow.api_calls[0]["output"] is None + assert ( + "The output 'unserialized_value' for flow is incorrect." in run_info_flow.api_calls[0]["error"]["message"] + ) assert isinstance(run_info_flow.api_calls[0]["start_time"], float) assert isinstance(run_info_flow.api_calls[0]["end_time"], float) assert len(run_info_flow.api_calls[0]["children"]) == 4, "There should be 4 children under root." diff --git a/src/promptflow/tests/executor/unittests/_utils/test_run_tracker_utils.py b/src/promptflow/tests/executor/unittests/_utils/test_run_tracker_utils.py new file mode 100644 index 00000000000..a420b0daab8 --- /dev/null +++ b/src/promptflow/tests/executor/unittests/_utils/test_run_tracker_utils.py @@ -0,0 +1,38 @@ +import pytest + +from promptflow._core.generator_proxy import GeneratorProxy +from promptflow._utils.run_tracker_utils import _deep_copy_and_extract_items_from_generator_proxy + + +@pytest.mark.unittest +class TestDeepCopyAndExtract: + def test_deep_copy_simple_value(self): + value = 10 + result = _deep_copy_and_extract_items_from_generator_proxy(value) + assert value == result + + def test_deep_copy_list(self): + value = [1, 2, 3] + result = _deep_copy_and_extract_items_from_generator_proxy(value) + assert value == result + assert id(value) != id(result), "List should be deep copied" + + def test_deep_copy_dict(self): + value = {"a": 1, "b": 2} + result = _deep_copy_and_extract_items_from_generator_proxy(value) + assert value == result + assert id(value) != id(result), "Dict should be deep copied" + + def test_extract_generator_proxy_items(self): + generator_proxy_value = GeneratorProxy(None) + generator_proxy_value._items = [1, 2] + expected = [1, 2] + result = _deep_copy_and_extract_items_from_generator_proxy(generator_proxy_value) + assert expected == result + + def test_composite(self): + value = {"a": [1, 2, 3], "b": GeneratorProxy(None)} + value["b"]._items = [1, 2] + expected = {"a": [1, 2, 3], "b": [1, 2]} + result = _deep_copy_and_extract_items_from_generator_proxy(value) + assert expected == result