Skip to content

Commit

Permalink
python[minor]: Release 0.3.0 (#1439)
Browse files Browse the repository at this point in the history
- turn on run compression by default
- make zstandard a required dep
- add pytest plugin
- proper concurrency handling in Client.evaluate/aevaluate so that no more than max_concurrency examples are evaluated at once
- support for filesystem access when uploading attachments

-------------
Co-authored-by: Angus Jelinek <[email protected]>
Co-authored-by: Isaac Francisco <[email protected]>
Co-authored-by: Predrag Gruevski <[email protected]>
Co-authored-by: Erick Friis <[email protected]>
Co-authored-by: isaac hershenson <[email protected]>
Co-authored-by: Jacob Lee <[email protected]>
  • Loading branch information
baskaryan authored Jan 22, 2025
1 parent 20cc074 commit dab4cdb
Show file tree
Hide file tree
Showing 30 changed files with 4,286 additions and 1,987 deletions.
35 changes: 15 additions & 20 deletions python/docs/create_api_rst.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Script for auto-generating api_reference.rst."""

from __future__ import annotations

import importlib
import inspect
import logging
Expand Down Expand Up @@ -82,6 +84,10 @@ class ModuleMembers(TypedDict):
}


def _document_func_or_class(name: str) -> bool:
return (not name.startswith("_")) or name in ("_Expect")


def _load_module_members(module_path: str, namespace: str) -> ModuleMembers:
classes_: List[ClassInfo] = []
functions: List[FunctionInfo] = []
Expand All @@ -108,24 +114,12 @@ def _load_module_members(module_path: str, namespace: str) -> ModuleMembers:
else "Pydantic" if issubclass(type_, BaseModel) else "Regular"
)
)
# if hasattr(type_, "__slots__"):
# for func_name, func_type in inspect.getmembers(type_):
# if inspect.isfunction(func_type):
# functions.append(
# FunctionInfo(
# name=func_name,
# qualified_name=f"{namespace}.{name}.{func_name}",
# is_public=not func_name.startswith("_"),
# is_deprecated=".. deprecated::"
# in (func_type.__doc__ or ""),
# )
# )
classes_.append(
ClassInfo(
name=name,
qualified_name=f"{namespace}.{name}",
kind=kind,
is_public=not name.startswith("_"),
is_public=_document_func_or_class(name),
is_deprecated=".. deprecated::" in (type_.__doc__ or ""),
)
)
Expand All @@ -134,7 +128,7 @@ def _load_module_members(module_path: str, namespace: str) -> ModuleMembers:
FunctionInfo(
name=name,
qualified_name=f"{namespace}.{name}",
is_public=not name.startswith("_"),
is_public=_document_func_or_class(name),
is_deprecated=".. deprecated::" in (type_.__doc__ or ""),
)
)
Expand All @@ -156,9 +150,10 @@ def _load_package_modules(
if file_path.name not in {
"_runner.py",
"_arunner.py",
"_testing.py",
"_internal.py",
"_expect.py",
"_openai.py",
"_expect.py",
}:
continue

Expand Down Expand Up @@ -200,6 +195,8 @@ def _load_package_modules(
"utils",
"anonymizer",
"wrappers",
"testing",
"_expect",
]


Expand Down Expand Up @@ -387,20 +384,18 @@ def _build_index(package_version: str) -> None:
| [AsyncClient](async_client/langsmith.async_client.AsyncClient) | Asynchronous client for interacting with the LangSmith API. |
| [traceable](run_helpers/langsmith.run_helpers.traceable) | Wrapper/decorator for tracing any function. |
| [wrap_openai](wrappers/langsmith.wrappers._openai.wrap_openai) | Wrapper for OpenAI client, adds LangSmith tracing to all OpenAI calls. |
| [evaluate](evaluation/langsmith.evaluation._runner.evaluate) | Evaluate an application on a dataset. |
| [aevaluate](evaluation/langsmith.evaluation._arunner.aevaluate) | Asynchronously evaluate an application on a dataset. |
| [unit](_testing/langsmith._testing.unit) | Create a LangSmith unit test. |
| [@pytest.mark.langsmith](/testing/langsmith.testing._internal.test) | LangSmith pytest integration. |
```{{toctree}}
:maxdepth: 2
:hidden:
client<client>
async_client<async_client>
evaluation<evaluation>
run_helpers<run_helpers>
wrappers<wrappers>
_testing<_testing>
testing<testing>
_expect<_expect>
```
"""
Expand Down
1 change: 0 additions & 1 deletion python/docs/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,3 @@ sphinx-design
sphinx-copybutton
beautifulsoup4
openai
-e python
6 changes: 3 additions & 3 deletions python/langsmith/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

if TYPE_CHECKING:
from langsmith._expect import expect
from langsmith._testing import test, unit
from langsmith.async_client import AsyncClient
from langsmith.client import Client
from langsmith.evaluation import aevaluate, evaluate
Expand All @@ -18,6 +17,7 @@
tracing_context,
)
from langsmith.run_trees import RunTree
from langsmith.testing._internal import test, unit
from langsmith.utils import (
ContextThreadPoolExecutor,
)
Expand Down Expand Up @@ -63,7 +63,7 @@ def __getattr__(name: str) -> Any:
return traceable

