Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] update evaluate to be concurrent #1345

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 89 additions & 27 deletions python/langsmith/evaluation/_arunner.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""V2 Evaluation Interface."""

Check notice on line 1 in python/langsmith/evaluation/_arunner.py

View workflow job for this annotation

GitHub Actions / benchmark

Benchmark results

........... WARNING: the benchmark result may be unstable * the standard deviation (118 ms) is 17% of the mean (692 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. create_5_000_run_trees: Mean +- std dev: 692 ms +- 118 ms ........... WARNING: the benchmark result may be unstable * the standard deviation (213 ms) is 15% of the mean (1.42 sec) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. create_10_000_run_trees: Mean +- std dev: 1.42 sec +- 0.21 sec ........... WARNING: the benchmark result may be unstable * the standard deviation (175 ms) is 12% of the mean (1.43 sec) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. create_20_000_run_trees: Mean +- std dev: 1.43 sec +- 0.18 sec ........... dumps_class_nested_py_branch_and_leaf_200x400: Mean +- std dev: 694 us +- 16 us ........... dumps_class_nested_py_leaf_50x100: Mean +- std dev: 25.4 ms +- 0.2 ms ........... dumps_class_nested_py_leaf_100x200: Mean +- std dev: 105 ms +- 2 ms ........... dumps_dataclass_nested_50x100: Mean +- std dev: 25.9 ms +- 0.4 ms ........... WARNING: the benchmark result may be unstable * the standard deviation (17.4 ms) is 23% of the mean (75.3 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. dumps_pydantic_nested_50x100: Mean +- std dev: 75.3 ms +- 17.4 ms ........... dumps_pydanticv1_nested_50x100: Mean +- std dev: 203 ms +- 2 ms

Check notice on line 1 in python/langsmith/evaluation/_arunner.py

View workflow job for this annotation

GitHub Actions / benchmark

Comparison against main

+-----------------------------------------------+----------+------------------------+ | Benchmark | main | changes | +===============================================+==========+========================+ | dumps_pydanticv1_nested_50x100 | 220 ms | 203 ms: 1.08x faster | +-----------------------------------------------+----------+------------------------+ | create_5_000_run_trees | 731 ms | 692 ms: 1.06x faster | +-----------------------------------------------+----------+------------------------+ | create_10_000_run_trees | 1.41 sec | 1.42 sec: 1.00x slower | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_branch_and_leaf_200x400 | 690 us | 694 us: 1.01x slower | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_100x200 | 104 ms | 105 ms: 1.01x slower | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_50x100 | 25.1 ms | 25.4 ms: 1.01x slower | +-----------------------------------------------+----------+------------------------+ | dumps_dataclass_nested_50x100 | 25.5 ms | 25.9 ms: 1.01x slower | +-----------------------------------------------+----------+------------------------+ | create_20_000_run_trees | 1.39 sec | 1.43 sec: 1.03x slower | +-----------------------------------------------+----------+------------------------+ | dumps_pydantic_nested_50x100 | 65.9 ms | 75.3 ms: 1.14x slower | +-----------------------------------------------+----------+------------------------+ | Geometric mean | (ref) | 1.01x slower | +-----------------------------------------------+----------+------------------------+

from __future__ import annotations

Expand Down Expand Up @@ -491,15 +491,24 @@
cache_path = None
with ls_utils.with_optional_cache(cache_path, ignore_hosts=[client.api_url]):
if is_async_target:
manager = await manager.awith_predictions(
cast(ATARGET_T, target), max_concurrency=max_concurrency
)
if evaluators:
manager = await manager.awith_evaluators(
evaluators, max_concurrency=max_concurrency
)
if summary_evaluators:
manager = await manager.awith_summary_evaluators(summary_evaluators)
if evaluators:
# Run predictions and evaluations in a single pipeline
manager = await manager.awith_predictions_and_evaluators(
cast(ATARGET_T, target), evaluators, max_concurrency=max_concurrency
)
else:
manager = await manager.awith_predictions(
cast(ATARGET_T, target), max_concurrency=max_concurrency
)
if summary_evaluators:
manager = await manager.awith_summary_evaluators(summary_evaluators)
else:
if evaluators:
manager = await manager.awith_evaluators(
evaluators, max_concurrency=max_concurrency
)
if summary_evaluators:
manager = await manager.awith_summary_evaluators(summary_evaluators)
results = AsyncExperimentResults(manager)
if blocking:
await results.wait()
Expand Down Expand Up @@ -642,6 +651,61 @@
upload_results=self._upload_results,
)

async def awith_predictions_and_evaluators(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could probably do something similar to what we do in the sync version to avoid having to duplicate logic here (basically share a semaphor)

self,
target: ATARGET_T,
evaluators: Sequence[Union[EVALUATOR_T, AEVALUATOR_T]],
/,
max_concurrency: Optional[int] = None,
) -> _AsyncExperimentManager:
"""Run predictions and evaluations in a single pipeline.

