From 4d9f3606d006371792a44c9cbf35c5d13c69dad8 Mon Sep 17 00:00:00 2001 From: tesol2y090 Date: Tue, 17 Dec 2024 16:50:12 +0700 Subject: [PATCH 1/2] feat: delete rw-stream-sink dep --- Cargo.lock | 1 - core/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 31df58e8ec4..52caad58159 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2781,7 +2781,6 @@ dependencies = [ "quick-protobuf", "quickcheck-ext", "rand 0.8.5", - "rw-stream-sink", "serde", "smallvec", "thiserror 2.0.3", diff --git a/core/Cargo.toml b/core/Cargo.toml index 8ec0b0fc197..049c2b29e7d 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -25,7 +25,6 @@ parking_lot = "0.12.3" pin-project = "1.1.5" quick-protobuf = "0.8" rand = "0.8" -rw-stream-sink = { workspace = true } serde = { version = "1", optional = true, features = ["derive"] } smallvec = "1.13.2" thiserror = { workspace = true } From fc8070368ec72a3b8aec6a81c3da1bcca8d4665d Mon Sep 17 00:00:00 2001 From: tesol2y090 Date: Tue, 17 Dec 2024 16:50:24 +0700 Subject: [PATCH 2/2] feat: add RwStreamSink inlined --- core/src/transport/memory.rs | 84 +++++++++++++++++++++++++++++++++--- 1 file changed, 77 insertions(+), 7 deletions(-) diff --git a/core/src/transport/memory.rs b/core/src/transport/memory.rs index 19197ddf714..617c10b20a8 100644 --- a/core/src/transport/memory.rs +++ b/core/src/transport/memory.rs @@ -21,21 +21,17 @@ use std::{ collections::{hash_map::Entry, VecDeque}, error, fmt, io, + io::Read, num::NonZeroU64, pin::Pin, + task::{Context, Poll}, }; use fnv::FnvHashMap; -use futures::{ - channel::mpsc, - future::Ready, - prelude::*, - task::{Context, Poll}, -}; +use futures::{channel::mpsc, future::Ready, prelude::*, ready}; use multiaddr::{Multiaddr, Protocol}; use once_cell::sync::Lazy; use parking_lot::Mutex; -use rw_stream_sink::RwStreamSink; use crate::transport::{DialOpts, ListenerId, Transport, TransportError, TransportEvent}; @@ -329,6 +325,80 @@ fn parse_memory_addr(a: &Multiaddr) -> Result { } } +#[pin_project::pin_project] +pub struct RwStreamSink { + #[pin] + inner: S, + current_item: Option::Ok>>, +} + +impl RwStreamSink { + /// Wraps around `inner`. + pub fn new(inner: S) -> Self { + RwStreamSink { + inner, + current_item: None, + } + } +} + +impl AsyncRead for RwStreamSink +where + S: TryStream, + ::Ok: AsRef<[u8]>, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut [u8], + ) -> Poll> { + let mut this = self.project(); + + // Grab the item to copy from. + let item_to_copy = loop { + if let Some(ref mut i) = this.current_item { + if i.position() < i.get_ref().as_ref().len() as u64 { + break i; + } + } + *this.current_item = Some(match ready!(this.inner.as_mut().try_poll_next(cx)) { + Some(Ok(i)) => std::io::Cursor::new(i), + Some(Err(e)) => return Poll::Ready(Err(e)), + None => return Poll::Ready(Ok(0)), // EOF + }); + }; + + // Copy it! + Poll::Ready(Ok(item_to_copy.read(buf)?)) + } +} + +impl AsyncWrite for RwStreamSink +where + S: TryStream + Sink<::Ok, Error = io::Error>, + ::Ok: for<'r> From<&'r [u8]>, +{ + fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { + let mut this = self.project(); + ready!(this.inner.as_mut().poll_ready(cx)?); + let n = buf.len(); + if let Err(e) = this.inner.start_send(buf.into()) { + return Poll::Ready(Err(e)); + } + Poll::Ready(Ok(n)) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.project(); + this.inner.poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.project(); + this.inner.poll_close(cx) + } +} + /// A channel represents an established, in-memory, logical connection between two endpoints. /// /// Implements `AsyncRead` and `AsyncWrite`.