Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] update evaluate to be concurrent #1345

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 188 additions & 32 deletions python/langsmith/evaluation/_arunner.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
"""V2 Evaluation Interface."""

Check notice on line 1 in python/langsmith/evaluation/_arunner.py

View workflow job for this annotation

GitHub Actions / benchmark

Benchmark results

........... create_5_000_run_trees: Mean +- std dev: 665 ms +- 57 ms ........... create_10_000_run_trees: Mean +- std dev: 1.32 sec +- 0.07 sec ........... create_20_000_run_trees: Mean +- std dev: 1.32 sec +- 0.09 sec ........... dumps_class_nested_py_branch_and_leaf_200x400: Mean +- std dev: 712 us +- 12 us ........... dumps_class_nested_py_leaf_50x100: Mean +- std dev: 25.1 ms +- 0.2 ms ........... dumps_class_nested_py_leaf_100x200: Mean +- std dev: 105 ms +- 3 ms ........... dumps_dataclass_nested_50x100: Mean +- std dev: 25.6 ms +- 0.3 ms ........... WARNING: the benchmark result may be unstable * the standard deviation (17.9 ms) is 24% of the mean (75.5 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. dumps_pydantic_nested_50x100: Mean +- std dev: 75.5 ms +- 17.9 ms ........... dumps_pydanticv1_nested_50x100: Mean +- std dev: 202 ms +- 3 ms

Check notice on line 1 in python/langsmith/evaluation/_arunner.py

View workflow job for this annotation

GitHub Actions / benchmark

Comparison against main

+-----------------------------------------------+----------+------------------------+ | Benchmark | main | changes | +===============================================+==========+========================+ | create_5_000_run_trees | 739 ms | 665 ms: 1.11x faster | +-----------------------------------------------+----------+------------------------+ | create_10_000_run_trees | 1.46 sec | 1.32 sec: 1.10x faster | +-----------------------------------------------+----------+------------------------+ | dumps_pydanticv1_nested_50x100 | 222 ms | 202 ms: 1.10x faster | +-----------------------------------------------+----------+------------------------+ | create_20_000_run_trees | 1.44 sec | 1.32 sec: 1.09x faster | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_50x100 | 25.2 ms | 25.1 ms: 1.00x faster | +-----------------------------------------------+----------+------------------------+ | dumps_dataclass_nested_50x100 | 25.4 ms | 25.6 ms: 1.01x slower | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_100x200 | 104 ms | 105 ms: 1.01x slower | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_branch_and_leaf_200x400 | 690 us | 712 us: 1.03x slower | +-----------------------------------------------+----------+------------------------+ | dumps_pydantic_nested_50x100 | 67.6 ms | 75.5 ms: 1.12x slower | +-----------------------------------------------+----------+------------------------+ | Geometric mean | (ref) | 1.03x faster | +-----------------------------------------------+----------+------------------------+

from __future__ import annotations

import asyncio
import concurrent.futures as cf
import datetime
import io
import logging
import pathlib
import uuid
Expand Down Expand Up @@ -47,7 +48,6 @@
_load_experiment,
_load_tqdm,
_load_traces,
_make_fresh_examples,
_resolve_data,
_resolve_evaluators,
_resolve_experiment,
Expand Down Expand Up @@ -480,7 +480,13 @@
num_repetitions=num_repetitions,
runs=runs,
include_attachments=_include_attachments(target)
or _evaluators_include_attachments(evaluators),
or _evaluators_include_attachments(evaluators) > 0,
reuse_attachments=num_repetitions
* (
int(_include_attachments(target))
+ _evaluators_include_attachments(evaluators)
)
> 1,
upload_results=upload_results,
).astart()
cache_dir = ls_utils.get_cache_dir(None)
Expand All @@ -491,15 +497,24 @@
cache_path = None
with ls_utils.with_optional_cache(cache_path, ignore_hosts=[client.api_url]):
if is_async_target:
manager = await manager.awith_predictions(
cast(ATARGET_T, target), max_concurrency=max_concurrency
)
if evaluators:
manager = await manager.awith_evaluators(
evaluators, max_concurrency=max_concurrency
)
if summary_evaluators:
manager = await manager.awith_summary_evaluators(summary_evaluators)
if evaluators:
# Run predictions and evaluations in a single pipeline
manager = await manager.awith_predictions_and_evaluators(
cast(ATARGET_T, target), evaluators, max_concurrency=max_concurrency
)
else:
manager = await manager.awith_predictions(
cast(ATARGET_T, target), max_concurrency=max_concurrency
)
if summary_evaluators:
manager = await manager.awith_summary_evaluators(summary_evaluators)
else:
if evaluators:
manager = await manager.awith_evaluators(
evaluators, max_concurrency=max_concurrency
)
if summary_evaluators:
manager = await manager.awith_summary_evaluators(summary_evaluators)
results = AsyncExperimentResults(manager)
if blocking:
await results.wait()
Expand Down Expand Up @@ -543,7 +558,9 @@
description: Optional[str] = None,
num_repetitions: int = 1,
include_attachments: bool = False,
reuse_attachments: bool = False,
upload_results: bool = True,
attachment_raw_data_dict: Optional[dict] = None,
):
super().__init__(
experiment=experiment,
Expand All @@ -560,7 +577,52 @@
self._summary_results = summary_results
self._num_repetitions = num_repetitions
self._include_attachments = include_attachments
self._reuse_attachments = reuse_attachments
self._upload_results = upload_results
self._attachment_raw_data_dict = attachment_raw_data_dict

def _reset_example_attachments(self, example: schemas.Example) -> schemas.Example:
"""Reset attachment readers for an example."""
if not hasattr(example, "attachments") or not example.attachments:
return example

new_attachments: dict[str, schemas.AttachmentInfo] = {}
for name, attachment in example.attachments.items():
if (
self._attachment_raw_data_dict is not None
and str(example.id) + name in self._attachment_raw_data_dict
):
new_attachments[name] = {
"presigned_url": attachment["presigned_url"],
"reader": io.BytesIO(
self._attachment_raw_data_dict[str(example.id) + name]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're sure this doesn't copy the bytes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, you are correct. io.BytesIO copies the underlying bytes. This is wrong, I am working on a fix rn.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ehh actually I am going to walk back my statement. based on testing I don't think bytesIO copies the data.

),
}
else:
new_attachments[name] = attachment

# Create a new Example instance with the updated attachments
return schemas.Example(
id=example.id,
created_at=example.created_at,
dataset_id=example.dataset_id,
inputs=example.inputs,
outputs=example.outputs,
metadata=example.metadata,
modified_at=example.modified_at,
runs=example.runs,
source_run_id=example.source_run_id,
attachments=new_attachments,
_host_url=example._host_url,
_tenant_id=example._tenant_id,
)

def _make_fresh_examples(
isahers1 marked this conversation as resolved.
Show resolved Hide resolved
self,
examples: List[schemas.Example],
) -> List[schemas.Example]:
"""Create fresh copies of examples with reset readers."""
return [self._reset_example_attachments(example) for example in examples]

async def aget_examples(self) -> AsyncIterator[schemas.Example]:
if self._examples is None:
Expand All @@ -569,11 +631,18 @@
client=self.client,
include_attachments=self._include_attachments,
)
if self._reuse_attachments and self._attachment_raw_data_dict is None:
examples_copy, self._examples = aitertools.atee(self._examples)
self._attachment_raw_data_dict = {
str(e.id) + name: value["reader"].read()
async for e in examples_copy
for name, value in (e.attachments or {}).items()
}
if self._num_repetitions > 1:
examples_list = [example async for example in self._examples]
self._examples = async_chain_from_iterable(
[
async_iter_from_list(_make_fresh_examples(examples_list))
async_iter_from_list(self._make_fresh_examples(examples_list))
for _ in range(self._num_repetitions)
]
)
Expand Down Expand Up @@ -639,6 +708,95 @@
runs=self._runs,
evaluation_results=self._evaluation_results,
include_attachments=self._include_attachments,
reuse_attachments=self._reuse_attachments,
upload_results=self._upload_results,
attachment_raw_data_dict=self._attachment_raw_data_dict,
)

