From d886b7b9819a92dd9d0903c8250837f87e225de9 Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Tue, 2 Jul 2024 10:43:16 -0400 Subject: [PATCH 1/4] Update Rust --- flake.lock | 46 ++++++---------------------------------------- 1 file changed, 6 insertions(+), 40 deletions(-) diff --git a/flake.lock b/flake.lock index a8aa7d8..280d8e9 100644 --- a/flake.lock +++ b/flake.lock @@ -55,24 +55,6 @@ "type": "github" } }, - "flake-utils_2": { - "inputs": { - "systems": "systems_2" - }, - "locked": { - "lastModified": 1705309234, - "narHash": "sha256-uNRRNRKmJyCRC/8y1RqBkqWBLM034y4qN7EprSdmgyA=", - "owner": "numtide", - "repo": "flake-utils", - "rev": "1ef2e671c3b0c19053962c07dbda38332dcebf26", - "type": "github" - }, - "original": { - "owner": "numtide", - "repo": "flake-utils", - "type": "github" - } - }, "nixpkgs": { "locked": { "lastModified": 1711703276, @@ -91,11 +73,11 @@ }, "nixpkgs_2": { "locked": { - "lastModified": 1706487304, - "narHash": "sha256-LE8lVX28MV2jWJsidW13D2qrHU/RUUONendL2Q/WlJg=", + "lastModified": 1718428119, + "narHash": "sha256-WdWDpNaq6u1IPtxtYHHWpl5BmabtpmLnMAx0RdJ/vo8=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "90f456026d284c22b3e3497be980b2e47d0b28ac", + "rev": "e6cea36f83499eb4e9cd184c8a8e823296b50ad5", "type": "github" }, "original": { @@ -133,15 +115,14 @@ }, "rust-overlay": { "inputs": { - "flake-utils": "flake-utils_2", "nixpkgs": "nixpkgs_2" }, "locked": { - "lastModified": 1716949111, - "narHash": "sha256-ms3aD3Z2jKd1dk8qd0D/N7C8vFxn6z6LQ1G7cvNTVJ8=", + "lastModified": 1719886738, + "narHash": "sha256-6eaaoJUkr4g9J/rMC4jhj3Gv8Sa62rvlpjFe3xZaSjM=", "owner": "oxalica", "repo": "rust-overlay", - "rev": "2e7ccf572ce0f0547d4cf4426de4482936882d0e", + "rev": "db12d0c6ef002f16998723b5dd619fa7b8997086", "type": "github" }, "original": { @@ -164,21 +145,6 @@ "repo": "default", "type": "github" } - }, - "systems_2": { - "locked": { - "lastModified": 1681028828, - "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", - "owner": "nix-systems", - "repo": "default", - "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", - "type": "github" - }, - "original": { - "owner": "nix-systems", - "repo": "default", - "type": "github" - } } }, "root": "root", From 145962b5fd017b3b51cad01bf2aeaacc4b0d7ec0 Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Tue, 2 Jul 2024 10:43:20 -0400 Subject: [PATCH 2/4] Permit handling of socket messages to/from client in separate tasks This allows a socket handler to spawn separate tasks to handle messages to and from the client. This satisfies a use case of the node dashboard server. There are two ways to handle this, neither of which worked before this change: * split the connection into a separate stream and sink, send one to each task. This requires messages to be sent into the sink by value, due to a subtle lifetime issue that is explained in the documentation. Thus, this change adds an implementation of Sink for Connection, to complement the existing Sink<&ToClient> impl * clone the connection and send one clone into each task. This change implements Clone for Connection. The Pin> is stateless and derived from the WebSocketConnection. Thus, we can "clone" it by recreating it from a clone of the connection. All the state is cloned with the connection itself. New documentation tests/examples are provided to demonstrate the two methods of accomplishing this use case and prove they compile. --- src/api.rs | 86 +++++++++++++++++++++++++++++++++++++++++++++++++-- src/app.rs | 2 +- src/socket.rs | 53 ++++++++++++++++++++++++++----- 3 files changed, 131 insertions(+), 10 deletions(-) diff --git a/src/api.rs b/src/api.rs index ab49955..a772a8b 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1080,7 +1080,89 @@ where /// }.boxed()); /// # } /// ``` - // + /// + /// In some cases, it may be desirable to handle messages to and from the client in separate + /// tasks. There are two ways of doing this: + /// + /// ## Split the connection into separate stream and sink + /// + /// ``` + /// use async_std::task::spawn; + /// use futures::{future::{join, FutureExt}, sink::SinkExt, stream::StreamExt}; + /// use tide_disco::{error::ServerError, socket::Connection, Api}; + /// # use vbs::version::StaticVersion; + /// + /// # fn ex(api: &mut Api<(), ServerError, StaticVersion<0, 1>>) { + /// api.socket("endpoint", |_req, mut conn: Connection>, _state| async move { + /// let (mut sink, mut stream) = conn.split(); + /// let recv = spawn(async move { + /// while let Some(Ok(msg)) = stream.next().await { + /// // Handle message from client. + /// } + /// }); + /// let send = spawn(async move { + /// loop { + /// let msg = // get message to send to client + /// # 0; + /// sink.send(msg).await; + /// } + /// }); + /// + /// join(send, recv).await; + /// Ok(()) + /// }.boxed()); + /// # } + /// ``` + /// + /// This approach requires messages to be sent to the client by value, consuming the message. + /// This is because, if we were to use the `Sync<&ToClient>` implementation for `Connection`, + /// the lifetime for `&ToClient` would be fixed after `split` is called, since the lifetime + /// appears in the return type, `SplitSink, &ToClient>`. Thus, this lifetime + /// outlives any scoped local variables created after the `split` call, such as `msg` in the + /// `loop`. + /// + /// If we want to use the message after sending it to the client, we would have to clone it, + /// which may be inefficient or impossible. Thus, there is another approach: + /// + /// ## Clone the connection + /// + /// ``` + /// use async_std::task::spawn; + /// use futures::{future::{join, FutureExt}, sink::SinkExt, stream::StreamExt}; + /// use tide_disco::{error::ServerError, socket::Connection, Api}; + /// # use vbs::version::StaticVersion; + /// + /// # fn ex(api: &mut Api<(), ServerError, StaticVersion<0, 1>>) { + /// api.socket("endpoint", |_req, mut conn: Connection>, _state| async move { + /// let recv = { + /// let mut conn = conn.clone(); + /// spawn(async move { + /// while let Some(Ok(msg)) = conn.next().await { + /// // Handle message from client. + /// } + /// }) + /// }; + /// let send = spawn(async move { + /// loop { + /// let msg = // get message to send to client + /// # 0; + /// conn.send(&msg).await; + /// // msg is still live at this point. + /// drop(msg); + /// } + /// }); + /// + /// join(send, recv).await; + /// Ok(()) + /// }.boxed()); + /// # } + /// ``` + /// + /// Depending on the exact situation, this method may end up being more verbose than the + /// previous example. But it allows us to retain the higher-ranked trait bound `conn: for<'a> + /// Sink<&'a ToClient>` instead of fixing the lifetime, which can prevent an unnecessary clone + /// in certain situations. + /// /// # Errors /// /// If the route `name` does not exist in the API specification, or if the route already has a @@ -1531,7 +1613,7 @@ mod test { .unwrap() .socket( "once", - |_req, mut conn: Connection<_, (), _, StaticVer01>, _state| { + |_req, mut conn: Connection, _state| { async move { conn.send("msg").boxed().await?; Ok(()) diff --git a/src/app.rs b/src/app.rs index 6443f91..c14ce4e 100644 --- a/src/app.rs +++ b/src/app.rs @@ -956,7 +956,7 @@ mod test { .unwrap() .socket( "socket_test", - |_req, mut conn: Connection<_, (), _, StaticVer01>, _state| { + |_req, mut conn: Connection, _state| { async move { conn.send("SOCKET").await.unwrap(); Ok(()) diff --git a/src/socket.rs b/src/socket.rs index dfd4269..13a4952 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -194,6 +194,28 @@ impl Sink<& } } +impl Sink + for Connection +{ + type Error = SocketError; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Sink::<&ToClient>::poll_ready(self, cx) + } + + fn start_send(self: Pin<&mut Self>, item: ToClient) -> Result<(), Self::Error> { + self.start_send(&item) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Sink::<&ToClient>::poll_flush(self, cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Sink::<&ToClient>::poll_close(self, cx) + } +} + impl Drop for Connection { @@ -233,19 +255,23 @@ impl unreachable!() }; Ok(Self { - sink: Box::pin(sink::unfold( - (conn.clone(), ty), - |(conn, accept), msg| async move { - conn.send(msg).await?; - Ok((conn, accept)) - }, - )), + sink: Self::sink(conn.clone()), conn, accept: ty, _phantom: Default::default(), }) } + /// Wrap a `WebSocketConnection` in a type that implements `Sink`. + fn sink( + conn: WebSocketConnection, + ) -> Pin>>> { + Box::pin(sink::unfold(conn, |conn, msg| async move { + conn.send(msg).await?; + Ok(conn) + })) + } + /// Project a `Pin<&mut Self>` to a pinned reference to the underlying connection. fn pinned_inner(self: Pin<&mut Self>) -> Pin<&mut WebSocketConnection> { // # Soundness @@ -269,6 +295,19 @@ impl } } +impl Clone + for Connection +{ + fn clone(&self) -> Self { + Self { + sink: Self::sink(self.conn.clone()), + conn: self.conn.clone(), + accept: self.accept, + _phantom: Default::default(), + } + } +} + pub(crate) type Handler = Box< dyn 'static + Send From c5258d30a6025278b13eb365bbaed6a26e8b142e Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Tue, 2 Jul 2024 10:53:42 -0400 Subject: [PATCH 3/4] Replace unsafe projection code with safe derive macro --- Cargo.lock | 1 + Cargo.toml | 1 + src/socket.rs | 53 ++++----------------------------------------------- 3 files changed, 6 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a99f1f2..b10e764 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3489,6 +3489,7 @@ dependencies = [ "markdown", "maud", "parking_lot", + "pin-project", "portpicker", "prometheus", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index 048d3e1..90b0ce7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ libc = "0.2" markdown = "0.3" maud = { version = "0.26", features = ["tide"] } parking_lot = "0.12" +pin-project = "1.0" prometheus = "0.13" reqwest = { version = "0.12", features = ["json"] } routefinder = "0.5" diff --git a/src/socket.rs b/src/socket.rs index 13a4952..c685940 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -20,6 +20,7 @@ use futures::{ task::{Context, Poll}, FutureExt, Sink, SinkExt, Stream, StreamExt, TryFutureExt, }; +use pin_project::pin_project; use serde::{de::DeserializeOwned, Serialize}; use std::borrow::Cow; use std::fmt::{self, Display, Formatter}; @@ -133,7 +134,9 @@ enum MessageType { /// /// [Connection] implements [Stream], which can be used to receive `FromClient` messages from the /// client, and [Sink] which can be used to send `ToClient` messages to the client. +#[pin_project] pub struct Connection { + #[pin] conn: WebSocketConnection, // [Sink] wrapper around `conn` sink: Pin>>>, @@ -150,7 +153,7 @@ impl fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // Get a `Pin<&mut WebSocketConnection>` for the underlying connection, so we can use the // `Stream` implementation of that field. - match self.pinned_inner().poll_next(cx) { + match self.project().conn.poll_next(cx) { Poll::Ready(None) => Poll::Ready(None), Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err.into()))), Poll::Ready(Some(Ok(msg))) => Poll::Ready(Some(match msg { @@ -216,32 +219,6 @@ impl Sink } } -impl Drop - for Connection -{ - fn drop(&mut self) { - // This is the idiomatic way to implement [drop] for a type that uses pinning. Since [drop] - // is implicitly called with `&mut self` even on types that were pinned, we place any - // implementation inside [inner_drop], which takes `Pin<&mut Self>`, when the commpiler will - // be able to check that we do not do anything that we couldn't have done on a - // `Pin<&mut Self>`. - // - // The [drop] implementation for this type is trivial, and it would be safe to use the - // automatically generated [drop] implementation, but we nonetheless implement [drop] - // explicitly in the idiomatic fashion so that it is impossible to accidentally implement an - // unsafe version of [drop] for this type in the future. - - // `new_unchecked` is okay because we know this value is never used again after being - // dropped. - inner_drop(unsafe { Pin::new_unchecked(self) }); - fn inner_drop( - _this: Pin<&mut Connection>, - ) { - // Any logic goes here. - } - } -} - impl Connection { @@ -271,28 +248,6 @@ impl Ok(conn) })) } - - /// Project a `Pin<&mut Self>` to a pinned reference to the underlying connection. - fn pinned_inner(self: Pin<&mut Self>) -> Pin<&mut WebSocketConnection> { - // # Soundness - // - // This implements _structural pinning_ for [Connection]. This comes with some requirements - // to maintain safety, as described at - // https://doc.rust-lang.org/std/pin/index.html#pinning-is-structural-for-field: - // - // 1. The struct must only be [Unpin] if all the structural fields are [Unpin]. This is the - // default, and we don't explicitly implement [Unpin] for [Connection]. - // 2. The destructor of the struct must not move structural fields out of its argument. This - // is enforced by the compiler in our [Drop] implementation, which follows the idiom for - // safe [Drop] implementations for pinned structs. - // 3. You must make sure that you uphold the [Drop] guarantee: once your struct is pinned, - // the memory that contains the content is not overwritten or deallocated without calling - // the content’s destructors. This is also enforced by our [Drop] implementation. - // 4. You must not offer any other operations that could lead to data being moved out of the - // structural fields when your type is pinned. There are no operations on this type that - // move out of `conn`. - unsafe { self.map_unchecked_mut(|s| &mut s.conn) } - } } impl Clone From 88fc1c87b31f2d4b88974185945ad1bb8e7c3184 Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Tue, 2 Jul 2024 11:02:16 -0400 Subject: [PATCH 4/4] Bump version --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b10e764..cb9901e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3462,7 +3462,7 @@ dependencies = [ [[package]] name = "tide-disco" -version = "0.8.0" +version = "0.9.0" dependencies = [ "anyhow", "ark-serialize", diff --git a/Cargo.toml b/Cargo.toml index 90b0ce7..7c0bd23 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tide-disco" -version = "0.8.0" +version = "0.9.0" edition = "2021" authors = ["Espresso Systems "] description = "Discoverability for Tide"