Skip to content

Commit

Permalink
feat: add "alloc" feature
Browse files Browse the repository at this point in the history
  • Loading branch information
wyfo committed Jul 12, 2024
1 parent b7f2d15 commit 45817aa
Show file tree
Hide file tree
Showing 11 changed files with 61 additions and 43 deletions.
30 changes: 16 additions & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,31 @@ description = "A buffering MPSC queue."
homepage = "https://github.com/wyfo/swap-buffer-queue"
readme = "README.md"
keywords = [
"atomic",
"lock-free",
"no-std",
"mpsc",
"async",
"atomic",
"lock-free",
"no-std",
"mpsc",
"async",
]
categories = [
"concurrency",
"data-structures",
"no-std",
"concurrency",
"data-structures",
"no-std",
]
license = "MIT"
repository = "https://github.com/wyfo/swap-buffer-queue"

[features]
default = ["stream"]
std = []
stream = ["std", "dep:futures"]
default = ["std"]
alloc = []
std = ["alloc"]
stream = ["dep:futures-core", "dep:futures-util"]
write = []

[dependencies]
crossbeam-utils = { version = "0.8", default-features = false }
futures = { version = "0.3", optional = true }
futures-core = { version = "0.3", default-features = false, optional = true }
futures-util = { version = "0.3", default-features = false, optional = true }

