Skip to content

Commit

Permalink
Merge branch 'ankush/debug-rust-io' of github.com:langchain-ai/langsm…
Browse files Browse the repository at this point in the history
…ith-sdk into ankush/debug-rust-io
  • Loading branch information
agola11 committed Dec 12, 2024
2 parents a22f99c + f376e33 commit 93558a1
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 1 deletion.
21 changes: 21 additions & 0 deletions python/langsmith/_internal/_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,27 @@ def serialize_run_dict(
attachments=attachments if attachments is not None else None,
)

def serialize_run_dict_for_compressed_ingest(
operation: Literal["post", "patch"], payload: dict
):
inputs = payload.pop("inputs", None)
outputs = payload.pop("outputs", None)
events = payload.pop("events", None)
attachments = payload.pop("attachments", None)
serialized = ...
extra = ...
return SerializedRunOperation(
operation=operation,
id=payload["id"],
trace_id=payload["trace_id"],
_none=_dumps_json(payload),
inputs=_dumps_json(inputs) if inputs is not None else None,
outputs=_dumps_json(outputs) if outputs is not None else None,
events=_dumps_json(events) if events is not None else None,
attachments=attachments if attachments is not None else None,
)



def combine_serialized_queue_operations(
ops: list[Union[SerializedRunOperation, SerializedFeedbackOperation]],
Expand Down
9 changes: 8 additions & 1 deletion python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,10 +494,13 @@ def __init__(
if info is None or isinstance(info, ls_schemas.LangSmithInfo)
else ls_schemas.LangSmithInfo(**info)
)
self.compressed_multipart_buffer = ...
weakref.finalize(self, close_session, self.session)
atexit.register(close_session, session_)
# Initialize auto batching
if auto_batch_tracing:
if auto_batch_tracing and _compression_enabled:
...
elif auto_batch_tracing:
self.tracing_queue: Optional[PriorityQueue] = PriorityQueue()

threading.Thread(
Expand Down Expand Up @@ -1295,6 +1298,10 @@ def create_run(
self.tracing_queue.put(
TracingQueueItem(run_create["dotted_order"], serialized_op)
)
elif os.environ["COMP"]:
# Do something different
# Use existing serialized_run_dict

else:
# Neither Rust nor Python batch ingestion is configured,
# fall back to the non-batch approach.
Expand Down

0 comments on commit 93558a1

Please sign in to comment.