Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make evaluation run a context manager instead of a singleton. #3529

Merged
merged 9 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 100 additions & 60 deletions src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import contextlib
import dataclasses
import enum
import logging
import os
import posixpath
import requests
import time
import uuid
from typing import Any, Dict, Optional, Type
from typing import Any, Dict, Optional
from urllib.parse import urlparse

from azure.storage.blob import BlobServiceClient
Expand All @@ -21,6 +23,7 @@
from azure.ai.ml.entities._credentials import AccountKeyConfiguration
from azure.ai.ml.entities._datastore.datastore import Datastore


LOGGER = logging.getLogger(__name__)


Expand Down Expand Up @@ -50,28 +53,15 @@ def generate(run_name: Optional[str]) -> 'RunInfo':
)


class Singleton(type):
"""Singleton class, which will be used as a metaclass."""

_instances = {}

def __call__(cls, *args, **kwargs):
"""Redefinition of call to return one instance per type."""
if cls not in Singleton._instances:
Singleton._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return Singleton._instances[cls]

@staticmethod
def destroy(cls: Type) -> None:
"""
Destroy the singleton instance.

:param cls: The class to be destroyed.
"""
Singleton._instances.pop(cls, None)
class RunStatus(enum.Enum):
"""Run states."""
NOT_STARTED = 0
STARTED = 1
BROKEN = 2
TERMINATED = 3


class EvalRun(metaclass=Singleton):
class EvalRun(contextlib.AbstractContextManager):
"""
The simple singleton run class, used for accessing artifact store.

Expand Down Expand Up @@ -117,25 +107,18 @@ def __init__(self,
self._workspace_name: str = workspace_name
self._ml_client: Any = ml_client
self._is_promptflow_run: bool = promptflow_run is not None
self._is_broken = False
if self._tracking_uri is None:
LOGGER.warning("tracking_uri was not provided, "
"The results will be saved locally, but will not be logged to Azure.")
self._url_base = None
self._is_broken = True
self.info = RunInfo.generate(run_name)
else:
self._url_base = urlparse(self._tracking_uri).netloc
if promptflow_run is not None:
self.info = RunInfo(
promptflow_run.name,
promptflow_run._experiment_name,
promptflow_run.name
)
else:
self._is_broken = self._start_run(run_name)
self._run_name = run_name
self._promptflow_run = promptflow_run
self._status = RunStatus.NOT_STARTED

self._is_terminated = False
@property
def status(self) -> RunStatus:
"""
Return the run status.

:return: The status of the run.
"""
return self._status

def _get_scope(self) -> str:
"""
Expand All @@ -154,7 +137,31 @@ def _get_scope(self) -> str:
self._workspace_name,
)

def _start_run(self, run_name: Optional[str]) -> bool:
def start_run(self) -> None:
nick863 marked this conversation as resolved.
Show resolved Hide resolved
"""
Start the run, or, if it is not applicable (for example, if tracking is not enabled), mark it as started.
"""
if self._status != RunStatus.NOT_STARTED:
raise ValueError("The run has already started. Please end this run and to start another one.")
nick863 marked this conversation as resolved.
Show resolved Hide resolved
self._status = RunStatus.STARTED
if self._tracking_uri is None:
LOGGER.warning("tracking_uri was not provided, "
"The results will be saved locally, but will not be logged to Azure.")
self._url_base = None
self._status = RunStatus.BROKEN
self.info = RunInfo.generate(self._run_name)
else:
self._url_base = urlparse(self._tracking_uri).netloc
if self._promptflow_run is not None:
self.info = RunInfo(
self._promptflow_run.name,
self._promptflow_run._experiment_name,
self._promptflow_run.name
)
else:
self._status = self._start_run(self._run_name)

def _start_run(self, run_name: Optional[str]) -> 'RunStatus':
"""
Make a request to start the mlflow run. If the run will not start, it will be

Expand All @@ -181,49 +188,58 @@ def _start_run(self, run_name: Optional[str]) -> bool:
self.info = RunInfo.generate(run_name)
LOGGER.warning(f"The run failed to start: {response.status_code}: {response.text}."
"The results will be saved locally, but will not be logged to Azure.")
return True
return RunStatus.BROKEN
parsed_response = response.json()
self.info = RunInfo(
run_id=parsed_response['run']['info']['run_id'],
experiment_id=parsed_response['run']['info']['experiment_id'],
run_name=parsed_response['run']['info']['run_name']
)
return False
return RunStatus.STARTED

def end_run(self, status: str) -> None:
def end_run(self, reason: str) -> None:
"""
Tetminate the run.