elif name == "test":
from langsmith._testing import test
from langsmith.testing._internal import test

return test

Expand Down Expand Up @@ -104,7 +104,7 @@ def __getattr__(name: str) -> Any:
return get_current_run_tree

elif name == "unit":
from langsmith._testing import unit
from langsmith.testing._internal import unit

return unit
elif name == "ContextThreadPoolExecutor":
Expand Down
70 changes: 37 additions & 33 deletions python/langsmith/_expect.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,50 @@
"""Make approximate assertions as "expectations" on test results.
This module is designed to be used within test cases decorated with the `@test` decorator
This module is designed to be used within test cases decorated with the
`@pytest.mark.decorator` decorator
It allows you to log scores about a test case and optionally make assertions that log as
"expectation" feedback to LangSmith.
Example usage:
.. code-block:: python
from langsmith import expect, test
import pytest
from langsmith import expect
@test
def test_output_semantically_close():
response = oai_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Say hello!"},
],
)
response_txt = response.choices[0].message.content
# Intended usage
expect.embedding_distance(
prediction=response_txt,
reference="Hello!",
).to_be_less_than(0.9)
# Score the test case
matcher = expect.edit_distance(
prediction=response_txt,
reference="Hello!",
)
# Apply an assertion and log 'expectation' feedback to LangSmith
matcher.to_be_less_than(1)
# You can also directly make assertions on values directly
expect.value(response_txt).to_contain("Hello!")
# Or using a custom check
expect.value(response_txt).against(lambda x: "Hello" in x)
@pytest.mark.langsmith
def test_output_semantically_close():
response = oai_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Say hello!"},
],
)
response_txt = response.choices[0].message.content
# Intended usage
expect.embedding_distance(
prediction=response_txt,
reference="Hello!",
).to_be_less_than(0.9)
# Score the test case
matcher = expect.edit_distance(
prediction=response_txt,
reference="Hello!",
)
# Apply an assertion and log 'expectation' feedback to LangSmith
matcher.to_be_less_than(1)
# You can also directly make assertions on values directly
expect.value(response_txt).to_contain("Hello!")
# Or using a custom check
expect.value(response_txt).against(lambda x: "Hello" in x)
# You can even use this for basic metric logging within tests
# You can even use this for basic metric logging within tests
expect.score(0.8)
expect.score(0.7, key="similarity").to_be_greater_than(0.7)
expect.score(0.8)
expect.score(0.7, key="similarity").to_be_greater_than(0.7)
""" # noqa: E501

from __future__ import annotations
Expand Down Expand Up @@ -379,7 +383,7 @@ def value(self, value: Any) -> _Matcher:

