diff --git a/dom/examples/todo/src/filter.rs b/dom/examples/todo/src/filter.rs index cf3d008ff..e70a61e92 100644 --- a/dom/examples/todo/src/filter.rs +++ b/dom/examples/todo/src/filter.rs @@ -49,7 +49,7 @@ pub fn filter_link(to_set: Visibility) -> Li { mox! {
  • {% "{}", to_set } diff --git a/dom/examples/todo/src/item.rs b/dom/examples/todo/src/item.rs index 0bb45419d..c69866e7d 100644 --- a/dom/examples/todo/src/item.rs +++ b/dom/examples/todo/src/item.rs @@ -97,7 +97,8 @@ mod tests { pub async fn single_item() { let root = document().create_element("div"); crate::App::boot_fn(&[Todo::new("weeeee")], root.clone(), || { - let todo = &illicit::expect::>>()[0]; + let todos_key = &illicit::expect::>>(); + let todo = &todos_key.commit_at_root()[0]; todo_item(todo) }); diff --git a/dom/examples/todo/src/main_section.rs b/dom/examples/todo/src/main_section.rs index 45e658112..6039e53ca 100644 --- a/dom/examples/todo/src/main_section.rs +++ b/dom/examples/todo/src/main_section.rs @@ -35,8 +35,8 @@ pub fn toggle(default_checked: bool) -> Span { #[illicit::from_env(todos: &Key>, visibility: &Key)] pub fn todo_list() -> Ul { let mut list = ul().class("todo-list"); - for todo in todos.iter() { - if visibility.should_show(todo) { + for todo in todos.commit_at_root().iter() { + if visibility.commit_at_root().should_show(todo) { list = list.child(todo_item(todo)); } } @@ -46,17 +46,18 @@ pub fn todo_list() -> Ul { #[topo::nested] #[illicit::from_env(todos: &Key>)] pub fn main_section() -> Section { - let num_complete = todos.iter().filter(|t| t.completed).count(); + let num_complete = todos.commit_at_root().iter().filter(|t| t.completed).count(); let mut section = section().class("main"); - if !todos.is_empty() { - section = section.child(toggle(num_complete == todos.len())); + if !todos.commit_at_root().is_empty() { + section = section.child(toggle(num_complete == todos.commit_at_root().len())); } section = section.child(todo_list()); - if !todos.is_empty() { - section = section.child(filter_footer(num_complete, todos.len() - num_complete)); + if !todos.commit_at_root().is_empty() { + section = + section.child(filter_footer(num_complete, todos.commit_at_root().len() - num_complete)); } section.build() diff --git a/src/lib.rs b/src/lib.rs index 2ef4e8d10..2e4df8029 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -73,7 +73,7 @@ use std::{ use topo::CallId; /// Applied to impl blocks, this macro defines a new "updater" wrapper type that -/// holds a [`crate::Key`] and forwards all receiver-mutating methods. Useful +/// holds a [`Key`] and forwards all receiver-mutating methods. Useful /// for defining interactions for a stateful component with less boilerplate. /// /// Requires the name of the updater struct to generate in the arguments to the @@ -285,22 +285,21 @@ where /// let mut rt = RunLoop::new(|| state(|| 0u64)); /// /// let track_wakes = BoolWaker::new(); -/// rt.set_state_change_waker(waker(track_wakes.clone())); /// -/// let (first_commit, first_key) = rt.run_once(); +/// let (first_commit, first_key) = rt.run_once_with(waker(track_wakes.clone())); /// assert_eq!(*first_commit, 0, "no updates yet"); /// assert!(!track_wakes.is_woken(), "no updates yet"); /// /// first_key.set(0); // this is a no-op -/// assert_eq!(*first_key, 0, "no updates yet"); +/// assert_eq!(*first_key.commit_at_root(), 0, "no updates yet"); /// assert!(!track_wakes.is_woken(), "no updates yet"); /// /// first_key.set(1); -/// assert_eq!(*first_key, 0, "update only enqueued, not yet committed"); +/// assert_eq!(*first_key.commit_at_root(), 0, "update only enqueued, not yet committed"); /// assert!(track_wakes.is_woken()); /// /// let (second_commit, second_key) = rt.run_once(); // this commits the pending update -/// assert_eq!(*second_key, 1); +/// assert_eq!(*second_key.commit_at_root(), 1); /// assert_eq!(*second_commit, 1); /// assert_eq!(*first_commit, 0, "previous value still held by previous pointer"); /// assert!(!track_wakes.is_woken(), "wakes only come from updating state vars"); @@ -331,22 +330,21 @@ where /// let mut rt = RunLoop::new(|| cache_state(&epoch.load(Ordering::Relaxed), |e| *e)); /// /// let track_wakes = BoolWaker::new(); -/// rt.set_state_change_waker(futures::task::waker(track_wakes.clone())); /// -/// let (first_commit, first_key) = rt.run_once(); +/// let (first_commit, first_key) = rt.run_once_with(futures::task::waker(track_wakes.clone())); /// assert_eq!(*first_commit, 0, "no updates yet"); /// assert!(!track_wakes.is_woken(), "no updates yet"); /// /// first_key.set(0); // this is a no-op -/// assert_eq!(*first_key, 0, "no updates yet"); +/// assert_eq!(*first_key.commit_at_root(), 0, "no updates yet"); /// assert!(!track_wakes.is_woken(), "no updates yet"); /// /// first_key.set(1); -/// assert_eq!(*first_key, 0, "update only enqueued, not yet committed"); +/// assert_eq!(*first_key.commit_at_root(), 0, "update only enqueued, not yet committed"); /// assert!(track_wakes.is_woken()); /// /// let (second_commit, second_key) = rt.run_once(); // this commits the pending update -/// assert_eq!(*second_key, 1); +/// assert_eq!(*second_key.commit_at_root(), 1); /// assert_eq!(*second_commit, 1); /// assert_eq!(*first_commit, 0, "previous value still held by previous pointer"); /// assert!(!track_wakes.is_woken(), "wakes only come from updating state vars"); @@ -363,15 +361,15 @@ where /// assert!(!track_wakes.is_woken()); /// /// third_key.set(2); -/// assert_eq!(*third_key, 2); +/// assert_eq!(*third_key.commit_at_root(), 2); /// assert!(!track_wakes.is_woken()); /// /// third_key.set(3); -/// assert_eq!(*third_key, 2); +/// assert_eq!(*third_key.commit_at_root(), 2); /// assert!(track_wakes.is_woken()); /// /// let (fourth_commit, fourth_key) = rt.run_once(); -/// assert_eq!(*fourth_key, 3); +/// assert_eq!(*fourth_key.commit_at_root(), 3); /// assert_eq!(*fourth_commit, 3); /// assert_eq!(*third_commit, 2); /// assert!(!track_wakes.is_woken()); @@ -630,7 +628,7 @@ where /// /// Reads through a commit are not guaranteed to be the latest value visible to /// the runtime. Commits should be shared and used within the context of a -/// single [`crate::runtime::Revision`], being re-loaded from the state variable +/// single [`runtime::Revision`], being re-loaded from the state variable /// each time. /// /// See [`state`] and [`cache_state`] for examples. @@ -681,7 +679,6 @@ where /// See [`state`] and [`cache_state`] for examples. pub struct Key { id: CallId, - commit_at_root: Commit, var: Arc>>, } @@ -691,6 +688,11 @@ impl Key { self.id } + /// Returns the `Commit` of the current `Revision` + pub fn commit_at_root(&self) -> Commit { + self.var.lock().current_commit().clone() + } + /// Runs `updater` with a reference to the state variable's latest value, /// and enqueues a commit to the variable if `updater` returns `Some`. /// Returns the `Revision` at which the state variable was last rooted @@ -717,22 +719,21 @@ impl Key { /// let mut rt = RunLoop::new(|| state(|| 0u64)); /// /// let track_wakes = BoolWaker::new(); - /// rt.set_state_change_waker(waker(track_wakes.clone())); /// - /// let (first_commit, first_key) = rt.run_once(); + /// let (first_commit, first_key) = rt.run_once_with(waker(track_wakes.clone())); /// assert_eq!(*first_commit, 0, "no updates yet"); /// assert!(!track_wakes.is_woken(), "no updates yet"); /// /// first_key.update(|_| None); // this is a no-op - /// assert_eq!(*first_key, 0, "no updates yet"); + /// assert_eq!(*first_key.commit_at_root(), 0, "no updates yet"); /// assert!(!track_wakes.is_woken(), "no updates yet"); /// /// first_key.update(|prev| Some(prev + 1)); - /// assert_eq!(*first_key, 0, "update only enqueued, not yet committed"); + /// assert_eq!(*first_key.commit_at_root(), 0, "update only enqueued, not yet committed"); /// assert!(track_wakes.is_woken()); /// /// let (second_commit, second_key) = rt.run_once(); // this commits the pending update - /// assert_eq!(*second_key, 1); + /// assert_eq!(*second_key.commit_at_root(), 1); /// assert_eq!(*second_commit, 1); /// assert_eq!(*first_commit, 0, "previous value still held by previous pointer"); /// assert!(!track_wakes.is_woken(), "wakes only come from updating state vars"); @@ -744,16 +745,6 @@ impl Key { var.enqueue_commit(new); } } - - /// Set a new value for the state variable, immediately taking effect. - fn force(&self, new: State) { - self.var.lock().enqueue_commit(new); - } - - // TODO(#197) delete this and remove the Deref impl - fn refresh(&mut self) { - self.commit_at_root = runtime::Var::root(self.var.clone()).0; - } } impl Key @@ -788,15 +779,7 @@ where impl Clone for Key { fn clone(&self) -> Self { - Self { id: self.id, commit_at_root: self.commit_at_root.clone(), var: self.var.clone() } - } -} - -impl Deref for Key { - type Target = State; - - fn deref(&self) -> &Self::Target { - self.commit_at_root.deref() + Self { id: self.id, var: self.var.clone() } } } @@ -805,7 +788,7 @@ where State: Debug, { fn fmt(&self, f: &mut Formatter) -> FmtResult { - self.commit_at_root.fmt(f) + self.commit_at_root().fmt(f) } } @@ -814,7 +797,7 @@ where State: Display, { fn fmt(&self, f: &mut Formatter) -> FmtResult { - self.commit_at_root.fmt(f) + self.commit_at_root().fmt(f) } } diff --git a/src/runtime.rs b/src/runtime.rs index 417b2dcc5..639f58cca 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -11,10 +11,12 @@ use futures::{ task::{noop_waker, LocalSpawn, SpawnError}, }; use illicit::AsContext; +use parking_lot::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard}; use std::{ fmt::{Debug, Formatter, Result as FmtResult}, rc::Rc, - task::Waker, + sync::{atomic::AtomicBool, Arc}, + task::{Poll, Waker}, }; pub(crate) use context::Context; @@ -41,6 +43,13 @@ impl std::fmt::Debug for Revision { } } +#[derive(Debug)] +pub(crate) struct RevisionControlSystem { + revision: Revision, + waker: Waker, + pending_changes: AtomicBool, +} + /// A [`Runtime`] is the primary integration point between moxie and an /// embedder. Each independent instance is responsible for an event loop and /// tracks time using a [`Revision`] which it increments on each iteration of @@ -65,16 +74,18 @@ impl std::fmt::Debug for Revision { /// /// ## Change notifications /// -/// Each runtime should be provided with a [`std::task::Waker`] that will notify +/// Each runtime should be provided with a [`Waker`] that will notify /// the embedding environment to run the loop again. This is done by calling -/// [`Runtime::set_state_change_waker`]. +/// [`Runtime::set_state_change_waker`] or +/// [`Runtime::poll_once`][Runtime::poll_once] +/// [`(_with)`][Runtime::poll_once_with]. /// /// For scenarios without an obvious "main thread" this can be done for you by -/// binding a root function to a [`RunLoop`] which implements -/// [`std::future::Future`] and can be spawned as a task onto an executor. For -/// more nuanced scenarios it can be necessary to write your own waker to ensure -/// scheduling compatible with the embedding environment. By default a no-op -/// waker is provided. +/// binding a root function to a [`RunLoop`] which implements +/// [`futures::Stream`] and can be spawned as a task onto an executor. +/// For more nuanced scenarios it can be necessary to write your own waker to +/// ensure scheduling compatible with the embedding environment. By default a +/// no-op waker is provided. /// /// The most common way of notifying a runtime of a change is to update a /// state variable. @@ -119,10 +130,9 @@ impl std::fmt::Debug for Revision { /// /// [dyn-cache]: https://docs.rs/dyn-cache pub struct Runtime { - revision: Revision, + rcs: Arc>, cache: SharedLocalCache, spawner: Spawner, - wk: Waker, } impl Default for Runtime { @@ -137,23 +147,77 @@ impl Runtime { pub fn new() -> Self { Self { spawner: Spawner(Rc::new(JunkSpawner)), - revision: Revision(0), cache: SharedLocalCache::default(), - wk: noop_waker(), + rcs: Arc::new(RwLock::new(RevisionControlSystem { + revision: Revision(0), + waker: noop_waker(), + // require the first revision to be forced? + pending_changes: AtomicBool::new(false), + })), } } - /// The current revision of the runtime, or how many times `run_once` has - /// been invoked. + /// The current revision of the runtime, or how many runs occurred. pub fn revision(&self) -> Revision { - self.revision + self.rcs.read().revision } - /// Runs the root closure once with access to the runtime context, - /// increments the runtime's `Revision`, and drops any cached values - /// which were not marked alive. + /// Runs the root closure once with access to the runtime context and drops + /// any cached values which were not marked alive. `Revision` is + /// incremented at the start of a run. pub fn run_once(&mut self, op: impl FnOnce() -> Out) -> Out { - self.revision.0 += 1; + self.execute(op, self.rcs.write()) + } + + /// Runs the root closure once with access to the runtime context and drops + /// any cached values which were not marked alive. `Waker` is set for the + /// next `Revision`, which starts after the start of the run. + pub fn run_once_with(&mut self, op: impl FnOnce() -> Out, waker: Waker) -> Out { + let mut rcs_write = self.rcs.write(); + rcs_write.waker = waker; + self.execute(op, rcs_write) + } + + /// Forces the next `Revision` without any changes. + pub fn force(&self) { + self.rcs.read().pending_changes.store(true, std::sync::atomic::Ordering::Relaxed); + } + + /// If change occured durig the last `Revision` then calls `run_once` + /// else returns `Poll::Pending`. It is required to force your + /// first revision to register your state variables! + pub fn poll_once(&mut self, op: impl FnOnce() -> Out) -> Poll { + // Avoid write lock + let rcs_read = self.rcs.upgradable_read(); + if !rcs_read.pending_changes.load(std::sync::atomic::Ordering::Relaxed) { + Poll::Pending + } else { + let rcs_write = RwLockUpgradableReadGuard::upgrade(rcs_read); + Poll::Ready(self.execute(op, rcs_write)) + } + } + + /// If change occured durig the last `Revision` then calls `run_once_with` + /// else returns [`Poll::Pending`]. It is required to force your + /// first revision to register your state variables! + pub fn poll_once_with(&mut self, op: impl FnOnce() -> Out, waker: Waker) -> Poll { + let mut rcs_write = self.rcs.write(); + rcs_write.waker = waker; + if !rcs_write.pending_changes.load(std::sync::atomic::Ordering::Relaxed) { + Poll::Pending + } else { + Poll::Ready(self.execute(op, rcs_write)) + } + } + + fn execute( + &self, + op: impl FnOnce() -> Out, + mut rcs_write: RwLockWriteGuard, + ) -> Out { + rcs_write.revision.0 += 1; + rcs_write.pending_changes.store(false, std::sync::atomic::Ordering::Relaxed); + drop(rcs_write); let ret = self.context_handle().offer(|| topo::call(op)); @@ -161,12 +225,17 @@ impl Runtime { ret } - /// Sets the [`std::task::Waker`] which will be called when state variables - /// receive commits. By default the runtime no-ops on a state change, + /// Sets the [`Waker`] which will be called when state variables + /// receive commits or if current `Revision` already has any received + /// commits. By default the runtime no-ops on a state change, /// which is probably the desired behavior if the embedding system will /// call `Runtime::run_once` on a regular interval regardless. - pub fn set_state_change_waker(&mut self, wk: Waker) { - self.wk = wk; + pub fn set_state_change_waker(&mut self, waker: Waker) { + let mut rcs_write = self.rcs.write(); + rcs_write.waker = waker; + if rcs_write.pending_changes.load(std::sync::atomic::Ordering::Relaxed) { + rcs_write.waker.wake_by_ref(); + } } /// Sets the executor that will be used to spawn normal priority tasks. diff --git a/src/runtime/context.rs b/src/runtime/context.rs index f14a0e3eb..d53a70b98 100644 --- a/src/runtime/context.rs +++ b/src/runtime/context.rs @@ -1,28 +1,24 @@ -use super::{Revision, Spawner, Var}; +use super::{Revision, RevisionControlSystem, Spawner, Var}; use crate::{Commit, Key}; use dyn_cache::local::SharedLocalCache; use futures::future::abortable; -use std::{ - borrow::Borrow, - future::Future, - task::{Poll, Waker}, -}; +use parking_lot::RwLock; +use std::{borrow::Borrow, future::Future, sync::Arc, task::Poll}; /// A handle to the current [`Runtime`] which is offered via [`illicit`] /// contexts and provides access to the current revision, cache storage, /// task spawning, and the waker for the loop. #[derive(Debug)] pub(crate) struct Context { - revision: Revision, + rcs: Arc>, pub cache: SharedLocalCache, spawner: Spawner, - waker: Waker, } impl Context { /// Returns the revision for which this context was created. pub fn revision(&self) -> Revision { - self.revision + self.rcs.read().revision } /// Load a [`crate::state::Var`] with the provided argument and initializer. @@ -40,7 +36,7 @@ impl Context { { let var = self .cache - .cache(id, arg, |arg| Var::new(topo::CallId::current(), self.waker.clone(), init(arg))); + .cache(id, arg, |arg| Var::new(topo::CallId::current(), self.rcs.clone(), init(arg))); Var::root(var) } @@ -68,28 +64,31 @@ impl Context { Output: 'static, Ret: 'static, { - let (_, set_result): (_, Key>) = self.cache_state(id, &(), |()| Poll::Pending); - let mut set_result2 = set_result.clone(); - self.cache.hold(id, arg, |arg| { - // before we spawn the new task we need to mark it pending - set_result.force(Poll::Pending); + let var = self.cache.cache_with( + id, + arg, + |arg| { + // before we spawn the new task we need to mark it pending + let var = Var::new(topo::CallId::current(), self.rcs.clone(), Poll::Pending); - let (fut, aborter) = abortable(init(arg)); - let task = async move { - if let Ok(to_store) = fut.await { - set_result.update(|_| Some(Poll::Ready(to_store))); - } - }; - self.spawner - .0 - .spawn_local_obj(Box::pin(task).into()) - .expect("that set_task_executor has been called"); - scopeguard::guard(aborter, |a| a.abort()) - }); + let (fut, aborter) = abortable(init(arg)); - set_result2.refresh(); + let var2 = var.clone(); + let task = async move { + if let Ok(to_store) = fut.await { + var2.lock().enqueue_commit(Poll::Ready(to_store)); + } + }; + self.spawner + .0 + .spawn_local_obj(Box::pin(task).into()) + .expect("that set_task_executor has been called"); + (var, scopeguard::guard(aborter, |a| a.abort())) + }, + |(var, _)| var.clone(), + ); - match &*set_result2 { + match *Var::root(var).0 { Poll::Ready(ref stored) => Poll::Ready(with(stored)), Poll::Pending => Poll::Pending, } @@ -98,11 +97,6 @@ impl Context { impl super::Runtime { pub(crate) fn context_handle(&self) -> Context { - Context { - revision: self.revision, - spawner: self.spawner.clone(), - cache: self.cache.clone(), - waker: self.wk.clone(), - } + Context { rcs: self.rcs.clone(), spawner: self.spawner.clone(), cache: self.cache.clone() } } } diff --git a/src/runtime/runloop.rs b/src/runtime/runloop.rs index b8e648d2d..d4a24fa16 100644 --- a/src/runtime/runloop.rs +++ b/src/runtime/runloop.rs @@ -11,8 +11,8 @@ use std::{ /// A [`Runtime`] that is bound with a particular root function. /// /// If running in a context with an async executor, can be consumed as a -/// [`futures::Stream`] of [`crate::runtime::Revision`]s in order to provide -/// the [`super::Runtime`] with a [`std::task::Waker`]. +/// [`futures::Stream`] in order to provide +/// the [`Runtime`] with a [`Waker`]. pub struct RunLoop { inner: Runtime, root: Root, @@ -25,6 +25,9 @@ impl super::Runtime { where Root: FnMut() -> Out, { + // RunLoop always forces it's first revision? + // or maybe just check if current revision is 0 + self.force(); RunLoop { inner: self, root } } } @@ -35,7 +38,8 @@ where { /// Creates a new `Runtime` attached to the provided root function. pub fn new(root: Root) -> RunLoop { - RunLoop { root, inner: Runtime::new() } + // maybe only there force first revision + Runtime::new().looped(root) } /// Returns the runtime's current Revision. @@ -43,8 +47,9 @@ where self.inner.revision() } - /// Sets the [`std::task::Waker`] which will be called when state variables - /// change. + /// Sets the [`Waker`] which will be called when state variables + /// changes or if current `Revision` already has any state variables + /// changed. pub fn set_state_change_waker(&mut self, wk: Waker) { self.inner.set_state_change_waker(wk); } @@ -54,12 +59,38 @@ where self.inner.set_task_executor(sp); } - /// Run the root function once within this runtime's context, returning the - /// result. + /// Runs the root closure once with access to the runtime context, returning + /// the result. `Revision` is incremented at the start of a run. pub fn run_once(&mut self) -> Out { self.inner.run_once(&mut self.root) } + /// Runs the root closure once with access to the runtime context, returning + /// the result. `Waker` is set for the next `Revision`, which starts after + /// the start of the run. + pub fn run_once_with(&mut self, waker: Waker) -> Out { + self.inner.run_once_with(&mut self.root, waker) + } + + /// Forces the next `Revision` without any changes. + pub fn force(&self) { + self.inner.force() + } + + /// If change occured durig the last `Revision` then calls `run_once` + /// else returns `Poll::Pending`. Note that RunLoop always forces it's first + /// run (for now?) + pub fn poll_once(&mut self) -> Poll { + self.inner.poll_once(&mut self.root) + } + + /// If change occured durig the last `Revision` then calls `run_once_with` + /// else returns [`Poll::Pending`]. Note that RunLoop always forces it's + /// first run (for now?) + pub fn poll_once_with(&mut self, waker: Waker) -> Poll { + self.inner.poll_once_with(&mut self.root, waker) + } + /// Poll this runtime without exiting. Discards any value returned from the /// root function. The future yields in between revisions and is woken on /// state changes. @@ -79,14 +110,12 @@ impl Stream for RunLoop where Root: FnMut() -> Out + Unpin, { - type Item = (Revision, Out); + type Item = Out; - /// This `Stream` implementation runs a single revision for each call to - /// `poll_next`, always returning `Poll::Ready(Some(...))`. + /// This `Stream` implementation yields until state change occurred or + /// future fully [loads][crate::load]. fn poll_next(self: Pin<&mut Self>, cx: &mut FutContext<'_>) -> Poll> { let this = self.get_mut(); - this.inner.set_state_change_waker(cx.waker().clone()); - let out = this.run_once(); - Poll::Ready(Some((this.inner.revision, out))) + this.poll_once_with(cx.waker().clone()).map(Some) } } diff --git a/src/runtime/var.rs b/src/runtime/var.rs index e9334464b..418efab0c 100644 --- a/src/runtime/var.rs +++ b/src/runtime/var.rs @@ -1,20 +1,28 @@ use crate::{Commit, Key}; -use parking_lot::Mutex; -use std::{sync::Arc, task::Waker}; +use parking_lot::{Mutex, RwLock}; +use std::sync::Arc; + +use super::{Revision, RevisionControlSystem}; /// The underlying container of state variables. Vends copies of the latest /// [`Commit`] for [`Key`]s. pub(crate) struct Var { current: Commit, id: topo::CallId, - pending: Option>, - waker: Waker, + // can only contain commit from current revision + // make proper checks inside methods! + pending: Option<(Revision, Commit)>, + rcs: Arc>, } impl Var { - pub fn new(id: topo::CallId, waker: Waker, inner: State) -> Arc> { + pub fn new( + id: topo::CallId, + rcs: Arc>, + inner: State, + ) -> Arc> { let current = Commit { id, inner: Arc::new(inner) }; - Arc::new(Mutex::new(Var { id, current, waker, pending: None })) + Arc::new(Mutex::new(Var { id, current, rcs, pending: None })) } /// Attach this `Var` to its callsite, performing any pending commit and @@ -22,25 +30,54 @@ impl Var { pub fn root(var: Arc>) -> (Commit, Key) { let (id, commit_at_root) = { let mut var = var.lock(); - if let Some(pending) = var.pending.take() { - var.current = pending; + // This function is always called within it's context + let current = Revision::current(); + + // Replace current commit with pending commit if it is from the past revision + match var.pending.take() { + Some((revision, commit)) if revision < current => var.current = commit, + still_pending => var.pending = still_pending, } + (var.id, var.current.clone()) }; - (commit_at_root.clone(), Key { id, commit_at_root, var }) + (commit_at_root, Key { id, var }) } /// Returns a reference to the latest value, pending or committed. pub fn latest(&self) -> &State { - &self.pending.as_ref().unwrap_or(&self.current) + self.pending.as_ref().map(|(_r, c)| c).unwrap_or(&self.current) + } + + /// Returns a reference to the current commit. + pub fn current_commit(&mut self) -> &Commit { + let current = self.rcs.read().revision; + + // Replace current commit with pending commit if it is from the past revision + match self.pending.take() { + Some((revision, commit)) if revision < current => self.current = commit, + still_pending => self.pending = still_pending, + } + + &self.current } /// Initiate a commit to the state variable. The commit will actually /// complete asynchronously when the state variable is next rooted in a /// topological function, flushing the pending commit. pub fn enqueue_commit(&mut self, state: State) { - self.pending = Some(Commit { inner: Arc::new(state), id: self.id }); - self.waker.wake_by_ref(); + let new_commit = Commit { inner: Arc::new(state), id: self.id }; + let rcs_read = self.rcs.read(); + let current = rcs_read.revision; + + // Replace current commit with pending commit if it is from the past revision + match self.pending.replace((current, new_commit)) { + Some((revision, old_commit)) if revision < current => self.current = old_commit, + _ => (), + } + + rcs_read.pending_changes.store(true, std::sync::atomic::Ordering::Relaxed); + rcs_read.waker.wake_by_ref(); } }