[dev-dependencies]
tokio-test = "0.4"
Expand All @@ -45,6 +47,6 @@ unexpected_cfgs = { level = "warn", check-cfg = ['cfg(loom)'] }
[package.metadata.docs.rs]
all-features = true
rustdoc-args = [
"--cfg",
"docsrs",
"--cfg",
"docsrs",
]
10 changes: 5 additions & 5 deletions src/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! [`Buffer`] definition and simple implementations.
use std::{
use core::{
fmt,
iter::FusedIterator,
marker::PhantomData,
Expand All @@ -12,12 +12,11 @@ use std::{
use crate::queue::Queue;

mod array;
#[cfg(feature = "std")]
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
#[cfg(feature = "alloc")]
mod vec;

pub use array::ArrayBuffer;
#[cfg(feature = "std")]
#[cfg(feature = "alloc")]
pub use vec::VecBuffer;

/// [`Queue`] buffer. It is used together with [`InsertIntoBuffer`].
Expand Down Expand Up @@ -114,8 +113,9 @@ where
// don't loop on iterator, because `ExactSizeIterator` is not a sufficient guarantee
// for unsafe code
for i in index..(index + self.0.len()) {
const ERROR: &str = "iterator exhausted before reaching its exact size";
// SAFETY: function contract encompass `CellBuffer::insert` one
unsafe { buffer.insert(i, self.0.next().unwrap()) };
unsafe { buffer.insert(i, self.0.next().expect(ERROR)) };
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/buffer/array.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{cell::Cell, mem::MaybeUninit, ops::Range};
use core::{cell::Cell, mem::MaybeUninit, ops::Range};

use crate::{
buffer::{Buffer, CellBuffer, Drain},
Expand Down
3 changes: 2 additions & 1 deletion src/buffer/vec.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{cell::Cell, mem::MaybeUninit, ops::Range};
use alloc::boxed::Box;
use core::{cell::Cell, mem::MaybeUninit, ops::Range};

use crate::buffer::{Buffer, CellBuffer, Drain, Resize};

Expand Down
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Queue error types.
use std::fmt;
use core::fmt;

/// Error returned by [`Queue::try_enqueue`](crate::Queue::try_enqueue).
///
Expand Down
11 changes: 3 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#![forbid(missing_docs)]
#![forbid(unsafe_op_in_unsafe_fn)]
#![forbid(clippy::undocumented_unsafe_blocks)]
#![cfg_attr(docsrs, feature(doc_cfg))]
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
#![cfg_attr(not(feature = "std"), no_std)]
//! # swap-buffer-queue
//! A buffering MPSC queue.
Expand Down Expand Up @@ -41,28 +41,23 @@
//! assert_eq!(slice.into_iter().collect::<Vec<_>>(), vec![0, 1, 2, 3, 4]);
//! ```
#[cfg(not(feature = "std"))]
extern crate core as std;
#[cfg(feature = "alloc")]
extern crate alloc;

pub mod buffer;
pub mod error;
mod loom;
pub mod notify;
mod queue;
#[cfg(feature = "std")]
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
mod synchronized;
mod utils;
#[cfg(feature = "write")]
#[cfg_attr(docsrs, doc(cfg(feature = "write")))]
pub mod write;
#[cfg(feature = "write")]
#[cfg_attr(docsrs, doc(cfg(feature = "write")))]
#[cfg(feature = "std")]
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
pub mod write_vectored;

pub use queue::Queue;
#[cfg(feature = "std")]
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
pub use synchronized::{SynchronizedNotifier, SynchronizedQueue};
9 changes: 5 additions & 4 deletions src/queue.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fmt, num::NonZeroUsize, ops::Range};
use core::{fmt, num::NonZeroUsize, ops::Range};

use crossbeam_utils::CachePadded;

Expand Down Expand Up @@ -281,6 +281,7 @@ where
/// queue.try_enqueue([0]).unwrap();
/// assert_eq!(queue.len(), 1);
/// ```
#[inline]
pub fn len(&self) -> usize {
let enqueuing =
EnqueuingCapacity::from_atomic(self.enqueuing_capacity.load(Ordering::Relaxed));
Expand Down Expand Up @@ -406,7 +407,7 @@ where
// Spin in case of concurrent modifications, except when the buffer is full ofc.
if enqueuing.remaining_capacity() != 0 {
for _ in 0..1 << backoff {
std::hint::spin_loop();
hint::spin_loop();
}
if backoff < BACKOFF_LIMIT {
backoff += 1;
Expand Down Expand Up @@ -467,7 +468,7 @@ where
.with_mut(|buf| unsafe { (*buf).slice(range.clone()) });
return Some(BufferSlice::new(self, buffer_index, range, slice));
}
std::hint::spin_loop();
hint::spin_loop();
}
// If the enqueuing are still ongoing, just save the dequeuing state in order to retry.
self.dequeuing_length.store(
Expand Down Expand Up @@ -564,7 +565,7 @@ where
};
if let Some(ref mut backoff) = backoff {
for _ in 0..1 << *backoff {
std::hint::spin_loop();
hint::spin_loop();
}
if *backoff < BACKOFF_LIMIT {
*backoff += 1;
Expand Down
27 changes: 23 additions & 4 deletions src/synchronized.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
//! Synchronization primitives for [`Queue`].
//!
//! It supports both synchronous and asynchronous API. [`SynchronizedQueue`] is just an alias
//! for a [`Queue`] using [`SynchronizedNotifier`].
//!
//! # Examples
//! ```rust
//! # use std::sync::Arc;
//! # use swap_buffer_queue::SynchronizedQueue;
//! # use swap_buffer_queue::buffer::VecBuffer;
//! let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
//! Arc::new(SynchronizedQueue::with_capacity(1));
//! let queue_clone = queue.clone();
//! std::thread::spawn(move || {
//! queue_clone.enqueue([0]).unwrap();
//! queue_clone.enqueue([1]).unwrap();
//! });
//! assert_eq!(queue.dequeue().unwrap()[0], 0);
//! assert_eq!(queue.dequeue().unwrap()[0], 1);
//! ```
use std::{
fmt,
future::poll_fn,
Expand Down Expand Up @@ -380,12 +400,11 @@ where
}

#[cfg(feature = "stream")]
#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
/// Returns an stream over the element of the queue (see [`BufferIter`](crate::buffer::BufferIter)).
///
/// # Examples
/// ```
/// # use futures::StreamExt;
/// # use futures_util::StreamExt;
/// # use swap_buffer_queue::SynchronizedQueue;
/// # use swap_buffer_queue::buffer::VecBuffer;
/// # tokio_test::block_on(async {
Expand All @@ -402,8 +421,8 @@ where
/// assert_eq!(stream.next().await, None);
/// # })
/// ```
pub fn stream(&self) -> impl futures::Stream<Item = B::Value> + '_ {
use futures::{stream, StreamExt};
pub fn stream(&self) -> impl futures_core::Stream<Item = B::Value> + '_ {
use futures_util::{stream, StreamExt};
stream::repeat_with(|| stream::once(self.dequeue_async()))
.flatten()
.take_while(|res| {
Expand Down
2 changes: 1 addition & 1 deletion src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{
use core::{
mem,
mem::MaybeUninit,
ops::{Deref, DerefMut},
Expand Down
5 changes: 2 additions & 3 deletions src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,11 @@
use std::ops::{Deref, DerefMut};

mod array;
#[cfg(feature = "std")]
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
#[cfg(feature = "alloc")]
mod vec;

pub use array::WriteArrayBuffer;
#[cfg(feature = "std")]
#[cfg(feature = "alloc")]
pub use vec::WriteVecBuffer;

/// A bytes slice with a `HEADER_SIZE`-bytes header and a `TRAILER_SIZE`-bytes trailer.
Expand Down
3 changes: 2 additions & 1 deletion src/write/vec.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
use alloc::boxed::Box;
use core::{
cell::{Cell, UnsafeCell},
ops::Range,
};
Expand Down

0 comments on commit 45817aa

Please sign in to comment.