def score(
self,
score: Union[float, int],
score: Union[float, int, bool],
*,
key: str = "score",
source_run_id: Optional[ls_client.ID_TYPE] = None,
Expand Down
100 changes: 81 additions & 19 deletions python/langsmith/_internal/_background_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import sys
import threading
import time
import weakref
from multiprocessing import cpu_count
from queue import Empty, Queue
Expand All @@ -20,6 +21,7 @@

from langsmith import schemas as ls_schemas
from langsmith import utils as ls_utils
from langsmith._internal._compressed_runs import CompressedRuns
from langsmith._internal._constants import (
_AUTO_SCALE_DOWN_NEMPTY_TRIGGER,
_AUTO_SCALE_UP_NTHREADS_LIMIT,
Expand Down Expand Up @@ -100,7 +102,8 @@ def _tracing_thread_drain_queue(
def _tracing_thread_drain_compressed_buffer(
client: Client, size_limit: int = 100, size_limit_bytes: int | None = 20_971_520
) -> Tuple[Optional[io.BytesIO], Optional[Tuple[int, int]]]:
assert client.compressed_runs is not None
if client.compressed_runs is None:
return None, None
with client.compressed_runs.lock:
client.compressed_runs.compressor_writer.flush()
current_size = client.compressed_runs.buffer.tell()
Expand Down Expand Up @@ -214,6 +217,24 @@ def tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None:
scale_up_qsize_trigger: int = batch_ingest_config["scale_up_qsize_trigger"]
use_multipart = batch_ingest_config.get("use_multipart_endpoint", False)

disable_compression = ls_utils.get_env_var("DISABLE_RUN_COMPRESSION")
if not ls_utils.is_truish(disable_compression) and use_multipart:
if not (client.info.instance_flags or {}).get(
"zstd_compression_enabled", False
):
logger.warning(
"Run compression is not enabled. Please update to the latest "
"version of LangSmith. Falling back to regular multipart ingestion."
)
else:
client._futures = set()
client.compressed_runs = CompressedRuns()
client._data_available_event = threading.Event()
threading.Thread(
target=tracing_control_thread_func_compress_parallel,
args=(weakref.ref(client),),
).start()

sub_threads: List[threading.Thread] = []
# 1 for this func, 1 for getrefcount, 1 for _get_data_type_cached
num_known_refs = 3
Expand Down Expand Up @@ -256,6 +277,7 @@ def keep_thread_active() -> bool:
_tracing_thread_handle_batch(
client, tracing_queue, next_batch, use_multipart
)

# drain the queue on exit
while next_batch := _tracing_thread_drain_queue(
tracing_queue, limit=size_limit, block=False
Expand All @@ -264,12 +286,20 @@ def keep_thread_active() -> bool:


def tracing_control_thread_func_compress_parallel(
client_ref: weakref.ref[Client],
client_ref: weakref.ref[Client], flush_interval: float = 0.5
) -> None:
client = client_ref()
if client is None:
return

if (
client.compressed_runs is None
or client._data_available_event is None
or client._futures is None
):
logger.error("Required compression attributes not initialized")
return

batch_ingest_config = _ensure_ingest_config(client.info)
size_limit: int = batch_ingest_config["size_limit"]
size_limit_bytes = batch_ingest_config.get("size_limit_bytes", 20_971_520)
Expand Down Expand Up @@ -300,35 +330,67 @@ def keep_thread_active() -> bool:
# for now, keep thread alive
return True

last_flush_time = time.monotonic()

while True:
triggered = client._data_available_event.wait(timeout=0.05)
if not keep_thread_active():
break
if not triggered:
continue
client._data_available_event.clear()

data_stream, compressed_runs_info = _tracing_thread_drain_compressed_buffer(
client, size_limit, size_limit_bytes
)
# If data arrived, clear the event and attempt a drain
if triggered:
client._data_available_event.clear()

if data_stream is not None:
try:
future = HTTP_REQUEST_THREAD_POOL.submit(
client._send_compressed_multipart_req,
data_stream,
compressed_runs_info,
data_stream, compressed_runs_info = _tracing_thread_drain_compressed_buffer(
client, size_limit, size_limit_bytes
)
# If we have data, submit the send request
if data_stream is not None:
try:
future = HTTP_REQUEST_THREAD_POOL.submit(
client._send_compressed_multipart_req,
data_stream,
compressed_runs_info,
)
client._futures.add(future)
except RuntimeError:
client._send_compressed_multipart_req(
data_stream,
compressed_runs_info,
)
last_flush_time = time.monotonic()

else:
if (time.monotonic() - last_flush_time) >= flush_interval:
data_stream, compressed_runs_info = (
_tracing_thread_drain_compressed_buffer(
client, size_limit=1, size_limit_bytes=1
)
)
client._futures.add(future)
except RuntimeError:
client._send_compressed_multipart_req(data_stream, compressed_runs_info)
if data_stream is not None:
try:
cf.wait(
[
HTTP_REQUEST_THREAD_POOL.submit(
client._send_compressed_multipart_req,
data_stream,
compressed_runs_info,
)
]
)
except RuntimeError:
client._send_compressed_multipart_req(
data_stream,
compressed_runs_info,
)
last_flush_time = time.monotonic()

# Drain the buffer on exit
# Drain the buffer on exit (final flush)
try:
final_data_stream, compressed_runs_info = (
_tracing_thread_drain_compressed_buffer(
client, size_limit=1, size_limit_bytes=1
) # Force final drain
)
)
if final_data_stream is not None:
try:
Expand Down
Loading

0 comments on commit dab4cdb

Please sign in to comment.