From 1e29fb981b4760535f8218ecb61a53d2af48d943 Mon Sep 17 00:00:00 2001 From: link2xt Date: Sun, 22 Sep 2024 20:27:15 +0000 Subject: [PATCH] feat: implement IMAP COMPRESS --- .github/workflows/ci.yml | 6 ++ Cargo.toml | 7 +- src/extensions/compress.rs | 193 +++++++++++++++++++++++++++++++++++++ src/extensions/mod.rs | 3 + src/lib.rs | 3 + 5 files changed, 210 insertions(+), 2 deletions(-) create mode 100644 src/extensions/compress.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ac24f85..5e520ce 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -31,6 +31,12 @@ jobs: - name: check tokio run: cargo check --workspace --all-targets --no-default-features --features runtime-tokio + - name: check compress feature with tokio + run: cargo check --workspace --all-targets --no-default-features --features runtime-tokio,compress + + - name: check compress feature with async-std + run: cargo check --workspace --all-targets --no-default-features --features runtime-async-std,compress + - name: check async-std examples working-directory: examples run: cargo check --workspace --all-targets --no-default-features --features runtime-async-std diff --git a/Cargo.toml b/Cargo.toml index 592df25..25382e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,12 +20,14 @@ is-it-maintained-open-issues = { repository = "async-email/async-imap" } [features] default = ["runtime-async-std"] +compress = ["async-compression"] -runtime-async-std = ["async-std"] -runtime-tokio = ["tokio"] +runtime-async-std = ["async-std", "async-compression?/futures-io"] +runtime-tokio = ["tokio", "async-compression?/tokio"] [dependencies] async-channel = "2.0.0" +async-compression = { version = "0.4.15", default-features = false, features = ["deflate"], optional = true } async-std = { version = "1.8.0", default-features = false, features = ["std", "unstable"], optional = true } base64 = "0.21" bytes = "1" @@ -35,6 +37,7 @@ imap-proto = "0.16.4" log = "0.4.8" nom = "7.0" once_cell = "1.8.0" +pin-project = "1" pin-utils = "0.1.0-alpha.4" self_cell = "1.0.1" stop-token = "0.7" diff --git a/src/extensions/compress.rs b/src/extensions/compress.rs new file mode 100644 index 0000000..1dba026 --- /dev/null +++ b/src/extensions/compress.rs @@ -0,0 +1,193 @@ +//! IMAP COMPRESS extension specified in [RFC4978](https://www.rfc-editor.org/rfc/rfc4978.html). + +use std::fmt; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use pin_project::pin_project; + +use crate::client::Session; +use crate::error::Result; +use crate::imap_stream::ImapStream; +use crate::types::IdGenerator; +use crate::Connection; + +#[cfg(feature = "runtime-async-std")] +use async_std::io::{IoSlice, IoSliceMut, Read, Write}; +#[cfg(feature = "runtime-async-std")] +use futures::io::BufReader; +#[cfg(feature = "runtime-tokio")] +use tokio::io::{AsyncRead as Read, AsyncWrite as Write, BufReader, ReadBuf}; + +#[cfg(feature = "runtime-tokio")] +use async_compression::tokio::bufread::DeflateDecoder; +#[cfg(feature = "runtime-tokio")] +use async_compression::tokio::write::DeflateEncoder; + +#[cfg(feature = "runtime-async-std")] +use async_compression::futures::bufread::DeflateDecoder; +#[cfg(feature = "runtime-async-std")] +use async_compression::futures::write::DeflateEncoder; + +/// Network stream compressed with DEFLATE. +#[derive(Debug)] +#[pin_project] +pub struct DeflateStream { + #[pin] + inner: DeflateDecoder>>, +} + +impl DeflateStream { + pub(crate) fn new(stream: T) -> Self { + let stream = DeflateEncoder::new(stream); + let stream = BufReader::new(stream); + let stream = DeflateDecoder::new(stream); + Self { inner: stream } + } + + /// Gets a reference to the underlying stream. + pub fn get_ref(&self) -> &T { + self.inner.get_ref().get_ref().get_ref() + } + + /// Gets a mutable reference to the underlying stream. + pub fn get_mut(&mut self) -> &mut T { + self.inner.get_mut().get_mut().get_mut() + } + + /// Consumes `DeflateStream` and returns underlying stream. + pub fn into_inner(self) -> T { + self.inner.into_inner().into_inner().into_inner() + } +} + +#[cfg(feature = "runtime-tokio")] +impl Read for DeflateStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + self.project().inner.poll_read(cx, buf) + } +} + +#[cfg(feature = "runtime-async-std")] +impl Read for DeflateStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + self.project().inner.poll_read(cx, buf) + } + + fn poll_read_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll> { + self.project().inner.poll_read_vectored(cx, bufs) + } +} + +#[cfg(feature = "runtime-tokio")] +impl Write for DeflateStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> Poll> { + self.project().inner.get_pin_mut().poll_write(cx, buf) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + self.project().inner.poll_flush(cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + self.project().inner.poll_shutdown(cx) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + self.project().inner.poll_write_vectored(cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + self.inner.is_write_vectored() + } +} + +#[cfg(feature = "runtime-async-std")] +impl Write for DeflateStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> Poll> { + self.project().inner.as_mut().poll_write(cx, buf) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + self.project().inner.poll_flush(cx) + } + + fn poll_close( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + self.project().inner.poll_close(cx) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + self.project().inner.poll_write_vectored(cx, bufs) + } +} + +impl Session { + /// Runs `COMPRESS DEFLATE` command. + pub async fn compress(self, f: F) -> Result> + where + S: Read + Write + Unpin + fmt::Debug, + F: FnOnce(DeflateStream) -> S, + { + let Self { + mut conn, + unsolicited_responses_tx, + unsolicited_responses, + } = self; + conn.run_command_and_check_ok("COMPRESS DEFLATE", Some(unsolicited_responses_tx.clone())) + .await?; + + let stream = conn.into_inner(); + let deflate_stream = DeflateStream::new(stream); + let stream = ImapStream::new(f(deflate_stream)); + let conn = Connection { + stream, + request_ids: IdGenerator::new(), + }; + let session = Session { + conn, + unsolicited_responses_tx, + unsolicited_responses, + }; + Ok(session) + } +} diff --git a/src/extensions/mod.rs b/src/extensions/mod.rs index 25fdaba..56aaa05 100644 --- a/src/extensions/mod.rs +++ b/src/extensions/mod.rs @@ -1,4 +1,7 @@ //! Implementations of various IMAP extensions. +#[cfg(feature = "compress")] +pub mod compress; + pub mod idle; pub mod quota; diff --git a/src/lib.rs b/src/lib.rs index 4f33ffa..b12c3f6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -95,6 +95,9 @@ mod imap_stream; mod parse; pub mod types; +#[cfg(feature = "compress")] +pub use crate::extensions::compress::DeflateStream; + pub use crate::authenticator::Authenticator; pub use crate::client::*;