diff --git a/rtic-time/CHANGELOG.md b/rtic-time/CHANGELOG.md index 22772d16518e..d318dab41ac7 100644 --- a/rtic-time/CHANGELOG.md +++ b/rtic-time/CHANGELOG.md @@ -7,6 +7,10 @@ For each category, *Added*, *Changed*, *Fixed* add new entries at the top! ## Unreleased +### Changed + +- Replace `async` implementations of `delay`/`delay_until`/`timeout`/`timeout_at` with strucs to reduce memory usage. + ## v2.0.0 - 2024-05-29 ### Added diff --git a/rtic-time/src/timer_queue.rs b/rtic-time/src/timer_queue.rs index 357deb24dd77..c4cd7eb0a4bb 100644 --- a/rtic-time/src/timer_queue.rs +++ b/rtic-time/src/timer_queue.rs @@ -3,15 +3,10 @@ use crate::linked_list::{self, Link, LinkedList}; use crate::TimeoutError; -use core::future::{poll_fn, Future}; +use core::future::Future; use core::pin::Pin; use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use core::task::{Poll, Waker}; -use futures_util::{ - future::{select, Either}, - pin_mut, -}; -use rtic_common::dropper::OnDrop; mod backend; mod tick_type; @@ -67,26 +62,6 @@ pub struct TimerQueue { initialized: AtomicBool, } -/// This is needed to make the async closure in `delay_until` accept that we "share" -/// the link possible between threads. -struct LinkPtr(*mut Option>>); - -impl Clone for LinkPtr { - fn clone(&self) -> Self { - LinkPtr(self.0) - } -} - -impl LinkPtr { - /// This will dereference the pointer stored within and give out an `&mut`. - unsafe fn get(&mut self) -> &mut Option>> { - &mut *self.0 - } -} - -unsafe impl Send for LinkPtr {} -unsafe impl Sync for LinkPtr {} - impl Default for TimerQueue { fn default() -> Self { Self::new() @@ -112,7 +87,7 @@ impl TimerQueue { pub fn initialize(&self, backend: Backend) { self.initialized.store(true, Ordering::SeqCst); - // Don't run drop on `Mono` + // Don't run drop on `Backend` core::mem::forget(backend); } @@ -164,29 +139,29 @@ impl TimerQueue { } /// Timeout at a specific time. - pub async fn timeout_at( + pub fn timeout_at( &self, instant: Backend::Ticks, future: F, - ) -> Result { - let delay = self.delay_until(instant); - - pin_mut!(future); - pin_mut!(delay); - - match select(future, delay).await { - Either::Left((r, _)) => Ok(r), - Either::Right(_) => Err(TimeoutError), + ) -> Timeout<'_, Backend, F> { + Timeout { + delay: Delay:: { + instant, + queue: &self.queue, + link_ptr: None, + marker: AtomicUsize::new(0), + }, + future, } } /// Timeout after at least a specific duration. #[inline] - pub async fn timeout_after( + pub fn timeout_after( &self, duration: Backend::Ticks, future: F, - ) -> Result { + ) -> Timeout<'_, Backend, F> { let now = Backend::now(); let mut timeout = now.wrapping_add(duration); if now != timeout { @@ -195,12 +170,12 @@ impl TimerQueue { // Wait for one period longer, because by definition timers have an uncertainty // of one period, so waiting for 'at least' needs to compensate for that. - self.timeout_at(timeout, future).await + self.timeout_at(timeout, future) } /// Delay for at least some duration of time. #[inline] - pub async fn delay(&self, duration: Backend::Ticks) { + pub fn delay(&self, duration: Backend::Ticks) -> Delay<'_, Backend> { let now = Backend::now(); let mut timeout = now.wrapping_add(duration); if now != timeout { @@ -209,79 +184,111 @@ impl TimerQueue { // Wait for one period longer, because by definition timers have an uncertainty // of one period, so waiting for 'at least' needs to compensate for that. - self.delay_until(timeout).await; + self.delay_until(timeout) } /// Delay to some specific time instant. - pub async fn delay_until(&self, instant: Backend::Ticks) { + pub fn delay_until(&self, instant: Backend::Ticks) -> Delay<'_, Backend> { if !self.initialized.load(Ordering::Relaxed) { panic!( "The timer queue is not initialized with a monotonic, you need to run `initialize`" ); } + Delay:: { + instant, + queue: &self.queue, + link_ptr: None, + marker: AtomicUsize::new(0), + } + } +} - let mut link_ptr: Option>> = None; +/// Future returned by `delay` and `delay_until`. +pub struct Delay<'q, Backend: TimerQueueBackend> { + instant: Backend::Ticks, + queue: &'q LinkedList>, + link_ptr: Option>>, + marker: AtomicUsize, +} - // Make this future `Drop`-safe - // SAFETY(link_ptr): Shadow the original definition of `link_ptr` so we can't abuse it. - let mut link_ptr = - LinkPtr(&mut link_ptr as *mut Option>>); - let mut link_ptr2 = link_ptr.clone(); +impl<'q, Backend: TimerQueueBackend> Future for Delay<'q, Backend> { + type Output = (); - let queue = &self.queue; - let marker = &AtomicUsize::new(0); + fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll { + // SAFETY: We ensure we never move anything out of this. + let this = unsafe { self.get_unchecked_mut() }; - let dropper = OnDrop::new(|| { - queue.delete(marker.load(Ordering::Relaxed)); - }); + if Backend::now().is_at_least(this.instant) { + return Poll::Ready(()); + } - poll_fn(|cx| { - if Backend::now().is_at_least(instant) { - return Poll::Ready(()); + // SAFETY: this is dereferenced only here and in `drop`. As the queue deletion is done only + // in `drop` we can't do this access concurrently with queue removal. + let link = &mut this.link_ptr; + if link.is_none() { + let link_ref = link.insert(Link::new(WaitingWaker { + waker: cx.waker().clone(), + release_at: this.instant, + was_popped: AtomicBool::new(false), + })); + + // SAFETY(new_unchecked): The address to the link is stable as it is defined + // outside this stack frame. + // SAFETY(insert): `link_ref` lfetime comes from `link_ptr` which itself is owned by + // the `Delay` struct. The `Delay::drop` impl ensures that the link is removed from the + // queue on drop, which happens before the struct and thus `link_ptr` goes out of + // scope. + let (head_updated, addr) = unsafe { this.queue.insert(Pin::new_unchecked(link_ref)) }; + this.marker.store(addr, Ordering::Relaxed); + if head_updated { + Backend::pend_interrupt() } + } - // SAFETY: This pointer is only dereferenced here and on drop of the future - // which happens outside this `poll_fn`'s stack frame, so this mutable access cannot - // happen at the same time as `dropper` runs. - let link = unsafe { link_ptr2.get() }; - if link.is_none() { - let link_ref = link.insert(Link::new(WaitingWaker { - waker: cx.waker().clone(), - release_at: instant, - was_popped: AtomicBool::new(false), - })); - - // SAFETY(new_unchecked): The address to the link is stable as it is defined - //outside this stack frame. - // SAFETY(insert): `link_ref` lifetime comes from `link_ptr` that is shadowed, and - // we make sure in `dropper` that the link is removed from the queue before - // dropping `link_ptr` AND `dropper` makes sure that the shadowed `link_ptr` lives - // until the end of the stack frame. - let (head_updated, addr) = unsafe { queue.insert(Pin::new_unchecked(link_ref)) }; - - marker.store(addr, Ordering::Relaxed); - - if head_updated { - // Pend the monotonic handler if the queue head was updated. - Backend::pend_interrupt() - } + Poll::Pending + } +} + +impl<'q, Backend: TimerQueueBackend> Drop for Delay<'q, Backend> { + fn drop(&mut self) { + // SAFETY: Drop cannot be run at the same time as poll, so we can't end up + // derefencing this concurrently to the one in `poll`. + match self.link_ptr.as_ref() { + None => return, + // If it was popped from the queue there is no need to run delete + Some(link) if link.val.was_popped.load(Ordering::Relaxed) => return, + _ => {} + } + self.queue.delete(self.marker.load(Ordering::Relaxed)); + } +} + +/// Future returned by `timeout` and `timeout_at`. +pub struct Timeout<'q, Backend: TimerQueueBackend, F> { + delay: Delay<'q, Backend>, + future: F, +} + +impl<'q, Backend: TimerQueueBackend, F: Future> Future for Timeout<'q, Backend, F> { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll { + let inner = unsafe { self.get_unchecked_mut() }; + + { + let f = unsafe { Pin::new_unchecked(&mut inner.future) }; + if let Poll::Ready(v) = f.poll(cx) { + return Poll::Ready(Ok(v)); } + } - Poll::Pending - }) - .await; - - // SAFETY: We only run this and dereference the pointer if we have - // exited the `poll_fn` below in the `drop(dropper)` call. The other dereference - // of this pointer is in the `poll_fn`. - if let Some(link) = unsafe { link_ptr.get() } { - if link.val.was_popped.load(Ordering::Relaxed) { - // If it was popped from the queue there is no need to run delete - dropper.defuse(); + { + let d = unsafe { Pin::new_unchecked(&mut inner.delay) }; + if d.poll(cx).is_ready() { + return Poll::Ready(Err(TimeoutError)); } - } else { - // Make sure that our link is deleted from the list before we drop this stack - drop(dropper); } + + Poll::Pending } }