def _get_example_with_readers(self, example: schemas.Example) -> schemas.Example:
new_attachments: dict[str, schemas.AttachmentInfo] = {}
for name, attachment in (example.attachments or {}).items():
if (
self._attachment_raw_data_dict is not None
and str(example.id) + name in self._attachment_raw_data_dict
):
reader = io.BytesIO(
self._attachment_raw_data_dict[str(example.id) + name]
)
new_attachments[name] = {
"presigned_url": attachment["presigned_url"],
"reader": reader,
}
else:
new_attachments[name] = attachment

return schemas.Example(
id=example.id,
created_at=example.created_at,
dataset_id=example.dataset_id,
inputs=example.inputs,
outputs=example.outputs,
metadata=example.metadata,
modified_at=example.modified_at,
runs=example.runs,
source_run_id=example.source_run_id,
attachments=new_attachments,
_host_url=example._host_url,
_tenant_id=example._tenant_id,
)

async def awith_predictions_and_evaluators(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could probably do something similar to what we do in the sync version to avoid having to duplicate logic here (basically share a semaphor)

self,
target: ATARGET_T,
evaluators: Sequence[Union[EVALUATOR_T, AEVALUATOR_T]],
/,
max_concurrency: Optional[int] = None,
) -> _AsyncExperimentManager:
"""Run predictions and evaluations in a single pipeline.

This allows evaluators to process results as soon as they're available from
the target function, rather than waiting for all predictions to complete first.
"""
evaluators = _resolve_evaluators(evaluators)

