Skip to content

Commit

Permalink
edits
Browse files Browse the repository at this point in the history
  • Loading branch information
isahers1 committed Jan 10, 2025
1 parent 628c3c3 commit f00e2f6
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 5 deletions.
37 changes: 33 additions & 4 deletions python/langsmith/evaluation/_arunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,18 @@ class _AsyncExperimentManager(_ExperimentManagerMixin):
sresults for the experiment.
summary_results (Optional[Iterable[EvaluationResults]]): The aggregate results
for the experiment.
num_repetitions (Optional[int], default=1): The number of repetitions for
the experiment.
include_attachments (Optional[bool], default=False): Whether to include
attachments. This is used for when we pull the examples for the experiment.
reuse_attachments (Optional[bool], default=False): Whether to reuse attachments
from examples. This is True if we need to reuse attachments across multiple
target/evaluator functions.
upload_results (Optional[bool], default=True): Whether to upload results
to Langsmith.
attachment_raw_data_dict (Optional[dict]): A dictionary to store raw data
for attachments. Only used if we reuse attachments across multiple
target/evaluator functions.
"""

def __init__(
Expand Down Expand Up @@ -582,7 +594,15 @@ def __init__(
self._attachment_raw_data_dict = attachment_raw_data_dict

def _reset_example_attachments(self, example: schemas.Example) -> schemas.Example:
"""Reset attachment readers for an example."""
"""Reset attachment readers for an example.
This is only in the case that an attachment is going to be used by more
than 1 callable (target + evaluators). In that case we keep a single copy
of the attachment data in self._attachment_raw_data_dict, and create
readers from that data. This makes it so that we don't have to keep
copies of the same data in memory, instead we can just create readers
from the same data.
"""
if not hasattr(example, "attachments") or not example.attachments:
return example

Expand Down Expand Up @@ -761,6 +781,11 @@ async def awith_predictions_and_evaluators(
self._evaluator_executor = cf.ThreadPoolExecutor(max_workers=4)

async def process_examples():
"""Create a single task per example.
That task is to run the target function and all the evaluators
sequentially.
"""
async for pred in self._apredict(
target,
max_concurrency=max_concurrency,
Expand All @@ -778,6 +803,9 @@ async def process_examples():
)
yield result

# Run the per-example tasks with max-concurrency
# This guarantees that max_concurrency is the upper limit
# for the number of target/evaluators that can be run in parallel
experiment_results = aitertools.aiter_with_concurrency(
max_concurrency,
process_examples(),
Expand Down Expand Up @@ -1000,9 +1028,10 @@ async def _run_single_evaluator(evaluator):
exc_info=True,
)

all_results = await asyncio.gather(
*[_run_single_evaluator(evaluator) for evaluator in evaluators]
)
all_results = []
for evaluator in evaluators:
all_results.append(await _run_single_evaluator(evaluator))

for result in all_results:
if result is not None:
eval_results["results"].extend(result)
Expand Down
10 changes: 9 additions & 1 deletion python/langsmith/evaluation/_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1340,7 +1340,15 @@ def __init__(
def _reset_example_attachment_readers(
self, example: schemas.Example
) -> schemas.Example:
"""Reset attachment readers for an example."""
"""Reset attachment readers for an example.
This is only in the case that an attachment is going to be used by more
than 1 callable (target + evaluators). In that case we keep a single copy
of the attachment data in self._attachment_raw_data_dict, and create
readers from that data. This makes it so that we don't have to keep
copies of the same data in memory, instead we can just create readers
from the same data.
"""
if not hasattr(example, "attachments") or not example.attachments:
return example

Expand Down
64 changes: 64 additions & 0 deletions python/tests/integration_tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""LangSmith langchain_client Integration Tests."""

import asyncio
import datetime
import io
import logging
Expand Down Expand Up @@ -1997,3 +1998,66 @@ def test_update_examples_multipart(langchain_client: Client) -> None:

# Clean up
langchain_client.delete_dataset(dataset_id=dataset.id)


async def test_aevaluate_max_concurrency(langchain_client: Client) -> None:
"""Test max concurrency works as expected."""
dataset_name = "__test_a_ton_of_feedback" + uuid4().hex[:4]
dataset = langchain_client.create_dataset(
dataset_name=dataset_name,
description="Test dataset for max concurrency",
)

examples = [
ExampleUploadWithAttachments(
inputs={"query": "What's in this image?"},
outputs={"answer": "A test image 1"},
attachments={
"image1": ("image/png", b"fake image data 1"),
"extra": ("text/plain", b"extra data"),
},
)
for _ in range(10)
]

langchain_client.upload_examples_multipart(dataset_id=dataset.id, uploads=examples)

evaluators = []
for _ in range(100):

async def eval_func(inputs, outputs):
await asyncio.sleep(0.1)
return {"score": random.random()}

evaluators.append(eval_func)

async def target(inputs, attachments):
return {"foo": "bar"}

start_time = time.time()
await langchain_client.aevaluate(
target,
data=dataset_name,
evaluators=evaluators,
max_concurrency=8,
)

end_time = time.time()
# this should proceed in a 8-2 manner, taking around 20 seconds total
assert end_time - start_time < 30
assert end_time - start_time > 20

start_time = time.time()
await langchain_client.aevaluate(
target,
data=dataset_name,
evaluators=evaluators,
max_concurrency=4,
)

end_time = time.time()
# this should proceed in a 4-4-2 manner, taking around 30 seconds total
assert end_time - start_time < 40
assert end_time - start_time > 30

langchain_client.delete_dataset(dataset_id=dataset.id)

0 comments on commit f00e2f6

Please sign in to comment.