From 45817aa06c7777fced00768df24c3440a3cd1df1 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Fri, 5 Jul 2024 07:20:11 +0200 Subject: [PATCH] feat: add "alloc" feature --- Cargo.toml | 30 ++++++++++++++++-------------- src/buffer.rs | 10 +++++----- src/buffer/array.rs | 2 +- src/buffer/vec.rs | 3 ++- src/error.rs | 2 +- src/lib.rs | 11 +++-------- src/queue.rs | 9 +++++---- src/synchronized.rs | 27 +++++++++++++++++++++++---- src/utils.rs | 2 +- src/write.rs | 5 ++--- src/write/vec.rs | 3 ++- 11 files changed, 61 insertions(+), 43 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 45a7c76..976b52d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,29 +7,31 @@ description = "A buffering MPSC queue." homepage = "https://github.com/wyfo/swap-buffer-queue" readme = "README.md" keywords = [ - "atomic", - "lock-free", - "no-std", - "mpsc", - "async", + "atomic", + "lock-free", + "no-std", + "mpsc", + "async", ] categories = [ - "concurrency", - "data-structures", - "no-std", + "concurrency", + "data-structures", + "no-std", ] license = "MIT" repository = "https://github.com/wyfo/swap-buffer-queue" [features] -default = ["stream"] -std = [] -stream = ["std", "dep:futures"] +default = ["std"] +alloc = [] +std = ["alloc"] +stream = ["dep:futures-core", "dep:futures-util"] write = [] [dependencies] crossbeam-utils = { version = "0.8", default-features = false } -futures = { version = "0.3", optional = true } +futures-core = { version = "0.3", default-features = false, optional = true } +futures-util = { version = "0.3", default-features = false, optional = true } [dev-dependencies] tokio-test = "0.4" @@ -45,6 +47,6 @@ unexpected_cfgs = { level = "warn", check-cfg = ['cfg(loom)'] } [package.metadata.docs.rs] all-features = true rustdoc-args = [ - "--cfg", - "docsrs", + "--cfg", + "docsrs", ] diff --git a/src/buffer.rs b/src/buffer.rs index 115f26f..2ea8518 100644 --- a/src/buffer.rs +++ b/src/buffer.rs @@ -1,6 +1,6 @@ //! [`Buffer`] definition and simple implementations. -use std::{ +use core::{ fmt, iter::FusedIterator, marker::PhantomData, @@ -12,12 +12,11 @@ use std::{ use crate::queue::Queue; mod array; -#[cfg(feature = "std")] -#[cfg_attr(docsrs, doc(cfg(feature = "std")))] +#[cfg(feature = "alloc")] mod vec; pub use array::ArrayBuffer; -#[cfg(feature = "std")] +#[cfg(feature = "alloc")] pub use vec::VecBuffer; /// [`Queue`] buffer. It is used together with [`InsertIntoBuffer`]. @@ -114,8 +113,9 @@ where // don't loop on iterator, because `ExactSizeIterator` is not a sufficient guarantee // for unsafe code for i in index..(index + self.0.len()) { + const ERROR: &str = "iterator exhausted before reaching its exact size"; // SAFETY: function contract encompass `CellBuffer::insert` one - unsafe { buffer.insert(i, self.0.next().unwrap()) }; + unsafe { buffer.insert(i, self.0.next().expect(ERROR)) }; } } } diff --git a/src/buffer/array.rs b/src/buffer/array.rs index a7d55fb..e8fc6ed 100644 --- a/src/buffer/array.rs +++ b/src/buffer/array.rs @@ -1,4 +1,4 @@ -use std::{cell::Cell, mem::MaybeUninit, ops::Range}; +use core::{cell::Cell, mem::MaybeUninit, ops::Range}; use crate::{ buffer::{Buffer, CellBuffer, Drain}, diff --git a/src/buffer/vec.rs b/src/buffer/vec.rs index 1e46903..f3a7c81 100644 --- a/src/buffer/vec.rs +++ b/src/buffer/vec.rs @@ -1,4 +1,5 @@ -use std::{cell::Cell, mem::MaybeUninit, ops::Range}; +use alloc::boxed::Box; +use core::{cell::Cell, mem::MaybeUninit, ops::Range}; use crate::buffer::{Buffer, CellBuffer, Drain, Resize}; diff --git a/src/error.rs b/src/error.rs index 3019a10..023f303 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,6 +1,6 @@ //! Queue error types. -use std::fmt; +use core::fmt; /// Error returned by [`Queue::try_enqueue`](crate::Queue::try_enqueue). /// diff --git a/src/lib.rs b/src/lib.rs index b94951e..ef32440 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,7 +3,7 @@ #![forbid(missing_docs)] #![forbid(unsafe_op_in_unsafe_fn)] #![forbid(clippy::undocumented_unsafe_blocks)] -#![cfg_attr(docsrs, feature(doc_cfg))] +#![cfg_attr(docsrs, feature(doc_auto_cfg))] #![cfg_attr(not(feature = "std"), no_std)] //! # swap-buffer-queue //! A buffering MPSC queue. @@ -41,8 +41,8 @@ //! assert_eq!(slice.into_iter().collect::>(), vec![0, 1, 2, 3, 4]); //! ``` -#[cfg(not(feature = "std"))] -extern crate core as std; +#[cfg(feature = "alloc")] +extern crate alloc; pub mod buffer; pub mod error; @@ -50,19 +50,14 @@ mod loom; pub mod notify; mod queue; #[cfg(feature = "std")] -#[cfg_attr(docsrs, doc(cfg(feature = "std")))] mod synchronized; mod utils; #[cfg(feature = "write")] -#[cfg_attr(docsrs, doc(cfg(feature = "write")))] pub mod write; #[cfg(feature = "write")] -#[cfg_attr(docsrs, doc(cfg(feature = "write")))] #[cfg(feature = "std")] -#[cfg_attr(docsrs, doc(cfg(feature = "std")))] pub mod write_vectored; pub use queue::Queue; #[cfg(feature = "std")] -#[cfg_attr(docsrs, doc(cfg(feature = "std")))] pub use synchronized::{SynchronizedNotifier, SynchronizedQueue}; diff --git a/src/queue.rs b/src/queue.rs index 4914c3c..8a92802 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -1,4 +1,4 @@ -use std::{fmt, num::NonZeroUsize, ops::Range}; +use core::{fmt, num::NonZeroUsize, ops::Range}; use crossbeam_utils::CachePadded; @@ -281,6 +281,7 @@ where /// queue.try_enqueue([0]).unwrap(); /// assert_eq!(queue.len(), 1); /// ``` + #[inline] pub fn len(&self) -> usize { let enqueuing = EnqueuingCapacity::from_atomic(self.enqueuing_capacity.load(Ordering::Relaxed)); @@ -406,7 +407,7 @@ where // Spin in case of concurrent modifications, except when the buffer is full ofc. if enqueuing.remaining_capacity() != 0 { for _ in 0..1 << backoff { - std::hint::spin_loop(); + hint::spin_loop(); } if backoff < BACKOFF_LIMIT { backoff += 1; @@ -467,7 +468,7 @@ where .with_mut(|buf| unsafe { (*buf).slice(range.clone()) }); return Some(BufferSlice::new(self, buffer_index, range, slice)); } - std::hint::spin_loop(); + hint::spin_loop(); } // If the enqueuing are still ongoing, just save the dequeuing state in order to retry. self.dequeuing_length.store( @@ -564,7 +565,7 @@ where }; if let Some(ref mut backoff) = backoff { for _ in 0..1 << *backoff { - std::hint::spin_loop(); + hint::spin_loop(); } if *backoff < BACKOFF_LIMIT { *backoff += 1; diff --git a/src/synchronized.rs b/src/synchronized.rs index 1ad0954..1e5b123 100644 --- a/src/synchronized.rs +++ b/src/synchronized.rs @@ -1,3 +1,23 @@ +//! Synchronization primitives for [`Queue`]. +//! +//! It supports both synchronous and asynchronous API. [`SynchronizedQueue`] is just an alias +//! for a [`Queue`] using [`SynchronizedNotifier`]. +//! +//! # Examples +//! ```rust +//! # use std::sync::Arc; +//! # use swap_buffer_queue::SynchronizedQueue; +//! # use swap_buffer_queue::buffer::VecBuffer; +//! let queue: Arc>> = +//! Arc::new(SynchronizedQueue::with_capacity(1)); +//! let queue_clone = queue.clone(); +//! std::thread::spawn(move || { +//! queue_clone.enqueue([0]).unwrap(); +//! queue_clone.enqueue([1]).unwrap(); +//! }); +//! assert_eq!(queue.dequeue().unwrap()[0], 0); +//! assert_eq!(queue.dequeue().unwrap()[0], 1); +//! ``` use std::{ fmt, future::poll_fn, @@ -380,12 +400,11 @@ where } #[cfg(feature = "stream")] - #[cfg_attr(docsrs, doc(cfg(feature = "stream")))] /// Returns an stream over the element of the queue (see [`BufferIter`](crate::buffer::BufferIter)). /// /// # Examples /// ``` - /// # use futures::StreamExt; + /// # use futures_util::StreamExt; /// # use swap_buffer_queue::SynchronizedQueue; /// # use swap_buffer_queue::buffer::VecBuffer; /// # tokio_test::block_on(async { @@ -402,8 +421,8 @@ where /// assert_eq!(stream.next().await, None); /// # }) /// ``` - pub fn stream(&self) -> impl futures::Stream + '_ { - use futures::{stream, StreamExt}; + pub fn stream(&self) -> impl futures_core::Stream + '_ { + use futures_util::{stream, StreamExt}; stream::repeat_with(|| stream::once(self.dequeue_async())) .flatten() .take_while(|res| { diff --git a/src/utils.rs b/src/utils.rs index 7b270f2..9445382 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,4 +1,4 @@ -use std::{ +use core::{ mem, mem::MaybeUninit, ops::{Deref, DerefMut}, diff --git a/src/write.rs b/src/write.rs index 53ef37c..a27d85d 100644 --- a/src/write.rs +++ b/src/write.rs @@ -29,12 +29,11 @@ use std::ops::{Deref, DerefMut}; mod array; -#[cfg(feature = "std")] -#[cfg_attr(docsrs, doc(cfg(feature = "std")))] +#[cfg(feature = "alloc")] mod vec; pub use array::WriteArrayBuffer; -#[cfg(feature = "std")] +#[cfg(feature = "alloc")] pub use vec::WriteVecBuffer; /// A bytes slice with a `HEADER_SIZE`-bytes header and a `TRAILER_SIZE`-bytes trailer. diff --git a/src/write/vec.rs b/src/write/vec.rs index 92c05d5..c58f786 100644 --- a/src/write/vec.rs +++ b/src/write/vec.rs @@ -1,4 +1,5 @@ -use std::{ +use alloc::boxed::Box; +use core::{ cell::{Cell, UnsafeCell}, ops::Range, };