From c9165c32b7569f415f9b730e1046253f19481c05 Mon Sep 17 00:00:00 2001 From: Heyi Tang Date: Fri, 8 Mar 2024 16:58:07 +0800 Subject: [PATCH] [Internal] Remove variant_id in all executor related logic. (#2274) # Description Remove variant_id concept in all executor related logic since it is used only for legacy logic. This pull request primarily involves the removal of `variant_id` from various methods across multiple files in the `src/promptflow/promptflow/` directory. The `variant_id` parameter was previously used in the initialization, copying, and execution of flow runs and node runs, but it has now been removed from these processes. Removal of `variant_id` from `FlowExecutionContext`: * [`src/promptflow/promptflow/_core/flow_execution_context.py`](diffhunk://#diff-8a45b6238b72974b62aa211aec63ef4cbeadfa8277f84525442c245a16ee4461L44-L52): Removed `variant_id` from the `__init__` method, `copy` method, `_prepare_node_run` method, and `bypass_node` method. [[1]](diffhunk://#diff-8a45b6238b72974b62aa211aec63ef4cbeadfa8277f84525442c245a16ee4461L44-L52) [[2]](diffhunk://#diff-8a45b6238b72974b62aa211aec63ef4cbeadfa8277f84525442c245a16ee4461L62) [[3]](diffhunk://#diff-8a45b6238b72974b62aa211aec63ef4cbeadfa8277f84525442c245a16ee4461L119) [[4]](diffhunk://#diff-8a45b6238b72974b62aa211aec63ef4cbeadfa8277f84525442c245a16ee4461L214) Removal of `variant_id` from `RunTracker`: * [`src/promptflow/promptflow/_core/run_tracker.py`](diffhunk://#diff-3ea41c85102f8318e1dc8568de0fb0de76ce92172e506df95e6f419808eabaf8L84): Removed `variant_id` from the `start_flow_run` method and `bypass_node_run` method. [[1]](diffhunk://#diff-3ea41c85102f8318e1dc8568de0fb0de76ce92172e506df95e6f419808eabaf8L84) [[2]](diffhunk://#diff-3ea41c85102f8318e1dc8568de0fb0de76ce92172e506df95e6f419808eabaf8L102) [[3]](diffhunk://#diff-3ea41c85102f8318e1dc8568de0fb0de76ce92172e506df95e6f419808eabaf8L143) [[4]](diffhunk://#diff-3ea41c85102f8318e1dc8568de0fb0de76ce92172e506df95e6f419808eabaf8L159) Removal of `variant_id` from `FlowExecutor`: * [`src/promptflow/promptflow/executor/flow_executor.py`](diffhunk://#diff-faa6c81d614b7e41b18a42a93139d961d92afa9aa9dd0b72cb6b7176d7541e69L657-R662): Removed `variant_id` from the `exec` method, `exec_line` method, `exec_line_async` method, `_exec` method, and `_exec_async` method. [[1]](diffhunk://#diff-faa6c81d614b7e41b18a42a93139d961d92afa9aa9dd0b72cb6b7176d7541e69L657-R662) [[2]](diffhunk://#diff-faa6c81d614b7e41b18a42a93139d961d92afa9aa9dd0b72cb6b7176d7541e69L674) [[3]](diffhunk://#diff-faa6c81d614b7e41b18a42a93139d961d92afa9aa9dd0b72cb6b7176d7541e69L688-L689) [[4]](diffhunk://#diff-faa6c81d614b7e41b18a42a93139d961d92afa9aa9dd0b72cb6b7176d7541e69L716) [[5]](diffhunk://#diff-faa6c81d614b7e41b18a42a93139d961d92afa9aa9dd0b72cb6b7176d7541e69L730) [[6]](diffhunk://#diff-faa6c81d614b7e41b18a42a93139d961d92afa9aa9dd0b72cb6b7176d7541e69L743-L744) [[7]](diffhunk://#diff-faa6c81d614b7e41b18a42a93139d961d92afa9aa9dd0b72cb6b7176d7541e69L766) [[8]](diffhunk://#diff-faa6c81d614b7e41b18a42a93139d961d92afa9aa9dd0b72cb6b7176d7541e69L887) [[9]](diffhunk://#diff-faa6c81d614b7e41b18a42a93139d961d92afa9aa9dd0b72cb6b7176d7541e69L920) [[10]](diffhunk://#diff-faa6c81d614b7e41b18a42a93139d961d92afa9aa9dd0b72cb6b7176d7541e69L929) [[11]](diffhunk://#diff-faa6c81d614b7e41b18a42a93139d961d92afa9aa9dd0b72cb6b7176d7541e69L965) [[12]](diffhunk://#diff-faa6c81d614b7e41b18a42a93139d961d92afa9aa9dd0b72cb6b7176d7541e69L1000) [[13]](diffhunk://#diff-faa6c81d614b7e41b18a42a93139d961d92afa9aa9dd0b72cb6b7176d7541e69L1009) # All Promptflow Contribution checklist: - [ ] **The pull request does not introduce [breaking changes].** - [ ] **CHANGELOG is updated for new features, bug fixes or other significant changes.** - [ ] **I have read the [contribution guidelines](../CONTRIBUTING.md).** - [ ] **Create an issue and link to the pull request to get dedicated review from promptflow team. Learn more: [suggested workflow](../CONTRIBUTING.md#suggested-workflow).** ## General Guidelines and Best Practices - [ ] Title of the pull request is clear and informative. - [ ] There are a small number of commits, each of which have an informative message. This means that previously merged commits do not appear in the history of the PR. For more information on cleaning up the commits in your PR, [see this page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md). ### Testing Guidelines - [ ] Pull request includes test coverage for the included changes. --------- Co-authored-by: Heyi --- .../promptflow/_core/flow_execution_context.py | 5 ----- src/promptflow/promptflow/_core/run_tracker.py | 4 ---- .../promptflow/contracts/run_info.py | 8 -------- .../promptflow/executor/flow_executor.py | 18 ++---------------- .../unittests/contracts/test_run_info.py | 2 -- .../storage/test_queue_run_storage.py | 2 -- .../unittests/storage/test_run_records.py | 2 -- 7 files changed, 2 insertions(+), 39 deletions(-) diff --git a/src/promptflow/promptflow/_core/flow_execution_context.py b/src/promptflow/promptflow/_core/flow_execution_context.py index 55d2e4d3755..3acf5c258a3 100644 --- a/src/promptflow/promptflow/_core/flow_execution_context.py +++ b/src/promptflow/promptflow/_core/flow_execution_context.py @@ -41,7 +41,6 @@ def __init__( run_id=None, flow_id=None, line_number=None, - variant_id=None, ): self._name = name self._run_tracker = run_tracker @@ -49,7 +48,6 @@ def __init__( self._run_id = run_id or str(uuid.uuid4()) self._flow_id = flow_id or self._run_id self._line_number = line_number - self._variant_id = variant_id def copy(self): return FlowExecutionContext( @@ -59,7 +57,6 @@ def copy(self): run_id=self._run_id, flow_id=self._flow_id, line_number=self._line_number, - variant_id=self._variant_id, ) def cancel_node_runs(self, msg): @@ -116,7 +113,6 @@ def _prepare_node_run(self, node: Node, f, kwargs={}): index=self._line_number, ) run_info.index = self._line_number - run_info.variant_id = self._variant_id self._run_tracker.set_inputs(node_run_id, {key: value for key, value in kwargs.items() if key != "self"}) return run_info @@ -211,7 +207,6 @@ def bypass_node(self, node: Node): parent_run_id=parent_run_id, run_id=node_run_id, index=self._line_number, - variant_id=self._variant_id, ) self._run_tracker.persist_node_run(run_info) diff --git a/src/promptflow/promptflow/_core/run_tracker.py b/src/promptflow/promptflow/_core/run_tracker.py index 6a6cfcac5fa..2d28a40bb47 100644 --- a/src/promptflow/promptflow/_core/run_tracker.py +++ b/src/promptflow/promptflow/_core/run_tracker.py @@ -81,7 +81,6 @@ def start_flow_run( parent_run_id="", inputs=None, index=None, - variant_id="", ) -> FlowRunInfo: """Create a flow run and save to run storage on demand.""" run_info = FlowRunInfo( @@ -99,7 +98,6 @@ def start_flow_run( start_time=datetime.utcnow(), end_time=None, index=index, - variant_id=variant_id, ) self.persist_flow_run(run_info) self._flow_runs[run_id] = run_info @@ -140,7 +138,6 @@ def bypass_node_run( parent_run_id, run_id, index, - variant_id, ): run_info = RunInfo( node=node, @@ -156,7 +153,6 @@ def bypass_node_run( end_time=datetime.utcnow(), result=None, index=index, - variant_id=variant_id, api_calls=[], ) self._node_runs[run_id] = run_info diff --git a/src/promptflow/promptflow/contracts/run_info.py b/src/promptflow/promptflow/contracts/run_info.py index d3522168f98..389b055cbb7 100644 --- a/src/promptflow/promptflow/contracts/run_info.py +++ b/src/promptflow/promptflow/contracts/run_info.py @@ -66,8 +66,6 @@ class RunInfo: :type index: Optional[int] :param api_calls: API calls made during the run :type api_calls: Optional[List[Dict[str, Any]]] - :param variant_id: Variant id of the run - :type variant_id: Optional[str] :param cached_run_id: Cached run id :type cached_run_id: Optional[str] :param cached_flow_run_id: Cached flow run id @@ -93,7 +91,6 @@ class RunInfo: end_time: datetime index: Optional[int] = None api_calls: Optional[List[Dict[str, Any]]] = None - variant_id: str = "" cached_run_id: str = None cached_flow_run_id: str = None logs: Optional[Dict[str, str]] = None @@ -117,7 +114,6 @@ def deserialize(data: dict) -> "RunInfo": end_time=parser.parse(data.get("end_time")).replace(tzinfo=None), index=data.get("index", None), api_calls=data.get("api_calls", None), - variant_id=data.get("variant_id", ""), cached_run_id=data.get("cached_run_id", None), cached_flow_run_id=data.get("cached_flow_run_id", None), logs=data.get("logs", None), @@ -161,8 +157,6 @@ class FlowRunInfo: :type index: Optional[int] :param api_calls: API calls made during the flow run :type api_calls: Optional[List[Dict[str, Any]]] - :param variant_id: Variant id of the flow run - :type variant_id: Optional[str] :param name: Name of the flow run :type name: Optional[str] :param description: Description of the flow run @@ -192,7 +186,6 @@ class FlowRunInfo: end_time: datetime index: Optional[int] = None api_calls: Optional[List[Dict[str, Any]]] = None - variant_id: str = "" name: str = "" description: str = "" tags: Optional[Mapping[str, str]] = None @@ -219,7 +212,6 @@ def deserialize(data: dict) -> "FlowRunInfo": end_time=parser.parse(data.get("end_time")).replace(tzinfo=None), index=data.get("index", None), api_calls=data.get("api_calls", None), - variant_id=data.get("variant_id", ""), name=data.get("name", ""), description=data.get("description", ""), tags=data.get("tags", None), diff --git a/src/promptflow/promptflow/executor/flow_executor.py b/src/promptflow/promptflow/executor/flow_executor.py index a8817855a34..76ae6d59d49 100644 --- a/src/promptflow/promptflow/executor/flow_executor.py +++ b/src/promptflow/promptflow/executor/flow_executor.py @@ -654,12 +654,12 @@ def exec(self, inputs: dict, node_concurrency=DEFAULT_CONCURRENCY_FLOW) -> dict: return result.output or {} def _exec_in_thread(self, args) -> LineResult: - inputs, run_id, line_number, variant_id, validate_inputs = args + inputs, run_id, line_number, validate_inputs = args thread_name = current_thread().name self._processing_idx[line_number] = thread_name self._run_tracker._activate_in_context() results = self._exec( - inputs, run_id=run_id, line_number=line_number, variant_id=variant_id, validate_inputs=validate_inputs + inputs, run_id=run_id, line_number=line_number, validate_inputs=validate_inputs ) self._run_tracker._deactivate_in_context() self._processing_idx.pop(line_number) @@ -671,7 +671,6 @@ def exec_line( inputs: Mapping[str, Any], index: Optional[int] = None, run_id: Optional[str] = None, - variant_id: str = "", validate_inputs: bool = True, node_concurrency=DEFAULT_CONCURRENCY_FLOW, allow_generator_output: bool = False, @@ -685,8 +684,6 @@ def exec_line( :type index: Optional[int] :param run_id: The ID of the flow run. :type run_id: Optional[str] - :param variant_id: The ID of the variant to execute. - :type variant_id: str :param validate_inputs: Whether to validate the input values. :type validate_inputs: bool :param node_concurrency: The maximum number of nodes that can be executed concurrently. @@ -713,7 +710,6 @@ def exec_line( inputs, run_id=run_id, line_number=index, - variant_id=variant_id, validate_inputs=validate_inputs, allow_generator_output=allow_generator_output, ) @@ -727,7 +723,6 @@ async def exec_line_async( inputs: Mapping[str, Any], index: Optional[int] = None, run_id: Optional[str] = None, - variant_id: str = "", validate_inputs: bool = True, node_concurrency=DEFAULT_CONCURRENCY_FLOW, allow_generator_output: bool = False, @@ -740,8 +735,6 @@ async def exec_line_async( :type index: Optional[int] :param run_id: The ID of the flow run. :type run_id: Optional[str] - :param variant_id: The ID of the variant to execute. - :type variant_id: str :param validate_inputs: Whether to validate the input values. :type validate_inputs: bool :param node_concurrency: The maximum number of nodes that can be executed concurrently. @@ -763,7 +756,6 @@ async def exec_line_async( inputs, run_id=run_id, line_number=index, - variant_id=variant_id, validate_inputs=validate_inputs, allow_generator_output=allow_generator_output, ) @@ -884,7 +876,6 @@ def _exec( inputs: Mapping[str, Any], run_id: Optional[str] = None, line_number: Optional[int] = None, - variant_id: str = "", validate_inputs: bool = False, allow_generator_output: bool = False, ) -> LineResult: @@ -917,7 +908,6 @@ def _exec( run_id=line_run_id, parent_run_id=run_id, index=line_number, - variant_id=variant_id, ) context = FlowExecutionContext( name=self._flow.name, @@ -926,7 +916,6 @@ def _exec( run_id=run_id, flow_id=self._flow_id, line_number=line_number, - variant_id=variant_id, ) output = {} aggregation_inputs = {} @@ -962,7 +951,6 @@ async def _exec_async( inputs: Mapping[str, Any], run_id: Optional[str] = None, line_number: Optional[int] = None, - variant_id: str = "", validate_inputs: bool = False, allow_generator_output: bool = False, ) -> LineResult: @@ -997,7 +985,6 @@ async def _exec_async( parent_run_id=run_id, inputs={k: inputs[k] for k in self._flow.inputs if k in inputs}, index=line_number, - variant_id=variant_id, ) context = FlowExecutionContext( name=self._flow.name, @@ -1006,7 +993,6 @@ async def _exec_async( run_id=run_id, flow_id=self._flow_id, line_number=line_number, - variant_id=variant_id, ) output = {} aggregation_inputs = {} diff --git a/src/promptflow/tests/executor/unittests/contracts/test_run_info.py b/src/promptflow/tests/executor/unittests/contracts/test_run_info.py index 9c5ab1dc231..c0faa6ea51d 100644 --- a/src/promptflow/tests/executor/unittests/contracts/test_run_info.py +++ b/src/promptflow/tests/executor/unittests/contracts/test_run_info.py @@ -62,7 +62,6 @@ def test_deserialize(self): "end_time": "2023-11-24T06:03:20.268858Z", "index": 0, "api_calls": None, - "variant_id": "", "cached_run_id": None, "cached_flow_run_id": None, "logs": None, @@ -120,7 +119,6 @@ def test_deserialize(self): "end_time": "2023-11-23T10:58:37.9590789Z", "index": 0, "api_calls": None, - "variant_id": "", "name": "", "description": "", "tags": None, diff --git a/src/promptflow/tests/executor/unittests/storage/test_queue_run_storage.py b/src/promptflow/tests/executor/unittests/storage/test_queue_run_storage.py index 5ca66a5e62d..17a0ef4bddc 100644 --- a/src/promptflow/tests/executor/unittests/storage/test_queue_run_storage.py +++ b/src/promptflow/tests/executor/unittests/storage/test_queue_run_storage.py @@ -25,7 +25,6 @@ def test_persist_node_run(self): end_time="end_time", index="index", api_calls="api_calls", - variant_id="variant_id", cached_run_id="cached_run_id", cached_flow_run_id="cached_flow_run_id", logs="logs", @@ -54,7 +53,6 @@ def test_persist_flow_run(self): end_time="end_time", index="index", api_calls="api_calls", - variant_id="variant_id", system_metrics="system_metrics", result="result", ) diff --git a/src/promptflow/tests/executor/unittests/storage/test_run_records.py b/src/promptflow/tests/executor/unittests/storage/test_run_records.py index cd299b9ec44..f1e2bcb6963 100644 --- a/src/promptflow/tests/executor/unittests/storage/test_run_records.py +++ b/src/promptflow/tests/executor/unittests/storage/test_run_records.py @@ -27,7 +27,6 @@ def test_line_record(): start_time=start_time, end_time=end_time, index=0, - variant_id=None, ) line_record = LineRunRecord.from_run_info(flow_run_info) assert line_record.line_number == 0 @@ -56,7 +55,6 @@ def test_line_serialize(): start_time=start_time, end_time=end_time, index=0, - variant_id=None, ) line_record = LineRunRecord.from_run_info(flow_run_info) result = line_record.serialize()