From 117dd3fdb523fd1f8f3698576c414610b2a27f4e Mon Sep 17 00:00:00 2001 From: nick863 <30440255+nick863@users.noreply.github.com> Date: Tue, 9 Jul 2024 14:46:43 -0700 Subject: [PATCH 1/6] Add start method, move write properties inside evals run --- .../promptflow/evals/evaluate/_eval_run.py | 134 +++++++++---- .../promptflow/evals/evaluate/_utils.py | 18 +- .../evals/e2etests/test_metrics_upload.py | 11 +- .../tests/evals/unittests/test_eval_run.py | 180 +++++++++++++++++- 4 files changed, 286 insertions(+), 57 deletions(-) diff --git a/src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py b/src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py index 05fd89b53f2..ccdd1875e88 100644 --- a/src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py +++ b/src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py @@ -1,7 +1,9 @@ # --------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # --------------------------------------------------------- +# import contextlib import dataclasses +import enum import logging import os import posixpath @@ -21,6 +23,7 @@ from azure.ai.ml.entities._credentials import AccountKeyConfiguration from azure.ai.ml.entities._datastore.datastore import Datastore + LOGGER = logging.getLogger(__name__) @@ -50,6 +53,14 @@ def generate(run_name: Optional[str]) -> 'RunInfo': ) +class RunStatus(enum.Enum): + """Run states.""" + NOT_STARTED = 0 + STARTED = 1 + BROKEN = 2 + TERMINATED = 3 + + class Singleton(type): """Singleton class, which will be used as a metaclass.""" @@ -117,25 +128,18 @@ def __init__(self, self._workspace_name: str = workspace_name self._ml_client: Any = ml_client self._is_promptflow_run: bool = promptflow_run is not None - self._is_broken = False - if self._tracking_uri is None: - LOGGER.warning("tracking_uri was not provided, " - "The results will be saved locally, but will not be logged to Azure.") - self._url_base = None - self._is_broken = True - self.info = RunInfo.generate(run_name) - else: - self._url_base = urlparse(self._tracking_uri).netloc - if promptflow_run is not None: - self.info = RunInfo( - promptflow_run.name, - promptflow_run._experiment_name, - promptflow_run.name - ) - else: - self._is_broken = self._start_run(run_name) + self._run_name = run_name + self._promptflow_run = promptflow_run + self._status = RunStatus.NOT_STARTED + + @property + def status(self) -> RunStatus: + """ + Return the run status. - self._is_terminated = False + :return: The status of the run. + """ + return self._status def _get_scope(self) -> str: """ @@ -154,7 +158,29 @@ def _get_scope(self) -> str: self._workspace_name, ) - def _start_run(self, run_name: Optional[str]) -> bool: + def start_run(self) -> None: + """ + Start the run, or, if it is not applicable (for example, if tracking is not enabled), mark it as started. + """ + self._status = RunStatus.STARTED + if self._tracking_uri is None: + LOGGER.warning("tracking_uri was not provided, " + "The results will be saved locally, but will not be logged to Azure.") + self._url_base = None + self._status = RunStatus.BROKEN + self.info = RunInfo.generate(self._run_name) + else: + self._url_base = urlparse(self._tracking_uri).netloc + if self._promptflow_run is not None: + self.info = RunInfo( + self._promptflow_run.name, + self._promptflow_run._experiment_name, + self._promptflow_run.name + ) + else: + self._status = self._start_run(self._run_name) + + def _start_run(self, run_name: Optional[str]) -> 'RunStatus': """ Make a request to start the mlflow run. If the run will not start, it will be @@ -181,41 +207,43 @@ def _start_run(self, run_name: Optional[str]) -> bool: self.info = RunInfo.generate(run_name) LOGGER.warning(f"The run failed to start: {response.status_code}: {response.text}." "The results will be saved locally, but will not be logged to Azure.") - return True + return RunStatus.BROKEN parsed_response = response.json() self.info = RunInfo( run_id=parsed_response['run']['info']['run_id'], experiment_id=parsed_response['run']['info']['experiment_id'], run_name=parsed_response['run']['info']['run_name'] ) - return False + return RunStatus.STARTED - def end_run(self, status: str) -> None: + def end_run(self, reason: str) -> None: """ Tetminate the run. - :param status: One of "FINISHED" "FAILED" and "KILLED" - :type status: str + :param reason: One of "FINISHED" "FAILED" and "KILLED" + :type reason: str :raises: ValueError if the run is not in ("FINISHED", "FAILED", "KILLED") """ + self._raise_not_started_nay_be() if self._is_promptflow_run: # This run is already finished, we just add artifacts/metrics to it. + self._status = RunStatus.TERMINATED Singleton.destroy(EvalRun) return - if status not in ("FINISHED", "FAILED", "KILLED"): + if reason not in ("FINISHED", "FAILED", "KILLED"): raise ValueError( - f"Incorrect terminal status {status}. " 'Valid statuses are "FINISHED", "FAILED" and "KILLED".' + f"Incorrect terminal status {reason}. " 'Valid statuses are "FINISHED", "FAILED" and "KILLED".' ) - if self._is_terminated: + if self._status == RunStatus.TERMINATED: LOGGER.warning("Unable to stop run because it was already terminated.") return - if self._is_broken: + if self._status == RunStatus.BROKEN: LOGGER.warning("Unable to stop run because the run failed to start.") return url = f"https://{self._url_base}/mlflow/v2.0" f"{self._get_scope()}/api/2.0/mlflow/runs/update" body = { "run_uuid": self.info.run_id, - "status": status, + "status": reason, "end_time": int(time.time() * 1000), "run_id": self.info.run_id, } @@ -223,7 +251,16 @@ def end_run(self, status: str) -> None: if response.status_code != 200: LOGGER.warning("Unable to terminate the run.") Singleton.destroy(EvalRun) - self._is_terminated = True + self._status = RunStatus.TERMINATED + + def __enter__(self): + """The Context Manager enter call.""" + self.start_run() + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + """The context manager exit call.""" + self.start_run() def get_run_history_uri(self) -> str: """ @@ -304,6 +341,17 @@ def _log_warning(self, failed_op: str, response: requests.Response) -> None: f"{response.text=}." ) + def _raise_not_started_nay_be(self) -> None: + """ + Raise value error if the run was not started. + + :raises: ValueError + """ + if self._status == RunStatus.NOT_STARTED: + raise ValueError( + "The run did not started. " + "Please start the run by calling start_run method.") + def log_artifact(self, artifact_folder: str, artifact_name: str = EVALUATION_ARTIFACT) -> None: """ The local implementation of mlflow-like artifact logging. @@ -314,7 +362,8 @@ def log_artifact(self, artifact_folder: str, artifact_name: str = EVALUATION_ART :param artifact_folder: The folder with artifacts to be uploaded. :type artifact_folder: str """ - if self._is_broken: + self._raise_not_started_nay_be() + if self._status == RunStatus.BROKEN: LOGGER.warning("Unable to log artifact because the run failed to start.") return # Check if artifact dirrectory is empty or does not exist. @@ -402,7 +451,8 @@ def log_metric(self, key: str, value: float) -> None: :param value: The valure to be logged. :type value: float """ - if self._is_broken: + self._raise_not_started_nay_be() + if self._status == RunStatus.BROKEN: LOGGER.warning("Unable to log metric because the run failed to start.") return body = { @@ -421,6 +471,26 @@ def log_metric(self, key: str, value: float) -> None: if response.status_code != 200: self._log_warning("save metrics", response) + def write_properties_to_run_history(self, properties: Dict[str, Any]) -> None: + """ + Write properties to the RunHistory service. + + :param properties: The properties to be written to run history. + :type properties: dict + """ + self._raise_not_started_nay_be() + if self._status == RunStatus.BROKEN: + LOGGER.warning("Unable to write properties because the run failed to start.") + return + # update host to run history and request PATCH API + response = self.request_with_retry( + url=self.get_run_history_uri(), + method="PATCH", + json_dict={"runId": self.info.run_id, "properties": properties}, + ) + if response.status_code != 200: + LOGGER.error("Fail writing properties '%s' to run history: %s", properties, response.text) + @staticmethod def get_instance(*args, **kwargs) -> "EvalRun": """ diff --git a/src/promptflow-evals/promptflow/evals/evaluate/_utils.py b/src/promptflow-evals/promptflow/evals/evaluate/_utils.py index 5ff657707c0..da10373420b 100644 --- a/src/promptflow-evals/promptflow/evals/evaluate/_utils.py +++ b/src/promptflow-evals/promptflow/evals/evaluate/_utils.py @@ -48,22 +48,6 @@ def load_jsonl(path): return [json.loads(line) for line in f.readlines()] -def _write_properties_to_run_history(properties: dict) -> None: - run = EvalRun.get_instance() - try: - # update host to run history and request PATCH API - response = run.request_with_retry( - url=run.get_run_history_uri(), - method="PATCH", - json_dict={"runId": run.info.run_id, "properties": properties}, - ) - if response.status_code != 200: - LOGGER.error("Fail writing properties '%s' to run history: %s", properties, response.text) - response.raise_for_status() - except AttributeError as e: - LOGGER.error("Fail writing properties '%s' to run history: %s", properties, e) - - def _azure_pf_client_and_triad(trace_destination): from promptflow.azure._cli._utils import _get_azure_pf_client @@ -115,7 +99,7 @@ def _log_metrics_and_instance_results( # adding these properties to avoid showing traces if a dummy run is created. # We are doing that only for the pure evaluation runs. if run is None: - _write_properties_to_run_history( + ev_run.write_properties_to_run_history( properties={ "_azureml.evaluation_run": "azure-ai-generative-parent", "_azureml.evaluate_artifacts": json.dumps([{"path": artifact_name, "type": "table"}]), diff --git a/src/promptflow-evals/tests/evals/e2etests/test_metrics_upload.py b/src/promptflow-evals/tests/evals/e2etests/test_metrics_upload.py index 24ca1dd743c..ea79fc97289 100644 --- a/src/promptflow-evals/tests/evals/e2etests/test_metrics_upload.py +++ b/src/promptflow-evals/tests/evals/e2etests/test_metrics_upload.py @@ -66,7 +66,7 @@ def _assert_no_errors_for_module(self, records, module_names): @pytest.mark.usefixtures("vcr_recording") def test_writing_to_run_history(self, setup_data, caplog): """Test logging data to RunHistory service.""" - logger = logging.getLogger(ev_utils.__name__) + logger = logging.getLogger(EvalRun.__module__) # All loggers, having promptflow. prefix will have "promptflow" logger # as a parent. This logger does not propagate the logs and cannot be # captured by caplog. Here we will skip this logger to capture logs. @@ -74,12 +74,13 @@ def test_writing_to_run_history(self, setup_data, caplog): # Just for sanity check let us make sure that the logging actually works mock_response = MagicMock() mock_response.status_code = 418 + ev_run = EvalRun.get_instance() with patch("promptflow.evals.evaluate._eval_run.EvalRun.request_with_retry", return_value=mock_response): - ev_utils._write_properties_to_run_history({"test": 42}) + ev_run.write_properties_to_run_history({"test": 42}) assert any(lg_rec.levelno == logging.ERROR for lg_rec in caplog.records), "The error log was not captured!" caplog.clear() - ev_utils._write_properties_to_run_history({"test": 42}) - self._assert_no_errors_for_module(caplog.records, [ev_utils.__name__]) + ev_run.write_properties_to_run_history({"test": 42}) + self._assert_no_errors_for_module(caplog.records, [EvalRun.__module__]) @pytest.mark.usefixtures("vcr_recording") def test_logging_metrics(self, setup_data, caplog): @@ -159,7 +160,7 @@ def test_e2e_run_target_fn(self, caplog, project_scope, questions_answers_file, evaluators={"f1": f1_score_eval}, azure_ai_project=project_scope, ) - self._assert_no_errors_for_module(caplog.records, (ev_utils.__name__, EvalRun.__module__)) + self._assert_no_errors_for_module(caplog.records, (EvalRun.__name__, EvalRun.__module__)) @pytest.mark.skip(reason="Test runs individually but not when run with entire suite.") @pytest.mark.usefixtures("vcr_recording") diff --git a/src/promptflow-evals/tests/evals/unittests/test_eval_run.py b/src/promptflow-evals/tests/evals/unittests/test_eval_run.py index bc5fe0b4208..ce596011655 100644 --- a/src/promptflow-evals/tests/evals/unittests/test_eval_run.py +++ b/src/promptflow-evals/tests/evals/unittests/test_eval_run.py @@ -10,7 +10,7 @@ import promptflow.evals.evaluate._utils as ev_utils from promptflow.azure._utils._token_cache import ArmTokenCache -from promptflow.evals.evaluate._eval_run import EvalRun, Singleton +from promptflow.evals.evaluate._eval_run import EvalRun, Singleton, RunStatus @pytest.fixture @@ -57,6 +57,7 @@ def test_end_raises(self, token_mock, setup_data, status, should_raise, caplog): workspace_name="mock", ml_client=MagicMock(), ) + run.start_run() if should_raise: with pytest.raises(ValueError) as cm: run.end_run(status) @@ -94,6 +95,7 @@ def test_run_logs_if_terminated(self, token_mock, setup_data, caplog): workspace_name="mock", ml_client=MagicMock(), ) + run.start_run() run.end_run("KILLED") run.end_run("KILLED") assert len(caplog.records) == 1 @@ -130,6 +132,7 @@ def test_end_logs_if_fails(self, token_mock, setup_data, caplog): workspace_name="mock", ml_client=MagicMock(), ) + run.start_run() run.end_run("FINISHED") assert len(caplog.records) == 1 assert "Unable to terminate the run." in caplog.records[0].message @@ -155,6 +158,7 @@ def test_start_run_fails(self, token_mock, setup_data, caplog): workspace_name="mock", ml_client=MagicMock(), ) + run.start_run() assert len(caplog.records) == 1 assert "500" in caplog.records[0].message assert mock_response_start.text in caplog.records[0].message @@ -214,6 +218,7 @@ def test_singleton(self, mock_session_cls, token_mock, setup_data, destroy_run, ml_client=MagicMock(), ) id1 = id(run) + run.start_run() if destroy_run: run.end_run("FINISHED") id2 = id( @@ -253,6 +258,7 @@ def test_run_name(self, mock_session_cls, token_mock, setup_data): workspace_name="mock", ml_client=MagicMock(), ) + run.start_run() assert run.info.run_id == mock_response.json.return_value['run']['info']['run_id'] assert run.info.experiment_id == mock_response.json.return_value[ 'run']['info']['experiment_id'] @@ -283,6 +289,7 @@ def test_run_with_name(self, mock_session_cls, token_mock, setup_data): workspace_name="mock", ml_client=MagicMock(), ) + run.start_run() assert run.info.run_id == mock_response.json.return_value['run']['info']['run_id'] assert run.info.experiment_id == mock_response.json.return_value[ 'run']['info']['experiment_id'] @@ -319,6 +326,7 @@ def test_get_urls(self, mock_session_cls, token_mock, setup_data): workspace_name="mock-ws-region", ml_client=MagicMock(), ) + run.start_run() assert run.get_run_history_uri() == ( "https://region.api.azureml.ms/history/v1.0/subscriptions" "/000000-0000-0000-0000-0000000/resourceGroups/mock-rg-region" @@ -384,6 +392,7 @@ def test_log_artifacts_logs_error(self, token_mock, setup_data, tmp_path, caplog workspace_name="mock-ws-region", ml_client=MagicMock(), ) + run.start_run() logger = logging.getLogger(EvalRun.__module__) # All loggers, having promptflow. prefix will have "promptflow" logger @@ -411,7 +420,15 @@ def test_log_artifacts_logs_error(self, token_mock, setup_data, tmp_path, caplog (True, False, "The run results file was not found, skipping artifacts upload.") ] ) - def test_wrong_artifact_path(self, token_mock, tmp_path, caplog, dir_exists, dir_empty, expected_error, setup_data): + def test_wrong_artifact_path( + self, + token_mock, + tmp_path, + caplog, + dir_exists, + dir_empty, + expected_error, + setup_data): """Test that if artifact path is empty, or dies not exist we are logging the error.""" mock_response = MagicMock() mock_response.status_code = 200 @@ -440,6 +457,7 @@ def test_wrong_artifact_path(self, token_mock, tmp_path, caplog, dir_exists, dir workspace_name="mock-ws-region", ml_client=MagicMock(), ) + run.start_run() logger = logging.getLogger(EvalRun.__module__) # All loggers, having promptflow. prefix will have "promptflow" logger # as a parent. This logger does not propagate the logs and cannot be @@ -470,7 +488,7 @@ def test_log_metrics_and_instance_results_logs_error(self, token_mock, caplog, s assert len(caplog.records) == 1 assert "Unable to log traces as trace destination was not defined." in caplog.records[0].message - def test_run_broken_if_no_tracking_uri(self, setup_data, caplog): + def test_run_broken_if_no_tracking_uri(self, token_mock, setup_data, caplog): """Test that if no tracking URI is provirded, the run is being marked as broken.""" logger = logging.getLogger(ev_utils.__name__) # All loggers, having promptflow. prefix will have "promptflow" logger @@ -485,9 +503,165 @@ def test_run_broken_if_no_tracking_uri(self, setup_data, caplog): workspace_name='mock', ml_client=MagicMock() ) + run.start_run() assert len(caplog.records) == 1 assert "The results will be saved locally, but will not be logged to Azure." in caplog.records[0].message with patch('promptflow.evals.evaluate._eval_run.EvalRun.request_with_retry') as mock_request: run.log_artifact('mock_dir') run.log_metric('foo', 42) + run.write_properties_to_run_history({'foo': 'bar'}) mock_request.assert_not_called() + + @pytest.mark.parametrize('status_code,pf_run', [ + (401, False), + (200, False), + (401, True), + (200, True), + ]) + def test_lifecycle(self, token_mock, status_code, pf_run, setup_data): + """Test the run statuses throughout its life cycle.""" + pf_run_mock = None + if pf_run: + pf_run_mock = MagicMock() + pf_run_mock.name = 'mock_pf_run' + pf_run_mock._experiment_name = 'mock_pf_experiment' + mock_response = MagicMock() + mock_response.status_code = status_code + mock_response.json.return_value = { + "run": { + "info": { + "run_id": str(uuid4()), + "experiment_id": str(uuid4()), + "run_name": str(uuid4()) + } + } + } + mock_session = MagicMock() + mock_session.request.return_value = mock_response + with patch("promptflow.evals.evaluate._eval_run.requests.Session", return_value=mock_session): + run = EvalRun( + run_name="test", + tracking_uri=( + "https://region.api.azureml.ms/mlflow/v2.0/subscriptions" + "/000000-0000-0000-0000-0000000/resourceGroups/mock-rg-region" + "/providers/Microsoft.MachineLearningServices" + "/workspaces/mock-ws-region" + ), + subscription_id="000000-0000-0000-0000-0000000", + group_name="mock-rg-region", + workspace_name="mock-ws-region", + ml_client=MagicMock(), + promptflow_run=pf_run_mock + ) + assert run.status == RunStatus.NOT_STARTED, f'Get {run.status}, expected {RunStatus.NOT_STARTED}' + run.start_run() + if status_code == 200 or pf_run: + assert run.status == RunStatus.STARTED, f'Get {run.status}, expected {RunStatus.STARTED}' + else: + assert run.status == RunStatus.BROKEN, f'Get {run.status}, expected {RunStatus.BROKEN}' + run.end_run("FINISHED") + if status_code == 200 or pf_run: + assert run.status == RunStatus.TERMINATED, f'Get {run.status}, expected {RunStatus.TERMINATED}' + else: + assert run.status == RunStatus.BROKEN, f'Get {run.status}, expected {RunStatus.BROKEN}' + + def test_local_lifecycle(self, token_mock, setup_data): + """Test that the local run have correct statuses.""" + run = EvalRun( + run_name=None, + tracking_uri=None, + subscription_id='mock', + group_name='mock', + workspace_name='mock', + ml_client=MagicMock() + ) + assert run.status == RunStatus.NOT_STARTED, f'Get {run.status}, expected {RunStatus.NOT_STARTED}' + run.start_run() + assert run.status == RunStatus.BROKEN, f'Get {run.status}, expected {RunStatus.BROKEN}' + run.end_run("FINISHED") + assert run.status == RunStatus.BROKEN, f'Get {run.status}, expected {RunStatus.BROKEN}' + + @pytest.mark.parametrize('status_code', [200, 401]) + def test_write_properties(self, token_mock, setup_data, caplog, status_code): + """Test writing properties to the evaluate run.""" + mock_start = MagicMock() + mock_start.status_code = 200 + mock_start.text = 'Mock error' + mock_start.json.return_value = { + "run": { + "info": { + "run_id": str(uuid4()), + "experiment_id": str(uuid4()), + "run_name": str(uuid4()) + } + } + } + mock_write = MagicMock() + mock_write.status_code = status_code + mock_write.text = 'Mock error' + mock_session = MagicMock() + mock_session.request.side_effect = [mock_start, mock_write] + with patch("promptflow.evals.evaluate._eval_run.requests.Session", return_value=mock_session): + run = EvalRun( + run_name="test", + tracking_uri=( + "https://region.api.azureml.ms/mlflow/v2.0/subscriptions" + "/000000-0000-0000-0000-0000000/resourceGroups/mock-rg-region" + "/providers/Microsoft.MachineLearningServices" + "/workspaces/mock-ws-region" + ), + subscription_id="000000-0000-0000-0000-0000000", + group_name="mock-rg-region", + workspace_name="mock-ws-region", + ml_client=MagicMock(), + ) + run.start_run() + run.write_properties_to_run_history({'foo': 'bar'}) + if status_code != 200: + assert len(caplog.records) == 1 + assert 'Fail writing properties' in caplog.records[0].message + assert mock_write.text in caplog.records[0].message + else: + assert len(caplog.records) == 0 + + def test_write_properties_to_run_history_logs_error(self, token_mock, caplog, setup_data): + """Test that we are logging the error when there is no trace destination.""" + logger = logging.getLogger(EvalRun.__module__) + # All loggers, having promptflow. prefix will have "promptflow" logger + # as a parent. This logger does not propagate the logs and cannot be + # captured by caplog. Here we will skip this logger to capture logs. + logger.parent = logging.root + run = EvalRun( + run_name=None, + tracking_uri=None, + subscription_id='mock', + group_name='mock', + workspace_name='mock', + ml_client=MagicMock() + ) + run.start_run() + run.write_properties_to_run_history({'foo': 'bar'}) + assert len(caplog.records) == 2 + assert "Unable to write properties because the run failed to start." in caplog.records[1].message + + @pytest.mark.parametrize( + 'function_literal,args', + [ + ('write_properties_to_run_history', ({'foo': 'bar'})), + ('log_metric', ('foo', 42)), + ('log_artifact', ('mock_folder',)) + ] + ) + def test_raises_if_not_started(self, token_mock, setup_data, function_literal, args): + """Test that all public functions are raising exception if run is not started.""" + run = EvalRun( + run_name=None, + tracking_uri=None, + subscription_id='mock', + group_name='mock', + workspace_name='mock', + ml_client=MagicMock() + ) + with pytest.raises(ValueError) as cm: + getattr(run, function_literal)(*args) + assert "The run did not started." in cm.value.args[0] From 41fcc9a965c3794f9179fe66e0a2fa0075283826 Mon Sep 17 00:00:00 2001 From: nick863 <30440255+nick863@users.noreply.github.com> Date: Tue, 9 Jul 2024 16:35:31 -0700 Subject: [PATCH 2/6] Make eval run an context manager --- .../promptflow/evals/evaluate/_eval_run.py | 42 +- .../promptflow/evals/evaluate/_utils.py | 45 +- .../evals/e2etests/test_metrics_upload.py | 96 ++-- .../tests/evals/unittests/test_eval_run.py | 412 ++++++------------ 4 files changed, 217 insertions(+), 378 deletions(-) diff --git a/src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py b/src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py index ccdd1875e88..85326f0ce28 100644 --- a/src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py +++ b/src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py @@ -1,7 +1,7 @@ # --------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # --------------------------------------------------------- -# import contextlib +import contextlib import dataclasses import enum import logging @@ -10,7 +10,7 @@ import requests import time import uuid -from typing import Any, Dict, Optional, Type +from typing import Any, Dict, Optional from urllib.parse import urlparse from azure.storage.blob import BlobServiceClient @@ -61,28 +61,7 @@ class RunStatus(enum.Enum): TERMINATED = 3 -class Singleton(type): - """Singleton class, which will be used as a metaclass.""" - - _instances = {} - - def __call__(cls, *args, **kwargs): - """Redefinition of call to return one instance per type.""" - if cls not in Singleton._instances: - Singleton._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) - return Singleton._instances[cls] - - @staticmethod - def destroy(cls: Type) -> None: - """ - Destroy the singleton instance. - - :param cls: The class to be destroyed. - """ - Singleton._instances.pop(cls, None) - - -class EvalRun(metaclass=Singleton): +class EvalRun(contextlib.AbstractContextManager): """ The simple singleton run class, used for accessing artifact store. @@ -162,6 +141,8 @@ def start_run(self) -> None: """ Start the run, or, if it is not applicable (for example, if tracking is not enabled), mark it as started. """ + if self._status != RunStatus.NOT_STARTED: + raise ValueError("The run has already started. Please end this run and to start another one.") self._status = RunStatus.STARTED if self._tracking_uri is None: LOGGER.warning("tracking_uri was not provided, " @@ -228,7 +209,6 @@ def end_run(self, reason: str) -> None: if self._is_promptflow_run: # This run is already finished, we just add artifacts/metrics to it. self._status = RunStatus.TERMINATED - Singleton.destroy(EvalRun) return if reason not in ("FINISHED", "FAILED", "KILLED"): raise ValueError( @@ -250,7 +230,6 @@ def end_run(self, reason: str) -> None: response = self.request_with_retry(url=url, method="POST", json_dict=body) if response.status_code != 200: LOGGER.warning("Unable to terminate the run.") - Singleton.destroy(EvalRun) self._status = RunStatus.TERMINATED def __enter__(self): @@ -260,7 +239,7 @@ def __enter__(self): def __exit__(self, exc_type, exc_value, exc_tb): """The context manager exit call.""" - self.start_run() + self.end_run("FINISHED") def get_run_history_uri(self) -> str: """ @@ -490,12 +469,3 @@ def write_properties_to_run_history(self, properties: Dict[str, Any]) -> None: ) if response.status_code != 200: LOGGER.error("Fail writing properties '%s' to run history: %s", properties, response.text) - - @staticmethod - def get_instance(*args, **kwargs) -> "EvalRun": - """ - The convenience method to the the EvalRun instance. - - :return: The EvalRun instance. - """ - return EvalRun(*args, **kwargs) diff --git a/src/promptflow-evals/promptflow/evals/evaluate/_utils.py b/src/promptflow-evals/promptflow/evals/evaluate/_utils.py index da10373420b..cc41848ed2a 100644 --- a/src/promptflow-evals/promptflow/evals/evaluate/_utils.py +++ b/src/promptflow-evals/promptflow/evals/evaluate/_utils.py @@ -74,7 +74,7 @@ def _log_metrics_and_instance_results( # Adding line_number as index column this is needed by UI to form link to individual instance run instance_results["line_number"] = instance_results.index.values - ev_run = EvalRun( + with EvalRun( run_name=run.name if run is not None else evaluation_name, tracking_uri=tracking_uri, subscription_id=ws_triad.subscription_id, @@ -82,35 +82,34 @@ def _log_metrics_and_instance_results( workspace_name=ws_triad.workspace_name, ml_client=azure_pf_client.ml_client, promptflow_run=run, - ) + ) as ev_run: - artifact_name = EvalRun.EVALUATION_ARTIFACT if run else EvalRun.EVALUATION_ARTIFACT_DUMMY_RUN + artifact_name = EvalRun.EVALUATION_ARTIFACT if run else EvalRun.EVALUATION_ARTIFACT_DUMMY_RUN - with tempfile.TemporaryDirectory() as tmpdir: - tmp_path = os.path.join(tmpdir, artifact_name) + with tempfile.TemporaryDirectory() as tmpdir: + tmp_path = os.path.join(tmpdir, artifact_name) - with open(tmp_path, "w", encoding="utf-8") as f: - f.write(instance_results.to_json(orient="records", lines=True)) + with open(tmp_path, "w", encoding="utf-8") as f: + f.write(instance_results.to_json(orient="records", lines=True)) - ev_run.log_artifact(tmpdir, artifact_name) + ev_run.log_artifact(tmpdir, artifact_name) - # Using mlflow to create a dummy run since once created via PF show traces of dummy run in UI. - # Those traces can be confusing. - # adding these properties to avoid showing traces if a dummy run is created. - # We are doing that only for the pure evaluation runs. - if run is None: - ev_run.write_properties_to_run_history( - properties={ - "_azureml.evaluation_run": "azure-ai-generative-parent", - "_azureml.evaluate_artifacts": json.dumps([{"path": artifact_name, "type": "table"}]), - "isEvaluatorRun": "true", - } - ) + # Using mlflow to create a dummy run since once created via PF show traces of dummy run in UI. + # Those traces can be confusing. + # adding these properties to avoid showing traces if a dummy run is created. + # We are doing that only for the pure evaluation runs. + if run is None: + ev_run.write_properties_to_run_history( + properties={ + "_azureml.evaluation_run": "azure-ai-generative-parent", + "_azureml.evaluate_artifacts": json.dumps([{"path": artifact_name, "type": "table"}]), + "isEvaluatorRun": "true", + } + ) - for metric_name, metric_value in metrics.items(): - ev_run.log_metric(metric_name, metric_value) + for metric_name, metric_value in metrics.items(): + ev_run.log_metric(metric_name, metric_value) - ev_run.end_run("FINISHED") evaluation_id = ev_run.info.run_name if run is not None else ev_run.info.run_id return _get_ai_studio_url(trace_destination=trace_destination, evaluation_id=evaluation_id) diff --git a/src/promptflow-evals/tests/evals/e2etests/test_metrics_upload.py b/src/promptflow-evals/tests/evals/e2etests/test_metrics_upload.py index ea79fc97289..232f80d36fc 100644 --- a/src/promptflow-evals/tests/evals/e2etests/test_metrics_upload.py +++ b/src/promptflow-evals/tests/evals/e2etests/test_metrics_upload.py @@ -33,18 +33,8 @@ def questions_file(): @pytest.fixture -def setup_data(azure_ml_client, project_scope): - tracking_uri = azure_ml_client.workspaces.get(project_scope["project_name"]).mlflow_tracking_uri - run = EvalRun( - run_name='test', - tracking_uri=tracking_uri, - subscription_id=project_scope["subscription_id"], - group_name=project_scope["resource_group_name"], - workspace_name=project_scope["project_name"], - ml_client=azure_ml_client - ) - yield - run.end_run("FINISHED") +def tracking_uri(azure_ml_client, project_scope): + return azure_ml_client.workspaces.get(project_scope["project_name"]).mlflow_tracking_uri @pytest.mark.usefixtures("model_config", "recording_injection", "project_scope") @@ -64,7 +54,7 @@ def _assert_no_errors_for_module(self, records, module_names): assert not error_messages, "\n".join(error_messages) @pytest.mark.usefixtures("vcr_recording") - def test_writing_to_run_history(self, setup_data, caplog): + def test_writing_to_run_history(self, caplog, project_scope, azure_ml_client, tracking_uri): """Test logging data to RunHistory service.""" logger = logging.getLogger(EvalRun.__module__) # All loggers, having promptflow. prefix will have "promptflow" logger @@ -74,55 +64,77 @@ def test_writing_to_run_history(self, setup_data, caplog): # Just for sanity check let us make sure that the logging actually works mock_response = MagicMock() mock_response.status_code = 418 - ev_run = EvalRun.get_instance() - with patch("promptflow.evals.evaluate._eval_run.EvalRun.request_with_retry", return_value=mock_response): + with EvalRun( + run_name='test', + tracking_uri=tracking_uri, + subscription_id=project_scope["subscription_id"], + group_name=project_scope["resource_group_name"], + workspace_name=project_scope["project_name"], + ml_client=azure_ml_client + ) as ev_run: + with patch("promptflow.evals.evaluate._eval_run.EvalRun.request_with_retry", return_value=mock_response): + ev_run.write_properties_to_run_history({"test": 42}) + assert any( + lg_rec.levelno == logging.ERROR for lg_rec in caplog.records), "The error log was not captured!" + caplog.clear() ev_run.write_properties_to_run_history({"test": 42}) - assert any(lg_rec.levelno == logging.ERROR for lg_rec in caplog.records), "The error log was not captured!" - caplog.clear() - ev_run.write_properties_to_run_history({"test": 42}) self._assert_no_errors_for_module(caplog.records, [EvalRun.__module__]) @pytest.mark.usefixtures("vcr_recording") - def test_logging_metrics(self, setup_data, caplog): + def test_logging_metrics(self, caplog, project_scope, azure_ml_client, tracking_uri): """Test logging metrics.""" logger = logging.getLogger(EvalRun.__module__) # All loggers, having promptflow. prefix will have "promptflow" logger # as a parent. This logger does not propagate the logs and cannot be # captured by caplog. Here we will skip this logger to capture logs. logger.parent = logging.root - ev_run = EvalRun.get_instance() - mock_response = MagicMock() - mock_response.status_code = 418 - with patch("promptflow.evals.evaluate._eval_run.EvalRun.request_with_retry", return_value=mock_response): + with EvalRun( + run_name='test', + tracking_uri=tracking_uri, + subscription_id=project_scope["subscription_id"], + group_name=project_scope["resource_group_name"], + workspace_name=project_scope["project_name"], + ml_client=azure_ml_client + ) as ev_run: + mock_response = MagicMock() + mock_response.status_code = 418 + with patch("promptflow.evals.evaluate._eval_run.EvalRun.request_with_retry", return_value=mock_response): + ev_run.log_metric("f1", 0.54) + assert any( + lg_rec.levelno == logging.WARNING for lg_rec in caplog.records), "The error log was not captured!" + caplog.clear() ev_run.log_metric("f1", 0.54) - assert any( - lg_rec.levelno == logging.WARNING for lg_rec in caplog.records), "The error log was not captured!" - caplog.clear() - ev_run.log_metric("f1", 0.54) self._assert_no_errors_for_module(caplog.records, EvalRun.__module__) @pytest.mark.usefixtures("vcr_recording") - def test_log_artifact(self, setup_data, caplog, tmp_path): + def test_log_artifact(self, project_scope, azure_ml_client, tracking_uri, caplog, tmp_path): """Test uploading artifact to the service.""" logger = logging.getLogger(EvalRun.__module__) # All loggers, having promptflow. prefix will have "promptflow" logger # as a parent. This logger does not propagate the logs and cannot be # captured by caplog. Here we will skip this logger to capture logs. logger.parent = logging.root - ev_run = EvalRun.get_instance() - mock_response = MagicMock() - mock_response.status_code = 418 - with open(os.path.join(tmp_path, EvalRun.EVALUATION_ARTIFACT), 'w') as fp: - json.dump({'f1': 0.5}, fp) - os.makedirs(os.path.join(tmp_path, 'internal_dir'), exist_ok=True) - with open(os.path.join(tmp_path, 'internal_dir', 'test.json'), 'w') as fp: - json.dump({'internal_f1': 0.6}, fp) - with patch('promptflow.evals.evaluate._eval_run.EvalRun.request_with_retry', return_value=mock_response): + with EvalRun( + run_name='test', + tracking_uri=tracking_uri, + subscription_id=project_scope["subscription_id"], + group_name=project_scope["resource_group_name"], + workspace_name=project_scope["project_name"], + ml_client=azure_ml_client + ) as ev_run: + mock_response = MagicMock() + mock_response.status_code = 418 + with open(os.path.join(tmp_path, EvalRun.EVALUATION_ARTIFACT), 'w') as fp: + json.dump({'f1': 0.5}, fp) + os.makedirs(os.path.join(tmp_path, 'internal_dir'), exist_ok=True) + with open(os.path.join(tmp_path, 'internal_dir', 'test.json'), 'w') as fp: + json.dump({'internal_f1': 0.6}, fp) + with patch('promptflow.evals.evaluate._eval_run.EvalRun.request_with_retry', return_value=mock_response): + ev_run.log_artifact(tmp_path) + assert any( + lg_rec.levelno == logging.WARNING for lg_rec in caplog.records), "The error log was not captured!" + caplog.clear() ev_run.log_artifact(tmp_path) - assert any( - lg_rec.levelno == logging.WARNING for lg_rec in caplog.records), "The error log was not captured!" - caplog.clear() - ev_run.log_artifact(tmp_path) self._assert_no_errors_for_module(caplog.records, EvalRun.__module__) @pytest.mark.skip(reason="Test runs individually but not when run with entire suite.") @@ -160,7 +172,7 @@ def test_e2e_run_target_fn(self, caplog, project_scope, questions_answers_file, evaluators={"f1": f1_score_eval}, azure_ai_project=project_scope, ) - self._assert_no_errors_for_module(caplog.records, (EvalRun.__name__, EvalRun.__module__)) + self._assert_no_errors_for_module(caplog.records, (ev_utils.__name__, EvalRun.__module__)) @pytest.mark.skip(reason="Test runs individually but not when run with entire suite.") @pytest.mark.usefixtures("vcr_recording") diff --git a/src/promptflow-evals/tests/evals/unittests/test_eval_run.py b/src/promptflow-evals/tests/evals/unittests/test_eval_run.py index ce596011655..9543a85f21e 100644 --- a/src/promptflow-evals/tests/evals/unittests/test_eval_run.py +++ b/src/promptflow-evals/tests/evals/unittests/test_eval_run.py @@ -10,14 +10,7 @@ import promptflow.evals.evaluate._utils as ev_utils from promptflow.azure._utils._token_cache import ArmTokenCache -from promptflow.evals.evaluate._eval_run import EvalRun, Singleton, RunStatus - - -@pytest.fixture -def setup_data(): - """Make sure, we will destroy the EvalRun instance as it is singleton.""" - yield - Singleton._instances.clear() +from promptflow.evals.evaluate._eval_run import EvalRun, RunStatus def generate_mock_token(): @@ -30,57 +23,59 @@ def generate_mock_token(): class TestEvalRun: """Unit tests for the eval-run object.""" + def _get_mock_create_resonse(self, status=200): + """Return the mock create request""" + mock_response = MagicMock() + mock_response.status_code = status + if status != 200: + mock_response.text = "Mock error" + else: + mock_response.json.return_value = { + "run": { + "info": { + "run_id": str(uuid4()), + "experiment_id": str(uuid4()), + "run_name": str(uuid4()) + } + } + } + return mock_response + + def _get_mock_end_response(self, status=200): + """Get the mock end run response.""" + mock_response = MagicMock() + mock_response.status_code = status + mock_response.text = 'Everything good' if status == 200 else 'Everything bad' + return mock_response + @pytest.mark.parametrize( "status,should_raise", [("KILLED", False), ("WRONG_STATUS", True), ("FINISHED", False), ("FAILED", False)] ) - def test_end_raises(self, token_mock, setup_data, status, should_raise, caplog): + def test_end_raises(self, token_mock, status, should_raise, caplog): """Test that end run raises exception if incorrect status is set.""" mock_session = MagicMock() - mock_response = MagicMock() - mock_response.status_code = 200 - mock_response.json.return_value = { - "run": { - "info": { - "run_id": str(uuid4()), - "experiment_id": str(uuid4()), - "run_name": str(uuid4()) - } - } - } - mock_session.request.return_value = mock_response + mock_session.request.return_value = self._get_mock_create_resonse() with patch("promptflow.evals.evaluate._eval_run.requests.Session", return_value=mock_session): - run = EvalRun( + with EvalRun( run_name=None, tracking_uri="www.microsoft.com", subscription_id="mock", group_name="mock", workspace_name="mock", ml_client=MagicMock(), - ) - run.start_run() - if should_raise: - with pytest.raises(ValueError) as cm: + ) as run: + if should_raise: + with pytest.raises(ValueError) as cm: + run.end_run(status) + assert status in cm.value.args[0] + else: run.end_run(status) - assert status in cm.value.args[0] - else: - run.end_run(status) - assert len(caplog.records) == 0 + assert len(caplog.records) == 0 - def test_run_logs_if_terminated(self, token_mock, setup_data, caplog): + def test_run_logs_if_terminated(self, token_mock, caplog): """Test that run warn user if we are trying to terminate it twice.""" mock_session = MagicMock() - mock_response = MagicMock() - mock_response.status_code = 200 - mock_response.json.return_value = { - "run": { - "info": { - "run_id": str(uuid4()), - "experiment_id": str(uuid4()), - "run_name": str(uuid4()) - } - } - } - mock_session.request.return_value = mock_response + mock_session.request.return_value = self._get_mock_create_resonse() with patch("promptflow.evals.evaluate._eval_run.requests.Session", return_value=mock_session): logger = logging.getLogger(EvalRun.__module__) # All loggers, having promptflow. prefix will have "promptflow" logger @@ -101,43 +96,30 @@ def test_run_logs_if_terminated(self, token_mock, setup_data, caplog): assert len(caplog.records) == 1 assert "Unable to stop run because it was already terminated." in caplog.records[0].message - def test_end_logs_if_fails(self, token_mock, setup_data, caplog): + def test_end_logs_if_fails(self, token_mock, caplog): """Test that if the terminal status setting was failed, it is logged.""" mock_session = MagicMock() - mock_response_start = MagicMock() - mock_response_start.status_code = 200 - mock_response_start.json.return_value = { - "run": { - "info": { - "run_id": str(uuid4()), - "experiment_id": str(uuid4()), - "run_name": str(uuid4()) - } - } - } - mock_response_end = MagicMock() - mock_response_end.status_code = 500 - mock_session.request.side_effect = [mock_response_start, mock_response_end] + mock_session.request.side_effect = [self._get_mock_create_resonse(), + self._get_mock_end_response(500)] with patch("promptflow.evals.evaluate._eval_run.requests.Session", return_value=mock_session): logger = logging.getLogger(EvalRun.__module__) # All loggers, having promptflow. prefix will have "promptflow" logger # as a parent. This logger does not propagate the logs and cannot be # captured by caplog. Here we will skip this logger to capture logs. logger.parent = logging.root - run = EvalRun( + with EvalRun( run_name=None, tracking_uri="www.microsoft.com", subscription_id="mock", group_name="mock", workspace_name="mock", ml_client=MagicMock(), - ) - run.start_run() - run.end_run("FINISHED") + ): + pass assert len(caplog.records) == 1 assert "Unable to terminate the run." in caplog.records[0].message - def test_start_run_fails(self, token_mock, setup_data, caplog): + def test_start_run_fails(self, token_mock, caplog): """Test that there are log messges if run was not started.""" mock_session = MagicMock() mock_response_start = MagicMock() @@ -180,116 +162,44 @@ def test_start_run_fails(self, token_mock, setup_data, caplog): assert "Unable to stop run because the run failed to start." in caplog.records[0].message caplog.clear() - @pytest.mark.parametrize("destroy_run,runs_are_the_same", [(False, True), (True, False)]) - @patch("promptflow.evals.evaluate._eval_run.requests.Session") - def test_singleton(self, mock_session_cls, token_mock, setup_data, destroy_run, runs_are_the_same): - """Test that the EvalRun is actually a singleton.""" - mock_response = MagicMock() - mock_response.status_code = 200 - mock_response.json.side_effect = [ - { - "run": { - "info": { - "run_id": str(uuid4()), - "experiment_id": str(uuid4()), - "run_name": str(uuid4()) - } - } - }, - { - "run": { - "info": { - "run_id": str(uuid4()), - "experiment_id": str(uuid4()), - "run_name": str(uuid4()) - } - } - }, - ] - mock_session = MagicMock() - mock_session.request.return_value = mock_response - mock_session_cls.return_value = mock_session - run = EvalRun( - run_name="run", - tracking_uri="www.microsoft.com", - subscription_id="mock", - group_name="mock", - workspace_name="mock", - ml_client=MagicMock(), - ) - id1 = id(run) - run.start_run() - if destroy_run: - run.end_run("FINISHED") - id2 = id( - EvalRun( - run_name="run", - tracking_uri="www.microsoft.com", - subscription_id="mock", - group_name="mock", - workspace_name="mock", - ml_client=MagicMock(), - ) - ) - assert (id1 == id2) == runs_are_the_same - @patch("promptflow.evals.evaluate._eval_run.requests.Session") - def test_run_name(self, mock_session_cls, token_mock, setup_data): + def test_run_name(self, mock_session_cls, token_mock): """Test that the run name is the same as ID if name is not given.""" - mock_response = MagicMock() - mock_response.status_code = 200 - mock_response.json.return_value = { - "run": { - "info": { - "run_id": str(uuid4()), - "experiment_id": str(uuid4()), - "run_name": str(uuid4()) - } - } - } mock_session = MagicMock() + mock_response = self._get_mock_create_resonse() mock_session.request.return_value = mock_response mock_session_cls.return_value = mock_session - run = EvalRun( + with EvalRun( run_name=None, tracking_uri="www.microsoft.com", subscription_id="mock", group_name="mock", workspace_name="mock", ml_client=MagicMock(), - ) - run.start_run() + ) as run: + pass assert run.info.run_id == mock_response.json.return_value['run']['info']['run_id'] assert run.info.experiment_id == mock_response.json.return_value[ 'run']['info']['experiment_id'] assert run.info.run_name == mock_response.json.return_value['run']['info']["run_name"] @patch("promptflow.evals.evaluate._eval_run.requests.Session") - def test_run_with_name(self, mock_session_cls, token_mock, setup_data): + def test_run_with_name(self, mock_session_cls, token_mock): """Test that the run name is not the same as id if it is given.""" - mock_response = MagicMock() - mock_response.status_code = 200 - mock_response.json.return_value = { - "run": { - "info": { - "run_id": str(uuid4()), - "experiment_id": str(uuid4()), - "run_name": 'test' - } - } - } + mock_response = self._get_mock_create_resonse() + mock_response.json.return_value['run']['info']['run_name'] = 'test' mock_session = MagicMock() mock_session.request.return_value = mock_response mock_session_cls.return_value = mock_session - run = EvalRun( + with EvalRun( run_name="test", tracking_uri="www.microsoft.com", subscription_id="mock", group_name="mock", workspace_name="mock", ml_client=MagicMock(), - ) - run.start_run() + ) as run: + pass assert run.info.run_id == mock_response.json.return_value['run']['info']['run_id'] assert run.info.experiment_id == mock_response.json.return_value[ 'run']['info']['experiment_id'] @@ -297,23 +207,13 @@ def test_run_with_name(self, mock_session_cls, token_mock, setup_data): assert run.info.run_name != run.info.run_id @patch("promptflow.evals.evaluate._eval_run.requests.Session") - def test_get_urls(self, mock_session_cls, token_mock, setup_data): + def test_get_urls(self, mock_session_cls, token_mock): """Test getting url-s from eval run.""" - mock_response = MagicMock() - mock_response.status_code = 200 - mock_response.json.return_value = { - "run": { - "info": { - "run_id": str(uuid4()), - "experiment_id": str(uuid4()), - "run_name": str(uuid4()) - } - } - } + mock_response = self._get_mock_create_resonse() mock_session = MagicMock() mock_session.request.return_value = mock_response mock_session_cls.return_value = mock_session - run = EvalRun( + with EvalRun( run_name="test", tracking_uri=( "https://region.api.azureml.ms/mlflow/v2.0/subscriptions" @@ -325,8 +225,8 @@ def test_get_urls(self, mock_session_cls, token_mock, setup_data): group_name="mock-rg-region", workspace_name="mock-ws-region", ml_client=MagicMock(), - ) - run.start_run() + ) as run: + pass assert run.get_run_history_uri() == ( "https://region.api.azureml.ms/history/v1.0/subscriptions" "/000000-0000-0000-0000-0000000/resourceGroups/mock-rg-region" @@ -356,30 +256,27 @@ def test_get_urls(self, mock_session_cls, token_mock, setup_data): ('log_metric', 'save metrics') ] ) - def test_log_artifacts_logs_error(self, token_mock, setup_data, tmp_path, caplog, log_function, expected_str): + def test_log_artifacts_logs_error(self, token_mock, tmp_path, caplog, log_function, expected_str): """Test that the error is logged.""" mock_session = MagicMock() - mock_create_response = MagicMock() - mock_create_response.status_code = 200 - mock_create_response.json.return_value = { - "run": { - "info": { - "run_id": str(uuid4()), - "experiment_id": str(uuid4()), - "run_name": str(uuid4()) - } - } - } mock_response = MagicMock() mock_response.status_code = 404 mock_response.text = "Mock not found error." - if log_function == "log_artifact": with open(os.path.join(tmp_path, "test.json"), "w") as fp: json.dump({"f1": 0.5}, fp) - mock_session.request.side_effect = [mock_create_response, mock_response] + mock_session.request.side_effect = [ + self._get_mock_create_resonse(), + mock_response, + self._get_mock_end_response() + ] with patch("promptflow.evals.evaluate._eval_run.requests.Session", return_value=mock_session): - run = EvalRun( + logger = logging.getLogger(EvalRun.__module__) + # All loggers, having promptflow. prefix will have "promptflow" logger + # as a parent. This logger does not propagate the logs and cannot be + # captured by caplog. Here we will skip this logger to capture logs. + logger.parent = logging.root + with EvalRun( run_name="test", tracking_uri=( "https://region.api.azureml.ms/mlflow/v2.0/subscriptions" @@ -391,23 +288,16 @@ def test_log_artifacts_logs_error(self, token_mock, setup_data, tmp_path, caplog group_name="mock-rg-region", workspace_name="mock-ws-region", ml_client=MagicMock(), - ) - run.start_run() - - logger = logging.getLogger(EvalRun.__module__) - # All loggers, having promptflow. prefix will have "promptflow" logger - # as a parent. This logger does not propagate the logs and cannot be - # captured by caplog. Here we will skip this logger to capture logs. - logger.parent = logging.root - fn = getattr(run, log_function) - if log_function == 'log_artifact': - with open(os.path.join(tmp_path, EvalRun.EVALUATION_ARTIFACT), 'w') as fp: - fp.write('42') - kwargs = {'artifact_folder': tmp_path} - else: - kwargs = {'key': 'f1', 'value': 0.5} - with patch('promptflow.evals.evaluate._eval_run.BlobServiceClient', return_value=MagicMock()): - fn(**kwargs) + ) as run: + fn = getattr(run, log_function) + if log_function == 'log_artifact': + with open(os.path.join(tmp_path, EvalRun.EVALUATION_ARTIFACT), 'w') as fp: + fp.write('42') + kwargs = {'artifact_folder': tmp_path} + else: + kwargs = {'key': 'f1', 'value': 0.5} + with patch('promptflow.evals.evaluate._eval_run.BlobServiceClient', return_value=MagicMock()): + fn(**kwargs) assert len(caplog.records) == 1 assert mock_response.text in caplog.records[0].message assert "404" in caplog.records[0].message @@ -421,30 +311,19 @@ def test_log_artifacts_logs_error(self, token_mock, setup_data, tmp_path, caplog ] ) def test_wrong_artifact_path( - self, - token_mock, - tmp_path, - caplog, - dir_exists, - dir_empty, - expected_error, - setup_data): + self, + token_mock, + tmp_path, + caplog, + dir_exists, + dir_empty, + expected_error, + ): """Test that if artifact path is empty, or dies not exist we are logging the error.""" - mock_response = MagicMock() - mock_response.status_code = 200 - mock_response.json.return_value = { - "run": { - "info": { - "run_id": str(uuid4()), - "experiment_id": str(uuid4()), - "run_name": str(uuid4()) - } - } - } mock_session = MagicMock() - mock_session.request.return_value = mock_response + mock_session.request.return_value = self._get_mock_create_resonse() with patch("promptflow.evals.evaluate._eval_run.requests.Session", return_value=mock_session): - run = EvalRun( + with EvalRun( run_name="test", tracking_uri=( "https://region.api.azureml.ms/mlflow/v2.0/subscriptions" @@ -456,22 +335,21 @@ def test_wrong_artifact_path( group_name="mock-rg-region", workspace_name="mock-ws-region", ml_client=MagicMock(), - ) - run.start_run() - logger = logging.getLogger(EvalRun.__module__) - # All loggers, having promptflow. prefix will have "promptflow" logger - # as a parent. This logger does not propagate the logs and cannot be - # captured by caplog. Here we will skip this logger to capture logs. - logger.parent = logging.root - artifact_folder = tmp_path if dir_exists else "wrong_path_567" - if not dir_empty: - with open(os.path.join(tmp_path, "test.txt"), 'w') as fp: - fp.write("42") - run.log_artifact(artifact_folder) + ) as run: + logger = logging.getLogger(EvalRun.__module__) + # All loggers, having promptflow. prefix will have "promptflow" logger + # as a parent. This logger does not propagate the logs and cannot be + # captured by caplog. Here we will skip this logger to capture logs. + logger.parent = logging.root + artifact_folder = tmp_path if dir_exists else "wrong_path_567" + if not dir_empty: + with open(os.path.join(tmp_path, "test.txt"), 'w') as fp: + fp.write("42") + run.log_artifact(artifact_folder) assert len(caplog.records) == 1 assert expected_error in caplog.records[0].message - def test_log_metrics_and_instance_results_logs_error(self, token_mock, caplog, setup_data): + def test_log_metrics_and_instance_results_logs_error(self, token_mock, caplog): """Test that we are logging the error when there is no trace destination.""" logger = logging.getLogger(ev_utils.__name__) # All loggers, having promptflow. prefix will have "promptflow" logger @@ -488,29 +366,28 @@ def test_log_metrics_and_instance_results_logs_error(self, token_mock, caplog, s assert len(caplog.records) == 1 assert "Unable to log traces as trace destination was not defined." in caplog.records[0].message - def test_run_broken_if_no_tracking_uri(self, token_mock, setup_data, caplog): + def test_run_broken_if_no_tracking_uri(self, token_mock, caplog): """Test that if no tracking URI is provirded, the run is being marked as broken.""" logger = logging.getLogger(ev_utils.__name__) # All loggers, having promptflow. prefix will have "promptflow" logger # as a parent. This logger does not propagate the logs and cannot be # captured by caplog. Here we will skip this logger to capture logs. logger.parent = logging.root - run = EvalRun( + with EvalRun( run_name=None, tracking_uri=None, subscription_id='mock', group_name='mock', workspace_name='mock', ml_client=MagicMock() - ) - run.start_run() - assert len(caplog.records) == 1 - assert "The results will be saved locally, but will not be logged to Azure." in caplog.records[0].message - with patch('promptflow.evals.evaluate._eval_run.EvalRun.request_with_retry') as mock_request: - run.log_artifact('mock_dir') - run.log_metric('foo', 42) - run.write_properties_to_run_history({'foo': 'bar'}) - mock_request.assert_not_called() + ) as run: + assert len(caplog.records) == 1 + assert "The results will be saved locally, but will not be logged to Azure." in caplog.records[0].message + with patch('promptflow.evals.evaluate._eval_run.EvalRun.request_with_retry') as mock_request: + run.log_artifact('mock_dir') + run.log_metric('foo', 42) + run.write_properties_to_run_history({'foo': 'bar'}) + mock_request.assert_not_called() @pytest.mark.parametrize('status_code,pf_run', [ (401, False), @@ -518,26 +395,15 @@ def test_run_broken_if_no_tracking_uri(self, token_mock, setup_data, caplog): (401, True), (200, True), ]) - def test_lifecycle(self, token_mock, status_code, pf_run, setup_data): + def test_lifecycle(self, token_mock, status_code, pf_run): """Test the run statuses throughout its life cycle.""" pf_run_mock = None if pf_run: pf_run_mock = MagicMock() pf_run_mock.name = 'mock_pf_run' pf_run_mock._experiment_name = 'mock_pf_experiment' - mock_response = MagicMock() - mock_response.status_code = status_code - mock_response.json.return_value = { - "run": { - "info": { - "run_id": str(uuid4()), - "experiment_id": str(uuid4()), - "run_name": str(uuid4()) - } - } - } mock_session = MagicMock() - mock_session.request.return_value = mock_response + mock_session.request.return_value = self._get_mock_create_resonse(status_code) with patch("promptflow.evals.evaluate._eval_run.requests.Session", return_value=mock_session): run = EvalRun( run_name="test", @@ -565,7 +431,7 @@ def test_lifecycle(self, token_mock, status_code, pf_run, setup_data): else: assert run.status == RunStatus.BROKEN, f'Get {run.status}, expected {RunStatus.BROKEN}' - def test_local_lifecycle(self, token_mock, setup_data): + def test_local_lifecycle(self, token_mock): """Test that the local run have correct statuses.""" run = EvalRun( run_name=None, @@ -582,27 +448,19 @@ def test_local_lifecycle(self, token_mock, setup_data): assert run.status == RunStatus.BROKEN, f'Get {run.status}, expected {RunStatus.BROKEN}' @pytest.mark.parametrize('status_code', [200, 401]) - def test_write_properties(self, token_mock, setup_data, caplog, status_code): + def test_write_properties(self, token_mock, caplog, status_code): """Test writing properties to the evaluate run.""" - mock_start = MagicMock() - mock_start.status_code = 200 - mock_start.text = 'Mock error' - mock_start.json.return_value = { - "run": { - "info": { - "run_id": str(uuid4()), - "experiment_id": str(uuid4()), - "run_name": str(uuid4()) - } - } - } mock_write = MagicMock() mock_write.status_code = status_code mock_write.text = 'Mock error' mock_session = MagicMock() - mock_session.request.side_effect = [mock_start, mock_write] + mock_session.request.side_effect = [ + self._get_mock_create_resonse(), + mock_write, + self._get_mock_end_response() + ] with patch("promptflow.evals.evaluate._eval_run.requests.Session", return_value=mock_session): - run = EvalRun( + with EvalRun( run_name="test", tracking_uri=( "https://region.api.azureml.ms/mlflow/v2.0/subscriptions" @@ -614,9 +472,8 @@ def test_write_properties(self, token_mock, setup_data, caplog, status_code): group_name="mock-rg-region", workspace_name="mock-ws-region", ml_client=MagicMock(), - ) - run.start_run() - run.write_properties_to_run_history({'foo': 'bar'}) + ) as run: + run.write_properties_to_run_history({'foo': 'bar'}) if status_code != 200: assert len(caplog.records) == 1 assert 'Fail writing properties' in caplog.records[0].message @@ -624,25 +481,26 @@ def test_write_properties(self, token_mock, setup_data, caplog, status_code): else: assert len(caplog.records) == 0 - def test_write_properties_to_run_history_logs_error(self, token_mock, caplog, setup_data): + def test_write_properties_to_run_history_logs_error(self, token_mock, caplog): """Test that we are logging the error when there is no trace destination.""" logger = logging.getLogger(EvalRun.__module__) # All loggers, having promptflow. prefix will have "promptflow" logger # as a parent. This logger does not propagate the logs and cannot be # captured by caplog. Here we will skip this logger to capture logs. logger.parent = logging.root - run = EvalRun( + with EvalRun( run_name=None, tracking_uri=None, subscription_id='mock', group_name='mock', workspace_name='mock', ml_client=MagicMock() - ) - run.start_run() - run.write_properties_to_run_history({'foo': 'bar'}) - assert len(caplog.records) == 2 + ) as run: + run.write_properties_to_run_history({'foo': 'bar'}) + assert len(caplog.records) == 3 + assert "tracking_uri was not provided," in caplog.records[0].message assert "Unable to write properties because the run failed to start." in caplog.records[1].message + assert "Unable to stop run because the run failed to start." in caplog.records[2].message @pytest.mark.parametrize( 'function_literal,args', @@ -652,7 +510,7 @@ def test_write_properties_to_run_history_logs_error(self, token_mock, caplog, se ('log_artifact', ('mock_folder',)) ] ) - def test_raises_if_not_started(self, token_mock, setup_data, function_literal, args): + def test_raises_if_not_started(self, token_mock, function_literal, args): """Test that all public functions are raising exception if run is not started.""" run = EvalRun( run_name=None, From ece405418b0e58b4b2b959b98f828b8b1723c1cf Mon Sep 17 00:00:00 2001 From: nick863 <30440255+nick863@users.noreply.github.com> Date: Wed, 10 Jul 2024 15:14:53 -0700 Subject: [PATCH 3/6] Fix --- .../promptflow/evals/evaluate/_eval_run.py | 74 ++++++++----------- .../tests/evals/unittests/test_eval_run.py | 24 +++--- 2 files changed, 44 insertions(+), 54 deletions(-) diff --git a/src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py b/src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py index 85326f0ce28..6be29127896 100644 --- a/src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py +++ b/src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py @@ -137,7 +137,7 @@ def _get_scope(self) -> str: self._workspace_name, ) - def start_run(self) -> None: + def _start_run(self) -> None: """ Start the run, or, if it is not applicable (for example, if tracking is not enabled), mark it as started. """ @@ -159,45 +159,35 @@ def start_run(self) -> None: self._promptflow_run.name ) else: - self._status = self._start_run(self._run_name) - - def _start_run(self, run_name: Optional[str]) -> 'RunStatus': - """ - Make a request to start the mlflow run. If the run will not start, it will be - - marked as broken and the logging will be switched off. - :param run_name: The display name for the run. - :type run_name: Optional[str] - :returns: True if the run has started and False otherwise. - """ - url = f"https://{self._url_base}/mlflow/v2.0" f"{self._get_scope()}/api/2.0/mlflow/runs/create" - body = { - "experiment_id": "0", - "user_id": "promptflow-evals", - "start_time": int(time.time() * 1000), - "tags": [{"key": "mlflow.user", "value": "promptflow-evals"}], - } - if run_name: - body["run_name"] = run_name - response = self.request_with_retry( - url=url, - method='POST', - json_dict=body - ) - if response.status_code != 200: - self.info = RunInfo.generate(run_name) - LOGGER.warning(f"The run failed to start: {response.status_code}: {response.text}." - "The results will be saved locally, but will not be logged to Azure.") - return RunStatus.BROKEN - parsed_response = response.json() - self.info = RunInfo( - run_id=parsed_response['run']['info']['run_id'], - experiment_id=parsed_response['run']['info']['experiment_id'], - run_name=parsed_response['run']['info']['run_name'] - ) - return RunStatus.STARTED - - def end_run(self, reason: str) -> None: + url = f"https://{self._url_base}/mlflow/v2.0" f"{self._get_scope()}/api/2.0/mlflow/runs/create" + body = { + "experiment_id": "0", + "user_id": "promptflow-evals", + "start_time": int(time.time() * 1000), + "tags": [{"key": "mlflow.user", "value": "promptflow-evals"}], + } + if self._run_name: + body["run_name"] = self._run_name + response = self.request_with_retry( + url=url, + method='POST', + json_dict=body + ) + if response.status_code != 200: + self.info = RunInfo.generate(self._run_name) + LOGGER.warning(f"The run failed to start: {response.status_code}: {response.text}." + "The results will be saved locally, but will not be logged to Azure.") + self._status = RunStatus.BROKEN + else: + parsed_response = response.json() + self.info = RunInfo( + run_id=parsed_response['run']['info']['run_id'], + experiment_id=parsed_response['run']['info']['experiment_id'], + run_name=parsed_response['run']['info']['run_name'] + ) + self._status = RunStatus.STARTED + + def _end_run(self, reason: str) -> None: """ Tetminate the run. @@ -234,12 +224,12 @@ def end_run(self, reason: str) -> None: def __enter__(self): """The Context Manager enter call.""" - self.start_run() + self._start_run() return self def __exit__(self, exc_type, exc_value, exc_tb): """The context manager exit call.""" - self.end_run("FINISHED") + self._end_run("FINISHED") def get_run_history_uri(self) -> str: """ diff --git a/src/promptflow-evals/tests/evals/unittests/test_eval_run.py b/src/promptflow-evals/tests/evals/unittests/test_eval_run.py index 9543a85f21e..af479b839b8 100644 --- a/src/promptflow-evals/tests/evals/unittests/test_eval_run.py +++ b/src/promptflow-evals/tests/evals/unittests/test_eval_run.py @@ -66,10 +66,10 @@ def test_end_raises(self, token_mock, status, should_raise, caplog): ) as run: if should_raise: with pytest.raises(ValueError) as cm: - run.end_run(status) + run._end_run(status) assert status in cm.value.args[0] else: - run.end_run(status) + run._end_run(status) assert len(caplog.records) == 0 def test_run_logs_if_terminated(self, token_mock, caplog): @@ -90,9 +90,9 @@ def test_run_logs_if_terminated(self, token_mock, caplog): workspace_name="mock", ml_client=MagicMock(), ) - run.start_run() - run.end_run("KILLED") - run.end_run("KILLED") + run._start_run() + run._end_run("KILLED") + run._end_run("KILLED") assert len(caplog.records) == 1 assert "Unable to stop run because it was already terminated." in caplog.records[0].message @@ -140,7 +140,7 @@ def test_start_run_fails(self, token_mock, caplog): workspace_name="mock", ml_client=MagicMock(), ) - run.start_run() + run._start_run() assert len(caplog.records) == 1 assert "500" in caplog.records[0].message assert mock_response_start.text in caplog.records[0].message @@ -157,7 +157,7 @@ def test_start_run_fails(self, token_mock, caplog): assert "Unable to log metric because the run failed to start." in caplog.records[0].message caplog.clear() # End run - run.end_run("FINISHED") + run._end_run("FINISHED") assert len(caplog.records) == 1 assert "Unable to stop run because the run failed to start." in caplog.records[0].message caplog.clear() @@ -420,12 +420,12 @@ def test_lifecycle(self, token_mock, status_code, pf_run): promptflow_run=pf_run_mock ) assert run.status == RunStatus.NOT_STARTED, f'Get {run.status}, expected {RunStatus.NOT_STARTED}' - run.start_run() + run._start_run() if status_code == 200 or pf_run: assert run.status == RunStatus.STARTED, f'Get {run.status}, expected {RunStatus.STARTED}' else: assert run.status == RunStatus.BROKEN, f'Get {run.status}, expected {RunStatus.BROKEN}' - run.end_run("FINISHED") + run._end_run("FINISHED") if status_code == 200 or pf_run: assert run.status == RunStatus.TERMINATED, f'Get {run.status}, expected {RunStatus.TERMINATED}' else: @@ -442,9 +442,9 @@ def test_local_lifecycle(self, token_mock): ml_client=MagicMock() ) assert run.status == RunStatus.NOT_STARTED, f'Get {run.status}, expected {RunStatus.NOT_STARTED}' - run.start_run() + run._start_run() assert run.status == RunStatus.BROKEN, f'Get {run.status}, expected {RunStatus.BROKEN}' - run.end_run("FINISHED") + run._end_run("FINISHED") assert run.status == RunStatus.BROKEN, f'Get {run.status}, expected {RunStatus.BROKEN}' @pytest.mark.parametrize('status_code', [200, 401]) @@ -510,7 +510,7 @@ def test_write_properties_to_run_history_logs_error(self, token_mock, caplog): ('log_artifact', ('mock_folder',)) ] ) - def test_raises_if_not_started(self, token_mock, function_literal, args): + def test_raises_if_not_started(self, token_mock, caplog, function_literal, args): """Test that all public functions are raising exception if run is not started.""" run = EvalRun( run_name=None, From ad4c34e57ab8ace43e5cb79c412edf8bd2fc073d Mon Sep 17 00:00:00 2001 From: nick863 <30440255+nick863@users.noreply.github.com> Date: Thu, 11 Jul 2024 11:45:31 -0700 Subject: [PATCH 4/6] Fix --- .../promptflow/evals/evaluate/_eval_run.py | 36 +++---- .../tests/evals/unittests/test_eval_run.py | 102 ++++++------------ 2 files changed, 52 insertions(+), 86 deletions(-) diff --git a/src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py b/src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py index 56a268dafc4..5dd72594133 100644 --- a/src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py +++ b/src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py @@ -196,7 +196,8 @@ def _end_run(self, reason: str) -> None: :type reason: str :raises: ValueError if the run is not in ("FINISHED", "FAILED", "KILLED") """ - self._raise_not_started_nay_be() + if not self._check_state_and_log('stop run'): + return if self._is_promptflow_run: # This run is already finished, we just add artifacts/metrics to it. self._status = RunStatus.TERMINATED @@ -208,9 +209,6 @@ def _end_run(self, reason: str) -> None: if self._status == RunStatus.TERMINATED: LOGGER.warning("Unable to stop run because it was already terminated.") return - if self._status == RunStatus.BROKEN: - LOGGER.warning("Unable to stop run because the run failed to start.") - return url = f"https://{self._url_base}/mlflow/v2.0" f"{self._get_scope()}/api/2.0/mlflow/runs/update" body = { "run_uuid": self.info.run_id, @@ -311,16 +309,24 @@ def _log_warning(self, failed_op: str, response: requests.Response) -> None: f"{response.text=}." ) - def _raise_not_started_nay_be(self) -> None: + def _check_state_and_log(self, action : str) -> bool: """ - Raise value error if the run was not started. + Check that the run is in the correct state and log worning if it is not. - :raises: ValueError + :param action: Action, whcih caused this check. For example if it is "log artifact", + the log message will start "Unable to log artifact." + :type action: str + :return: boolean saying if run is in the correct state. """ if self._status == RunStatus.NOT_STARTED: - raise ValueError( - "The run did not started. " + LOGGER.warning( + f"Unable to {action}. The run did not started. " "Please start the run by calling start_run method.") + return False + if self._status == RunStatus.BROKEN: + LOGGER.warning(f"Unable to {action} because the run failed to start.") + return False + return True def log_artifact(self, artifact_folder: str, artifact_name: str = EVALUATION_ARTIFACT) -> None: """ @@ -332,9 +338,7 @@ def log_artifact(self, artifact_folder: str, artifact_name: str = EVALUATION_ART :param artifact_folder: The folder with artifacts to be uploaded. :type artifact_folder: str """ - self._raise_not_started_nay_be() - if self._status == RunStatus.BROKEN: - LOGGER.warning("Unable to log artifact because the run failed to start.") + if not self._check_state_and_log('log artifact'): return # Check if artifact dirrectory is empty or does not exist. if not os.path.isdir(artifact_folder): @@ -421,9 +425,7 @@ def log_metric(self, key: str, value: float) -> None: :param value: The valure to be logged. :type value: float """ - self._raise_not_started_nay_be() - if self._status == RunStatus.BROKEN: - LOGGER.warning("Unable to log metric because the run failed to start.") + if not self._check_state_and_log('log metric'): return body = { "run_uuid": self.info.run_id, @@ -448,9 +450,7 @@ def write_properties_to_run_history(self, properties: Dict[str, Any]) -> None: :param properties: The properties to be written to run history. :type properties: dict """ - self._raise_not_started_nay_be() - if self._status == RunStatus.BROKEN: - LOGGER.warning("Unable to write properties because the run failed to start.") + if not self._check_state_and_log('write properties'): return # update host to run history and request PATCH API response = self.request_with_retry( diff --git a/src/promptflow-evals/tests/evals/unittests/test_eval_run.py b/src/promptflow-evals/tests/evals/unittests/test_eval_run.py index af479b839b8..388a2bfdca2 100644 --- a/src/promptflow-evals/tests/evals/unittests/test_eval_run.py +++ b/src/promptflow-evals/tests/evals/unittests/test_eval_run.py @@ -23,6 +23,19 @@ def generate_mock_token(): class TestEvalRun: """Unit tests for the eval-run object.""" + _MOCK_CREDS = dict( + tracking_uri=( + "https://region.api.azureml.ms/mlflow/v2.0/subscriptions" + "/000000-0000-0000-0000-0000000/resourceGroups/mock-rg-region" + "/providers/Microsoft.MachineLearningServices" + "/workspaces/mock-ws-region" + ), + subscription_id="000000-0000-0000-0000-0000000", + group_name="mock-rg-region", + workspace_name="mock-ws-region", + ml_client=MagicMock(), + ) + def _get_mock_create_resonse(self, status=200): """Return the mock create request""" mock_response = MagicMock() @@ -58,11 +71,7 @@ def test_end_raises(self, token_mock, status, should_raise, caplog): with patch("promptflow.evals.evaluate._eval_run.requests.Session", return_value=mock_session): with EvalRun( run_name=None, - tracking_uri="www.microsoft.com", - subscription_id="mock", - group_name="mock", - workspace_name="mock", - ml_client=MagicMock(), + **TestEvalRun._MOCK_CREDS ) as run: if should_raise: with pytest.raises(ValueError) as cm: @@ -215,16 +224,7 @@ def test_get_urls(self, mock_session_cls, token_mock): mock_session_cls.return_value = mock_session with EvalRun( run_name="test", - tracking_uri=( - "https://region.api.azureml.ms/mlflow/v2.0/subscriptions" - "/000000-0000-0000-0000-0000000/resourceGroups/mock-rg-region" - "/providers/Microsoft.MachineLearningServices" - "/workspaces/mock-ws-region" - ), - subscription_id="000000-0000-0000-0000-0000000", - group_name="mock-rg-region", - workspace_name="mock-ws-region", - ml_client=MagicMock(), + **TestEvalRun._MOCK_CREDS ) as run: pass assert run.get_run_history_uri() == ( @@ -278,16 +278,7 @@ def test_log_artifacts_logs_error(self, token_mock, tmp_path, caplog, log_functi logger.parent = logging.root with EvalRun( run_name="test", - tracking_uri=( - "https://region.api.azureml.ms/mlflow/v2.0/subscriptions" - "/000000-0000-0000-0000-0000000/resourceGroups/mock-rg-region" - "/providers/Microsoft.MachineLearningServices" - "/workspaces/mock-ws-region" - ), - subscription_id="000000-0000-0000-0000-0000000", - group_name="mock-rg-region", - workspace_name="mock-ws-region", - ml_client=MagicMock(), + **TestEvalRun._MOCK_CREDS ) as run: fn = getattr(run, log_function) if log_function == 'log_artifact': @@ -325,16 +316,7 @@ def test_wrong_artifact_path( with patch("promptflow.evals.evaluate._eval_run.requests.Session", return_value=mock_session): with EvalRun( run_name="test", - tracking_uri=( - "https://region.api.azureml.ms/mlflow/v2.0/subscriptions" - "/000000-0000-0000-0000-0000000/resourceGroups/mock-rg-region" - "/providers/Microsoft.MachineLearningServices" - "/workspaces/mock-ws-region" - ), - subscription_id="000000-0000-0000-0000-0000000", - group_name="mock-rg-region", - workspace_name="mock-ws-region", - ml_client=MagicMock(), + **TestEvalRun._MOCK_CREDS ) as run: logger = logging.getLogger(EvalRun.__module__) # All loggers, having promptflow. prefix will have "promptflow" logger @@ -407,16 +389,7 @@ def test_lifecycle(self, token_mock, status_code, pf_run): with patch("promptflow.evals.evaluate._eval_run.requests.Session", return_value=mock_session): run = EvalRun( run_name="test", - tracking_uri=( - "https://region.api.azureml.ms/mlflow/v2.0/subscriptions" - "/000000-0000-0000-0000-0000000/resourceGroups/mock-rg-region" - "/providers/Microsoft.MachineLearningServices" - "/workspaces/mock-ws-region" - ), - subscription_id="000000-0000-0000-0000-0000000", - group_name="mock-rg-region", - workspace_name="mock-ws-region", - ml_client=MagicMock(), + **TestEvalRun._MOCK_CREDS, promptflow_run=pf_run_mock ) assert run.status == RunStatus.NOT_STARTED, f'Get {run.status}, expected {RunStatus.NOT_STARTED}' @@ -462,16 +435,7 @@ def test_write_properties(self, token_mock, caplog, status_code): with patch("promptflow.evals.evaluate._eval_run.requests.Session", return_value=mock_session): with EvalRun( run_name="test", - tracking_uri=( - "https://region.api.azureml.ms/mlflow/v2.0/subscriptions" - "/000000-0000-0000-0000-0000000/resourceGroups/mock-rg-region" - "/providers/Microsoft.MachineLearningServices" - "/workspaces/mock-ws-region" - ), - subscription_id="000000-0000-0000-0000-0000000", - group_name="mock-rg-region", - workspace_name="mock-ws-region", - ml_client=MagicMock(), + **TestEvalRun._MOCK_CREDS ) as run: run.write_properties_to_run_history({'foo': 'bar'}) if status_code != 200: @@ -503,23 +467,25 @@ def test_write_properties_to_run_history_logs_error(self, token_mock, caplog): assert "Unable to stop run because the run failed to start." in caplog.records[2].message @pytest.mark.parametrize( - 'function_literal,args', + 'function_literal,args,expected_action', [ - ('write_properties_to_run_history', ({'foo': 'bar'})), - ('log_metric', ('foo', 42)), - ('log_artifact', ('mock_folder',)) + ('write_properties_to_run_history', ({'foo': 'bar'}), 'write properties'), + ('log_metric', ('foo', 42), 'log metric'), + ('log_artifact', ('mock_folder',), 'log artifact') ] ) - def test_raises_if_not_started(self, token_mock, caplog, function_literal, args): + def test_logs_if_not_started(self, token_mock, caplog, function_literal, args, expected_action): """Test that all public functions are raising exception if run is not started.""" + logger = logging.getLogger(ev_utils.__name__) + # All loggers, having promptflow. prefix will have "promptflow" logger + # as a parent. This logger does not propagate the logs and cannot be + # captured by caplog. Here we will skip this logger to capture logs. + logger.parent = logging.root run = EvalRun( run_name=None, - tracking_uri=None, - subscription_id='mock', - group_name='mock', - workspace_name='mock', - ml_client=MagicMock() + **TestEvalRun._MOCK_CREDS ) - with pytest.raises(ValueError) as cm: - getattr(run, function_literal)(*args) - assert "The run did not started." in cm.value.args[0] + getattr(run, function_literal)(*args) + assert len(caplog.records) == 1 + assert expected_action in caplog.records[0].message, caplog.records[0].message + assert "The run did not started." in caplog.records[0].message, caplog.records[0].message From 1dcd9094967a1b050f28813dba91a1db45bcdc59 Mon Sep 17 00:00:00 2001 From: nick863 <30440255+nick863@users.noreply.github.com> Date: Thu, 11 Jul 2024 11:55:03 -0700 Subject: [PATCH 5/6] Fix typo --- src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py b/src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py index 5dd72594133..d8be8f22559 100644 --- a/src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py +++ b/src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py @@ -313,7 +313,7 @@ def _check_state_and_log(self, action : str) -> bool: """ Check that the run is in the correct state and log worning if it is not. - :param action: Action, whcih caused this check. For example if it is "log artifact", + :param action: Action, which caused this check. For example if it is "log artifact", the log message will start "Unable to log artifact." :type action: str :return: boolean saying if run is in the correct state. From 7ee1ea264edd632be4ff9b0f61b696b4915ce15a Mon Sep 17 00:00:00 2001 From: nick863 <30440255+nick863@users.noreply.github.com> Date: Thu, 11 Jul 2024 14:12:02 -0700 Subject: [PATCH 6/6] Fix --- .../promptflow/evals/evaluate/_eval_run.py | 44 +++++++++++-------- .../tests/evals/unittests/test_eval_run.py | 35 ++++++++++++--- 2 files changed, 54 insertions(+), 25 deletions(-) diff --git a/src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py b/src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py index d8be8f22559..6142747def4 100644 --- a/src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py +++ b/src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py @@ -10,7 +10,7 @@ import requests import time import uuid -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Set from urllib.parse import urlparse from requests.adapters import HTTPAdapter @@ -142,8 +142,9 @@ def _start_run(self) -> None: """ Start the run, or, if it is not applicable (for example, if tracking is not enabled), mark it as started. """ - if self._status != RunStatus.NOT_STARTED: - raise ValueError("The run has already started. Please end this run and to start another one.") + self._check_state_and_log('start run', + {v for v in RunStatus if v != RunStatus.NOT_STARTED}, + True) self._status = RunStatus.STARTED if self._tracking_uri is None: LOGGER.warning("tracking_uri was not provided, " @@ -196,7 +197,9 @@ def _end_run(self, reason: str) -> None: :type reason: str :raises: ValueError if the run is not in ("FINISHED", "FAILED", "KILLED") """ - if not self._check_state_and_log('stop run'): + if not self._check_state_and_log('stop run', + {RunStatus.BROKEN, RunStatus.NOT_STARTED, RunStatus.TERMINATED}, + False): return if self._is_promptflow_run: # This run is already finished, we just add artifacts/metrics to it. @@ -206,9 +209,6 @@ def _end_run(self, reason: str) -> None: raise ValueError( f"Incorrect terminal status {reason}. " 'Valid statuses are "FINISHED", "FAILED" and "KILLED".' ) - if self._status == RunStatus.TERMINATED: - LOGGER.warning("Unable to stop run because it was already terminated.") - return url = f"https://{self._url_base}/mlflow/v2.0" f"{self._get_scope()}/api/2.0/mlflow/runs/update" body = { "run_uuid": self.info.run_id, @@ -309,22 +309,30 @@ def _log_warning(self, failed_op: str, response: requests.Response) -> None: f"{response.text=}." ) - def _check_state_and_log(self, action : str) -> bool: + def _check_state_and_log( + self, + action: str, + bad_states: Set[RunStatus], + should_raise: bool) -> bool: """ Check that the run is in the correct state and log worning if it is not. :param action: Action, which caused this check. For example if it is "log artifact", the log message will start "Unable to log artifact." :type action: str + :param bad_states: The states, considered invalid for given action. + :type bad_states: set + :param should_raise: Should we raise an error if the bad state has been encountered? + :type should_raise: bool + :raises: RuntimeError if should_raise is True and invalid state was encountered. :return: boolean saying if run is in the correct state. """ - if self._status == RunStatus.NOT_STARTED: - LOGGER.warning( - f"Unable to {action}. The run did not started. " - "Please start the run by calling start_run method.") - return False - if self._status == RunStatus.BROKEN: - LOGGER.warning(f"Unable to {action} because the run failed to start.") + if self._status in bad_states: + msg = f"Unable to {action} due to Run status={self._status}." + if should_raise: + raise RuntimeError(msg) + else: + LOGGER.warning(msg) return False return True @@ -338,7 +346,7 @@ def log_artifact(self, artifact_folder: str, artifact_name: str = EVALUATION_ART :param artifact_folder: The folder with artifacts to be uploaded. :type artifact_folder: str """ - if not self._check_state_and_log('log artifact'): + if not self._check_state_and_log('log artifact', {RunStatus.BROKEN, RunStatus.NOT_STARTED}, False): return # Check if artifact dirrectory is empty or does not exist. if not os.path.isdir(artifact_folder): @@ -425,7 +433,7 @@ def log_metric(self, key: str, value: float) -> None: :param value: The valure to be logged. :type value: float """ - if not self._check_state_and_log('log metric'): + if not self._check_state_and_log('log metric', {RunStatus.BROKEN, RunStatus.NOT_STARTED}, False): return body = { "run_uuid": self.info.run_id, @@ -450,7 +458,7 @@ def write_properties_to_run_history(self, properties: Dict[str, Any]) -> None: :param properties: The properties to be written to run history. :type properties: dict """ - if not self._check_state_and_log('write properties'): + if not self._check_state_and_log('write properties', {RunStatus.BROKEN, RunStatus.NOT_STARTED}, False): return # update host to run history and request PATCH API response = self.request_with_retry( diff --git a/src/promptflow-evals/tests/evals/unittests/test_eval_run.py b/src/promptflow-evals/tests/evals/unittests/test_eval_run.py index 388a2bfdca2..22964bbf12e 100644 --- a/src/promptflow-evals/tests/evals/unittests/test_eval_run.py +++ b/src/promptflow-evals/tests/evals/unittests/test_eval_run.py @@ -103,7 +103,7 @@ def test_run_logs_if_terminated(self, token_mock, caplog): run._end_run("KILLED") run._end_run("KILLED") assert len(caplog.records) == 1 - assert "Unable to stop run because it was already terminated." in caplog.records[0].message + assert "Unable to stop run due to Run status=RunStatus.TERMINATED." in caplog.records[0].message def test_end_logs_if_fails(self, token_mock, caplog): """Test that if the terminal status setting was failed, it is logged.""" @@ -158,17 +158,17 @@ def test_start_run_fails(self, token_mock, caplog): # Log artifact run.log_artifact("test") assert len(caplog.records) == 1 - assert "Unable to log artifact because the run failed to start." in caplog.records[0].message + assert "Unable to log artifact due to Run status=RunStatus.BROKEN." in caplog.records[0].message caplog.clear() # Log metric run.log_metric("a", 42) assert len(caplog.records) == 1 - assert "Unable to log metric because the run failed to start." in caplog.records[0].message + assert "Unable to log metric due to Run status=RunStatus.BROKEN." in caplog.records[0].message caplog.clear() # End run run._end_run("FINISHED") assert len(caplog.records) == 1 - assert "Unable to stop run because the run failed to start." in caplog.records[0].message + assert "Unable to stop run due to Run status=RunStatus.BROKEN." in caplog.records[0].message caplog.clear() @patch("promptflow.evals.evaluate._eval_run.requests.Session") @@ -463,8 +463,8 @@ def test_write_properties_to_run_history_logs_error(self, token_mock, caplog): run.write_properties_to_run_history({'foo': 'bar'}) assert len(caplog.records) == 3 assert "tracking_uri was not provided," in caplog.records[0].message - assert "Unable to write properties because the run failed to start." in caplog.records[1].message - assert "Unable to stop run because the run failed to start." in caplog.records[2].message + assert "Unable to write properties due to Run status=RunStatus.BROKEN." in caplog.records[1].message + assert "Unable to stop run due to Run status=RunStatus.BROKEN." in caplog.records[2].message @pytest.mark.parametrize( 'function_literal,args,expected_action', @@ -488,4 +488,25 @@ def test_logs_if_not_started(self, token_mock, caplog, function_literal, args, e getattr(run, function_literal)(*args) assert len(caplog.records) == 1 assert expected_action in caplog.records[0].message, caplog.records[0].message - assert "The run did not started." in caplog.records[0].message, caplog.records[0].message + assert f"Unable to {expected_action} due to Run status=RunStatus.NOT_STARTED" in caplog.records[ + 0].message, caplog.records[0].message + + @pytest.mark.parametrize( + 'status', + [RunStatus.STARTED, RunStatus.BROKEN, RunStatus.TERMINATED] + ) + def test_starting_started_run(self, token_mock, status): + """Test exception if the run was already started""" + run = EvalRun( + run_name=None, + **TestEvalRun._MOCK_CREDS + ) + mock_session = MagicMock() + mock_session.request.return_value = self._get_mock_create_resonse(500 if status == RunStatus.BROKEN else 200) + with patch("promptflow.evals.evaluate._eval_run.requests.Session", return_value=mock_session): + run._start_run() + if status == RunStatus.TERMINATED: + run._end_run('FINISHED') + with pytest.raises(RuntimeError) as cm: + run._start_run() + assert f"Unable to start run due to Run status={status}" in cm.value.args[0], cm.value.args[0]