diff --git a/python/bench/tracing_client_via_pyo3.py b/python/bench/tracing_client_via_pyo3.py index cb1cd6b97..411b97f2c 100644 --- a/python/bench/tracing_client_via_pyo3.py +++ b/python/bench/tracing_client_via_pyo3.py @@ -75,7 +75,7 @@ def benchmark_run_creation(json_size, num_runs) -> None: if client._pyo3_client: # Wait for the queue to drain. - client._pyo3_client.drain() + del client else: client.tracing_queue.join() diff --git a/python/bench/tracing_rust_client_bench.py b/python/bench/tracing_rust_client_bench.py index 6c5203d98..3f846e5d9 100644 --- a/python/bench/tracing_rust_client_bench.py +++ b/python/bench/tracing_rust_client_bench.py @@ -49,7 +49,7 @@ def benchmark_run_creation(num_runs: int, json_size: int, samples: int = 1) -> D client.create_run(run) # wait for client queues to be empty - client.drain() + del client elapsed = time.perf_counter() - start print(f"runs complete: {elapsed:.3f}s") diff --git a/rust/crates/langsmith-pyo3/src/blocking_tracing_client.rs b/rust/crates/langsmith-pyo3/src/blocking_tracing_client.rs index 77020bd74..6b3816f15 100644 --- a/rust/crates/langsmith-pyo3/src/blocking_tracing_client.rs +++ b/rust/crates/langsmith-pyo3/src/blocking_tracing_client.rs @@ -68,14 +68,19 @@ impl BlockingTracingClient { Python::allow_threads(slf.py(), || unpacked.client.submit_run_update(run.into_inner())) .map_err(|e| into_py_err(slf.py(), e)) } - - pub fn drain(slf: &Bound<'_, Self>) -> PyResult<()> { - let unpacked = slf.get(); - Python::allow_threads(slf.py(), || unpacked.client.drain()) - .map_err(|e| into_py_err(slf.py(), e)) - } } fn into_py_err(py: Python<'_>, e: langsmith_tracing_client::client::TracingClientError) -> PyErr { crate::errors::TracingClientError::new_err(format!("{e}").into_py(py)) } + +impl Drop for BlockingTracingClient { + fn drop(&mut self) { + if Arc::strong_count(&self.client) == 1 { + // This is the only copy of the client in Python, + // so let it drain its in-progress requests before proceeding. + // This runs when Python runs GC on the client, such as when the application is exiting. + self.client.drain().expect("draining failed"); + } + } +} diff --git a/rust/crates/langsmith-tracing-client/src/client/blocking/processor.rs b/rust/crates/langsmith-tracing-client/src/client/blocking/processor.rs index bb23cd858..495c546d9 100644 --- a/rust/crates/langsmith-tracing-client/src/client/blocking/processor.rs +++ b/rust/crates/langsmith-tracing-client/src/client/blocking/processor.rs @@ -56,13 +56,7 @@ impl RunProcessor { } self.drain_sender.send(()).expect("drain_sender should never fail"); - - // Put this thread to sleep, so we know the remaining `Drain` messages - // are almost certainly answered by other worker threads. - // - // HACK: This is very hacky! - // Drain should only be used for benchmarking. - std::thread::sleep(Duration::from_secs(120)); + break; } _ => { buffer.push(queued_run); diff --git a/rust/crates/langsmith-tracing-client/src/client/blocking/tracing_client.rs b/rust/crates/langsmith-tracing-client/src/client/blocking/tracing_client.rs index f9f11e5a2..4861ffe3d 100644 --- a/rust/crates/langsmith-tracing-client/src/client/blocking/tracing_client.rs +++ b/rust/crates/langsmith-tracing-client/src/client/blocking/tracing_client.rs @@ -82,6 +82,15 @@ impl TracingClient { self.sender.send(queued_run).map_err(|_| TracingClientError::QueueFull) } + /// Complete all in-progress requests, then allow the worker threads to exit. + /// + /// Convenience function for the PyO3 bindings, which cannot use [`Self::shutdown`] + /// due to its by-value `self`. This means we cannot `.join()` the threads, + /// but the client is nevertheless unusable after this call. + /// + /// Sending further data after a [`Self::drain()`] call has unspecified behavior. + /// It will not cause *undefined behavior* in the programming language sense, + /// but it may e.g. cause errors, panics, or even silently fail, with no guarantees. pub fn drain(&self) -> Result<(), TracingClientError> { for _ in &self.handles { self.sender.send(QueuedRun::Drain).map_err(|_| TracingClientError::QueueFull)?; diff --git a/rust/crates/langsmith-tracing-client/src/client/run.rs b/rust/crates/langsmith-tracing-client/src/client/run.rs index 4750c1028..f1484e50e 100644 --- a/rust/crates/langsmith-tracing-client/src/client/run.rs +++ b/rust/crates/langsmith-tracing-client/src/client/run.rs @@ -135,6 +135,6 @@ pub(crate) enum QueuedRun { Update(RunUpdateExtended), #[expect(dead_code)] RunBytes(RunEventBytes), - Drain, + Drain, // Like `Shutdown`, but explicitly sends a message confirming draining is complete. Shutdown, }