From f00e2f6f0e2509ac5c4689421ad841a88a68d828 Mon Sep 17 00:00:00 2001 From: isaac hershenson Date: Fri, 10 Jan 2025 15:27:16 -0800 Subject: [PATCH] edits --- python/langsmith/evaluation/_arunner.py | 37 +++++++++-- python/langsmith/evaluation/_runner.py | 10 ++- python/tests/integration_tests/test_client.py | 64 +++++++++++++++++++ 3 files changed, 106 insertions(+), 5 deletions(-) diff --git a/python/langsmith/evaluation/_arunner.py b/python/langsmith/evaluation/_arunner.py index ad4a60544..d39752969 100644 --- a/python/langsmith/evaluation/_arunner.py +++ b/python/langsmith/evaluation/_arunner.py @@ -543,6 +543,18 @@ class _AsyncExperimentManager(_ExperimentManagerMixin): sresults for the experiment. summary_results (Optional[Iterable[EvaluationResults]]): The aggregate results for the experiment. + num_repetitions (Optional[int], default=1): The number of repetitions for + the experiment. + include_attachments (Optional[bool], default=False): Whether to include + attachments. This is used for when we pull the examples for the experiment. + reuse_attachments (Optional[bool], default=False): Whether to reuse attachments + from examples. This is True if we need to reuse attachments across multiple + target/evaluator functions. + upload_results (Optional[bool], default=True): Whether to upload results + to Langsmith. + attachment_raw_data_dict (Optional[dict]): A dictionary to store raw data + for attachments. Only used if we reuse attachments across multiple + target/evaluator functions. """ def __init__( @@ -582,7 +594,15 @@ def __init__( self._attachment_raw_data_dict = attachment_raw_data_dict def _reset_example_attachments(self, example: schemas.Example) -> schemas.Example: - """Reset attachment readers for an example.""" + """Reset attachment readers for an example. + + This is only in the case that an attachment is going to be used by more + than 1 callable (target + evaluators). In that case we keep a single copy + of the attachment data in self._attachment_raw_data_dict, and create + readers from that data. This makes it so that we don't have to keep + copies of the same data in memory, instead we can just create readers + from the same data. + """ if not hasattr(example, "attachments") or not example.attachments: return example @@ -761,6 +781,11 @@ async def awith_predictions_and_evaluators( self._evaluator_executor = cf.ThreadPoolExecutor(max_workers=4) async def process_examples(): + """Create a single task per example. + + That task is to run the target function and all the evaluators + sequentially. + """ async for pred in self._apredict( target, max_concurrency=max_concurrency, @@ -778,6 +803,9 @@ async def process_examples(): ) yield result + # Run the per-example tasks with max-concurrency + # This guarantees that max_concurrency is the upper limit + # for the number of target/evaluators that can be run in parallel experiment_results = aitertools.aiter_with_concurrency( max_concurrency, process_examples(), @@ -1000,9 +1028,10 @@ async def _run_single_evaluator(evaluator): exc_info=True, ) - all_results = await asyncio.gather( - *[_run_single_evaluator(evaluator) for evaluator in evaluators] - ) + all_results = [] + for evaluator in evaluators: + all_results.append(await _run_single_evaluator(evaluator)) + for result in all_results: if result is not None: eval_results["results"].extend(result) diff --git a/python/langsmith/evaluation/_runner.py b/python/langsmith/evaluation/_runner.py index 156067abe..96b73b008 100644 --- a/python/langsmith/evaluation/_runner.py +++ b/python/langsmith/evaluation/_runner.py @@ -1340,7 +1340,15 @@ def __init__( def _reset_example_attachment_readers( self, example: schemas.Example ) -> schemas.Example: - """Reset attachment readers for an example.""" + """Reset attachment readers for an example. + + This is only in the case that an attachment is going to be used by more + than 1 callable (target + evaluators). In that case we keep a single copy + of the attachment data in self._attachment_raw_data_dict, and create + readers from that data. This makes it so that we don't have to keep + copies of the same data in memory, instead we can just create readers + from the same data. + """ if not hasattr(example, "attachments") or not example.attachments: return example diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index bd1c37ca8..a35e96489 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -1,5 +1,6 @@ """LangSmith langchain_client Integration Tests.""" +import asyncio import datetime import io import logging @@ -1997,3 +1998,66 @@ def test_update_examples_multipart(langchain_client: Client) -> None: # Clean up langchain_client.delete_dataset(dataset_id=dataset.id) + + +async def test_aevaluate_max_concurrency(langchain_client: Client) -> None: + """Test max concurrency works as expected.""" + dataset_name = "__test_a_ton_of_feedback" + uuid4().hex[:4] + dataset = langchain_client.create_dataset( + dataset_name=dataset_name, + description="Test dataset for max concurrency", + ) + + examples = [ + ExampleUploadWithAttachments( + inputs={"query": "What's in this image?"}, + outputs={"answer": "A test image 1"}, + attachments={ + "image1": ("image/png", b"fake image data 1"), + "extra": ("text/plain", b"extra data"), + }, + ) + for _ in range(10) + ] + + langchain_client.upload_examples_multipart(dataset_id=dataset.id, uploads=examples) + + evaluators = [] + for _ in range(100): + + async def eval_func(inputs, outputs): + await asyncio.sleep(0.1) + return {"score": random.random()} + + evaluators.append(eval_func) + + async def target(inputs, attachments): + return {"foo": "bar"} + + start_time = time.time() + await langchain_client.aevaluate( + target, + data=dataset_name, + evaluators=evaluators, + max_concurrency=8, + ) + + end_time = time.time() + # this should proceed in a 8-2 manner, taking around 20 seconds total + assert end_time - start_time < 30 + assert end_time - start_time > 20 + + start_time = time.time() + await langchain_client.aevaluate( + target, + data=dataset_name, + evaluators=evaluators, + max_concurrency=4, + ) + + end_time = time.time() + # this should proceed in a 4-4-2 manner, taking around 30 seconds total + assert end_time - start_time < 40 + assert end_time - start_time > 30 + + langchain_client.delete_dataset(dataset_id=dataset.id)