Skip to content

Commit

Permalink
Enable executor recording mode (#1997)
Browse files Browse the repository at this point in the history
# Description


This PR is focused on enabling the recording injection mode for CI/tests
in the execution environment. The recording mode provides a mechanism to
record and replay end-to-end tests, enhancing the reliability and
efficiency of our testing process.

For detailed information on the recording mode, please refer to our
[documentation](https://github.com/microsoft/promptflow/blob/main/docs/dev/replay-e2e-test.md).

In this PR, we still keep the connection configuration in CI to make
sure backward compatibilities. Meanwhile, enable the recording injection
fixture to switch test into recording mode.

## **Key Features of This PR:**

### **Generation of Test Records**
The generation of records for execution tests when they are missing in
the shelve database, to make it work under recording mode.

### **Multi-Process Compatibility**
Resolves issues related to the recording injection mode in a
multi-process environment, particularly with processes
spawned/forkserver.
**_What is the issue under multi/new Process(spawn/forkserver)?_** 
Spawn/forkserver mode will not replicate the resources/setups from main
Process, including the recording setup. This would make the recording
not working anymore.

![image](https://github.com/microsoft/promptflow/assets/95913588/4cc8f68a-f61f-49ad-b2bd-6717f4d304bf)
**_How you resolved this issue?_** 
There are multiple ways to pass the recording setup in new Process, like
environment variable, serializable object as argument etc. But these
might incur interface change or too complex to squeeze into simple state
object.
We choose to re-import the state into new Process.

1) Create new target method for Process: this new target is used to
re-import the state needed

Example: For new Process target method _process_wrapper, for define
another wrapper method outside and inject the recording state
 

![image](https://github.com/microsoft/promptflow/assets/95913588/426827a0-3c1e-426d-864f-ec85ad611416)

2) Define a customized Process class with above new targets

Enable this new target method whenever new Process spawned/forkservered

![image](https://github.com/microsoft/promptflow/assets/95913588/21756ecf-fbed-4d6e-a21a-79228988b407)

3) Override context.Process or multiprocess.Process class

multiprocessing.get_context("spawn").Process = MockSpawnProcess
or

            multiprocessing.Process = MockSpawnProcess


We have implemented above logic in codes and integrated as part of
recording injection fixture for testing.


**_So all the tests under executor/sdk would intercept the third-party
call like this as default in CI?_**

Yes, all the CI is enable with "PROMPT_FLOW_TEST_MODE=replay". All the
openai/aoai related calls would be mocked to do recording result
retrieval rather than real api request.

**_Sometimes we might have necessity to customize the openai/AOAI call
in tests. How shall we do instead of using the default recording
mock?_**

Yes, 
1) Create your own target with customized mocking about openai/aoai call
.
2) Override the default recording target via context manager
"override_process_target"

Sample Tests: test_executor_openai_telemetry

**_In which scope the recording mode is enabled?_**
The recording mode is enabled per requiriment. If the test are involved
with 3rd party service with connection, like openai/aoai etc, it is
required to enable recording mode. If not, your PR will fail at the CI
tests since the connections info are not configured.
To enable recording mode, just make sure the fixture
"recording_injection" is required in your test class level or method
level.
 
_**Why not make recording mode a session-level fixture or required
fixture for all tests?**_
1) recording setup is complicated. it might introduce expected behavior
if abuse
2) Some tests like **test_executor_openai_telemetry** might server some
special test purpose. If enable recording, the customized mocking might
be not easy to be configured.

**_Note:_** 
_Above logic resolved this issue for one layer of new Processing. If you
have nested new Processing action , you need to repeat above logic at
each layer._







## **Todos and Ongoing Work[updated]:**

