Skip to content

Commit

Permalink
Plumb through initial grpc support for the python SDK
Browse files Browse the repository at this point in the history
  • Loading branch information
jleibs committed Nov 4, 2024
1 parent 045048f commit 5ca018e
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 0 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6793,10 +6793,14 @@ dependencies = [
"re_log",
"re_log_types",
"re_memory",
"re_remote_store_types",
"re_sdk",
"re_video",
"re_web_viewer_server",
"re_ws_comms",
"tokio",
"tokio-stream",
"tonic",
"uuid",
]

Expand Down
16 changes: 16 additions & 0 deletions rerun_py/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ extension-module = ["pyo3/extension-module"]
## You need to install [nasm](https://nasm.us/) to compile with this feature.
nasm = ["re_video/nasm"]

remote = [
"dep:re_remote_store_types",
"dep:re_ws_comms",
"dep:tokio",
"dep:tokio-stream",
"dep:tonic",
]

## Support serving a web viewer over HTTP with `serve()`.
##
## Enabling this adds quite a bit to the binary size,
Expand Down Expand Up @@ -71,6 +79,14 @@ pyo3 = { workspace = true, features = ["abi3-py38"] }
rand = { workspace = true, features = ["std", "std_rng"] }
uuid.workspace = true

# Deps for remote feature
re_remote_store_types = { workspace = true, optional = true }
tokio = { workspace = true, optional = true }
tokio-stream = { workspace = true, optional = true }
tonic = { workspace = true, default-features = false, features = [
"transport",
], optional = true }


[build-dependencies]
re_build_tools.workspace = true
Expand Down
1 change: 1 addition & 0 deletions rerun_py/rerun_sdk/rerun/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
dataframe as dataframe,
experimental as experimental,
notebook as notebook,
remote as remote,
)
from ._baseclasses import (
ComponentColumn as ComponentColumn,
Expand Down
10 changes: 10 additions & 0 deletions rerun_py/rerun_sdk/rerun/remote.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from __future__ import annotations

try:
from rerun_bindings import (
connect as connect,
)
except ImportError:

def connect(url: str) -> None:
raise NotImplementedError("Rerun SDK was built without the `remote` feature enabled.")
3 changes: 3 additions & 0 deletions rerun_py/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ mod arrow;
mod dataframe;
mod python_bridge;
mod video;

#[cfg(feature = "remote")]
mod remote;
3 changes: 3 additions & 0 deletions rerun_py/src/python_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ fn rerun_bindings(_py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
// dataframes
crate::dataframe::register(m)?;

#[cfg(feature = "remote")]
crate::remote::register(m)?;

Ok(())
}

Expand Down
85 changes: 85 additions & 0 deletions rerun_py/src/remote.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#![allow(unsafe_op_in_unsafe_fn)]
use arrow2::io::ipc::write::Record;
// False positive due to #[pyfunction] macro
use pyo3::{exceptions::PyRuntimeError, prelude::*, Bound, PyResult};
use re_remote_store_types::v0::{storage_node_client::StorageNodeClient, ListRecordingsRequest};

/// Register the `rerun.remote` module.
pub(crate) fn register(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(connect, m)?)?;

Ok(())
}

async fn connect_async(addr: String) -> PyResult<StorageNodeClient<tonic::transport::Channel>> {
#[cfg(not(target_arch = "wasm32"))]
let tonic_client = tonic::transport::Endpoint::new(addr)
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?
.connect()
.await
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;

Ok(StorageNodeClient::new(tonic_client))
}

#[pyfunction]
pub fn connect(addr: String) -> PyResult<PyConnection> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;

let client = runtime.block_on(connect_async(addr))?;

Ok(PyConnection { runtime, client })
}

/// A connection to a remote storage node.
#[pyclass(name = "Connection")]
pub struct PyConnection {
/// A tokio runtime for async operations. This connection will currently
/// block the Python interpreter while waiting for responses.
/// This runtime must be persisted for the lifetime of the connection.
runtime: tokio::runtime::Runtime,

/// The actual tonic connection.
client: StorageNodeClient<tonic::transport::Channel>,
}

#[pymethods]
impl PyConnection {
/// List all recordings registered with the node.
fn list_recordings(&mut self) -> PyResult<Vec<PyRecordingInfo>> {
self.runtime.block_on(async {
let request = ListRecordingsRequest {};

let resp = self
.client
.list_recordings(request)
.await
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;

Ok(resp
.into_inner()
.recordings
.into_iter()
.map(|recording| PyRecordingInfo { info: recording })
.collect())
})
}
}

/// The info for a recording stored in the archive.
#[pyclass(name = "RecordingInfo")]
pub struct PyRecordingInfo {
info: re_remote_store_types::v0::RecordingInfo,
}

#[pymethods]
impl PyRecordingInfo {
fn __repr__(&self) -> String {
format!(
"Recording(id={})",
self.info.id.as_ref().map_or("Unknown", |id| id.id.as_str())
)
}
}

0 comments on commit 5ca018e

Please sign in to comment.