diff --git a/src/promptflow/promptflow/_core/flow_execution_context.py b/src/promptflow/promptflow/_core/flow_execution_context.py index 55d2e4d3755..3acf5c258a3 100644 --- a/src/promptflow/promptflow/_core/flow_execution_context.py +++ b/src/promptflow/promptflow/_core/flow_execution_context.py @@ -41,7 +41,6 @@ def __init__( run_id=None, flow_id=None, line_number=None, - variant_id=None, ): self._name = name self._run_tracker = run_tracker @@ -49,7 +48,6 @@ def __init__( self._run_id = run_id or str(uuid.uuid4()) self._flow_id = flow_id or self._run_id self._line_number = line_number - self._variant_id = variant_id def copy(self): return FlowExecutionContext( @@ -59,7 +57,6 @@ def copy(self): run_id=self._run_id, flow_id=self._flow_id, line_number=self._line_number, - variant_id=self._variant_id, ) def cancel_node_runs(self, msg): @@ -116,7 +113,6 @@ def _prepare_node_run(self, node: Node, f, kwargs={}): index=self._line_number, ) run_info.index = self._line_number - run_info.variant_id = self._variant_id self._run_tracker.set_inputs(node_run_id, {key: value for key, value in kwargs.items() if key != "self"}) return run_info @@ -211,7 +207,6 @@ def bypass_node(self, node: Node): parent_run_id=parent_run_id, run_id=node_run_id, index=self._line_number, - variant_id=self._variant_id, ) self._run_tracker.persist_node_run(run_info) diff --git a/src/promptflow/promptflow/_core/run_tracker.py b/src/promptflow/promptflow/_core/run_tracker.py index 6a6cfcac5fa..2d28a40bb47 100644 --- a/src/promptflow/promptflow/_core/run_tracker.py +++ b/src/promptflow/promptflow/_core/run_tracker.py @@ -81,7 +81,6 @@ def start_flow_run( parent_run_id="", inputs=None, index=None, - variant_id="", ) -> FlowRunInfo: """Create a flow run and save to run storage on demand.""" run_info = FlowRunInfo( @@ -99,7 +98,6 @@ def start_flow_run( start_time=datetime.utcnow(), end_time=None, index=index, - variant_id=variant_id, ) self.persist_flow_run(run_info) self._flow_runs[run_id] = run_info @@ -140,7 +138,6 @@ def bypass_node_run( parent_run_id, run_id, index, - variant_id, ): run_info = RunInfo( node=node, @@ -156,7 +153,6 @@ def bypass_node_run( end_time=datetime.utcnow(), result=None, index=index, - variant_id=variant_id, api_calls=[], ) self._node_runs[run_id] = run_info diff --git a/src/promptflow/promptflow/contracts/run_info.py b/src/promptflow/promptflow/contracts/run_info.py index d3522168f98..389b055cbb7 100644 --- a/src/promptflow/promptflow/contracts/run_info.py +++ b/src/promptflow/promptflow/contracts/run_info.py @@ -66,8 +66,6 @@ class RunInfo: :type index: Optional[int] :param api_calls: API calls made during the run :type api_calls: Optional[List[Dict[str, Any]]] - :param variant_id: Variant id of the run - :type variant_id: Optional[str] :param cached_run_id: Cached run id :type cached_run_id: Optional[str] :param cached_flow_run_id: Cached flow run id @@ -93,7 +91,6 @@ class RunInfo: end_time: datetime index: Optional[int] = None api_calls: Optional[List[Dict[str, Any]]] = None - variant_id: str = "" cached_run_id: str = None cached_flow_run_id: str = None logs: Optional[Dict[str, str]] = None @@ -117,7 +114,6 @@ def deserialize(data: dict) -> "RunInfo": end_time=parser.parse(data.get("end_time")).replace(tzinfo=None), index=data.get("index", None), api_calls=data.get("api_calls", None), - variant_id=data.get("variant_id", ""), cached_run_id=data.get("cached_run_id", None), cached_flow_run_id=data.get("cached_flow_run_id", None), logs=data.get("logs", None), @@ -161,8 +157,6 @@ class FlowRunInfo: :type index: Optional[int] :param api_calls: API calls made during the flow run :type api_calls: Optional[List[Dict[str, Any]]] - :param variant_id: Variant id of the flow run - :type variant_id: Optional[str] :param name: Name of the flow run :type name: Optional[str] :param description: Description of the flow run @@ -192,7 +186,6 @@ class FlowRunInfo: end_time: datetime index: Optional[int] = None api_calls: Optional[List[Dict[str, Any]]] = None - variant_id: str = "" name: str = "" description: str = "" tags: Optional[Mapping[str, str]] = None @@ -219,7 +212,6 @@ def deserialize(data: dict) -> "FlowRunInfo": end_time=parser.parse(data.get("end_time")).replace(tzinfo=None), index=data.get("index", None), api_calls=data.get("api_calls", None), - variant_id=data.get("variant_id", ""), name=data.get("name", ""), description=data.get("description", ""), tags=data.get("tags", None), diff --git a/src/promptflow/promptflow/executor/flow_executor.py b/src/promptflow/promptflow/executor/flow_executor.py index a8817855a34..76ae6d59d49 100644 --- a/src/promptflow/promptflow/executor/flow_executor.py +++ b/src/promptflow/promptflow/executor/flow_executor.py @@ -654,12 +654,12 @@ def exec(self, inputs: dict, node_concurrency=DEFAULT_CONCURRENCY_FLOW) -> dict: return result.output or {} def _exec_in_thread(self, args) -> LineResult: - inputs, run_id, line_number, variant_id, validate_inputs = args + inputs, run_id, line_number, validate_inputs = args thread_name = current_thread().name self._processing_idx[line_number] = thread_name self._run_tracker._activate_in_context() results = self._exec( - inputs, run_id=run_id, line_number=line_number, variant_id=variant_id, validate_inputs=validate_inputs + inputs, run_id=run_id, line_number=line_number, validate_inputs=validate_inputs ) self._run_tracker._deactivate_in_context() self._processing_idx.pop(line_number) @@ -671,7 +671,6 @@ def exec_line( inputs: Mapping[str, Any], index: Optional[int] = None, run_id: Optional[str] = None, - variant_id: str = "", validate_inputs: bool = True, node_concurrency=DEFAULT_CONCURRENCY_FLOW, allow_generator_output: bool = False, @@ -685,8 +684,6 @@ def exec_line( :type index: Optional[int] :param run_id: The ID of the flow run. :type run_id: Optional[str] - :param variant_id: The ID of the variant to execute. - :type variant_id: str :param validate_inputs: Whether to validate the input values. :type validate_inputs: bool :param node_concurrency: The maximum number of nodes that can be executed concurrently. @@ -713,7 +710,6 @@ def exec_line( inputs, run_id=run_id, line_number=index, - variant_id=variant_id, validate_inputs=validate_inputs, allow_generator_output=allow_generator_output, ) @@ -727,7 +723,6 @@ async def exec_line_async( inputs: Mapping[str, Any], index: Optional[int] = None, run_id: Optional[str] = None, - variant_id: str = "", validate_inputs: bool = True, node_concurrency=DEFAULT_CONCURRENCY_FLOW, allow_generator_output: bool = False, @@ -740,8 +735,6 @@ async def exec_line_async( :type index: Optional[int] :param run_id: The ID of the flow run. :type run_id: Optional[str] - :param variant_id: The ID of the variant to execute. - :type variant_id: str :param validate_inputs: Whether to validate the input values. :type validate_inputs: bool :param node_concurrency: The maximum number of nodes that can be executed concurrently. @@ -763,7 +756,6 @@ async def exec_line_async( inputs, run_id=run_id, line_number=index, - variant_id=variant_id, validate_inputs=validate_inputs, allow_generator_output=allow_generator_output, ) @@ -884,7 +876,6 @@ def _exec( inputs: Mapping[str, Any], run_id: Optional[str] = None, line_number: Optional[int] = None, - variant_id: str = "", validate_inputs: bool = False, allow_generator_output: bool = False, ) -> LineResult: @@ -917,7 +908,6 @@ def _exec( run_id=line_run_id, parent_run_id=run_id, index=line_number, - variant_id=variant_id, ) context = FlowExecutionContext( name=self._flow.name, @@ -926,7 +916,6 @@ def _exec( run_id=run_id, flow_id=self._flow_id, line_number=line_number, - variant_id=variant_id, ) output = {} aggregation_inputs = {} @@ -962,7 +951,6 @@ async def _exec_async( inputs: Mapping[str, Any], run_id: Optional[str] = None, line_number: Optional[int] = None, - variant_id: str = "", validate_inputs: bool = False, allow_generator_output: bool = False, ) -> LineResult: @@ -997,7 +985,6 @@ async def _exec_async( parent_run_id=run_id, inputs={k: inputs[k] for k in self._flow.inputs if k in inputs}, index=line_number, - variant_id=variant_id, ) context = FlowExecutionContext( name=self._flow.name, @@ -1006,7 +993,6 @@ async def _exec_async( run_id=run_id, flow_id=self._flow_id, line_number=line_number, - variant_id=variant_id, ) output = {} aggregation_inputs = {} diff --git a/src/promptflow/tests/executor/unittests/contracts/test_run_info.py b/src/promptflow/tests/executor/unittests/contracts/test_run_info.py index 9c5ab1dc231..c0faa6ea51d 100644 --- a/src/promptflow/tests/executor/unittests/contracts/test_run_info.py +++ b/src/promptflow/tests/executor/unittests/contracts/test_run_info.py @@ -62,7 +62,6 @@ def test_deserialize(self): "end_time": "2023-11-24T06:03:20.268858Z", "index": 0, "api_calls": None, - "variant_id": "", "cached_run_id": None, "cached_flow_run_id": None, "logs": None, @@ -120,7 +119,6 @@ def test_deserialize(self): "end_time": "2023-11-23T10:58:37.9590789Z", "index": 0, "api_calls": None, - "variant_id": "", "name": "", "description": "", "tags": None, diff --git a/src/promptflow/tests/executor/unittests/storage/test_queue_run_storage.py b/src/promptflow/tests/executor/unittests/storage/test_queue_run_storage.py index 5ca66a5e62d..17a0ef4bddc 100644 --- a/src/promptflow/tests/executor/unittests/storage/test_queue_run_storage.py +++ b/src/promptflow/tests/executor/unittests/storage/test_queue_run_storage.py @@ -25,7 +25,6 @@ def test_persist_node_run(self): end_time="end_time", index="index", api_calls="api_calls", - variant_id="variant_id", cached_run_id="cached_run_id", cached_flow_run_id="cached_flow_run_id", logs="logs", @@ -54,7 +53,6 @@ def test_persist_flow_run(self): end_time="end_time", index="index", api_calls="api_calls", - variant_id="variant_id", system_metrics="system_metrics", result="result", ) diff --git a/src/promptflow/tests/executor/unittests/storage/test_run_records.py b/src/promptflow/tests/executor/unittests/storage/test_run_records.py index cd299b9ec44..f1e2bcb6963 100644 --- a/src/promptflow/tests/executor/unittests/storage/test_run_records.py +++ b/src/promptflow/tests/executor/unittests/storage/test_run_records.py @@ -27,7 +27,6 @@ def test_line_record(): start_time=start_time, end_time=end_time, index=0, - variant_id=None, ) line_record = LineRunRecord.from_run_info(flow_run_info) assert line_record.line_number == 0 @@ -56,7 +55,6 @@ def test_line_serialize(): start_time=start_time, end_time=end_time, index=0, - variant_id=None, ) line_record = LineRunRecord.from_run_info(flow_run_info) result = line_record.serialize()