diff --git a/src/futures/bufread/generic/decoder.rs b/src/futures/bufread/generic/decoder.rs index 60b0f69a..9ed1e637 100644 --- a/src/futures/bufread/generic/decoder.rs +++ b/src/futures/bufread/generic/decoder.rs @@ -2,11 +2,11 @@ use core::{ pin::Pin, task::{Context, Poll}, }; -use std::io::Result; +use std::io::{IoSlice, Result}; use crate::{codec::Decode, util::PartialBuffer}; use futures_core::ready; -use futures_io::{AsyncBufRead, AsyncRead}; +use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite}; use pin_project_lite::pin_project; #[derive(Debug)] @@ -19,7 +19,7 @@ enum State { pin_project! { #[derive(Debug)] - pub struct Decoder { + pub struct Decoder { #[pin] reader: R, decoder: D, @@ -28,16 +28,7 @@ pin_project! { } } -impl Decoder { - pub fn new(reader: R, decoder: D) -> Self { - Self { - reader, - decoder, - state: State::Decoding, - multiple_members: false, - } - } - +impl Decoder { pub fn get_ref(&self) -> &R { &self.reader } @@ -57,6 +48,17 @@ impl Decoder { pub fn multiple_members(&mut self, enabled: bool) { self.multiple_members = enabled; } +} + +impl Decoder { + pub fn new(reader: R, decoder: D) -> Self { + Self { + reader, + decoder, + state: State::Decoding, + multiple_members: false, + } + } fn do_poll_read( self: Pin<&mut Self>, @@ -158,3 +160,25 @@ impl AsyncRead for Decoder { } } } + +impl AsyncWrite for Decoder { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + self.get_pin_mut().poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_pin_mut().poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_pin_mut().poll_close(cx) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + self.get_pin_mut().poll_write_vectored(cx, bufs) + } +} diff --git a/src/futures/bufread/generic/encoder.rs b/src/futures/bufread/generic/encoder.rs index b14c5490..3254aa7b 100644 --- a/src/futures/bufread/generic/encoder.rs +++ b/src/futures/bufread/generic/encoder.rs @@ -6,7 +6,7 @@ use std::io::Result; use crate::{codec::Encode, util::PartialBuffer}; use futures_core::ready; -use futures_io::{AsyncBufRead, AsyncRead}; +use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite, IoSlice}; use pin_project_lite::pin_project; #[derive(Debug)] @@ -18,7 +18,7 @@ enum State { pin_project! { #[derive(Debug)] - pub struct Encoder { + pub struct Encoder { #[pin] reader: R, encoder: E, @@ -26,15 +26,7 @@ pin_project! { } } -impl Encoder { - pub fn new(reader: R, encoder: E) -> Self { - Self { - reader, - encoder, - state: State::Encoding, - } - } - +impl Encoder { pub fn get_ref(&self) -> &R { &self.reader } @@ -54,6 +46,16 @@ impl Encoder { pub fn into_inner(self) -> R { self.reader } +} + +impl Encoder { + pub fn new(reader: R, encoder: E) -> Self { + Self { + reader, + encoder, + state: State::Encoding, + } + } fn do_poll_read( self: Pin<&mut Self>, @@ -115,3 +117,25 @@ impl AsyncRead for Encoder { } } } + +impl AsyncWrite for Encoder { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + self.get_pin_mut().poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_pin_mut().poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_pin_mut().poll_close(cx) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + self.get_pin_mut().poll_write_vectored(cx, bufs) + } +} diff --git a/src/futures/bufread/macros/decoder.rs b/src/futures/bufread/macros/decoder.rs index 6b9a1b3c..2d037f63 100644 --- a/src/futures/bufread/macros/decoder.rs +++ b/src/futures/bufread/macros/decoder.rs @@ -22,7 +22,9 @@ macro_rules! decoder { } $($($inherent_methods)*)* + } + impl<$inner> $name<$inner> { /// Configure multi-member/frame decoding, if enabled this will reset the decoder state /// when reaching the end of a compressed member/frame and expect either EOF or another /// compressed member/frame to follow it in the stream. @@ -72,6 +74,38 @@ macro_rules! decoder { } } + impl<$inner: futures_io::AsyncWrite> futures_io::AsyncWrite for $name<$inner> { + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + self.get_pin_mut().poll_write(cx, buf) + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.get_pin_mut().poll_flush(cx) + } + + fn poll_close( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.get_pin_mut().poll_flush(cx) + } + + fn poll_write_vectored( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + bufs: &[std::io::IoSlice<'_>] + ) -> std::task::Poll> { + self.get_pin_mut().poll_write_vectored(cx, bufs) + } + } + const _: () = { fn _assert() { use crate::util::{_assert_send, _assert_sync}; diff --git a/src/futures/bufread/macros/encoder.rs b/src/futures/bufread/macros/encoder.rs index 2d38f8a9..f6731eb9 100644 --- a/src/futures/bufread/macros/encoder.rs +++ b/src/futures/bufread/macros/encoder.rs @@ -62,6 +62,38 @@ macro_rules! encoder { } } + impl<$inner: futures_io::AsyncWrite> futures_io::AsyncWrite for $name<$inner> { + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + self.project().inner.poll_write(cx, buf) + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.project().inner.poll_flush(cx) + } + + fn poll_close( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.project().inner.poll_flush(cx) + } + + fn poll_write_vectored( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + bufs: &[std::io::IoSlice<'_>] + ) -> std::task::Poll> { + self.project().inner.poll_write_vectored(cx, bufs) + } + } + const _: () = { fn _assert() { use crate::util::{_assert_send, _assert_sync}; diff --git a/src/futures/write/buf_writer.rs b/src/futures/write/buf_writer.rs index f1c0d866..f3948c6b 100644 --- a/src/futures/write/buf_writer.rs +++ b/src/futures/write/buf_writer.rs @@ -113,7 +113,9 @@ impl BufWriter { *this.written = 0; Poll::Ready(ret) } +} +impl BufWriter { /// Gets a reference to the underlying writer. pub fn get_ref(&self) -> &W { &self.inner diff --git a/src/futures/write/generic/decoder.rs b/src/futures/write/generic/decoder.rs index 2c925c5a..2e29a5eb 100644 --- a/src/futures/write/generic/decoder.rs +++ b/src/futures/write/generic/decoder.rs @@ -10,7 +10,7 @@ use crate::{ util::PartialBuffer, }; use futures_core::ready; -use futures_io::AsyncWrite; +use futures_io::{AsyncRead, AsyncWrite, IoSliceMut}; use pin_project_lite::pin_project; #[derive(Debug)] @@ -22,7 +22,7 @@ enum State { pin_project! { #[derive(Debug)] - pub struct Decoder { + pub struct Decoder { #[pin] writer: BufWriter, decoder: D, @@ -30,15 +30,7 @@ pin_project! { } } -impl Decoder { - pub fn new(writer: W, decoder: D) -> Self { - Self { - writer: BufWriter::new(writer), - decoder, - state: State::Decoding, - } - } - +impl Decoder { pub fn get_ref(&self) -> &W { self.writer.get_ref() } @@ -54,6 +46,16 @@ impl Decoder { pub fn into_inner(self) -> W { self.writer.into_inner() } +} + +impl Decoder { + pub fn new(writer: W, decoder: D) -> Self { + Self { + writer: BufWriter::new(writer), + decoder, + state: State::Decoding, + } + } fn do_poll_write( self: Pin<&mut Self>, @@ -182,3 +184,21 @@ impl AsyncWrite for Decoder { } } } + +impl AsyncRead for Decoder { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + self.get_pin_mut().poll_read(cx, buf) + } + + fn poll_read_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll> { + self.get_pin_mut().poll_read_vectored(cx, bufs) + } +} diff --git a/src/futures/write/generic/encoder.rs b/src/futures/write/generic/encoder.rs index 2b9d95e0..e4627893 100644 --- a/src/futures/write/generic/encoder.rs +++ b/src/futures/write/generic/encoder.rs @@ -10,7 +10,7 @@ use crate::{ util::PartialBuffer, }; use futures_core::ready; -use futures_io::AsyncWrite; +use futures_io::{AsyncRead, AsyncWrite, IoSliceMut}; use pin_project_lite::pin_project; #[derive(Debug)] @@ -22,7 +22,7 @@ enum State { pin_project! { #[derive(Debug)] - pub struct Encoder { + pub struct Encoder { #[pin] writer: BufWriter, encoder: E, @@ -30,15 +30,7 @@ pin_project! { } } -impl Encoder { - pub fn new(writer: W, encoder: E) -> Self { - Self { - writer: BufWriter::new(writer), - encoder, - state: State::Encoding, - } - } - +impl Encoder { pub fn get_ref(&self) -> &W { self.writer.get_ref() } @@ -58,6 +50,16 @@ impl Encoder { pub fn into_inner(self) -> W { self.writer.into_inner() } +} + +impl Encoder { + pub fn new(writer: W, encoder: E) -> Self { + Self { + writer: BufWriter::new(writer), + encoder, + state: State::Encoding, + } + } fn do_poll_write( self: Pin<&mut Self>, @@ -179,3 +181,21 @@ impl AsyncWrite for Encoder { Poll::Ready(Ok(())) } } + +impl AsyncRead for Encoder { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + self.get_pin_mut().poll_read(cx, buf) + } + + fn poll_read_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll> { + self.get_pin_mut().poll_read_vectored(cx, bufs) + } +} diff --git a/src/futures/write/macros/decoder.rs b/src/futures/write/macros/decoder.rs index 496962ca..fed59431 100644 --- a/src/futures/write/macros/decoder.rs +++ b/src/futures/write/macros/decoder.rs @@ -12,17 +12,7 @@ macro_rules! decoder { } } - impl<$inner: futures_io::AsyncWrite> $name<$inner> { - /// Creates a new decoder which will take in compressed data and write it uncompressed - /// to the given stream. - pub fn new(read: $inner) -> $name<$inner> { - $name { - inner: crate::futures::write::Decoder::new(read, crate::codec::$name::new()), - } - } - - $($($inherent_methods)*)* - + impl<$inner> $name<$inner> { /// Acquires a reference to the underlying reader that this decoder is wrapping. pub fn get_ref(&self) -> &$inner { self.inner.get_ref() @@ -55,6 +45,18 @@ macro_rules! decoder { } } + impl<$inner: futures_io::AsyncWrite> $name<$inner> { + /// Creates a new decoder which will take in compressed data and write it uncompressed + /// to the given stream. + pub fn new(read: $inner) -> $name<$inner> { + $name { + inner: crate::futures::write::Decoder::new(read, crate::codec::$name::new()), + } + } + + $($($inherent_methods)*)* + } + impl<$inner: futures_io::AsyncWrite> futures_io::AsyncWrite for $name<$inner> { fn poll_write( self: std::pin::Pin<&mut Self>, @@ -79,6 +81,24 @@ macro_rules! decoder { } } + impl<$inner: futures_io::AsyncRead> futures_io::AsyncRead for $name<$inner> { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut [u8] + ) -> std::task::Poll> { + self.get_pin_mut().poll_read(cx, buf) + } + + fn poll_read_vectored( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + bufs: &mut [futures_io::IoSliceMut<'_>] + ) -> std::task::Poll> { + self.get_pin_mut().poll_read_vectored(cx, bufs) + } + } + const _: () = { fn _assert() { use crate::util::{_assert_send, _assert_sync}; diff --git a/src/futures/write/macros/encoder.rs b/src/futures/write/macros/encoder.rs index 82cf75ba..405016a2 100644 --- a/src/futures/write/macros/encoder.rs +++ b/src/futures/write/macros/encoder.rs @@ -19,7 +19,9 @@ macro_rules! encoder { /// $($inherent_methods)* )* + } + impl<$inner> $name<$inner> { /// Acquires a reference to the underlying writer that this encoder is wrapping. pub fn get_ref(&self) -> &$inner { self.inner.get_ref() @@ -76,6 +78,24 @@ macro_rules! encoder { } } + impl<$inner: futures_io::AsyncRead> futures_io::AsyncRead for $name<$inner> { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut [u8] + ) -> std::task::Poll> { + self.get_pin_mut().poll_read(cx, buf) + } + + fn poll_read_vectored( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + bufs: &mut [futures_io::IoSliceMut<'_>] + ) -> std::task::Poll> { + self.get_pin_mut().poll_read_vectored(cx, bufs) + } + } + const _: () = { fn _assert() { use crate::util::{_assert_send, _assert_sync}; diff --git a/src/tokio/bufread/generic/decoder.rs b/src/tokio/bufread/generic/decoder.rs index c5c16d18..abf0cfa5 100644 --- a/src/tokio/bufread/generic/decoder.rs +++ b/src/tokio/bufread/generic/decoder.rs @@ -2,12 +2,12 @@ use core::{ pin::Pin, task::{Context, Poll}, }; -use std::io::Result; +use std::io::{IoSlice, Result}; use crate::{codec::Decode, util::PartialBuffer}; use futures_core::ready; use pin_project_lite::pin_project; -use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf}; +use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}; #[derive(Debug)] enum State { @@ -19,7 +19,7 @@ enum State { pin_project! { #[derive(Debug)] - pub struct Decoder { + pub struct Decoder { #[pin] reader: R, decoder: D, @@ -28,16 +28,7 @@ pin_project! { } } -impl Decoder { - pub fn new(reader: R, decoder: D) -> Self { - Self { - reader, - decoder, - state: State::Decoding, - multiple_members: false, - } - } - +impl Decoder { pub fn get_ref(&self) -> &R { &self.reader } @@ -57,6 +48,17 @@ impl Decoder { pub fn multiple_members(&mut self, enabled: bool) { self.multiple_members = enabled; } +} + +impl Decoder { + pub fn new(reader: R, decoder: D) -> Self { + Self { + reader, + decoder, + state: State::Decoding, + multiple_members: false, + } + } fn do_poll_read( self: Pin<&mut Self>, @@ -162,3 +164,33 @@ impl AsyncRead for Decoder { } } } + +impl AsyncWrite for Decoder { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.get_pin_mut().poll_write(cx, buf) + } + + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + mut bufs: &[IoSlice<'_>], + ) -> Poll> { + self.get_pin_mut().poll_write_vectored(cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + self.get_ref().is_write_vectored() + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_pin_mut().poll_flush(cx) + } + + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_pin_mut().poll_shutdown(cx) + } +} diff --git a/src/tokio/bufread/generic/encoder.rs b/src/tokio/bufread/generic/encoder.rs index 729e34b7..6cfb1e4a 100644 --- a/src/tokio/bufread/generic/encoder.rs +++ b/src/tokio/bufread/generic/encoder.rs @@ -2,12 +2,12 @@ use core::{ pin::Pin, task::{Context, Poll}, }; -use std::io::Result; +use std::io::{IoSlice, Result}; use crate::{codec::Encode, util::PartialBuffer}; use futures_core::ready; use pin_project_lite::pin_project; -use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf}; +use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}; #[derive(Debug)] enum State { @@ -18,7 +18,7 @@ enum State { pin_project! { #[derive(Debug)] - pub struct Encoder { + pub struct Encoder { #[pin] reader: R, encoder: E, @@ -26,15 +26,7 @@ pin_project! { } } -impl Encoder { - pub fn new(reader: R, encoder: E) -> Self { - Self { - reader, - encoder, - state: State::Encoding, - } - } - +impl Encoder { pub fn get_ref(&self) -> &R { &self.reader } @@ -54,6 +46,16 @@ impl Encoder { pub fn into_inner(self) -> R { self.reader } +} + +impl Encoder { + pub fn new(reader: R, encoder: E) -> Self { + Self { + reader, + encoder, + state: State::Encoding, + } + } fn do_poll_read( self: Pin<&mut Self>, @@ -119,3 +121,33 @@ impl AsyncRead for Encoder { } } } + +impl AsyncWrite for Encoder { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.get_pin_mut().poll_write(cx, buf) + } + + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + mut bufs: &[IoSlice<'_>], + ) -> Poll> { + self.get_pin_mut().poll_write_vectored(cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + self.get_ref().is_write_vectored() + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_pin_mut().poll_flush(cx) + } + + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_pin_mut().poll_shutdown(cx) + } +} diff --git a/src/tokio/bufread/macros/decoder.rs b/src/tokio/bufread/macros/decoder.rs index bd66951d..a3c36ac1 100644 --- a/src/tokio/bufread/macros/decoder.rs +++ b/src/tokio/bufread/macros/decoder.rs @@ -22,7 +22,9 @@ macro_rules! decoder { } $($($inherent_methods)*)* + } + impl<$inner> $name<$inner> { /// Configure multi-member/frame decoding, if enabled this will reset the decoder state /// when reaching the end of a compressed member/frame and expect either EOF or another /// compressed member/frame to follow it in the stream. @@ -72,6 +74,42 @@ macro_rules! decoder { } } + impl<$inner: tokio::io::AsyncWrite> tokio::io::AsyncWrite for $name<$inner> { + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + self.get_pin_mut().poll_write(cx, buf) + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.get_pin_mut().poll_flush(cx) + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.get_pin_mut().poll_shutdown(cx) + } + + fn poll_write_vectored( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> std::task::Poll> { + self.get_pin_mut().poll_write_vectored(cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + self.get_ref().is_write_vectored() + } + } + const _: () = { fn _assert() { use crate::util::{_assert_send, _assert_sync}; diff --git a/src/tokio/bufread/macros/encoder.rs b/src/tokio/bufread/macros/encoder.rs index 7c411d46..f43f0631 100644 --- a/src/tokio/bufread/macros/encoder.rs +++ b/src/tokio/bufread/macros/encoder.rs @@ -19,7 +19,9 @@ macro_rules! encoder { /// $($inherent_methods)* )* + } + impl<$inner> $name<$inner> { /// Acquires a reference to the underlying reader that this encoder is wrapping. pub fn get_ref(&self) -> &$inner { self.inner.get_ref() @@ -62,6 +64,42 @@ macro_rules! encoder { } } + impl<$inner: tokio::io::AsyncWrite> tokio::io::AsyncWrite for $name<$inner> { + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + self.get_pin_mut().poll_write(cx, buf) + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.get_pin_mut().poll_flush(cx) + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.get_pin_mut().poll_shutdown(cx) + } + + fn poll_write_vectored( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> std::task::Poll> { + self.get_pin_mut().poll_write_vectored(cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + self.get_ref().is_write_vectored() + } + } + const _: () = { fn _assert() { use crate::util::{_assert_send, _assert_sync}; diff --git a/src/tokio/write/buf_writer.rs b/src/tokio/write/buf_writer.rs index 3f76e81b..620ed859 100644 --- a/src/tokio/write/buf_writer.rs +++ b/src/tokio/write/buf_writer.rs @@ -113,7 +113,9 @@ impl BufWriter { *this.written = 0; Poll::Ready(ret) } +} +impl BufWriter { /// Gets a reference to the underlying writer. pub fn get_ref(&self) -> &W { &self.inner diff --git a/src/tokio/write/generic/decoder.rs b/src/tokio/write/generic/decoder.rs index 5ab34123..fa99a566 100644 --- a/src/tokio/write/generic/decoder.rs +++ b/src/tokio/write/generic/decoder.rs @@ -11,7 +11,7 @@ use crate::{ }; use futures_core::ready; use pin_project_lite::pin_project; -use tokio::io::AsyncWrite; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; #[derive(Debug)] enum State { @@ -22,7 +22,7 @@ enum State { pin_project! { #[derive(Debug)] - pub struct Decoder { + pub struct Decoder { #[pin] writer: BufWriter, decoder: D, @@ -30,15 +30,7 @@ pin_project! { } } -impl Decoder { - pub fn new(writer: W, decoder: D) -> Self { - Self { - writer: BufWriter::new(writer), - decoder, - state: State::Decoding, - } - } - +impl Decoder { pub fn get_ref(&self) -> &W { self.writer.get_ref() } @@ -54,6 +46,16 @@ impl Decoder { pub fn into_inner(self) -> W { self.writer.into_inner() } +} + +impl Decoder { + pub fn new(writer: W, decoder: D) -> Self { + Self { + writer: BufWriter::new(writer), + decoder, + state: State::Decoding, + } + } fn do_poll_write( self: Pin<&mut Self>, @@ -182,3 +184,13 @@ impl AsyncWrite for Decoder { } } } + +impl AsyncRead for Decoder { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + self.get_pin_mut().poll_read(cx, buf) + } +} diff --git a/src/tokio/write/generic/encoder.rs b/src/tokio/write/generic/encoder.rs index a722b872..5e5321d6 100644 --- a/src/tokio/write/generic/encoder.rs +++ b/src/tokio/write/generic/encoder.rs @@ -11,7 +11,7 @@ use crate::{ }; use futures_core::ready; use pin_project_lite::pin_project; -use tokio::io::AsyncWrite; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; #[derive(Debug)] enum State { @@ -22,7 +22,7 @@ enum State { pin_project! { #[derive(Debug)] - pub struct Encoder { + pub struct Encoder { #[pin] writer: BufWriter, encoder: E, @@ -30,15 +30,7 @@ pin_project! { } } -impl Encoder { - pub fn new(writer: W, encoder: E) -> Self { - Self { - writer: BufWriter::new(writer), - encoder, - state: State::Encoding, - } - } - +impl Encoder { pub fn get_ref(&self) -> &W { self.writer.get_ref() } @@ -58,6 +50,16 @@ impl Encoder { pub fn into_inner(self) -> W { self.writer.into_inner() } +} + +impl Encoder { + pub fn new(writer: W, encoder: E) -> Self { + Self { + writer: BufWriter::new(writer), + encoder, + state: State::Encoding, + } + } fn do_poll_write( self: Pin<&mut Self>, @@ -179,3 +181,13 @@ impl AsyncWrite for Encoder { Poll::Ready(Ok(())) } } + +impl AsyncRead for Encoder { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + self.get_pin_mut().poll_read(cx, buf) + } +} diff --git a/src/tokio/write/macros/decoder.rs b/src/tokio/write/macros/decoder.rs index 7a36eaa8..19e69779 100644 --- a/src/tokio/write/macros/decoder.rs +++ b/src/tokio/write/macros/decoder.rs @@ -22,7 +22,9 @@ macro_rules! decoder { } $($($inherent_methods)*)* + } + impl<$inner> $name<$inner> { /// Acquires a reference to the underlying reader that this decoder is wrapping. pub fn get_ref(&self) -> &$inner { self.inner.get_ref() @@ -79,6 +81,16 @@ macro_rules! decoder { } } + impl<$inner: tokio::io::AsyncRead> tokio::io::AsyncRead for $name<$inner> { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + self.get_pin_mut().poll_read(cx, buf) + } + } + const _: () = { fn _assert() { use crate::util::{_assert_send, _assert_sync}; diff --git a/src/tokio/write/macros/encoder.rs b/src/tokio/write/macros/encoder.rs index 39ab12d6..46dbf320 100644 --- a/src/tokio/write/macros/encoder.rs +++ b/src/tokio/write/macros/encoder.rs @@ -19,7 +19,9 @@ macro_rules! encoder { /// $($inherent_methods)* )* + } + impl<$inner> $name<$inner> { /// Acquires a reference to the underlying writer that this encoder is wrapping. pub fn get_ref(&self) -> &$inner { self.inner.get_ref() @@ -76,6 +78,16 @@ macro_rules! encoder { } } + impl<$inner: tokio::io::AsyncRead> tokio::io::AsyncRead for $name<$inner> { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + self.get_pin_mut().poll_read(cx, buf) + } + } + const _: () = { fn _assert() { use crate::util::{_assert_send, _assert_sync};