:param status: One of "FINISHED" "FAILED" and "KILLED"
:type status: str
:param reason: One of "FINISHED" "FAILED" and "KILLED"
:type reason: str
:raises: ValueError if the run is not in ("FINISHED", "FAILED", "KILLED")
"""
self._raise_not_started_nay_be()
if self._is_promptflow_run:
# This run is already finished, we just add artifacts/metrics to it.
Singleton.destroy(EvalRun)
self._status = RunStatus.TERMINATED
return
if status not in ("FINISHED", "FAILED", "KILLED"):
if reason not in ("FINISHED", "FAILED", "KILLED"):
raise ValueError(
f"Incorrect terminal status {status}. " 'Valid statuses are "FINISHED", "FAILED" and "KILLED".'
f"Incorrect terminal status {reason}. " 'Valid statuses are "FINISHED", "FAILED" and "KILLED".'
)
if self._is_terminated:
if self._status == RunStatus.TERMINATED:
LOGGER.warning("Unable to stop run because it was already terminated.")
return
if self._is_broken:
if self._status == RunStatus.BROKEN:
LOGGER.warning("Unable to stop run because the run failed to start.")
return
url = f"https://{self._url_base}/mlflow/v2.0" f"{self._get_scope()}/api/2.0/mlflow/runs/update"
body = {
"run_uuid": self.info.run_id,
"status": status,
"status": reason,
"end_time": int(time.time() * 1000),
"run_id": self.info.run_id,
}
response = self.request_with_retry(url=url, method="POST", json_dict=body)
if response.status_code != 200:
LOGGER.warning("Unable to terminate the run.")
Singleton.destroy(EvalRun)
self._is_terminated = True
self._status = RunStatus.TERMINATED

def __enter__(self):
"""The Context Manager enter call."""
self.start_run()
return self

def __exit__(self, exc_type, exc_value, exc_tb):
"""The context manager exit call."""
self.end_run("FINISHED")

def get_run_history_uri(self) -> str:
"""
Expand Down Expand Up @@ -304,6 +320,17 @@ def _log_warning(self, failed_op: str, response: requests.Response) -> None:
f"{response.text=}."
)

def _raise_not_started_nay_be(self) -> None:
"""
Raise value error if the run was not started.

:raises: ValueError
"""
if self._status == RunStatus.NOT_STARTED:
raise ValueError(
nick863 marked this conversation as resolved.
Show resolved Hide resolved
"The run did not started. "
"Please start the run by calling start_run method.")

def log_artifact(self, artifact_folder: str, artifact_name: str = EVALUATION_ARTIFACT) -> None:
"""
The local implementation of mlflow-like artifact logging.
Expand All @@ -314,7 +341,8 @@ def log_artifact(self, artifact_folder: str, artifact_name: str = EVALUATION_ART
:param artifact_folder: The folder with artifacts to be uploaded.
:type artifact_folder: str
"""
if self._is_broken:
self._raise_not_started_nay_be()
nick863 marked this conversation as resolved.
Show resolved Hide resolved
if self._status == RunStatus.BROKEN:
LOGGER.warning("Unable to log artifact because the run failed to start.")
return
# Check if artifact dirrectory is empty or does not exist.
Expand Down Expand Up @@ -402,7 +430,8 @@ def log_metric(self, key: str, value: float) -> None:
:param value: The valure to be logged.
:type value: float
"""
if self._is_broken:
self._raise_not_started_nay_be()
if self._status == RunStatus.BROKEN:
LOGGER.warning("Unable to log metric because the run failed to start.")
return
body = {
Expand All @@ -421,11 +450,22 @@ def log_metric(self, key: str, value: float) -> None:
if response.status_code != 200:
self._log_warning("save metrics", response)

@staticmethod
def get_instance(*args, **kwargs) -> "EvalRun":
def write_properties_to_run_history(self, properties: Dict[str, Any]) -> None:
"""
The convenience method to the the EvalRun instance.
Write properties to the RunHistory service.

