Skip to content

Commit

Permalink
feat: implement IMAP COMPRESS
Browse files Browse the repository at this point in the history
  • Loading branch information
link2xt committed Oct 14, 2024
1 parent 1954ce4 commit 1e29fb9
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 2 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ jobs:
- name: check tokio
run: cargo check --workspace --all-targets --no-default-features --features runtime-tokio

- name: check compress feature with tokio
run: cargo check --workspace --all-targets --no-default-features --features runtime-tokio,compress

- name: check compress feature with async-std
run: cargo check --workspace --all-targets --no-default-features --features runtime-async-std,compress

- name: check async-std examples
working-directory: examples
run: cargo check --workspace --all-targets --no-default-features --features runtime-async-std
Expand Down
7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ is-it-maintained-open-issues = { repository = "async-email/async-imap" }

[features]
default = ["runtime-async-std"]
compress = ["async-compression"]

runtime-async-std = ["async-std"]
runtime-tokio = ["tokio"]
runtime-async-std = ["async-std", "async-compression?/futures-io"]
runtime-tokio = ["tokio", "async-compression?/tokio"]

[dependencies]
async-channel = "2.0.0"
async-compression = { version = "0.4.15", default-features = false, features = ["deflate"], optional = true }
async-std = { version = "1.8.0", default-features = false, features = ["std", "unstable"], optional = true }
base64 = "0.21"
bytes = "1"
Expand All @@ -35,6 +37,7 @@ imap-proto = "0.16.4"
log = "0.4.8"
nom = "7.0"
once_cell = "1.8.0"
pin-project = "1"
pin-utils = "0.1.0-alpha.4"
self_cell = "1.0.1"
stop-token = "0.7"
Expand Down
193 changes: 193 additions & 0 deletions src/extensions/compress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
//! IMAP COMPRESS extension specified in [RFC4978](https://www.rfc-editor.org/rfc/rfc4978.html).

use std::fmt;
use std::pin::Pin;
use std::task::{Context, Poll};

use pin_project::pin_project;

use crate::client::Session;
use crate::error::Result;
use crate::imap_stream::ImapStream;
use crate::types::IdGenerator;
use crate::Connection;

#[cfg(feature = "runtime-async-std")]
use async_std::io::{IoSlice, IoSliceMut, Read, Write};
#[cfg(feature = "runtime-async-std")]
use futures::io::BufReader;
#[cfg(feature = "runtime-tokio")]
use tokio::io::{AsyncRead as Read, AsyncWrite as Write, BufReader, ReadBuf};

#[cfg(feature = "runtime-tokio")]
use async_compression::tokio::bufread::DeflateDecoder;
#[cfg(feature = "runtime-tokio")]
use async_compression::tokio::write::DeflateEncoder;

#[cfg(feature = "runtime-async-std")]
use async_compression::futures::bufread::DeflateDecoder;
#[cfg(feature = "runtime-async-std")]
use async_compression::futures::write::DeflateEncoder;

/// Network stream compressed with DEFLATE.
#[derive(Debug)]
#[pin_project]
pub struct DeflateStream<T: Read + Write + Unpin + fmt::Debug> {
#[pin]
inner: DeflateDecoder<BufReader<DeflateEncoder<T>>>,
}

impl<T: Read + Write + Unpin + fmt::Debug> DeflateStream<T> {
pub(crate) fn new(stream: T) -> Self {
let stream = DeflateEncoder::new(stream);
let stream = BufReader::new(stream);
let stream = DeflateDecoder::new(stream);
Self { inner: stream }
}

/// Gets a reference to the underlying stream.
pub fn get_ref(&self) -> &T {
self.inner.get_ref().get_ref().get_ref()
}

/// Gets a mutable reference to the underlying stream.
pub fn get_mut(&mut self) -> &mut T {
self.inner.get_mut().get_mut().get_mut()
}

/// Consumes `DeflateStream` and returns underlying stream.
pub fn into_inner(self) -> T {
self.inner.into_inner().into_inner().into_inner()
}
}

#[cfg(feature = "runtime-tokio")]
impl<T: Read + Write + Unpin + fmt::Debug> Read for DeflateStream<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
self.project().inner.poll_read(cx, buf)
}
}

#[cfg(feature = "runtime-async-std")]
impl<T: Read + Write + Unpin + fmt::Debug> Read for DeflateStream<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<async_std::io::Result<usize>> {
self.project().inner.poll_read(cx, buf)
}

fn poll_read_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<async_std::io::Result<usize>> {
self.project().inner.poll_read_vectored(cx, bufs)
}
}

#[cfg(feature = "runtime-tokio")]
impl<T: Read + Write + Unpin + fmt::Debug> Write for DeflateStream<T> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
self.project().inner.get_pin_mut().poll_write(cx, buf)
}

fn poll_flush(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<std::io::Result<()>> {
self.project().inner.poll_flush(cx)
}

fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<std::io::Result<()>> {
self.project().inner.poll_shutdown(cx)
}

fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> Poll<std::io::Result<usize>> {
self.project().inner.poll_write_vectored(cx, bufs)
}

fn is_write_vectored(&self) -> bool {
self.inner.is_write_vectored()
}
}

#[cfg(feature = "runtime-async-std")]
impl<T: Read + Write + Unpin + fmt::Debug> Write for DeflateStream<T> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> Poll<async_std::io::Result<usize>> {
self.project().inner.as_mut().poll_write(cx, buf)
}

fn poll_flush(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<async_std::io::Result<()>> {
self.project().inner.poll_flush(cx)
}

fn poll_close(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<async_std::io::Result<()>> {
self.project().inner.poll_close(cx)
}

fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<async_std::io::Result<usize>> {
self.project().inner.poll_write_vectored(cx, bufs)
}
}

impl<T: Read + Write + Unpin + fmt::Debug + Send> Session<T> {
/// Runs `COMPRESS DEFLATE` command.
pub async fn compress<F, S>(self, f: F) -> Result<Session<S>>
where
S: Read + Write + Unpin + fmt::Debug,
F: FnOnce(DeflateStream<T>) -> S,
{
let Self {
mut conn,
unsolicited_responses_tx,
unsolicited_responses,
} = self;
conn.run_command_and_check_ok("COMPRESS DEFLATE", Some(unsolicited_responses_tx.clone()))
.await?;

let stream = conn.into_inner();
let deflate_stream = DeflateStream::new(stream);
let stream = ImapStream::new(f(deflate_stream));
let conn = Connection {
stream,
request_ids: IdGenerator::new(),
};
let session = Session {
conn,
unsolicited_responses_tx,
unsolicited_responses,
};
Ok(session)
}
}
3 changes: 3 additions & 0 deletions src/extensions/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
//! Implementations of various IMAP extensions.
#[cfg(feature = "compress")]
pub mod compress;

pub mod idle;

pub mod quota;
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ mod imap_stream;
mod parse;
pub mod types;

#[cfg(feature = "compress")]
pub use crate::extensions::compress::DeflateStream;

pub use crate::authenticator::Authenticator;
pub use crate::client::*;

Expand Down

0 comments on commit 1e29fb9

Please sign in to comment.