### **[Done]Metrics Mocks**
Currently, this openai_injector is skipped as it does not involve actual
openai/aoai requests. This omission leads to potential inaccuracies in
metrics calculation, especially token usage, in recording mode. Future
work will focus on integrating openai_injector.py into the recording
mode.
--- this fixed already by [[fundamental] Recording support metrics by
crazygao · Pull Request #1762 · microsoft/promptflow
(github.com)](#1762)

### **[Todo]Consolidation of Configuration**
Efforts are underway to consolidate the configuration settings for
recording injection in both the execution environment and the SDK,
aiming for a more unified and streamlined setup.

### **[Done]Record error info**
Record not only regular tool execution result, but also error response
if exception ocurr
----- have fixed this in this PR, sample tests:
test_executor_node_overrides

### **[Done]Test langchain**
Support langchain test with agent

---- fixed by [[fundamental] Recording support metrics by crazygao ·
Pull Request #1762 · microsoft/promptflow
(github.com)](#1762)

### **[Todo] Create pipeline CI without connection to monitor**
We need to CI without connection to identify tests which needs the
record setup.

### **[Todo] Make migration plan to make the non-connection CI as main
CI**
Need to make plan to safely switch to new record mode. Make sure there
are fallback logic if record has flow design which might hinder the new
feature delivery.

### **[Todo] Enable more apis with recording mode**
like assistant apis & tests

# All Promptflow Contribution checklist:
- [x] **The pull request does not introduce [breaking changes].**
- [ ] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [x] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [x] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [x] Title of the pull request is clear and informative.
- [ ] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [x] Pull request includes test coverage for the included changes.

---------

Co-authored-by: Philip Gao <[email protected]>
Co-authored-by: chjinche <[email protected]>
Co-authored-by: Cheng Liu <[email protected]>
Co-authored-by: Han Wang <[email protected]>
Co-authored-by: chenslucky <[email protected]>
Co-authored-by: cs_lucky <[email protected]>
Co-authored-by: Ying Chen <[email protected]>
Co-authored-by: Ying Chen <[email protected]>
Co-authored-by: chenyang <[email protected]>
Co-authored-by: Peiwen Gao <[email protected]>
  • Loading branch information
11 people authored Mar 8, 2024
1 parent 807767e commit fdb286d
Show file tree
Hide file tree
Showing 26 changed files with 594 additions and 217 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/promptflow-executor-e2e-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ jobs:
os: [ubuntu-latest]
runs-on: ${{ matrix.os }}
steps:
- name: Set test mode
run: echo "PROMPT_FLOW_TEST_MODE=$(if [[ "${{ github.event_name }}" == "pull_request" ]]; then echo replay; else echo live; fi)" >> $GITHUB_ENV
- name: checkout
uses: actions/checkout@v4
with:
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/promptflow-executor-unit-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ jobs:
os: [ubuntu-latest]
runs-on: ${{ matrix.os }}
steps:
- name: Set test mode
run: echo "PROMPT_FLOW_TEST_MODE=$(if [[ "${{ github.event_name }}" == "pull_request" ]]; then echo replay; else echo live; fi)" >> $GITHUB_ENV
- name: checkout
uses: actions/checkout@v4
with:
Expand Down
4 changes: 2 additions & 2 deletions examples/flows/evaluation/eval-summarization/data.jsonl
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{"document": "this is a test document", "summary": "test document"}
{"document": "this is a test document2", "summary": "test document2"}
{"document": "this is a test document", "summary": "test document"}
{"document": "this is a test document2", "summary": "test document2"}
208 changes: 104 additions & 104 deletions examples/flows/evaluation/eval-summarization/flow.dag.yaml
Original file line number Diff line number Diff line change
@@ -1,104 +1,104 @@
$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json
environment:
python_requirements_txt: requirements.txt
inputs:
document:
type: string
summary:
type: string
outputs:
coherence:
type: double
reference: ${score_coherence.output}
consistency:
type: double
reference: ${score_consistency.output}
fluency:
type: double
reference: ${score_fluency.output}
relevance:
type: double
reference: ${score_relevance.output}
nodes:
- name: prompt_coherence
type: prompt
source:
type: code
path: prompts/coherence.jinja2
inputs:
Document: ${inputs.document}
Summary: ${inputs.summary}
- name: score_coherence
type: python
source:
type: code
path: geval.py
inputs:
connection: open_ai_connection
prompt_with_src_and_gen: ${prompt_coherence.output}
max_score: 5
deployment_name: gpt-4
- name: prompt_consistency
type: prompt
source:
type: code
path: prompts/consistency.jinja2
inputs:
Document: ${inputs.document}
Summary: ${inputs.summary}
- name: score_consistency
type: python
source:
type: code
path: geval.py
inputs:
connection: open_ai_connection
prompt_with_src_and_gen: ${prompt_consistency.output}
max_score: 5
deployment_name: gpt-4
- name: prompt_fluency
type: prompt
source:
type: code
path: prompts/fluency.jinja2
inputs:
Summary: ${inputs.summary}
- name: score_fluency
type: python
source:
type: code
path: geval.py
inputs:
connection: open_ai_connection
prompt_with_src_and_gen: ${prompt_fluency.output}
max_score: 3
deployment_name: gpt-4
- name: prompt_relevance
type: prompt
source:
type: code
path: prompts/relevance.jinja2
inputs:
Document: ${inputs.document}
Summary: ${inputs.summary}
- name: score_relevance
type: python
source:
type: code
path: geval.py
inputs:
connection: open_ai_connection
prompt_with_src_and_gen: ${prompt_relevance.output}
max_score: 5
deployment_name: gpt-4
- name: average_scores
type: python
source:
type: code
path: average_scores.py
inputs:
fluency_list: ${score_fluency.output}
consistency_list: ${score_consistency.output}
relevance_list: ${score_relevance.output}
coherence_list: ${score_coherence.output}
aggregation: true
$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json
environment:
python_requirements_txt: requirements.txt
inputs:
document:
type: string
summary:
type: string
outputs:
coherence:
type: double
reference: ${score_coherence.output}
consistency:
type: double
reference: ${score_consistency.output}
fluency:
type: double
reference: ${score_fluency.output}
relevance:
type: double
reference: ${score_relevance.output}
nodes:
- name: prompt_coherence
type: prompt
source:
type: code
path: prompts/coherence.jinja2
inputs:
Document: ${inputs.document}
Summary: ${inputs.summary}
- name: score_coherence
type: python
source:
type: code
path: geval.py
inputs:
connection: open_ai_connection
prompt_with_src_and_gen: ${prompt_coherence.output}
max_score: 5
deployment_name: gpt-4
- name: prompt_consistency
type: prompt
source:
type: code
path: prompts/consistency.jinja2
inputs:
Document: ${inputs.document}
Summary: ${inputs.summary}
- name: score_consistency
type: python
source:
type: code
path: geval.py
inputs:
connection: open_ai_connection
prompt_with_src_and_gen: ${prompt_consistency.output}
max_score: 5
deployment_name: gpt-4
- name: prompt_fluency
type: prompt
source:
type: code
path: prompts/fluency.jinja2
inputs:
Summary: ${inputs.summary}
- name: score_fluency
type: python
source:
type: code
path: geval.py
inputs:
connection: open_ai_connection
prompt_with_src_and_gen: ${prompt_fluency.output}
max_score: 3
deployment_name: gpt-4
- name: prompt_relevance
type: prompt
source:
type: code
path: prompts/relevance.jinja2
inputs:
Document: ${inputs.document}
Summary: ${inputs.summary}
- name: score_relevance
type: python
source:
type: code
path: geval.py
inputs:
connection: open_ai_connection
prompt_with_src_and_gen: ${prompt_relevance.output}
max_score: 5
deployment_name: gpt-4
- name: average_scores
type: python
source:
type: code
path: average_scores.py
inputs:
fluency_list: ${score_fluency.output}
consistency_list: ${score_consistency.output}
relevance_list: ${score_relevance.output}
coherence_list: ${score_coherence.output}
aggregation: true
88 changes: 85 additions & 3 deletions src/promptflow/tests/executor/conftest.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,99 @@
import multiprocessing
from pathlib import Path

import pytest
from executor.process_utils import (
MockForkServerProcess,
MockSpawnProcess,
current_process_manager_var,
current_process_wrapper_var,
override_process_class,
)
from executor.record_utils import setup_recording
from fastapi.testclient import TestClient
from sdk_cli_test.recording_utilities import (
RecordStorage,
delete_count_lock_file,
is_live,
is_record,
is_replay,
recording_array_extend,
recording_array_reset,
)
from sdk_cli_test.recording_utilities.record_storage import is_recording_enabled

from promptflow.executor._line_execution_process_pool import _process_wrapper
from promptflow.executor._process_manager import create_spawned_fork_process_manager
from promptflow.executor._service.app import app
from promptflow.tracing._openai_injector import inject_openai_api

PROMPTFLOW_ROOT = Path(__file__) / "../../.."


@pytest.fixture
def recording_setup():
patches = setup_recording()
try:
yield
finally:
for patcher in patches:
patcher.stop()


def _default_mock_process_wrapper(*args, **kwargs):
# Default mock implementation of _process_wrapper in recording mode
setup_recording()
_process_wrapper(*args, **kwargs)


def _default_mock_create_spawned_fork_process_manager(*args, **kwargs):
# Default mock implementation of create_spawned_fork_process_manager in recording mode
setup_recording()
create_spawned_fork_process_manager(*args, **kwargs)


@pytest.fixture
def process_override():
# This fixture is used to override the Process class to ensure the recording mode works

# Step I: set process pool targets placeholder with customized targets
current_process_wrapper_var.set(_default_mock_process_wrapper)
current_process_manager_var.set(_default_mock_create_spawned_fork_process_manager)

# Step II: override the process pool class
process_class_dict = {"spawn": MockSpawnProcess, "forkserver": MockForkServerProcess}
original_process_class = override_process_class(process_class_dict)

try:
yield
finally:
for start_method, MockProcessClass in process_class_dict.items():
if start_method in multiprocessing.get_all_start_methods():
multiprocessing.get_context(start_method).Process = original_process_class[start_method]
if start_method == multiprocessing.get_start_method():
multiprocessing.Process = original_process_class[start_method]


@pytest.fixture
def recording_injection(recording_setup, process_override):
# This fixture is used to main entry point to inject recording mode into the test
try:
yield (is_replay() or is_record(), recording_array_extend)
finally:
if is_replay() or is_record():
RecordStorage.get_instance().delete_lock_file()
if is_live():
delete_count_lock_file()
recording_array_reset()


@pytest.fixture(autouse=True, scope="session")
def inject_api_executor():
"""Inject OpenAI API during test session.
"""Inject OpenAI API during test session when recording not enabled
AOAI call in promptflow should involve trace logging and header injection. Inject
function to API call in test scenario."""
inject_openai_api()
if not is_recording_enabled():
inject_openai_api()


@pytest.fixture(autouse=True, scope="session")
Expand Down
2 changes: 1 addition & 1 deletion src/promptflow/tests/executor/e2etests/test_activate.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
]


@pytest.mark.usefixtures("dev_connections")
@pytest.mark.usefixtures("dev_connections", "recording_injection")
@pytest.mark.e2etest
class TestExecutorActivate:
@pytest.mark.parametrize("flow_folder", ACTIVATE_FLOW_TEST_CASES)
Expand Down
7 changes: 6 additions & 1 deletion src/promptflow/tests/executor/e2etests/test_assistant.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from ..utils import get_flow_folder, get_yaml_file


@pytest.mark.usefixtures("dev_connections")
@pytest.mark.usefixtures("dev_connections", "recording_injection")
@pytest.mark.e2etest
class TestAssistant:
@pytest.mark.parametrize(
Expand All @@ -22,6 +22,11 @@ def test_assistant_tool_with_connection(self, flow_folder, line_input, dev_conne
flow_result = executor.exec_line(line_input)
print(flow_result.output)
assert flow_result.run_info.status == Status.Completed
assert len(flow_result.output["answer"]["content"]) == 1
assert flow_result.output["answer"]["content"][0]["type"] == "text"
name = line_input["name"]
assert f"Thanks for your help, {name}!" == flow_result.output["answer"]["content"][0]["text"]["value"]
assert flow_result.output["thread_id"]

@pytest.mark.parametrize(
"flow_folder, line_input",
Expand Down
11 changes: 10 additions & 1 deletion src/promptflow/tests/executor/e2etests/test_batch_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from promptflow.contracts.run_info import Status
from promptflow.executor._errors import InputNotFound

from ..conftest import setup_recording
from ..process_utils import MockForkServerProcess, MockSpawnProcess, override_process_class
from ..utils import (
MemoryRunStorage,
get_flow_expected_metrics,
Expand Down Expand Up @@ -63,6 +65,13 @@ def _run_batch_with_start_method(multiprocessing_start_method, flow_folder, inpu
batch_result, output_dir = submit_batch_run(
flow_folder, inputs_mapping, connections=dev_connections, return_output_dir=True
)
# The method is used as start method to construct new process in tests.
# We need to make sure the necessary setup in place to get pass along in new process
process_class_dict = {"spawn": MockSpawnProcess, "forkserver": MockForkServerProcess}
override_process_class(process_class_dict)

# recording injection again since this method is running in a new process
setup_recording()

assert isinstance(batch_result, BatchResult)
nlines = get_batch_inputs_line(flow_folder)
Expand Down Expand Up @@ -114,7 +123,7 @@ def __init__(self, name: str, output_path: Path):
self._run_source = None


@pytest.mark.usefixtures("use_secrets_config_file", "dev_connections")
@pytest.mark.usefixtures("use_secrets_config_file", "dev_connections", "recording_injection")
@pytest.mark.e2etest
class TestBatch:
def test_batch_storage(self, dev_connections):
Expand Down
Loading

0 comments on commit fdb286d

Please sign in to comment.