Skip to content

Commit

Permalink
Implement CandyTypedDeque over the queue API
Browse files Browse the repository at this point in the history
  • Loading branch information
tomerfiliba committed Sep 11, 2024
1 parent 01cc6e4 commit 12e9db0
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 32 deletions.
47 changes: 33 additions & 14 deletions src/queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ enum QueuePos {
Tail,
}

pub struct QueueIterator<'a, 'b> {
pub struct QueueIterator<'a> {
store: &'a CandyStore,
queue_key: &'b [u8],
queue_key: Vec<u8>,
curr: Option<u64>,
end: Option<u64>,
fwd: bool,
}

impl<'a, 'b> Iterator for QueueIterator<'a, 'b> {
impl<'a> Iterator for QueueIterator<'a> {
type Item = Result<(usize, Vec<u8>)>;
fn next(&mut self) -> Option<Self::Item> {
if self.curr.is_none() {
match self.store.fetch_queue(self.queue_key) {
match self.store.fetch_queue(&self.queue_key) {
Ok(queue) => match queue {
Some(queue) => {
if self.fwd {
Expand All @@ -59,7 +59,7 @@ impl<'a, 'b> Iterator for QueueIterator<'a, 'b> {
}
match self
.store
.get_raw(&self.store.make_queue_item_key(self.queue_key, curr))
.get_raw(&self.store.make_queue_item_key(&self.queue_key, curr))
{
Ok(v) => {
match v {
Expand Down Expand Up @@ -314,26 +314,45 @@ impl CandyStore {
Ok(indices)
}

pub fn iter_queue<'a, 'b, B: AsRef<[u8]> + ?Sized>(
&'a self,
queue_key: &'b B,
) -> QueueIterator<'a, 'b> {
pub fn peek_queue_head<B: AsRef<[u8]> + ?Sized>(
&self,
queue_key: &B,
) -> Result<Option<Vec<u8>>> {
let Some(res) = self.iter_queue(queue_key).next() else {
return Ok(None);
};
let (_, v) = res?;
Ok(Some(v))
}

pub fn peek_queue_tail<B: AsRef<[u8]> + ?Sized>(
&self,
queue_key: &B,
) -> Result<Option<Vec<u8>>> {
let Some(res) = self.iter_queue_backwards(queue_key).next() else {
return Ok(None);
};
let (_, v) = res?;
Ok(Some(v))
}

pub fn iter_queue<'a, B: AsRef<[u8]> + ?Sized>(&'a self, queue_key: &B) -> QueueIterator<'a> {
QueueIterator {
store: &self,
queue_key: queue_key.as_ref(),
queue_key: queue_key.as_ref().to_owned(),
curr: None,
end: None,
fwd: true,
}
}

pub fn iter_queue_backwards<'a, 'b, B: AsRef<[u8]> + ?Sized>(
pub fn iter_queue_backwards<'a, B: AsRef<[u8]> + ?Sized>(
&'a self,
queue_key: &'b B,
) -> QueueIterator<'a, 'b> {
queue_key: &B,
) -> QueueIterator<'a> {
QueueIterator {
store: &self,
queue_key: queue_key.as_ref(),
queue_key: queue_key.as_ref().to_owned(),
curr: None,
end: None,
fwd: false,
Expand Down
62 changes: 44 additions & 18 deletions src/typed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,17 +579,18 @@ where
}
}

/// A wrapper around [CandyTypedList] that's specialized for double-ended queues - only allows pushing
/// and popping from either the head or the tail. The keys are auto-generated internally and are not exposed to
/// the caller
/// A wrapper around [CandyStore] that exposes the queue API in a typed manner. See [CandyTypedStore] for more
/// info
pub struct CandyTypedDeque<L, V> {
pub list: CandyTypedList<L, EncodableUuid, V>,
store: Arc<CandyStore>,
_phantom: PhantomData<(L, V)>,
}

impl<L, V> Clone for CandyTypedDeque<L, V> {
fn clone(&self) -> Self {
Self {
list: self.list.clone(),
store: self.store.clone(),
_phantom: Default::default(),
}
}
}
Expand All @@ -601,7 +602,8 @@ where
{
pub fn new(store: Arc<CandyStore>) -> Self {
Self {
list: CandyTypedList::new(store),
store,
_phantom: Default::default(),
}
}

Expand All @@ -615,7 +617,9 @@ where
L: Borrow<Q1>,
V: Borrow<Q2>,
{
self.list.push_head(list_key, val)?;
let list_key = CandyTypedList::<L, (), ()>::make_list_key(list_key);
let val = val.to_bytes::<LE>();
self.store.push_to_queue_head(&list_key, &val)?;
Ok(())
}

Expand All @@ -629,7 +633,9 @@ where
L: Borrow<Q1>,
V: Borrow<Q2>,
{
self.list.push_tail(list_key, val)?;
let list_key = CandyTypedList::<L, (), ()>::make_list_key(list_key);
let val = val.to_bytes::<LE>();
self.store.push_to_queue_tail(&list_key, &val)?;
Ok(())
}

Expand All @@ -638,31 +644,47 @@ where
where
L: Borrow<Q>,
{
Ok(self.list.pop_head(list_key)?.map(|kv| kv.1))
let list_key = CandyTypedList::<L, (), ()>::make_list_key(list_key);
let Some(v) = self.store.pop_queue_head(&list_key)? else {
return Ok(None);
};
Ok(Some(from_bytes::<V>(&v)?))
}

/// Pops a value from the end (tail) of the queue
pub fn pop_tail<Q: ?Sized + Encode>(&self, list_key: &Q) -> Result<Option<V>>
where
L: Borrow<Q>,
{
Ok(self.list.pop_tail(list_key)?.map(|kv| kv.1))
let list_key = CandyTypedList::<L, (), ()>::make_list_key(list_key);
let Some(v) = self.store.pop_queue_tail(&list_key)? else {
return Ok(None);
};
Ok(Some(from_bytes::<V>(&v)?))
}

/// Peek at the value from the beginning (head) of the queue
pub fn peek_head<Q: ?Sized + Encode>(&self, list_key: &Q) -> Result<Option<V>>
where
L: Borrow<Q>,
{
Ok(self.list.peek_head(list_key)?.map(|kv| kv.1))
let list_key = CandyTypedList::<L, (), ()>::make_list_key(list_key);
let Some(v) = self.store.peek_queue_head(&list_key)? else {
return Ok(None);
};
Ok(Some(from_bytes::<V>(&v)?))
}

/// Peek at the value from the end (tail) of the queue
pub fn peek_tail<Q: ?Sized + Encode>(&self, list_key: &Q) -> Result<Option<V>>
where
L: Borrow<Q>,
{
Ok(self.list.peek_tail(list_key)?.map(|kv| kv.1))
let list_key = CandyTypedList::<L, (), ()>::make_list_key(list_key);
let Some(v) = self.store.peek_queue_tail(&list_key)? else {
return Ok(None);
};
Ok(Some(from_bytes::<V>(&v)?))
}

/// See [CandyTypedList::iter]
Expand All @@ -673,9 +695,10 @@ where
where
L: Borrow<Q>,
{
self.list.iter(list_key).map(|res| match res {
let list_key = CandyTypedList::<L, (), ()>::make_list_key(list_key);
self.store.iter_queue(&list_key).map(|res| match res {
Err(e) => Err(e),
Ok((_, v)) => Ok(v),
Ok((_, v)) => Ok(from_bytes::<V>(&v).unwrap()),
})
}

Expand All @@ -687,9 +710,12 @@ where
where
L: Borrow<Q>,
{
self.list.iter_backwards(list_key).map(|res| match res {
Err(e) => Err(e),
Ok((_, v)) => Ok(v),
})
let list_key = CandyTypedList::<L, (), ()>::make_list_key(list_key);
self.store
.iter_queue_backwards(&list_key)
.map(|res| match res {
Err(e) => Err(e),
Ok((_, v)) => Ok(from_bytes::<V>(&v).unwrap()),
})
}
}
4 changes: 4 additions & 0 deletions tests/test_queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ fn test_queues() -> Result<()> {
db.push_to_queue_tail("work", "item1")?;
db.push_to_queue_tail("work", "item2")?;
db.push_to_queue_tail("work", "item3")?;

assert_eq!(db.peek_queue_head("work")?, Some("item1".into()));
assert_eq!(db.peek_queue_tail("work")?, Some("item3".into()));

assert_eq!(db.pop_queue_head("work")?, Some("item1".into()));
assert_eq!(db.pop_queue_head("work")?, Some("item2".into()));
assert_eq!(db.pop_queue_head("work")?, Some("item3".into()));
Expand Down

0 comments on commit 12e9db0

Please sign in to comment.