Skip to content

Commit

Permalink
Drain the Rust client when its last copy in Python is dropped. (#1334)
Browse files Browse the repository at this point in the history
If the Python application wants to exit (or the client just gets garbage
collected), we want to finish sending any batched requests before
proceeding with the GC or app shutdown.

Normally, Python will make sure to shut down any Python-side threads
before allowing the program to exit. But it's completely unaware of
Rust's own threads, so it can't shut them down or wait for them. This is
why we need to explicitly make sure they complete their work and shut
down on their own. The `Drop` trait in Rust runs when the value is
dropped, and we ensure we shut down the background threads *only* when
we're certain the only `Arc` copy of the Rust client is the one we're
holding (and which is about to go out of scope).
  • Loading branch information
obi1kenobi authored Dec 12, 2024
1 parent 674b701 commit 7423b35
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 16 deletions.
2 changes: 1 addition & 1 deletion python/bench/tracing_client_via_pyo3.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def benchmark_run_creation(json_size, num_runs) -> None:

if client._pyo3_client:
# Wait for the queue to drain.
client._pyo3_client.drain()
del client
else:
client.tracing_queue.join()

Expand Down
2 changes: 1 addition & 1 deletion python/bench/tracing_rust_client_bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def benchmark_run_creation(num_runs: int, json_size: int, samples: int = 1) -> D
client.create_run(run)

# wait for client queues to be empty
client.drain()
del client
elapsed = time.perf_counter() - start

print(f"runs complete: {elapsed:.3f}s")
Expand Down
17 changes: 11 additions & 6 deletions rust/crates/langsmith-pyo3/src/blocking_tracing_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,19 @@ impl BlockingTracingClient {
Python::allow_threads(slf.py(), || unpacked.client.submit_run_update(run.into_inner()))
.map_err(|e| into_py_err(slf.py(), e))
}

pub fn drain(slf: &Bound<'_, Self>) -> PyResult<()> {
let unpacked = slf.get();
Python::allow_threads(slf.py(), || unpacked.client.drain())
.map_err(|e| into_py_err(slf.py(), e))
}
}

fn into_py_err(py: Python<'_>, e: langsmith_tracing_client::client::TracingClientError) -> PyErr {
crate::errors::TracingClientError::new_err(format!("{e}").into_py(py))
}

impl Drop for BlockingTracingClient {
fn drop(&mut self) {
if Arc::strong_count(&self.client) == 1 {
// This is the only copy of the client in Python,
// so let it drain its in-progress requests before proceeding.
// This runs when Python runs GC on the client, such as when the application is exiting.
self.client.drain().expect("draining failed");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,7 @@ impl RunProcessor {
}

self.drain_sender.send(()).expect("drain_sender should never fail");

// Put this thread to sleep, so we know the remaining `Drain` messages
// are almost certainly answered by other worker threads.
//
// HACK: This is very hacky!
// Drain should only be used for benchmarking.
std::thread::sleep(Duration::from_secs(120));
break;
}
_ => {
buffer.push(queued_run);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ impl TracingClient {
self.sender.send(queued_run).map_err(|_| TracingClientError::QueueFull)
}

/// Complete all in-progress requests, then allow the worker threads to exit.
///
/// Convenience function for the PyO3 bindings, which cannot use [`Self::shutdown`]
/// due to its by-value `self`. This means we cannot `.join()` the threads,
/// but the client is nevertheless unusable after this call.
///
/// Sending further data after a [`Self::drain()`] call has unspecified behavior.
/// It will not cause *undefined behavior* in the programming language sense,
/// but it may e.g. cause errors, panics, or even silently fail, with no guarantees.
pub fn drain(&self) -> Result<(), TracingClientError> {
for _ in &self.handles {
self.sender.send(QueuedRun::Drain).map_err(|_| TracingClientError::QueueFull)?;
Expand Down
2 changes: 1 addition & 1 deletion rust/crates/langsmith-tracing-client/src/client/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,6 @@ pub(crate) enum QueuedRun {
Update(RunUpdateExtended),
#[expect(dead_code)]
RunBytes(RunEventBytes),
Drain,
Drain, // Like `Shutdown`, but explicitly sends a message confirming draining is complete.
Shutdown,
}

0 comments on commit 7423b35

Please sign in to comment.