if not hasattr(self, "_evaluator_executor"):
self._evaluator_executor = cf.ThreadPoolExecutor(max_workers=4)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooc where's the 4 come from?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied the value from _ascore - not really sure beyond that


async def process_examples():
async for pred in self._apredict(
target,
max_concurrency=max_concurrency,
include_attachments=_include_attachments(target),
):
example, run = pred["example"], pred["run"]
result = self._arun_evaluators(
evaluators,
{
"run": run,
"example": example,
"evaluation_results": {"results": []},
},
executor=self._evaluator_executor,
)
yield result

experiment_results = aitertools.aiter_with_concurrency(
max_concurrency,
process_examples(),
_eager_consumption_timeout=0.001,
)

r1, r2, r3 = aitertools.atee(experiment_results, 3, lock=asyncio.Lock())

return _AsyncExperimentManager(
(result["example"] async for result in r1),
experiment=self._experiment,
metadata=self._metadata,
client=self.client,
runs=(result["run"] async for result in r2),
evaluation_results=(result["evaluation_results"] async for result in r3),
summary_results=self._summary_results,
include_attachments=self._include_attachments,
upload_results=self._upload_results,
)

Expand Down Expand Up @@ -740,7 +898,7 @@
# Yield the coroutine to be awaited later
yield _aforward(
fn,
example,
self._get_example_with_readers(example),
self.experiment_name,
self._metadata,
self.client,
Expand Down Expand Up @@ -796,19 +954,22 @@
run = current_results["run"]
example = current_results["example"]
eval_results = current_results["evaluation_results"]
for evaluator in evaluators:

async def _run_single_evaluator(evaluator):
try:
evaluator_response = await evaluator.aevaluate_run(
run=run,
example=example,
example=self._get_example_with_readers(example),
)
eval_results["results"].extend(
self.client._select_eval_results(evaluator_response)
selected_results = self.client._select_eval_results(
evaluator_response
)

if self._upload_results:
self.client._log_evaluation_feedback(
evaluator_response, run=run, _executor=executor
)
return selected_results
except Exception as e:
try:
feedback_keys = _extract_feedback_keys(evaluator)
Expand All @@ -824,13 +985,14 @@
for key in feedback_keys
]
)
eval_results["results"].extend(
self.client._select_eval_results(error_response)
selected_results = self.client._select_eval_results(
error_response
)
if self._upload_results:
self.client._log_evaluation_feedback(
error_response, run=run, _executor=executor
)
return selected_results
except Exception as e2:
logger.debug(f"Error parsing feedback keys: {e2}")
pass
Expand All @@ -839,15 +1001,13 @@
f" run {run.id}: {repr(e)}",
exc_info=True,
)
logger.error(
f"Error running evaluator {repr(evaluator)} on"
f" run {run.id}: {repr(e)}",
exc_info=True,
)
if example.attachments is not None:
for attachment in example.attachments:
reader = example.attachments[attachment]["reader"]
reader.seek(0)

all_results = await asyncio.gather(
*[_run_single_evaluator(evaluator) for evaluator in evaluators]
)
for result in all_results:
if result is not None:
eval_results["results"].extend(result)
return ExperimentResultRow(
run=run,
example=example,
Expand Down Expand Up @@ -1064,10 +1224,6 @@
client=client,
),
)
if include_attachments and example.attachments is not None:
for attachment in example.attachments:
reader = example.attachments[attachment]["reader"]
reader.seek(0)
except Exception as e:
logger.error(
f"Error running target function: {e}", exc_info=True, stacklevel=1
Expand Down
Loading
Loading