From 696e4bf63d7e4536b7eccd83e05a2271682178fc Mon Sep 17 00:00:00 2001 From: isaac hershenson Date: Wed, 18 Dec 2024 18:46:26 -0800 Subject: [PATCH 01/10] draft --- python/langsmith/evaluation/_arunner.py | 116 ++++++++++++++---- python/langsmith/evaluation/evaluator.py | 11 +- python/langsmith/schemas.py | 8 +- python/tests/integration_tests/test_client.py | 21 ++-- 4 files changed, 115 insertions(+), 41 deletions(-) diff --git a/python/langsmith/evaluation/_arunner.py b/python/langsmith/evaluation/_arunner.py index 7b29241e6..2150b1fac 100644 --- a/python/langsmith/evaluation/_arunner.py +++ b/python/langsmith/evaluation/_arunner.py @@ -491,15 +491,24 @@ async def _aevaluate( cache_path = None with ls_utils.with_optional_cache(cache_path, ignore_hosts=[client.api_url]): if is_async_target: - manager = await manager.awith_predictions( - cast(ATARGET_T, target), max_concurrency=max_concurrency - ) - if evaluators: - manager = await manager.awith_evaluators( - evaluators, max_concurrency=max_concurrency - ) - if summary_evaluators: - manager = await manager.awith_summary_evaluators(summary_evaluators) + if evaluators: + # Run predictions and evaluations in a single pipeline + manager = await manager.awith_predictions_and_evaluators( + cast(ATARGET_T, target), evaluators, max_concurrency=max_concurrency + ) + else: + manager = await manager.awith_predictions( + cast(ATARGET_T, target), max_concurrency=max_concurrency + ) + if summary_evaluators: + manager = await manager.awith_summary_evaluators(summary_evaluators) + else: + if evaluators: + manager = await manager.awith_evaluators( + evaluators, max_concurrency=max_concurrency + ) + if summary_evaluators: + manager = await manager.awith_summary_evaluators(summary_evaluators) results = AsyncExperimentResults(manager) if blocking: await results.wait() @@ -642,6 +651,61 @@ async def astart(self) -> _AsyncExperimentManager: upload_results=self._upload_results, ) + async def awith_predictions_and_evaluators( + self, + target: ATARGET_T, + evaluators: Sequence[Union[EVALUATOR_T, AEVALUATOR_T]], + /, + max_concurrency: Optional[int] = None, + ) -> _AsyncExperimentManager: + """Run predictions and evaluations in a single pipeline. + + This allows evaluators to process results as soon as they're available from + the target function, rather than waiting for all predictions to complete first. + """ + evaluators = _resolve_evaluators(evaluators) + + if not hasattr(self, "_evaluator_executor"): + self._evaluator_executor = cf.ThreadPoolExecutor(max_workers=4) + + async def process_examples(): + async for pred in self._apredict( + target, + max_concurrency=max_concurrency, + include_attachments=_include_attachments(target), + ): + example, run = pred["example"], pred["run"] + result = self._arun_evaluators( + evaluators, + { + "run": run, + "example": example, + "evaluation_results": {"results": []}, + }, + executor=self._evaluator_executor, + ) + yield result + + experiment_results = aitertools.aiter_with_concurrency( + max_concurrency, + process_examples(), + _eager_consumption_timeout=0.001, + ) + + r1, r2, r3 = aitertools.atee(experiment_results, 3, lock=asyncio.Lock()) + + return _AsyncExperimentManager( + (result["example"] async for result in r1), + experiment=self._experiment, + metadata=self._metadata, + client=self.client, + runs=(result["run"] async for result in r2), + evaluation_results=(result["evaluation_results"] async for result in r3), + summary_results=self._summary_results, + include_attachments=self._include_attachments, + upload_results=self._upload_results, + ) + async def awith_predictions( self, target: ATARGET_T, @@ -796,15 +860,20 @@ async def _arun_evaluators( run = current_results["run"] example = current_results["example"] eval_results = current_results["evaluation_results"] - for evaluator in evaluators: + lock = asyncio.Lock() + + async def _run_single_evaluator(evaluator): try: evaluator_response = await evaluator.aevaluate_run( run=run, example=example, ) - eval_results["results"].extend( - self.client._select_eval_results(evaluator_response) + selected_results = self.client._select_eval_results( + evaluator_response ) + async with lock: + eval_results["results"].extend(selected_results) + if self._upload_results: self.client._log_evaluation_feedback( evaluator_response, run=run, _executor=executor @@ -824,9 +893,11 @@ async def _arun_evaluators( for key in feedback_keys ] ) - eval_results["results"].extend( - self.client._select_eval_results(error_response) + selected_results = self.client._select_eval_results( + error_response ) + async with lock: + eval_results["results"].extend(selected_results) if self._upload_results: self.client._log_evaluation_feedback( error_response, run=run, _executor=executor @@ -839,15 +910,10 @@ async def _arun_evaluators( f" run {run.id}: {repr(e)}", exc_info=True, ) - logger.error( - f"Error running evaluator {repr(evaluator)} on" - f" run {run.id}: {repr(e)}", - exc_info=True, - ) - if example.attachments is not None: - for attachment in example.attachments: - reader = example.attachments[attachment]["reader"] - reader.seek(0) + + await asyncio.gather( + *[_run_single_evaluator(evaluator) for evaluator in evaluators] + ) return ExperimentResultRow( run=run, example=example, @@ -1064,10 +1130,6 @@ def _get_run(r: run_trees.RunTree) -> None: client=client, ), ) - if include_attachments and example.attachments is not None: - for attachment in example.attachments: - reader = example.attachments[attachment]["reader"] - reader.seek(0) except Exception as e: logger.error( f"Error running target function: {e}", exc_info=True, stacklevel=1 diff --git a/python/langsmith/evaluation/evaluator.py b/python/langsmith/evaluation/evaluator.py index a1505699a..fc1f07685 100644 --- a/python/langsmith/evaluation/evaluator.py +++ b/python/langsmith/evaluation/evaluator.py @@ -4,6 +4,7 @@ import asyncio import inspect +import io import uuid from abc import abstractmethod from typing import ( @@ -666,7 +667,15 @@ async def awrapper( "example": example, "inputs": example.inputs if example else {}, "outputs": run.outputs or {}, - "attachments": example.attachments or {} if example else {}, + "attachments": { + name: { + "presigned_url": value["presigned_url"], + "reader": io.BytesIO(value["reader"].getvalue()), + } + for name, value in (example.attachments or {}).items() + } + if example + else {}, "reference_outputs": example.outputs or {} if example else {}, } args = (arg_map[arg] for arg in positional_args) diff --git a/python/langsmith/schemas.py b/python/langsmith/schemas.py index acedaf177..a6ca393b6 100644 --- a/python/langsmith/schemas.py +++ b/python/langsmith/schemas.py @@ -76,14 +76,14 @@ def read(self, size: int = -1) -> bytes: """Read function.""" ... - def write(self, b: bytes) -> int: - """Write function.""" - ... - def seek(self, offset: int, whence: int = 0) -> int: """Seek function.""" ... + def getvalue(self) -> bytes: + """Get value function.""" + ... + class ExampleBase(BaseModel): """Example base model.""" diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index 3bcd9d04c..8e3e5cd8d 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -1488,15 +1488,18 @@ async def test_aevaluate_with_attachments(langchain_client: Client) -> None: data_type=DataType.kv, ) - example = ExampleUploadWithAttachments( - inputs={"question": "What is shown in the image?"}, - outputs={"answer": "test image"}, - attachments={ - "image": ("image/png", b"fake image data for testing"), - }, - ) + examples = [ + ExampleUploadWithAttachments( + inputs={"question": "What is shown in the image?"}, + outputs={"answer": "test image"}, + attachments={ + "image": ("image/png", b"fake image data for testing"), + }, + ) + for i in range(10) + ] - langchain_client.upload_examples_multipart(dataset_id=dataset.id, uploads=[example]) + langchain_client.upload_examples_multipart(dataset_id=dataset.id, uploads=examples) async def target( inputs: Dict[str, Any], attachments: Dict[str, Any] @@ -1542,7 +1545,7 @@ async def evaluator_2( max_concurrency=3, ) - assert len(results) == 2 + assert len(results) == 20 async for result in results: assert result["evaluation_results"]["results"][0].score == 1.0 assert result["evaluation_results"]["results"][1].score == 1.0 From a55898146cbdc90ae085787ca4141fab89b42c2a Mon Sep 17 00:00:00 2001 From: Bagatur Date: Mon, 23 Dec 2024 09:40:12 -0500 Subject: [PATCH 02/10] fmt --- python/langsmith/evaluation/evaluator.py | 18 ++++++++++-------- python/tests/integration_tests/test_client.py | 14 +++++++------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/python/langsmith/evaluation/evaluator.py b/python/langsmith/evaluation/evaluator.py index fc1f07685..3a11fd292 100644 --- a/python/langsmith/evaluation/evaluator.py +++ b/python/langsmith/evaluation/evaluator.py @@ -667,15 +667,17 @@ async def awrapper( "example": example, "inputs": example.inputs if example else {}, "outputs": run.outputs or {}, - "attachments": { - name: { - "presigned_url": value["presigned_url"], - "reader": io.BytesIO(value["reader"].getvalue()), + "attachments": ( + { + name: { + "presigned_url": value["presigned_url"], + "reader": io.BytesIO(value["reader"].getvalue()), + } + for name, value in (example.attachments or {}).items() } - for name, value in (example.attachments or {}).items() - } - if example - else {}, + if example + else {} + ), "reference_outputs": example.outputs or {} if example else {}, } args = (arg_map[arg] for arg in positional_args) diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index 8e3e5cd8d..28a38602b 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -1490,10 +1490,10 @@ async def test_aevaluate_with_attachments(langchain_client: Client) -> None: examples = [ ExampleUploadWithAttachments( - inputs={"question": "What is shown in the image?"}, + inputs={"question": "What is shown in the image?", "index": i}, outputs={"answer": "test image"}, attachments={ - "image": ("image/png", b"fake image data for testing"), + "image": ("text/plain", bytes(f"data: {i}", "utf-8")), }, ) for i in range(10) @@ -1508,16 +1508,16 @@ async def target( assert "image" in attachments assert "presigned_url" in attachments["image"] image_data = attachments["image"]["reader"] - assert image_data.read() == b"fake image data for testing" + assert image_data.read() == bytes(f"data: {inputs['index']}", "utf-8") return {"answer": "test image"} async def evaluator_1( - outputs: dict, reference_outputs: dict, attachments: dict + inputs: dict, outputs: dict, reference_outputs: dict, attachments: dict ) -> Dict[str, Any]: assert "image" in attachments assert "presigned_url" in attachments["image"] image_data = attachments["image"]["reader"] - assert image_data.read() == b"fake image data for testing" + assert image_data.read() == bytes(f"data: {inputs['index']}", "utf-8") return { "score": float( reference_outputs.get("answer") == outputs.get("answer") # type: ignore @@ -1525,12 +1525,12 @@ async def evaluator_1( } async def evaluator_2( - outputs: dict, reference_outputs: dict, attachments: dict + inputs: dict, outputs: dict, reference_outputs: dict, attachments: dict ) -> Dict[str, Any]: assert "image" in attachments assert "presigned_url" in attachments["image"] image_data = attachments["image"]["reader"] - assert image_data.read() == b"fake image data for testing" + assert image_data.read() == bytes(f"data: {inputs['index']}", "utf-8") return { "score": float( reference_outputs.get("answer") == outputs.get("answer") # type: ignore From 117e8c6e5b4edbaf615fede31acaca56ad7c9409 Mon Sep 17 00:00:00 2001 From: Bagatur Date: Mon, 23 Dec 2024 09:43:42 -0500 Subject: [PATCH 03/10] fmt --- python/tests/integration_tests/test_client.py | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index 28a38602b..90017872a 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -1513,29 +1513,19 @@ async def target( async def evaluator_1( inputs: dict, outputs: dict, reference_outputs: dict, attachments: dict - ) -> Dict[str, Any]: + ) -> bool: assert "image" in attachments assert "presigned_url" in attachments["image"] image_data = attachments["image"]["reader"] - assert image_data.read() == bytes(f"data: {inputs['index']}", "utf-8") - return { - "score": float( - reference_outputs.get("answer") == outputs.get("answer") # type: ignore - ) - } + return image_data.read() == bytes(f"data: {inputs['index']}", "utf-8") async def evaluator_2( inputs: dict, outputs: dict, reference_outputs: dict, attachments: dict - ) -> Dict[str, Any]: + ) -> bool: assert "image" in attachments assert "presigned_url" in attachments["image"] image_data = attachments["image"]["reader"] - assert image_data.read() == bytes(f"data: {inputs['index']}", "utf-8") - return { - "score": float( - reference_outputs.get("answer") == outputs.get("answer") # type: ignore - ) - } + return image_data.read() == bytes(f"data: {inputs['index']}", "utf-8") results = await langchain_client.aevaluate( target, From 3c8424cdcf48102544edd47f451460c2c4438ba2 Mon Sep 17 00:00:00 2001 From: isaac hershenson Date: Mon, 23 Dec 2024 09:03:46 -0800 Subject: [PATCH 04/10] bagatur comments --- python/langsmith/evaluation/_arunner.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/langsmith/evaluation/_arunner.py b/python/langsmith/evaluation/_arunner.py index 2150b1fac..6e1122467 100644 --- a/python/langsmith/evaluation/_arunner.py +++ b/python/langsmith/evaluation/_arunner.py @@ -860,7 +860,6 @@ async def _arun_evaluators( run = current_results["run"] example = current_results["example"] eval_results = current_results["evaluation_results"] - lock = asyncio.Lock() async def _run_single_evaluator(evaluator): try: @@ -871,13 +870,12 @@ async def _run_single_evaluator(evaluator): selected_results = self.client._select_eval_results( evaluator_response ) - async with lock: - eval_results["results"].extend(selected_results) if self._upload_results: self.client._log_evaluation_feedback( evaluator_response, run=run, _executor=executor ) + return selected_results except Exception as e: try: feedback_keys = _extract_feedback_keys(evaluator) @@ -896,12 +894,11 @@ async def _run_single_evaluator(evaluator): selected_results = self.client._select_eval_results( error_response ) - async with lock: - eval_results["results"].extend(selected_results) if self._upload_results: self.client._log_evaluation_feedback( error_response, run=run, _executor=executor ) + return selected_results except Exception as e2: logger.debug(f"Error parsing feedback keys: {e2}") pass @@ -911,9 +908,12 @@ async def _run_single_evaluator(evaluator): exc_info=True, ) - await asyncio.gather( + all_results = await asyncio.gather( *[_run_single_evaluator(evaluator) for evaluator in evaluators] ) + for result in all_results: + if result is not None: + eval_results["results"].extend(result) return ExperimentResultRow( run=run, example=example, From 2bf063f028b8a94a0ffa6341cf9eee89fa182e9d Mon Sep 17 00:00:00 2001 From: isaac hershenson Date: Mon, 6 Jan 2025 13:44:58 -0800 Subject: [PATCH 05/10] fix test --- python/langsmith/evaluation/_arunner.py | 59 +++++++++- python/langsmith/evaluation/_runner.py | 103 ++++++++++-------- python/langsmith/evaluation/evaluator.py | 12 +- python/tests/integration_tests/test_client.py | 22 ++++ 4 files changed, 134 insertions(+), 62 deletions(-) diff --git a/python/langsmith/evaluation/_arunner.py b/python/langsmith/evaluation/_arunner.py index 6e1122467..af9dd0001 100644 --- a/python/langsmith/evaluation/_arunner.py +++ b/python/langsmith/evaluation/_arunner.py @@ -24,7 +24,7 @@ Union, cast, ) - +import io import langsmith from langsmith import run_helpers as rh from langsmith import run_trees, schemas @@ -47,7 +47,6 @@ _load_experiment, _load_tqdm, _load_traces, - _make_fresh_examples, _resolve_data, _resolve_evaluators, _resolve_experiment, @@ -480,7 +479,8 @@ async def _aevaluate( num_repetitions=num_repetitions, runs=runs, include_attachments=_include_attachments(target) - or _evaluators_include_attachments(evaluators), + or _evaluators_include_attachments(evaluators) > 0, + reuse_attachments = num_repetitions * (int(_include_attachments(target)) + _evaluators_include_attachments(evaluators)) > 1, upload_results=upload_results, ).astart() cache_dir = ls_utils.get_cache_dir(None) @@ -552,7 +552,9 @@ def __init__( description: Optional[str] = None, num_repetitions: int = 1, include_attachments: bool = False, + reuse_attachments: bool = False, upload_results: bool = True, + attachment_raw_data_dict: Optional[dict] = None, ): super().__init__( experiment=experiment, @@ -569,7 +571,16 @@ def __init__( self._summary_results = summary_results self._num_repetitions = num_repetitions self._include_attachments = include_attachments + self._reuse_attachments = reuse_attachments self._upload_results = upload_results + self._attachment_raw_data_dict = attachment_raw_data_dict + + def _make_fresh_examples(self, examples): + return ( + example + for example in examples + if str(example.id) not in self._attachment_raw_data_dict + ) async def aget_examples(self) -> AsyncIterator[schemas.Example]: if self._examples is None: @@ -578,11 +589,18 @@ async def aget_examples(self) -> AsyncIterator[schemas.Example]: client=self.client, include_attachments=self._include_attachments, ) + if self._reuse_attachments and self._attachment_raw_data_dict is None: + examples_copy, self._examples = aitertools.atee(self._examples) + self._attachment_raw_data_dict = { + str(e.id) + name: value['reader'].read() + async for e in examples_copy + for name, value in e.attachments.items() + } if self._num_repetitions > 1: examples_list = [example async for example in self._examples] self._examples = async_chain_from_iterable( [ - async_iter_from_list(_make_fresh_examples(examples_list)) + async_iter_from_list(self._make_fresh_examples(examples_list)) for _ in range(self._num_repetitions) ] ) @@ -648,7 +666,36 @@ async def astart(self) -> _AsyncExperimentManager: runs=self._runs, evaluation_results=self._evaluation_results, include_attachments=self._include_attachments, + reuse_attachments=self._reuse_attachments, upload_results=self._upload_results, + attachment_raw_data_dict=self._attachment_raw_data_dict + ) + + def _get_example_with_readers(self, example: schemas.Example) -> schemas.Example: + new_attachments = {} + for name, attachment in example.attachments.items(): + try: + reader = io.BytesIO(self._attachment_raw_data_dict[str(example.id) + name]) + new_attachments[name] = { + "presigned_url": attachment["presigned_url"], + "reader": reader, + } + except Exception: + new_attachments[name] = attachment + + return schemas.Example( + id=example.id, + created_at=example.created_at, + dataset_id=example.dataset_id, + inputs=example.inputs, + outputs=example.outputs, + metadata=example.metadata, + modified_at=example.modified_at, + runs=example.runs, + source_run_id=example.source_run_id, + attachments=new_attachments, + _host_url=example._host_url, + _tenant_id=example._tenant_id, ) async def awith_predictions_and_evaluators( @@ -804,7 +851,7 @@ async def predict_all(): # Yield the coroutine to be awaited later yield _aforward( fn, - example, + self._get_example_with_readers(example), self.experiment_name, self._metadata, self.client, @@ -865,7 +912,7 @@ async def _run_single_evaluator(evaluator): try: evaluator_response = await evaluator.aevaluate_run( run=run, - example=example, + example=self._get_example_with_readers(example), ) selected_results = self.client._select_eval_results( evaluator_response diff --git a/python/langsmith/evaluation/_runner.py b/python/langsmith/evaluation/_runner.py index e755fe423..2e52a7652 100644 --- a/python/langsmith/evaluation/_runner.py +++ b/python/langsmith/evaluation/_runner.py @@ -1067,7 +1067,7 @@ def _evaluate( runs=runs, # Create or resolve the experiment. include_attachments=_include_attachments(target) - or _evaluators_include_attachments(evaluators), + or _evaluators_include_attachments(evaluators) > 0, upload_results=upload_results, ).start() cache_dir = ls_utils.get_cache_dir(None) @@ -1317,7 +1317,9 @@ def __init__( description: Optional[str] = None, num_repetitions: int = 1, include_attachments: bool = False, + reuse_attachments: bool = False, upload_results: bool = True, + attachment_raw_data_dict: Optional[dict] = None, ): super().__init__( experiment=experiment, @@ -1332,7 +1334,48 @@ def __init__( self._summary_results = summary_results self._num_repetitions = num_repetitions self._include_attachments = include_attachments + self._reuse_attachments = reuse_attachments self._upload_results = upload_results + self._attachment_raw_data_dict = attachment_raw_data_dict + + def _reset_example_attachments(self, example: schemas.Example) -> schemas.Example: + """Reset attachment readers for an example.""" + if not hasattr(example, "attachments") or not example.attachments: + return example + + new_attachments = {} + for name, attachment in example.attachments.items(): + try: + new_attachments[name] = { + "presigned_url": attachment["presigned_url"], + "reader": io.BytesIO(self._attachment_raw_data_dict[str(example.id) + name]), + } + except Exception: + new_attachments[name] = attachment + + # Create a new Example instance with the updated attachments + return schemas.Example( + id=example.id, + created_at=example.created_at, + dataset_id=example.dataset_id, + inputs=example.inputs, + outputs=example.outputs, + metadata=example.metadata, + modified_at=example.modified_at, + runs=example.runs, + source_run_id=example.source_run_id, + attachments=new_attachments, + _host_url=example._host_url, + _tenant_id=example._tenant_id, + ) + + + def _make_fresh_examples( + self, + _original_examples: List[schemas.Example], + ) -> List[schemas.Example]: + """Create fresh copies of examples with reset readers.""" + return [self._reset_example_attachments(example) for example in _original_examples] @property def examples(self) -> Iterable[schemas.Example]: @@ -1342,10 +1385,17 @@ def examples(self) -> Iterable[schemas.Example]: client=self.client, include_attachments=self._include_attachments, ) + if self._reuse_attachments and self._attachment_raw_data_dict is None: + examples_copy, self._examples = itertools.tee(self._examples) + self._attachment_raw_data_dict = { + str(e.id) + name: value['reader'].read() + for e in examples_copy + for name, value in e.attachments.items() + } if self._num_repetitions > 1: examples_list = list(self._examples) self._examples = itertools.chain.from_iterable( - _make_fresh_examples(examples_list) + self._make_fresh_examples(examples_list) for _ in range(self._num_repetitions) ) self._examples, examples_iter = itertools.tee(self._examples) @@ -1390,7 +1440,9 @@ def start(self) -> _ExperimentManager: runs=self._runs, evaluation_results=self._evaluation_results, include_attachments=self._include_attachments, + reuse_attachments=self._reuse_attachments, upload_results=self._upload_results, + attachment_raw_data_dict=self._attachment_raw_data_dict ) def with_predictions( @@ -1929,11 +1981,11 @@ def _ensure_traceable( def _evaluators_include_attachments( evaluators: Optional[Sequence[Union[EVALUATOR_T, AEVALUATOR_T]]], -) -> bool: +) -> int: if evaluators is None: - return False + return 0 - def evaluator_has_attachments(evaluator: Any) -> bool: + def evaluator_uses_attachments(evaluator: Any) -> bool: if not callable(evaluator): return False sig = inspect.signature(evaluator) @@ -1943,7 +1995,7 @@ def evaluator_has_attachments(evaluator: Any) -> bool: ] return any(p.name == "attachments" for p in positional_params) - return any(evaluator_has_attachments(e) for e in evaluators) + return sum(evaluator_uses_attachments(e) for e in evaluators) def _include_attachments( @@ -2232,42 +2284,3 @@ def _import_langchain_runnable() -> Optional[type]: def _is_langchain_runnable(o: Any) -> bool: return bool((Runnable := _import_langchain_runnable()) and isinstance(o, Runnable)) - - -def _reset_example_attachments(example: schemas.Example) -> schemas.Example: - """Reset attachment readers for an example.""" - if not hasattr(example, "attachments") or not example.attachments: - return example - - new_attachments = {} - for key, attachment in example.attachments.items(): - response = requests.get(attachment["presigned_url"], stream=True) - response.raise_for_status() - reader = io.BytesIO(response.content) - new_attachments[key] = { - "presigned_url": attachment["presigned_url"], - "reader": reader, - } - - # Create a new Example instance with the updated attachments - return schemas.Example( - id=example.id, - created_at=example.created_at, - dataset_id=example.dataset_id, - inputs=example.inputs, - outputs=example.outputs, - metadata=example.metadata, - modified_at=example.modified_at, - runs=example.runs, - source_run_id=example.source_run_id, - attachments=new_attachments, - _host_url=example._host_url, - _tenant_id=example._tenant_id, - ) - - -def _make_fresh_examples( - _original_examples: List[schemas.Example], -) -> List[schemas.Example]: - """Create fresh copies of examples with reset readers.""" - return [_reset_example_attachments(example) for example in _original_examples] diff --git a/python/langsmith/evaluation/evaluator.py b/python/langsmith/evaluation/evaluator.py index 3a11fd292..931af0237 100644 --- a/python/langsmith/evaluation/evaluator.py +++ b/python/langsmith/evaluation/evaluator.py @@ -667,17 +667,7 @@ async def awrapper( "example": example, "inputs": example.inputs if example else {}, "outputs": run.outputs or {}, - "attachments": ( - { - name: { - "presigned_url": value["presigned_url"], - "reader": io.BytesIO(value["reader"].getvalue()), - } - for name, value in (example.attachments or {}).items() - } - if example - else {} - ), + "attachments": example.attachments or {}, "reference_outputs": example.outputs or {} if example else {}, } args = (arg_map[arg] for arg in positional_args) diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index 90017872a..24f822654 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -1319,6 +1319,7 @@ def evaluator_2( num_repetitions=2, ) + assert len(results) == 2 for result in results: assert result["evaluation_results"]["results"][0].score == 1.0 assert result["evaluation_results"]["results"][1].score == 1.0 @@ -1429,6 +1430,16 @@ def evaluator( for result in results: assert result["evaluation_results"]["results"][0].score == 1.0 + results = langchain_client.evaluate( + target, + data=dataset_name, + evaluators=[evaluator], + ) + + assert len(results) == 1 + for result in results: + assert result["evaluation_results"]["results"][0].score == 1.0 + langchain_client.delete_dataset(dataset_name=dataset_name) @@ -1540,6 +1551,17 @@ async def evaluator_2( assert result["evaluation_results"]["results"][0].score == 1.0 assert result["evaluation_results"]["results"][1].score == 1.0 + + results = await langchain_client.aevaluate( + target, + data=dataset_name, + evaluators=[], + num_repetitions=1, + max_concurrency=3, + ) + + assert len(results) == 10 + langchain_client.delete_dataset(dataset_name=dataset_name) From 982750c870f20ea9ac42aa6f0bd95c6b3728a6a4 Mon Sep 17 00:00:00 2001 From: isaac hershenson Date: Mon, 6 Jan 2025 13:58:27 -0800 Subject: [PATCH 06/10] fmt --- python/langsmith/evaluation/_arunner.py | 31 +++++++++++++------ python/langsmith/evaluation/_runner.py | 25 +++++++++------ python/langsmith/evaluation/evaluator.py | 3 +- python/tests/integration_tests/test_client.py | 1 - 4 files changed, 37 insertions(+), 23 deletions(-) diff --git a/python/langsmith/evaluation/_arunner.py b/python/langsmith/evaluation/_arunner.py index af9dd0001..6822151a2 100644 --- a/python/langsmith/evaluation/_arunner.py +++ b/python/langsmith/evaluation/_arunner.py @@ -5,6 +5,7 @@ import asyncio import concurrent.futures as cf import datetime +import io import logging import pathlib import uuid @@ -24,7 +25,7 @@ Union, cast, ) -import io + import langsmith from langsmith import run_helpers as rh from langsmith import run_trees, schemas @@ -480,7 +481,12 @@ async def _aevaluate( runs=runs, include_attachments=_include_attachments(target) or _evaluators_include_attachments(evaluators) > 0, - reuse_attachments = num_repetitions * (int(_include_attachments(target)) + _evaluators_include_attachments(evaluators)) > 1, + reuse_attachments=num_repetitions + * ( + int(_include_attachments(target)) + + _evaluators_include_attachments(evaluators) + ) + > 1, upload_results=upload_results, ).astart() cache_dir = ls_utils.get_cache_dir(None) @@ -592,9 +598,9 @@ async def aget_examples(self) -> AsyncIterator[schemas.Example]: if self._reuse_attachments and self._attachment_raw_data_dict is None: examples_copy, self._examples = aitertools.atee(self._examples) self._attachment_raw_data_dict = { - str(e.id) + name: value['reader'].read() + str(e.id) + name: value["reader"].read() async for e in examples_copy - for name, value in e.attachments.items() + for name, value in (e.attachments or {}).items() } if self._num_repetitions > 1: examples_list = [example async for example in self._examples] @@ -668,19 +674,24 @@ async def astart(self) -> _AsyncExperimentManager: include_attachments=self._include_attachments, reuse_attachments=self._reuse_attachments, upload_results=self._upload_results, - attachment_raw_data_dict=self._attachment_raw_data_dict + attachment_raw_data_dict=self._attachment_raw_data_dict, ) def _get_example_with_readers(self, example: schemas.Example) -> schemas.Example: - new_attachments = {} - for name, attachment in example.attachments.items(): - try: - reader = io.BytesIO(self._attachment_raw_data_dict[str(example.id) + name]) + new_attachments: dict[str, schemas.AttachmentInfo] = {} + for name, attachment in (example.attachments or {}).items(): + if ( + self._attachment_raw_data_dict is not None + and str(example.id) + name in self._attachment_raw_data_dict + ): + reader = io.BytesIO( + self._attachment_raw_data_dict[str(example.id) + name] + ) new_attachments[name] = { "presigned_url": attachment["presigned_url"], "reader": reader, } - except Exception: + else: new_attachments[name] = attachment return schemas.Example( diff --git a/python/langsmith/evaluation/_runner.py b/python/langsmith/evaluation/_runner.py index 2e52a7652..02be981af 100644 --- a/python/langsmith/evaluation/_runner.py +++ b/python/langsmith/evaluation/_runner.py @@ -37,7 +37,6 @@ cast, ) -import requests from typing_extensions import TypedDict, overload import langsmith @@ -1343,14 +1342,19 @@ def _reset_example_attachments(self, example: schemas.Example) -> schemas.Exampl if not hasattr(example, "attachments") or not example.attachments: return example - new_attachments = {} + new_attachments: dict[str, schemas.AttachmentInfo] = {} for name, attachment in example.attachments.items(): - try: + if ( + self._attachment_raw_data_dict is not None + and str(example.id) + name in self._attachment_raw_data_dict + ): new_attachments[name] = { "presigned_url": attachment["presigned_url"], - "reader": io.BytesIO(self._attachment_raw_data_dict[str(example.id) + name]), + "reader": io.BytesIO( + self._attachment_raw_data_dict[str(example.id) + name] + ), } - except Exception: + else: new_attachments[name] = attachment # Create a new Example instance with the updated attachments @@ -1369,13 +1373,14 @@ def _reset_example_attachments(self, example: schemas.Example) -> schemas.Exampl _tenant_id=example._tenant_id, ) - def _make_fresh_examples( self, _original_examples: List[schemas.Example], ) -> List[schemas.Example]: """Create fresh copies of examples with reset readers.""" - return [self._reset_example_attachments(example) for example in _original_examples] + return [ + self._reset_example_attachments(example) for example in _original_examples + ] @property def examples(self) -> Iterable[schemas.Example]: @@ -1388,9 +1393,9 @@ def examples(self) -> Iterable[schemas.Example]: if self._reuse_attachments and self._attachment_raw_data_dict is None: examples_copy, self._examples = itertools.tee(self._examples) self._attachment_raw_data_dict = { - str(e.id) + name: value['reader'].read() + str(e.id) + name: value["reader"].read() for e in examples_copy - for name, value in e.attachments.items() + for name, value in (e.attachments or {}).items() } if self._num_repetitions > 1: examples_list = list(self._examples) @@ -1442,7 +1447,7 @@ def start(self) -> _ExperimentManager: include_attachments=self._include_attachments, reuse_attachments=self._reuse_attachments, upload_results=self._upload_results, - attachment_raw_data_dict=self._attachment_raw_data_dict + attachment_raw_data_dict=self._attachment_raw_data_dict, ) def with_predictions( diff --git a/python/langsmith/evaluation/evaluator.py b/python/langsmith/evaluation/evaluator.py index 931af0237..a1505699a 100644 --- a/python/langsmith/evaluation/evaluator.py +++ b/python/langsmith/evaluation/evaluator.py @@ -4,7 +4,6 @@ import asyncio import inspect -import io import uuid from abc import abstractmethod from typing import ( @@ -667,7 +666,7 @@ async def awrapper( "example": example, "inputs": example.inputs if example else {}, "outputs": run.outputs or {}, - "attachments": example.attachments or {}, + "attachments": example.attachments or {} if example else {}, "reference_outputs": example.outputs or {} if example else {}, } args = (arg_map[arg] for arg in positional_args) diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index 24f822654..bd1c37ca8 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -1551,7 +1551,6 @@ async def evaluator_2( assert result["evaluation_results"]["results"][0].score == 1.0 assert result["evaluation_results"]["results"][1].score == 1.0 - results = await langchain_client.aevaluate( target, data=dataset_name, From c9071a4aae51ea00b7bc558e8511c3ac863eaba1 Mon Sep 17 00:00:00 2001 From: isaac hershenson Date: Mon, 6 Jan 2025 14:01:10 -0800 Subject: [PATCH 07/10] fmt --- python/langsmith/evaluation/_arunner.py | 46 ++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 5 deletions(-) diff --git a/python/langsmith/evaluation/_arunner.py b/python/langsmith/evaluation/_arunner.py index 6822151a2..817becf82 100644 --- a/python/langsmith/evaluation/_arunner.py +++ b/python/langsmith/evaluation/_arunner.py @@ -581,13 +581,49 @@ def __init__( self._upload_results = upload_results self._attachment_raw_data_dict = attachment_raw_data_dict - def _make_fresh_examples(self, examples): - return ( - example - for example in examples - if str(example.id) not in self._attachment_raw_data_dict + def _reset_example_attachments(self, example: schemas.Example) -> schemas.Example: + """Reset attachment readers for an example.""" + if not hasattr(example, "attachments") or not example.attachments: + return example + + new_attachments: dict[str, schemas.AttachmentInfo] = {} + for name, attachment in example.attachments.items(): + if ( + self._attachment_raw_data_dict is not None + and str(example.id) + name in self._attachment_raw_data_dict + ): + new_attachments[name] = { + "presigned_url": attachment["presigned_url"], + "reader": io.BytesIO( + self._attachment_raw_data_dict[str(example.id) + name] + ), + } + else: + new_attachments[name] = attachment + + # Create a new Example instance with the updated attachments + return schemas.Example( + id=example.id, + created_at=example.created_at, + dataset_id=example.dataset_id, + inputs=example.inputs, + outputs=example.outputs, + metadata=example.metadata, + modified_at=example.modified_at, + runs=example.runs, + source_run_id=example.source_run_id, + attachments=new_attachments, + _host_url=example._host_url, + _tenant_id=example._tenant_id, ) + def _make_fresh_examples( + self, + examples: List[schemas.Example], + ) -> List[schemas.Example]: + """Create fresh copies of examples with reset readers.""" + return [self._reset_example_attachments(example) for example in examples] + async def aget_examples(self) -> AsyncIterator[schemas.Example]: if self._examples is None: self._examples = _aresolve_data( From 41cd64fce5d47da7f65fa405913140687f629fd3 Mon Sep 17 00:00:00 2001 From: isaac hershenson Date: Fri, 10 Jan 2025 09:58:25 -0800 Subject: [PATCH 08/10] ankush comment --- python/langsmith/evaluation/_arunner.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/python/langsmith/evaluation/_arunner.py b/python/langsmith/evaluation/_arunner.py index 817becf82..ad4a60544 100644 --- a/python/langsmith/evaluation/_arunner.py +++ b/python/langsmith/evaluation/_arunner.py @@ -617,13 +617,6 @@ def _reset_example_attachments(self, example: schemas.Example) -> schemas.Exampl _tenant_id=example._tenant_id, ) - def _make_fresh_examples( - self, - examples: List[schemas.Example], - ) -> List[schemas.Example]: - """Create fresh copies of examples with reset readers.""" - return [self._reset_example_attachments(example) for example in examples] - async def aget_examples(self) -> AsyncIterator[schemas.Example]: if self._examples is None: self._examples = _aresolve_data( @@ -642,7 +635,12 @@ async def aget_examples(self) -> AsyncIterator[schemas.Example]: examples_list = [example async for example in self._examples] self._examples = async_chain_from_iterable( [ - async_iter_from_list(self._make_fresh_examples(examples_list)) + async_iter_from_list( + [ + self._reset_example_attachments(example) + for example in examples_list + ] + ) for _ in range(self._num_repetitions) ] ) From 628c3c3dbf108d984c4522f701e2d9edf692e83d Mon Sep 17 00:00:00 2001 From: isaac hershenson Date: Fri, 10 Jan 2025 10:27:16 -0800 Subject: [PATCH 09/10] fmt --- python/langsmith/evaluation/_runner.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/python/langsmith/evaluation/_runner.py b/python/langsmith/evaluation/_runner.py index 02be981af..156067abe 100644 --- a/python/langsmith/evaluation/_runner.py +++ b/python/langsmith/evaluation/_runner.py @@ -1337,7 +1337,9 @@ def __init__( self._upload_results = upload_results self._attachment_raw_data_dict = attachment_raw_data_dict - def _reset_example_attachments(self, example: schemas.Example) -> schemas.Example: + def _reset_example_attachment_readers( + self, example: schemas.Example + ) -> schemas.Example: """Reset attachment readers for an example.""" if not hasattr(example, "attachments") or not example.attachments: return example @@ -1373,15 +1375,6 @@ def _reset_example_attachments(self, example: schemas.Example) -> schemas.Exampl _tenant_id=example._tenant_id, ) - def _make_fresh_examples( - self, - _original_examples: List[schemas.Example], - ) -> List[schemas.Example]: - """Create fresh copies of examples with reset readers.""" - return [ - self._reset_example_attachments(example) for example in _original_examples - ] - @property def examples(self) -> Iterable[schemas.Example]: if self._examples is None: @@ -1400,7 +1393,10 @@ def examples(self) -> Iterable[schemas.Example]: if self._num_repetitions > 1: examples_list = list(self._examples) self._examples = itertools.chain.from_iterable( - self._make_fresh_examples(examples_list) + [ + self._reset_example_attachment_readers(example) + for example in examples_list + ] for _ in range(self._num_repetitions) ) self._examples, examples_iter = itertools.tee(self._examples) From f00e2f6f0e2509ac5c4689421ad841a88a68d828 Mon Sep 17 00:00:00 2001 From: isaac hershenson Date: Fri, 10 Jan 2025 15:27:16 -0800 Subject: [PATCH 10/10] edits --- python/langsmith/evaluation/_arunner.py | 37 +++++++++-- python/langsmith/evaluation/_runner.py | 10 ++- python/tests/integration_tests/test_client.py | 64 +++++++++++++++++++ 3 files changed, 106 insertions(+), 5 deletions(-) diff --git a/python/langsmith/evaluation/_arunner.py b/python/langsmith/evaluation/_arunner.py index ad4a60544..d39752969 100644 --- a/python/langsmith/evaluation/_arunner.py +++ b/python/langsmith/evaluation/_arunner.py @@ -543,6 +543,18 @@ class _AsyncExperimentManager(_ExperimentManagerMixin): sresults for the experiment. summary_results (Optional[Iterable[EvaluationResults]]): The aggregate results for the experiment. + num_repetitions (Optional[int], default=1): The number of repetitions for + the experiment. + include_attachments (Optional[bool], default=False): Whether to include + attachments. This is used for when we pull the examples for the experiment. + reuse_attachments (Optional[bool], default=False): Whether to reuse attachments + from examples. This is True if we need to reuse attachments across multiple + target/evaluator functions. + upload_results (Optional[bool], default=True): Whether to upload results + to Langsmith. + attachment_raw_data_dict (Optional[dict]): A dictionary to store raw data + for attachments. Only used if we reuse attachments across multiple + target/evaluator functions. """ def __init__( @@ -582,7 +594,15 @@ def __init__( self._attachment_raw_data_dict = attachment_raw_data_dict def _reset_example_attachments(self, example: schemas.Example) -> schemas.Example: - """Reset attachment readers for an example.""" + """Reset attachment readers for an example. + + This is only in the case that an attachment is going to be used by more + than 1 callable (target + evaluators). In that case we keep a single copy + of the attachment data in self._attachment_raw_data_dict, and create + readers from that data. This makes it so that we don't have to keep + copies of the same data in memory, instead we can just create readers + from the same data. + """ if not hasattr(example, "attachments") or not example.attachments: return example @@ -761,6 +781,11 @@ async def awith_predictions_and_evaluators( self._evaluator_executor = cf.ThreadPoolExecutor(max_workers=4) async def process_examples(): + """Create a single task per example. + + That task is to run the target function and all the evaluators + sequentially. + """ async for pred in self._apredict( target, max_concurrency=max_concurrency, @@ -778,6 +803,9 @@ async def process_examples(): ) yield result + # Run the per-example tasks with max-concurrency + # This guarantees that max_concurrency is the upper limit + # for the number of target/evaluators that can be run in parallel experiment_results = aitertools.aiter_with_concurrency( max_concurrency, process_examples(), @@ -1000,9 +1028,10 @@ async def _run_single_evaluator(evaluator): exc_info=True, ) - all_results = await asyncio.gather( - *[_run_single_evaluator(evaluator) for evaluator in evaluators] - ) + all_results = [] + for evaluator in evaluators: + all_results.append(await _run_single_evaluator(evaluator)) + for result in all_results: if result is not None: eval_results["results"].extend(result) diff --git a/python/langsmith/evaluation/_runner.py b/python/langsmith/evaluation/_runner.py index 156067abe..96b73b008 100644 --- a/python/langsmith/evaluation/_runner.py +++ b/python/langsmith/evaluation/_runner.py @@ -1340,7 +1340,15 @@ def __init__( def _reset_example_attachment_readers( self, example: schemas.Example ) -> schemas.Example: - """Reset attachment readers for an example.""" + """Reset attachment readers for an example. + + This is only in the case that an attachment is going to be used by more + than 1 callable (target + evaluators). In that case we keep a single copy + of the attachment data in self._attachment_raw_data_dict, and create + readers from that data. This makes it so that we don't have to keep + copies of the same data in memory, instead we can just create readers + from the same data. + """ if not hasattr(example, "attachments") or not example.attachments: return example diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index bd1c37ca8..a35e96489 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -1,5 +1,6 @@ """LangSmith langchain_client Integration Tests.""" +import asyncio import datetime import io import logging @@ -1997,3 +1998,66 @@ def test_update_examples_multipart(langchain_client: Client) -> None: # Clean up langchain_client.delete_dataset(dataset_id=dataset.id) + + +async def test_aevaluate_max_concurrency(langchain_client: Client) -> None: + """Test max concurrency works as expected.""" + dataset_name = "__test_a_ton_of_feedback" + uuid4().hex[:4] + dataset = langchain_client.create_dataset( + dataset_name=dataset_name, + description="Test dataset for max concurrency", + ) + + examples = [ + ExampleUploadWithAttachments( + inputs={"query": "What's in this image?"}, + outputs={"answer": "A test image 1"}, + attachments={ + "image1": ("image/png", b"fake image data 1"), + "extra": ("text/plain", b"extra data"), + }, + ) + for _ in range(10) + ] + + langchain_client.upload_examples_multipart(dataset_id=dataset.id, uploads=examples) + + evaluators = [] + for _ in range(100): + + async def eval_func(inputs, outputs): + await asyncio.sleep(0.1) + return {"score": random.random()} + + evaluators.append(eval_func) + + async def target(inputs, attachments): + return {"foo": "bar"} + + start_time = time.time() + await langchain_client.aevaluate( + target, + data=dataset_name, + evaluators=evaluators, + max_concurrency=8, + ) + + end_time = time.time() + # this should proceed in a 8-2 manner, taking around 20 seconds total + assert end_time - start_time < 30 + assert end_time - start_time > 20 + + start_time = time.time() + await langchain_client.aevaluate( + target, + data=dataset_name, + evaluators=evaluators, + max_concurrency=4, + ) + + end_time = time.time() + # this should proceed in a 4-4-2 manner, taking around 30 seconds total + assert end_time - start_time < 40 + assert end_time - start_time > 30 + + langchain_client.delete_dataset(dataset_id=dataset.id)