Skip to content

Commit

Permalink
Pass through AsyncRead and AsyncWrite traits
Browse files Browse the repository at this point in the history
  • Loading branch information
link2xt committed Oct 13, 2024
1 parent 9d9fe02 commit cc91be3
Show file tree
Hide file tree
Showing 18 changed files with 496 additions and 110 deletions.
50 changes: 37 additions & 13 deletions src/futures/bufread/generic/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ use core::{
pin::Pin,
task::{Context, Poll},
};
use std::io::Result;
use std::io::{IoSlice, Result};

use crate::{codec::Decode, util::PartialBuffer};
use futures_core::ready;
use futures_io::{AsyncBufRead, AsyncRead};
use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
use pin_project_lite::pin_project;

#[derive(Debug)]
Expand All @@ -19,7 +19,7 @@ enum State {

pin_project! {
#[derive(Debug)]
pub struct Decoder<R, D: Decode> {
pub struct Decoder<R, D> {
#[pin]
reader: R,
decoder: D,
Expand All @@ -28,16 +28,7 @@ pin_project! {
}
}

impl<R: AsyncBufRead, D: Decode> Decoder<R, D> {
pub fn new(reader: R, decoder: D) -> Self {
Self {
reader,
decoder,
state: State::Decoding,
multiple_members: false,
}
}

impl<R, D> Decoder<R, D> {
pub fn get_ref(&self) -> &R {
&self.reader
}
Expand All @@ -57,6 +48,17 @@ impl<R: AsyncBufRead, D: Decode> Decoder<R, D> {
pub fn multiple_members(&mut self, enabled: bool) {
self.multiple_members = enabled;
}
}

impl<R: AsyncBufRead, D: Decode> Decoder<R, D> {
pub fn new(reader: R, decoder: D) -> Self {
Self {
reader,
decoder,
state: State::Decoding,
multiple_members: false,
}
}

fn do_poll_read(
self: Pin<&mut Self>,
Expand Down Expand Up @@ -158,3 +160,25 @@ impl<R: AsyncBufRead, D: Decode> AsyncRead for Decoder<R, D> {
}
}
}

impl<R: AsyncWrite, D> AsyncWrite for Decoder<R, D> {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
self.get_pin_mut().poll_write(cx, buf)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.get_pin_mut().poll_flush(cx)
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.get_pin_mut().poll_close(cx)
}

fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<Result<usize>> {
self.get_pin_mut().poll_write_vectored(cx, bufs)
}
}
46 changes: 35 additions & 11 deletions src/futures/bufread/generic/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::io::Result;

use crate::{codec::Encode, util::PartialBuffer};
use futures_core::ready;
use futures_io::{AsyncBufRead, AsyncRead};
use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite, IoSlice};
use pin_project_lite::pin_project;

#[derive(Debug)]
Expand All @@ -18,23 +18,15 @@ enum State {

pin_project! {
#[derive(Debug)]
pub struct Encoder<R, E: Encode> {
pub struct Encoder<R, E> {
#[pin]
reader: R,
encoder: E,
state: State,
}
}

impl<R: AsyncBufRead, E: Encode> Encoder<R, E> {
pub fn new(reader: R, encoder: E) -> Self {
Self {
reader,
encoder,
state: State::Encoding,
}
}

impl<R, E> Encoder<R, E> {
pub fn get_ref(&self) -> &R {
&self.reader
}
Expand All @@ -54,6 +46,16 @@ impl<R: AsyncBufRead, E: Encode> Encoder<R, E> {
pub fn into_inner(self) -> R {
self.reader
}
}

impl<R: AsyncBufRead, E: Encode> Encoder<R, E> {
pub fn new(reader: R, encoder: E) -> Self {
Self {
reader,
encoder,
state: State::Encoding,
}
}

fn do_poll_read(
self: Pin<&mut Self>,
Expand Down Expand Up @@ -115,3 +117,25 @@ impl<R: AsyncBufRead, E: Encode> AsyncRead for Encoder<R, E> {
}
}
}

impl<R: AsyncWrite, E> AsyncWrite for Encoder<R, E> {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
self.get_pin_mut().poll_write(cx, buf)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.get_pin_mut().poll_flush(cx)
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.get_pin_mut().poll_close(cx)
}

fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<Result<usize>> {
self.get_pin_mut().poll_write_vectored(cx, bufs)
}
}
34 changes: 34 additions & 0 deletions src/futures/bufread/macros/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ macro_rules! decoder {
}

$($($inherent_methods)*)*
}

impl<$inner> $name<$inner> {
/// Configure multi-member/frame decoding, if enabled this will reset the decoder state
/// when reaching the end of a compressed member/frame and expect either EOF or another
/// compressed member/frame to follow it in the stream.
Expand Down Expand Up @@ -72,6 +74,38 @@ macro_rules! decoder {
}
}

impl<$inner: futures_io::AsyncWrite> futures_io::AsyncWrite for $name<$inner> {
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
self.get_pin_mut().poll_write(cx, buf)
}

fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
self.get_pin_mut().poll_flush(cx)
}

fn poll_close(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
self.get_pin_mut().poll_flush(cx)
}

fn poll_write_vectored(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
bufs: &[std::io::IoSlice<'_>]
) -> std::task::Poll<std::io::Result<usize>> {
self.get_pin_mut().poll_write_vectored(cx, bufs)
}
}

const _: () = {
fn _assert() {
use crate::util::{_assert_send, _assert_sync};
Expand Down
32 changes: 32 additions & 0 deletions src/futures/bufread/macros/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,38 @@ macro_rules! encoder {
}
}

impl<$inner: futures_io::AsyncWrite> futures_io::AsyncWrite for $name<$inner> {
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
self.project().inner.poll_write(cx, buf)
}

fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
self.project().inner.poll_flush(cx)
}

fn poll_close(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
self.project().inner.poll_flush(cx)
}

fn poll_write_vectored(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
bufs: &[std::io::IoSlice<'_>]
) -> std::task::Poll<std::io::Result<usize>> {
self.project().inner.poll_write_vectored(cx, bufs)
}
}

const _: () = {
fn _assert() {
use crate::util::{_assert_send, _assert_sync};
Expand Down
2 changes: 2 additions & 0 deletions src/futures/write/buf_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ impl<W: AsyncWrite> BufWriter<W> {
*this.written = 0;
Poll::Ready(ret)
}
}

impl<W> BufWriter<W> {
/// Gets a reference to the underlying writer.
pub fn get_ref(&self) -> &W {
&self.inner
Expand Down
42 changes: 31 additions & 11 deletions src/futures/write/generic/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
util::PartialBuffer,
};
use futures_core::ready;
use futures_io::AsyncWrite;
use futures_io::{AsyncRead, AsyncWrite, IoSliceMut};
use pin_project_lite::pin_project;

#[derive(Debug)]
Expand All @@ -22,23 +22,15 @@ enum State {

pin_project! {
#[derive(Debug)]
pub struct Decoder<W, D: Decode> {
pub struct Decoder<W, D> {
#[pin]
writer: BufWriter<W>,
decoder: D,
state: State,
}
}

impl<W: AsyncWrite, D: Decode> Decoder<W, D> {
pub fn new(writer: W, decoder: D) -> Self {
Self {
writer: BufWriter::new(writer),
decoder,
state: State::Decoding,
}
}

impl<W, D> Decoder<W, D> {
pub fn get_ref(&self) -> &W {
self.writer.get_ref()
}
Expand All @@ -54,6 +46,16 @@ impl<W: AsyncWrite, D: Decode> Decoder<W, D> {
pub fn into_inner(self) -> W {
self.writer.into_inner()
}
}

impl<W: AsyncWrite, D: Decode> Decoder<W, D> {
pub fn new(writer: W, decoder: D) -> Self {
Self {
writer: BufWriter::new(writer),
decoder,
state: State::Decoding,
}
}

fn do_poll_write(
self: Pin<&mut Self>,
Expand Down Expand Up @@ -182,3 +184,21 @@ impl<W: AsyncWrite, D: Decode> AsyncWrite for Decoder<W, D> {
}
}
}

impl<W: AsyncRead, D> AsyncRead for Decoder<W, D> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
self.get_pin_mut().poll_read(cx, buf)
}

fn poll_read_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
self.get_pin_mut().poll_read_vectored(cx, bufs)
}
}
Loading

0 comments on commit cc91be3

Please sign in to comment.