This allows evaluators to process results as soon as they're available from
the target function, rather than waiting for all predictions to complete first.
"""
evaluators = _resolve_evaluators(evaluators)

if not hasattr(self, "_evaluator_executor"):
self._evaluator_executor = cf.ThreadPoolExecutor(max_workers=4)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooc where's the 4 come from?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied the value from _ascore - not really sure beyond that


async def process_examples():
async for pred in self._apredict(
target,
max_concurrency=max_concurrency,
include_attachments=_include_attachments(target),
):
example, run = pred["example"], pred["run"]
result = self._arun_evaluators(
evaluators,
{
"run": run,
"example": example,
"evaluation_results": {"results": []},
},
executor=self._evaluator_executor,
)
yield result

experiment_results = aitertools.aiter_with_concurrency(
max_concurrency,
process_examples(),
_eager_consumption_timeout=0.001,
)

r1, r2, r3 = aitertools.atee(experiment_results, 3, lock=asyncio.Lock())

return _AsyncExperimentManager(
(result["example"] async for result in r1),
experiment=self._experiment,
metadata=self._metadata,
client=self.client,
runs=(result["run"] async for result in r2),
evaluation_results=(result["evaluation_results"] async for result in r3),
summary_results=self._summary_results,
include_attachments=self._include_attachments,
upload_results=self._upload_results,
)

async def awith_predictions(
self,
target: ATARGET_T,
Expand Down Expand Up @@ -796,15 +860,20 @@
run = current_results["run"]
example = current_results["example"]
eval_results = current_results["evaluation_results"]
for evaluator in evaluators:
lock = asyncio.Lock()

async def _run_single_evaluator(evaluator):
try:
evaluator_response = await evaluator.aevaluate_run(
run=run,
example=example,
)
eval_results["results"].extend(
self.client._select_eval_results(evaluator_response)
selected_results = self.client._select_eval_results(
evaluator_response
)
async with lock:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we just return the selected_results in _run_single_evaluator and construct the eval_results after the asycio.gather? to avoid needing to lock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be fixed, but someone should check I did it correctly

eval_results["results"].extend(selected_results)

if self._upload_results:
self.client._log_evaluation_feedback(
evaluator_response, run=run, _executor=executor
Expand All @@ -824,9 +893,11 @@
for key in feedback_keys
]
)
eval_results["results"].extend(
self.client._select_eval_results(error_response)
selected_results = self.client._select_eval_results(
error_response
)
async with lock:
eval_results["results"].extend(selected_results)
if self._upload_results:
self.client._log_evaluation_feedback(
error_response, run=run, _executor=executor
Expand All @@ -839,15 +910,10 @@
f" run {run.id}: {repr(e)}",
exc_info=True,
)
logger.error(
f"Error running evaluator {repr(evaluator)} on"
f" run {run.id}: {repr(e)}",
exc_info=True,
)
if example.attachments is not None:
for attachment in example.attachments:
reader = example.attachments[attachment]["reader"]
reader.seek(0)

await asyncio.gather(
*[_run_single_evaluator(evaluator) for evaluator in evaluators]
)
return ExperimentResultRow(
run=run,
example=example,
Expand Down Expand Up @@ -1064,10 +1130,6 @@
client=client,
),
)
if include_attachments and example.attachments is not None:
for attachment in example.attachments:
reader = example.attachments[attachment]["reader"]
reader.seek(0)
except Exception as e:
logger.error(
f"Error running target function: {e}", exc_info=True, stacklevel=1
Expand Down
11 changes: 10 additions & 1 deletion python/langsmith/evaluation/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import asyncio
import inspect
import io
import uuid
from abc import abstractmethod
from typing import (
Expand Down Expand Up @@ -666,7 +667,15 @@ async def awrapper(
"example": example,
"inputs": example.inputs if example else {},
"outputs": run.outputs or {},
"attachments": example.attachments or {} if example else {},
"attachments": {
name: {
"presigned_url": value["presigned_url"],
"reader": io.BytesIO(value["reader"].getvalue()),
}
for name, value in (example.attachments or {}).items()
}
if example
else {},
"reference_outputs": example.outputs or {} if example else {},
}
args = (arg_map[arg] for arg in positional_args)
Expand Down
8 changes: 4 additions & 4 deletions python/langsmith/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,14 @@ def read(self, size: int = -1) -> bytes:
"""Read function."""
...

def write(self, b: bytes) -> int:
"""Write function."""
...

def seek(self, offset: int, whence: int = 0) -> int:
"""Seek function."""
...

def getvalue(self) -> bytes:
"""Get value function."""
...


class ExampleBase(BaseModel):
"""Example base model."""
Expand Down
21 changes: 12 additions & 9 deletions python/tests/integration_tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1488,15 +1488,18 @@ async def test_aevaluate_with_attachments(langchain_client: Client) -> None:
data_type=DataType.kv,
)

example = ExampleUploadWithAttachments(
inputs={"question": "What is shown in the image?"},
outputs={"answer": "test image"},
attachments={
"image": ("image/png", b"fake image data for testing"),
},
)
examples = [
ExampleUploadWithAttachments(
inputs={"question": "What is shown in the image?"},
outputs={"answer": "test image"},
attachments={
"image": ("image/png", b"fake image data for testing"),
},
)
for i in range(10)
]

langchain_client.upload_examples_multipart(dataset_id=dataset.id, uploads=[example])
langchain_client.upload_examples_multipart(dataset_id=dataset.id, uploads=examples)

async def target(
inputs: Dict[str, Any], attachments: Dict[str, Any]
Expand Down Expand Up @@ -1542,7 +1545,7 @@ async def evaluator_2(
max_concurrency=3,
)

assert len(results) == 2
assert len(results) == 20
async for result in results:
assert result["evaluation_results"]["results"][0].score == 1.0
assert result["evaluation_results"]["results"][1].score == 1.0
Expand Down
Loading