-
Notifications
You must be signed in to change notification settings - Fork 90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DRAFT] update evaluate to be concurrent #1345
base: main
Are you sure you want to change the base?
Changes from all commits
696e4bf
a558981
117e8c6
3c8424c
2bf063f
982750c
c9071a4
41cd64f
628c3c3
f00e2f6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,11 @@ | ||
"""V2 Evaluation Interface.""" | ||
Check notice on line 1 in python/langsmith/evaluation/_arunner.py GitHub Actions / benchmarkBenchmark results
Check notice on line 1 in python/langsmith/evaluation/_arunner.py GitHub Actions / benchmarkComparison against main
|
||
|
||
from __future__ import annotations | ||
|
||
import asyncio | ||
import concurrent.futures as cf | ||
import datetime | ||
import io | ||
import logging | ||
import pathlib | ||
import uuid | ||
|
@@ -47,7 +48,6 @@ | |
_load_experiment, | ||
_load_tqdm, | ||
_load_traces, | ||
_make_fresh_examples, | ||
_resolve_data, | ||
_resolve_evaluators, | ||
_resolve_experiment, | ||
|
@@ -480,7 +480,13 @@ | |
num_repetitions=num_repetitions, | ||
runs=runs, | ||
include_attachments=_include_attachments(target) | ||
or _evaluators_include_attachments(evaluators), | ||
or _evaluators_include_attachments(evaluators) > 0, | ||
reuse_attachments=num_repetitions | ||
* ( | ||
int(_include_attachments(target)) | ||
+ _evaluators_include_attachments(evaluators) | ||
) | ||
> 1, | ||
upload_results=upload_results, | ||
).astart() | ||
cache_dir = ls_utils.get_cache_dir(None) | ||
|
@@ -491,15 +497,24 @@ | |
cache_path = None | ||
with ls_utils.with_optional_cache(cache_path, ignore_hosts=[client.api_url]): | ||
if is_async_target: | ||
manager = await manager.awith_predictions( | ||
cast(ATARGET_T, target), max_concurrency=max_concurrency | ||
) | ||
if evaluators: | ||
manager = await manager.awith_evaluators( | ||
evaluators, max_concurrency=max_concurrency | ||
) | ||
if summary_evaluators: | ||
manager = await manager.awith_summary_evaluators(summary_evaluators) | ||
if evaluators: | ||
# Run predictions and evaluations in a single pipeline | ||
manager = await manager.awith_predictions_and_evaluators( | ||
cast(ATARGET_T, target), evaluators, max_concurrency=max_concurrency | ||
) | ||
else: | ||
manager = await manager.awith_predictions( | ||
cast(ATARGET_T, target), max_concurrency=max_concurrency | ||
) | ||
if summary_evaluators: | ||
manager = await manager.awith_summary_evaluators(summary_evaluators) | ||
else: | ||
if evaluators: | ||
manager = await manager.awith_evaluators( | ||
evaluators, max_concurrency=max_concurrency | ||
) | ||
if summary_evaluators: | ||
manager = await manager.awith_summary_evaluators(summary_evaluators) | ||
results = AsyncExperimentResults(manager) | ||
if blocking: | ||
await results.wait() | ||
|
@@ -528,6 +543,18 @@ | |
sresults for the experiment. | ||
summary_results (Optional[Iterable[EvaluationResults]]): The aggregate results | ||
for the experiment. | ||
num_repetitions (Optional[int], default=1): The number of repetitions for | ||
the experiment. | ||
include_attachments (Optional[bool], default=False): Whether to include | ||
attachments. This is used for when we pull the examples for the experiment. | ||
reuse_attachments (Optional[bool], default=False): Whether to reuse attachments | ||
from examples. This is True if we need to reuse attachments across multiple | ||
target/evaluator functions. | ||
upload_results (Optional[bool], default=True): Whether to upload results | ||
to Langsmith. | ||
attachment_raw_data_dict (Optional[dict]): A dictionary to store raw data | ||
for attachments. Only used if we reuse attachments across multiple | ||
target/evaluator functions. | ||
""" | ||
|
||
def __init__( | ||
|
@@ -543,7 +570,9 @@ | |
description: Optional[str] = None, | ||
num_repetitions: int = 1, | ||
include_attachments: bool = False, | ||
reuse_attachments: bool = False, | ||
upload_results: bool = True, | ||
attachment_raw_data_dict: Optional[dict] = None, | ||
): | ||
super().__init__( | ||
experiment=experiment, | ||
|
@@ -560,7 +589,53 @@ | |
self._summary_results = summary_results | ||
self._num_repetitions = num_repetitions | ||
self._include_attachments = include_attachments | ||
self._reuse_attachments = reuse_attachments | ||
self._upload_results = upload_results | ||
self._attachment_raw_data_dict = attachment_raw_data_dict | ||
|
||
def _reset_example_attachments(self, example: schemas.Example) -> schemas.Example: | ||
"""Reset attachment readers for an example. | ||
|
||
This is only in the case that an attachment is going to be used by more | ||
than 1 callable (target + evaluators). In that case we keep a single copy | ||
of the attachment data in self._attachment_raw_data_dict, and create | ||
readers from that data. This makes it so that we don't have to keep | ||
copies of the same data in memory, instead we can just create readers | ||
from the same data. | ||
""" | ||
if not hasattr(example, "attachments") or not example.attachments: | ||
return example | ||
|
||
new_attachments: dict[str, schemas.AttachmentInfo] = {} | ||
for name, attachment in example.attachments.items(): | ||
if ( | ||
self._attachment_raw_data_dict is not None | ||
and str(example.id) + name in self._attachment_raw_data_dict | ||
): | ||
new_attachments[name] = { | ||
"presigned_url": attachment["presigned_url"], | ||
"reader": io.BytesIO( | ||
self._attachment_raw_data_dict[str(example.id) + name] | ||
), | ||
} | ||
else: | ||
new_attachments[name] = attachment | ||
|
||
# Create a new Example instance with the updated attachments | ||
return schemas.Example( | ||
id=example.id, | ||
created_at=example.created_at, | ||
dataset_id=example.dataset_id, | ||
inputs=example.inputs, | ||
outputs=example.outputs, | ||
metadata=example.metadata, | ||
modified_at=example.modified_at, | ||
runs=example.runs, | ||
source_run_id=example.source_run_id, | ||
attachments=new_attachments, | ||
_host_url=example._host_url, | ||
_tenant_id=example._tenant_id, | ||
) | ||
|
||
async def aget_examples(self) -> AsyncIterator[schemas.Example]: | ||
if self._examples is None: | ||
|
@@ -569,11 +644,23 @@ | |
client=self.client, | ||
include_attachments=self._include_attachments, | ||
) | ||
if self._reuse_attachments and self._attachment_raw_data_dict is None: | ||
examples_copy, self._examples = aitertools.atee(self._examples) | ||
self._attachment_raw_data_dict = { | ||
str(e.id) + name: value["reader"].read() | ||
async for e in examples_copy | ||
for name, value in (e.attachments or {}).items() | ||
} | ||
if self._num_repetitions > 1: | ||
examples_list = [example async for example in self._examples] | ||
self._examples = async_chain_from_iterable( | ||
[ | ||
async_iter_from_list(_make_fresh_examples(examples_list)) | ||
async_iter_from_list( | ||
[ | ||
self._reset_example_attachments(example) | ||
for example in examples_list | ||
] | ||
) | ||
for _ in range(self._num_repetitions) | ||
] | ||
) | ||
|
@@ -639,6 +726,103 @@ | |
runs=self._runs, | ||
evaluation_results=self._evaluation_results, | ||
include_attachments=self._include_attachments, | ||
reuse_attachments=self._reuse_attachments, | ||
upload_results=self._upload_results, | ||
attachment_raw_data_dict=self._attachment_raw_data_dict, | ||
) | ||
|
||
def _get_example_with_readers(self, example: schemas.Example) -> schemas.Example: | ||
new_attachments: dict[str, schemas.AttachmentInfo] = {} | ||
for name, attachment in (example.attachments or {}).items(): | ||
if ( | ||
self._attachment_raw_data_dict is not None | ||
and str(example.id) + name in self._attachment_raw_data_dict | ||
): | ||
reader = io.BytesIO( | ||
self._attachment_raw_data_dict[str(example.id) + name] | ||
) | ||
new_attachments[name] = { | ||
"presigned_url": attachment["presigned_url"], | ||
"reader": reader, | ||
} | ||
else: | ||
new_attachments[name] = attachment | ||
|
||
return schemas.Example( | ||
id=example.id, | ||
created_at=example.created_at, | ||
dataset_id=example.dataset_id, | ||
inputs=example.inputs, | ||
outputs=example.outputs, | ||
metadata=example.metadata, | ||
modified_at=example.modified_at, | ||
runs=example.runs, | ||
source_run_id=example.source_run_id, | ||
attachments=new_attachments, | ||
_host_url=example._host_url, | ||
_tenant_id=example._tenant_id, | ||
) | ||
|
||
async def awith_predictions_and_evaluators( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could probably do something similar to what we do in the sync version to avoid having to duplicate logic here (basically share a semaphor) |
||
self, | ||
target: ATARGET_T, | ||
evaluators: Sequence[Union[EVALUATOR_T, AEVALUATOR_T]], | ||
/, | ||
max_concurrency: Optional[int] = None, | ||
) -> _AsyncExperimentManager: | ||
"""Run predictions and evaluations in a single pipeline. | ||
|
||
This allows evaluators to process results as soon as they're available from | ||
the target function, rather than waiting for all predictions to complete first. | ||
""" | ||
evaluators = _resolve_evaluators(evaluators) | ||
|
||
if not hasattr(self, "_evaluator_executor"): | ||
self._evaluator_executor = cf.ThreadPoolExecutor(max_workers=4) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ooc where's the 4 come from? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I copied the value from |
||
|
||
async def process_examples(): | ||
"""Create a single task per example. | ||
|
||
That task is to run the target function and all the evaluators | ||
sequentially. | ||
""" | ||
async for pred in self._apredict( | ||
target, | ||
max_concurrency=max_concurrency, | ||
include_attachments=_include_attachments(target), | ||
): | ||
example, run = pred["example"], pred["run"] | ||
result = self._arun_evaluators( | ||
evaluators, | ||
{ | ||
"run": run, | ||
"example": example, | ||
"evaluation_results": {"results": []}, | ||
}, | ||
executor=self._evaluator_executor, | ||
) | ||
yield result | ||
|
||
# Run the per-example tasks with max-concurrency | ||
# This guarantees that max_concurrency is the upper limit | ||
# for the number of target/evaluators that can be run in parallel | ||
experiment_results = aitertools.aiter_with_concurrency( | ||
max_concurrency, | ||
process_examples(), | ||
_eager_consumption_timeout=0.001, | ||
) | ||
|
||
r1, r2, r3 = aitertools.atee(experiment_results, 3, lock=asyncio.Lock()) | ||
|
||
return _AsyncExperimentManager( | ||
(result["example"] async for result in r1), | ||
experiment=self._experiment, | ||
metadata=self._metadata, | ||
client=self.client, | ||
runs=(result["run"] async for result in r2), | ||
evaluation_results=(result["evaluation_results"] async for result in r3), | ||
summary_results=self._summary_results, | ||
include_attachments=self._include_attachments, | ||
upload_results=self._upload_results, | ||
) | ||
|
||
|
@@ -740,7 +924,7 @@ | |
# Yield the coroutine to be awaited later | ||
yield _aforward( | ||
fn, | ||
example, | ||
self._get_example_with_readers(example), | ||
self.experiment_name, | ||
self._metadata, | ||
self.client, | ||
|
@@ -796,19 +980,22 @@ | |
run = current_results["run"] | ||
example = current_results["example"] | ||
eval_results = current_results["evaluation_results"] | ||
for evaluator in evaluators: | ||
|
||
async def _run_single_evaluator(evaluator): | ||
try: | ||
evaluator_response = await evaluator.aevaluate_run( | ||
run=run, | ||
example=example, | ||
example=self._get_example_with_readers(example), | ||
) | ||
eval_results["results"].extend( | ||
self.client._select_eval_results(evaluator_response) | ||
selected_results = self.client._select_eval_results( | ||
evaluator_response | ||
) | ||
|
||
if self._upload_results: | ||
self.client._log_evaluation_feedback( | ||
evaluator_response, run=run, _executor=executor | ||
) | ||
return selected_results | ||
except Exception as e: | ||
try: | ||
feedback_keys = _extract_feedback_keys(evaluator) | ||
|
@@ -824,13 +1011,14 @@ | |
for key in feedback_keys | ||
] | ||
) | ||
eval_results["results"].extend( | ||
self.client._select_eval_results(error_response) | ||
selected_results = self.client._select_eval_results( | ||
error_response | ||
) | ||
if self._upload_results: | ||
self.client._log_evaluation_feedback( | ||
error_response, run=run, _executor=executor | ||
) | ||
return selected_results | ||
except Exception as e2: | ||
logger.debug(f"Error parsing feedback keys: {e2}") | ||
pass | ||
|
@@ -839,15 +1027,14 @@ | |
f" run {run.id}: {repr(e)}", | ||
exc_info=True, | ||
) | ||
logger.error( | ||
f"Error running evaluator {repr(evaluator)} on" | ||
f" run {run.id}: {repr(e)}", | ||
exc_info=True, | ||
) | ||
if example.attachments is not None: | ||
for attachment in example.attachments: | ||
reader = example.attachments[attachment]["reader"] | ||
reader.seek(0) | ||
|
||
all_results = [] | ||
for evaluator in evaluators: | ||
all_results.append(await _run_single_evaluator(evaluator)) | ||
|
||
for result in all_results: | ||
if result is not None: | ||
eval_results["results"].extend(result) | ||
return ExperimentResultRow( | ||
run=run, | ||
example=example, | ||
|
@@ -1064,10 +1251,6 @@ | |
client=client, | ||
), | ||
) | ||
if include_attachments and example.attachments is not None: | ||
for attachment in example.attachments: | ||
reader = example.attachments[attachment]["reader"] | ||
reader.seek(0) | ||
except Exception as e: | ||
logger.error( | ||
f"Error running target function: {e}", exc_info=True, stacklevel=1 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're sure this doesn't copy the bytes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, you are correct. io.BytesIO copies the underlying bytes. This is wrong, I am working on a fix rn.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ehh actually I am going to walk back my statement. based on testing I don't think bytesIO copies the data.