:return: The EvalRun instance.
:param properties: The properties to be written to run history.
:type properties: dict
"""
return EvalRun(*args, **kwargs)
self._raise_not_started_nay_be()
if self._status == RunStatus.BROKEN:
LOGGER.warning("Unable to write properties because the run failed to start.")
return
# update host to run history and request PATCH API
response = self.request_with_retry(
url=self.get_run_history_uri(),
method="PATCH",
json_dict={"runId": self.info.run_id, "properties": properties},
)
if response.status_code != 200:
LOGGER.error("Fail writing properties '%s' to run history: %s", properties, response.text)
61 changes: 22 additions & 39 deletions src/promptflow-evals/promptflow/evals/evaluate/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,6 @@ def load_jsonl(path):
return [json.loads(line) for line in f.readlines()]


def _write_properties_to_run_history(properties: dict) -> None:
run = EvalRun.get_instance()
try:
# update host to run history and request PATCH API
response = run.request_with_retry(
url=run.get_run_history_uri(),
method="PATCH",
json_dict={"runId": run.info.run_id, "properties": properties},
)
if response.status_code != 200:
LOGGER.error("Fail writing properties '%s' to run history: %s", properties, response.text)
response.raise_for_status()
except AttributeError as e:
LOGGER.error("Fail writing properties '%s' to run history: %s", properties, e)


def _azure_pf_client_and_triad(trace_destination):
from promptflow.azure._cli._utils import _get_azure_pf_client

Expand All @@ -90,43 +74,42 @@ def _log_metrics_and_instance_results(
# Adding line_number as index column this is needed by UI to form link to individual instance run
instance_results["line_number"] = instance_results.index.values

ev_run = EvalRun(
with EvalRun(
nick863 marked this conversation as resolved.
Show resolved Hide resolved
run_name=run.name if run is not None else evaluation_name,
tracking_uri=tracking_uri,
subscription_id=ws_triad.subscription_id,
group_name=ws_triad.resource_group_name,
workspace_name=ws_triad.workspace_name,
ml_client=azure_pf_client.ml_client,
promptflow_run=run,
)
) as ev_run:

artifact_name = EvalRun.EVALUATION_ARTIFACT if run else EvalRun.EVALUATION_ARTIFACT_DUMMY_RUN
artifact_name = EvalRun.EVALUATION_ARTIFACT if run else EvalRun.EVALUATION_ARTIFACT_DUMMY_RUN

with tempfile.TemporaryDirectory() as tmpdir:
tmp_path = os.path.join(tmpdir, artifact_name)
with tempfile.TemporaryDirectory() as tmpdir:
tmp_path = os.path.join(tmpdir, artifact_name)

with open(tmp_path, "w", encoding="utf-8") as f:
f.write(instance_results.to_json(orient="records", lines=True))
with open(tmp_path, "w", encoding="utf-8") as f:
f.write(instance_results.to_json(orient="records", lines=True))

ev_run.log_artifact(tmpdir, artifact_name)
ev_run.log_artifact(tmpdir, artifact_name)

# Using mlflow to create a dummy run since once created via PF show traces of dummy run in UI.
# Those traces can be confusing.
# adding these properties to avoid showing traces if a dummy run is created.
# We are doing that only for the pure evaluation runs.
if run is None:
_write_properties_to_run_history(
properties={
"_azureml.evaluation_run": "azure-ai-generative-parent",
"_azureml.evaluate_artifacts": json.dumps([{"path": artifact_name, "type": "table"}]),
"isEvaluatorRun": "true",
}
)
# Using mlflow to create a dummy run since once created via PF show traces of dummy run in UI.
# Those traces can be confusing.
# adding these properties to avoid showing traces if a dummy run is created.
# We are doing that only for the pure evaluation runs.
if run is None:
ev_run.write_properties_to_run_history(
properties={
"_azureml.evaluation_run": "azure-ai-generative-parent",
"_azureml.evaluate_artifacts": json.dumps([{"path": artifact_name, "type": "table"}]),
"isEvaluatorRun": "true",
}
)

for metric_name, metric_value in metrics.items():
ev_run.log_metric(metric_name, metric_value)
for metric_name, metric_value in metrics.items():
ev_run.log_metric(metric_name, metric_value)

ev_run.end_run("FINISHED")
evaluation_id = ev_run.info.run_name if run is not None else ev_run.info.run_id
return _get_ai_studio_url(trace_destination=trace_destination, evaluation_id=evaluation_id)

Expand Down
Loading
Loading