diff --git a/Cargo.toml b/Cargo.toml index db55060c96b3..1fe854c38216 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ name = "rtic" [dependencies] cortex-m = "0.7.0" cortex-m-rtic-macros = { path = "macros", version = "0.6.0-alpha.5" } +rtic-actor-traits = { path = "actor-traits" } rtic-monotonic = "0.1.0-alpha.2" rtic-core = "0.3.1" heapless = "0.7.7" @@ -42,13 +43,18 @@ version = "0.5.2" [target.x86_64-unknown-linux-gnu.dev-dependencies] trybuild = "1" +[features] +memory-watermark = ["cortex-m-rtic-macros/memory-watermark"] + [profile.release] codegen-units = 1 lto = true [workspace] members = [ + "actor-traits", "macros", + "post-spy", "xtask", ] diff --git a/actor-traits/Cargo.toml b/actor-traits/Cargo.toml new file mode 100644 index 000000000000..9c4ae8172d4f --- /dev/null +++ b/actor-traits/Cargo.toml @@ -0,0 +1,6 @@ +[package] +edition = "2018" +name = "rtic-actor-traits" +version = "0.1.0" + +[dependencies] diff --git a/actor-traits/src/lib.rs b/actor-traits/src/lib.rs new file mode 100644 index 000000000000..5384f94fb348 --- /dev/null +++ b/actor-traits/src/lib.rs @@ -0,0 +1,9 @@ +#![no_std] + +pub trait Post { + fn post(&mut self, message: M) -> Result<(), M>; +} + +pub trait Receive { + fn receive(&mut self, message: M); +} diff --git a/examples/actor-capacity.rs b/examples/actor-capacity.rs new file mode 100644 index 000000000000..7d96746008be --- /dev/null +++ b/examples/actor-capacity.rs @@ -0,0 +1,60 @@ +#![no_main] +#![no_std] + +use panic_semihosting as _; + +#[rtic::app(device = lm3s6965, dispatchers = [GPIOA])] +mod app { + use core::sync::atomic::{AtomicU8, Ordering}; + + use cortex_m_semihosting::{debug, hprintln}; + use rtic_actor_traits::Receive; + + struct Actor; + + struct Message; + + static CALL_COUNT: AtomicU8 = AtomicU8::new(0); + + impl Receive for Actor { + fn receive(&mut self, _: Message) { + hprintln!("Actor::receive was called").ok(); + CALL_COUNT.store(CALL_COUNT.load(Ordering::Relaxed) + 1, Ordering::Relaxed); + } + } + + #[actors] + struct Actors { + #[subscribe(Message, capacity = 2)] + actor: Actor, + } + + #[init] + fn init(mut cx: init::Context) -> (Shared, Local, init::Monotonics, Actors) { + assert!(cx.poster.post(Message).is_ok()); + assert!(cx.poster.post(Message).is_ok()); + assert!(cx.poster.post(Message).is_err()); + + ( + Shared {}, + Local {}, + init::Monotonics(), + Actors { actor: Actor }, + ) + } + + #[idle] + fn idle(_: idle::Context) -> ! { + assert_eq!(2, CALL_COUNT.load(Ordering::Relaxed)); + + loop { + debug::exit(debug::EXIT_SUCCESS); + } + } + + #[shared] + struct Shared {} + + #[local] + struct Local {} +} diff --git a/examples/actor-init.rs b/examples/actor-init.rs new file mode 100644 index 000000000000..ecadef5b35a0 --- /dev/null +++ b/examples/actor-init.rs @@ -0,0 +1,47 @@ +#![no_main] +#![no_std] + +use panic_semihosting as _; + +#[rtic::app(device = lm3s6965, dispatchers = [GPIOA])] +mod app { + use cortex_m_semihosting::{debug, hprintln}; + use rtic_actor_traits::Receive; + + #[derive(Debug)] + struct Actor { + state: i32, + } + + struct AssertActorWasInitialized; + + const INITIAL_STATE: i32 = 42; + + impl Receive for Actor { + fn receive(&mut self, _: AssertActorWasInitialized) { + assert_eq!(INITIAL_STATE, self.state); + hprintln!("OK").ok(); + debug::exit(debug::EXIT_SUCCESS); + } + } + + #[actors] + struct Actors { + #[subscribe(AssertActorWasInitialized)] + #[init(Actor { state: INITIAL_STATE })] + actor: Actor, + } + + #[init] + fn init(mut cx: init::Context) -> (Shared, Local, init::Monotonics, Actors) { + cx.poster.post(AssertActorWasInitialized).ok(); + + (Shared {}, Local {}, init::Monotonics(), Actors {}) + } + + #[shared] + struct Shared {} + + #[local] + struct Local {} +} diff --git a/examples/actor-post.rs b/examples/actor-post.rs new file mode 100644 index 000000000000..dbc5c4521bc1 --- /dev/null +++ b/examples/actor-post.rs @@ -0,0 +1,69 @@ +#![no_main] +#![no_std] + +use panic_semihosting as _; + +#[rtic::app(device = lm3s6965, dispatchers = [GPIOA])] +mod app { + use core::sync::atomic::{AtomicBool, Ordering}; + + use cortex_m_semihosting::{debug, hprintln}; + use rtic_actor_traits::Receive; + + struct Actor; + + const PAYLOAD: i32 = 42; + + struct Message { + payload: i32, + } + + static RECEIVE_WAS_CALLED: AtomicBool = AtomicBool::new(false); + + impl Receive for Actor { + fn receive(&mut self, m: Message) { + hprintln!("Actor::receive was called").ok(); + + RECEIVE_WAS_CALLED.store(true, Ordering::Relaxed); + + assert_eq!(PAYLOAD, m.payload); + } + } + + #[actors] + struct Actors { + #[subscribe(Message)] + actor: Actor, + } + + #[init] + fn init(mut cx: init::Context) -> (Shared, Local, init::Monotonics, Actors) { + cx.poster.post(Message { payload: PAYLOAD }).ok(); + + // receive invocation withheld + assert!(!RECEIVE_WAS_CALLED.load(Ordering::Relaxed)); + + ( + Shared {}, + Local {}, + init::Monotonics(), + Actors { actor: Actor }, + ) + } + + #[idle] + fn idle(_: idle::Context) -> ! { + // receive invocation must have executed by now + assert!(RECEIVE_WAS_CALLED.load(Ordering::Relaxed)); + + loop { + debug::exit(debug::EXIT_SUCCESS); + } + } + + #[shared] + struct Shared {} + + #[local] + struct Local {} +} diff --git a/examples/actor-publish-failure.rs b/examples/actor-publish-failure.rs new file mode 100644 index 000000000000..9a8444b769dd --- /dev/null +++ b/examples/actor-publish-failure.rs @@ -0,0 +1,87 @@ +#![no_main] +#![no_std] + +use panic_semihosting as _; + +#[rtic::app(device = lm3s6965, dispatchers = [GPIOA, GPIOB])] +mod app { + use core::sync::atomic::{AtomicBool, Ordering}; + + use cortex_m_semihosting::{debug, hprintln}; + use rtic_actor_traits::Receive; + + struct A; + struct B; + + #[derive(Default)] + struct M { + must_not_be_cloned: bool, + } + + impl Clone for M { + fn clone(&self) -> Self { + assert!(!self.must_not_be_cloned); + M { + must_not_be_cloned: self.must_not_be_cloned, + } + } + } + + impl Receive for A { + fn receive(&mut self, _: M) { + static WAS_CALLED_EXACTLY_ONCE: AtomicBool = AtomicBool::new(false); + hprintln!("A::receive was called").ok(); + assert!(!WAS_CALLED_EXACTLY_ONCE.load(Ordering::Relaxed)); + WAS_CALLED_EXACTLY_ONCE.store(true, Ordering::Relaxed); + } + } + + impl Receive for B { + fn receive(&mut self, _: M) { + static WAS_CALLED_EXACTLY_ONCE: AtomicBool = AtomicBool::new(false); + hprintln!("B::receive was called").ok(); + assert!(!WAS_CALLED_EXACTLY_ONCE.load(Ordering::Relaxed)); + WAS_CALLED_EXACTLY_ONCE.store(true, Ordering::Relaxed); + } + } + + #[actors] + struct Actors { + #[subscribe(M, capacity = 2)] + #[init(A)] + a: A, + + #[subscribe(M, capacity = 1)] + #[init(B)] + b: B, + } + + #[init] + fn init(cx: init::Context) -> (Shared, Local, init::Monotonics, Actors) { + let mut poster = cx.poster; + assert!(poster.post(M::default()).is_ok()); + + // B's message queue is full so message must NOT be cloned + // this must also NOT trigger task A even if it has capacity + assert!(poster + .post(M { + must_not_be_cloned: true + }) + .is_err()); + + (Shared {}, Local {}, init::Monotonics(), Actors {}) + } + + #[idle] + fn idle(_: idle::Context) -> ! { + loop { + debug::exit(debug::EXIT_SUCCESS) + } + } + + #[local] + struct Local {} + + #[shared] + struct Shared {} +} diff --git a/examples/actor-publish.rs b/examples/actor-publish.rs new file mode 100644 index 000000000000..2fe9eb547f19 --- /dev/null +++ b/examples/actor-publish.rs @@ -0,0 +1,99 @@ +#![no_main] +#![no_std] + +use panic_semihosting as _; + +#[rtic::app(device = lm3s6965, dispatchers = [GPIOA, GPIOB])] +mod app { + use core::sync::atomic::{AtomicBool, Ordering}; + + use cortex_m_semihosting::{debug, hprintln}; + use rtic_actor_traits::Receive; + + struct A; + struct B; + + const PAYLOAD: i32 = 42; + + static CLONE_WAS_CALLED_EXACTLY_ONCE: AtomicBool = AtomicBool::new(false); + + struct M { + payload: i32, + } + + impl Clone for M { + fn clone(&self) -> Self { + assert!(!CLONE_WAS_CALLED_EXACTLY_ONCE.load(Ordering::Relaxed)); + CLONE_WAS_CALLED_EXACTLY_ONCE.store(true, Ordering::Relaxed); + + // `derive(Clone)` implementation + Self { + payload: self.payload.clone(), + } + } + } + + static A_RECEIVE_WAS_CALLED: AtomicBool = AtomicBool::new(false); + + impl Receive for A { + fn receive(&mut self, m: M) { + hprintln!("A::receive was called").ok(); + + assert_eq!(PAYLOAD, m.payload); + A_RECEIVE_WAS_CALLED.store(true, Ordering::Relaxed); + } + } + + static B_RECEIVE_WAS_CALLED: AtomicBool = AtomicBool::new(false); + + impl Receive for B { + fn receive(&mut self, m: M) { + hprintln!("B::receive was called").ok(); + + assert_eq!(PAYLOAD, m.payload); + B_RECEIVE_WAS_CALLED.store(true, Ordering::Relaxed); + } + } + + #[actors] + struct Actors { + #[subscribe(M, capacity = 2)] + #[init(A)] + a: A, + + #[subscribe(M, capacity = 1)] + #[init(B)] + b: B, + } + + #[init] + fn init(cx: init::Context) -> (Shared, Local, init::Monotonics, Actors) { + let mut poster = cx.poster; + assert!(poster.post(M { payload: PAYLOAD }).is_ok()); + + assert!(CLONE_WAS_CALLED_EXACTLY_ONCE.load(Ordering::Relaxed)); + + // receive invocations withheld + assert!(!A_RECEIVE_WAS_CALLED.load(Ordering::Relaxed)); + assert!(!B_RECEIVE_WAS_CALLED.load(Ordering::Relaxed)); + + (Shared {}, Local {}, init::Monotonics(), Actors {}) + } + + #[idle] + fn idle(_: idle::Context) -> ! { + // receive invocations must have executed by now + assert!(A_RECEIVE_WAS_CALLED.load(Ordering::Relaxed)); + assert!(B_RECEIVE_WAS_CALLED.load(Ordering::Relaxed)); + + loop { + debug::exit(debug::EXIT_SUCCESS) + } + } + + #[local] + struct Local {} + + #[shared] + struct Shared {} +} diff --git a/examples/actor-watermark.rs b/examples/actor-watermark.rs new file mode 100644 index 000000000000..0d3dc9e34485 --- /dev/null +++ b/examples/actor-watermark.rs @@ -0,0 +1,68 @@ +// This example depends on the `memory-watermark` feature +#![no_main] +#![no_std] + +use panic_semihosting as _; + +#[rtic::app(device = lm3s6965, dispatchers = [GPIOA])] +mod app { + use cortex_m_semihosting::{debug, hprintln}; + use rtic_actor_traits::Receive; + + struct Actor; + + struct Message; + + impl Receive for Actor { + fn receive(&mut self, _: Message) { + hprintln!("Actor::receive was called").ok(); + } + } + + #[actors] + struct Actors { + #[subscribe(Message, capacity = 2)] + actor: Actor, + } + + #[init] + fn init(mut cx: init::Context) -> (Shared, Local, init::Monotonics, Actors) { + assert_eq!(0, actor::SUBSCRIPTIONS[0].watermark()); + assert_eq!("Message", actor::SUBSCRIPTIONS[0].message_type); + assert_eq!(2, actor::SUBSCRIPTIONS[0].capacity); + + assert!(cx.poster.post(Message).is_ok()); + assert_eq!(1, actor::SUBSCRIPTIONS[0].watermark()); + + // monotonically increasing + assert!(cx.poster.post(Message).is_ok()); + assert_eq!(2, actor::SUBSCRIPTIONS[0].watermark()); + + // bounded value + assert!(cx.poster.post(Message).is_err()); + assert_eq!(2, actor::SUBSCRIPTIONS[0].watermark()); + + ( + Shared {}, + Local {}, + init::Monotonics(), + Actors { actor: Actor }, + ) + } + + #[idle] + fn idle(_: idle::Context) -> ! { + // monotonically increasing: does not decrease after receive is called + assert_eq!(2, actor::SUBSCRIPTIONS[0].watermark()); + + loop { + debug::exit(debug::EXIT_SUCCESS); + } + } + + #[shared] + struct Shared {} + + #[local] + struct Local {} +} diff --git a/macros/Cargo.toml b/macros/Cargo.toml index 56bfd5cdc224..454b68eabb0a 100644 --- a/macros/Cargo.toml +++ b/macros/Cargo.toml @@ -22,4 +22,11 @@ proc-macro2 = "1" proc-macro-error = "1" quote = "1" syn = "1" -rtic-syntax = "0.5.0-alpha.4" +indexmap = "1.0.2" + +[dependencies.rtic-syntax] +branch = "actor" +git = "https://github.com/rtic-rs/rtic-syntax" + +[features] +memory-watermark = [] diff --git a/macros/src/analyze.rs b/macros/src/analyze.rs index 6b2613887934..72df14d5aa6b 100644 --- a/macros/src/analyze.rs +++ b/macros/src/analyze.rs @@ -29,6 +29,7 @@ pub fn app(analysis: P, app: &App) -> P { .software_tasks .values() .map(|task| task.args.priority) + .chain(app.actors.values().map(|actor| actor.priority)) .collect::>(); // map from priorities to interrupts (holding name and attributes) diff --git a/macros/src/check.rs b/macros/src/check.rs index 374fcedd0960..d3f3bb862e55 100644 --- a/macros/src/check.rs +++ b/macros/src/check.rs @@ -1,4 +1,4 @@ -use std::collections::HashSet; +use std::{cell::Cell, collections::HashSet}; use proc_macro2::Span; use rtic_syntax::{analyze::Analysis, ast::App}; @@ -30,14 +30,18 @@ pub fn app(app: &App, _analysis: &Analysis) -> parse::Result { // Check that there are enough external interrupts to dispatch the software tasks and the timer // queue handler - let mut first = None; + let first = Cell::new(None); let priorities = app .software_tasks .iter() .map(|(name, task)| { - first = Some(name); + first.set(Some(name)); task.args.priority }) + .chain(app.actors.iter().map(|(name, ao)| { + first.set(Some(name)); + ao.priority + })) .collect::>(); let need = priorities.len(); @@ -46,14 +50,14 @@ pub fn app(app: &App, _analysis: &Analysis) -> parse::Result { let s = { format!( "not enough interrupts to dispatch \ - all software tasks (need: {}; given: {})", + all software tasks / actors (need: {}; given: {})", need, given ) }; - // If not enough tasks and first still is None, may cause + // If not enough interrupts and `first` still is None, may cause // "custom attribute panicked" due to unwrap on None - return Err(parse::Error::new(first.unwrap().span(), &s)); + return Err(parse::Error::new(first.get().unwrap().span(), &s)); } // Check that all exceptions are valid; only exceptions with configurable priorities are diff --git a/macros/src/codegen.rs b/macros/src/codegen.rs index 63a4e3cadc58..0162f94ed518 100644 --- a/macros/src/codegen.rs +++ b/macros/src/codegen.rs @@ -4,6 +4,7 @@ use rtic_syntax::ast::App; use crate::{analyze::Analysis, check::Extra}; +mod actors; mod assertions; mod dispatchers; mod hardware_tasks; @@ -83,6 +84,8 @@ pub fn app(app: &App, analysis: &Analysis, extra: &Extra) -> TokenStream2 { } )); + let mod_app_actors = actors::codegen(app, analysis, extra); + let (mod_app_shared_resources, mod_shared_resources) = shared_resources::codegen(app, analysis, extra); let (mod_app_local_resources, mod_local_resources) = @@ -210,6 +213,8 @@ pub fn app(app: &App, analysis: &Analysis, extra: &Extra) -> TokenStream2 { #(#mod_app_timer_queue)* + #mod_app_actors + #(#mains)* } ) diff --git a/macros/src/codegen/actors.rs b/macros/src/codegen/actors.rs new file mode 100644 index 000000000000..0372817002a5 --- /dev/null +++ b/macros/src/codegen/actors.rs @@ -0,0 +1,299 @@ +use indexmap::IndexMap; +use proc_macro2::TokenStream as TokenStream2; +use quote::{format_ident, quote}; +use rtic_syntax::ast::App; + +use crate::{analyze::Analysis, check::Extra, codegen::util}; + +pub fn codegen(app: &App, analysis: &Analysis, extra: &Extra) -> TokenStream2 { + // Generate a `Poster` type, and `Post` implementations for every message that a task has + // subscribed to. + + let mut map: IndexMap<_, Vec<_>> = IndexMap::new(); + for (name, obj) in &app.actors { + for (subscription_index, subscription) in obj.subscriptions.iter().enumerate() { + map.entry(subscription.ty.clone()) + .or_default() + .push((name, subscription_index)); + } + } + + let post_impls = map.iter().map(|(message_ty, pairs)| { + let last_index = pairs.len() - 1; + let any_is_full = pairs + .iter() + .map(|(actor_name, subscription_index)| { + let post_name = util::actor_post(actor_name, *subscription_index); + + quote!(#post_name::is_full()) + }) + .collect::>(); + let posts = pairs + .iter() + .enumerate() + .map(|(i, (actor_name, subscription_index))| { + let post_name = util::actor_post(actor_name, *subscription_index); + + if i == last_index { + // avoid Clone on last message + quote!(#post_name(message)?;) + } else { + quote!(#post_name(message.clone())?;) + } + }) + .collect::>(); + + quote! { + impl rtic::export::Post<#message_ty> for Poster { + fn post(&mut self, message: #message_ty) -> Result<(), #message_ty> { + // TODO(micro-optimization) do the `clone`-ing *outside* the critical section + // Atomically posts all messages + rtic::export::interrupt::free(|_| unsafe { + if false #(|| #any_is_full)* { + return Err(message) + } + #(#posts)* + Ok(()) + })?; + Ok(()) + } + } + } + }); + + // Actor receive "task" functions + let mut task_functions = vec![]; + for (name, actor) in &app.actors { + let actor_ty = &actor.ty; + for (subscription_index, subscription) in actor.subscriptions.iter().enumerate() { + let function_name = &util::internal_actor_receive_task(name, subscription_index); + let actor_state = util::actor_state_ident(name); + let input_ty = &subscription.ty; + let refmut = if actor.init.is_none() { + quote!(&mut *#actor_state.get_mut_unchecked().as_mut_ptr()) + } else { + quote!(#actor_state.get_mut_unchecked()) + }; + + task_functions.push(quote!( + fn #function_name(message: #input_ty) { + // NOTE(safety) all the Receive methods of an actor instance run at the same + // priority so no lock required + unsafe { + <#actor_ty as rtic::export::Receive<#input_ty>>::receive( + #refmut, + message, + ) + } + } + )); + } + } + + // "Spawn" infrastructure + let mut spawn_infra = vec![]; + for (actor_name, actor) in &app.actors { + for (subscription_index, subscription) in actor.subscriptions.iter().enumerate() { + let capacity = subscription.capacity; + let message_ty = &subscription.ty; + + let cap_lit = util::capacity_literal(capacity as usize); + let cap_lit_p1 = util::capacity_literal(capacity as usize + 1); + let pseudo_task_name = util::actor_receive_task(actor_name, subscription_index); + let inputs_ident = util::inputs_ident(&pseudo_task_name); + let elems = (0..capacity).map(|_| quote!(core::mem::MaybeUninit::uninit())); + + let uninit_section = util::link_section_uninit(); + spawn_infra.push(quote!( + #uninit_section + // /// Buffer that holds the inputs of a task + #[doc(hidden)] + static #inputs_ident: rtic::RacyCell<[core::mem::MaybeUninit<#message_ty>; #cap_lit]> = + rtic::RacyCell::new([#(#elems,)*]); + )); + + let fq_ident = util::fq_ident(&pseudo_task_name); + + let fq_ty = quote!(rtic::export::SCFQ<#cap_lit_p1>); + let fq_expr = quote!(rtic::export::Queue::new()); + + spawn_infra.push(quote!( + // /// Queue version of a free-list that keeps track of empty slots in + // /// the following buffers + #[doc(hidden)] + static #fq_ident: rtic::RacyCell<#fq_ty> = rtic::RacyCell::new(#fq_expr); + )); + + let priority = actor.priority; + let t = util::spawn_t_ident(priority); + let device = &extra.device; + let enum_ = util::interrupt_ident(); + let interrupt = &analysis + .interrupts + .get(&priority) + .expect("RTIC-ICE: interrupt identifer not found") + .0; + + let call_update_watermark = if cfg!(feature = "memory-watermark") { + let update_watermark = util::update_watermark(subscription_index); + quote!( + #actor_name::#update_watermark(#fq_ident.get_unchecked().len()); + ) + } else { + quote!() + }; + + let rq = util::rq_ident(priority); + let dequeue = quote!(#fq_ident.get_mut_unchecked().dequeue()); + let post_name = util::actor_post(actor_name, subscription_index); + spawn_infra.push(quote!( + /// Safety: needs to be wrapped in a critical section + unsafe fn #post_name(message: #message_ty) -> Result<(), #message_ty> { + unsafe { + if let Some(index) = #dequeue { + #call_update_watermark + #inputs_ident + .get_mut_unchecked() + .get_unchecked_mut(usize::from(index)) + .as_mut_ptr() + .write(message); + + #rq.get_mut_unchecked().enqueue_unchecked((#t::#pseudo_task_name, index)); + + rtic::pend(#device::#enum_::#interrupt); + + Ok(()) + } else { + Err(message) + } + } + } + + mod #post_name { + /// Safety: needs to be wrapped in a critical section + pub unsafe fn is_full() -> bool { + // this is the queue version of a "free list" when it's empty the message + // queue of the task is full (= no more messages can be posted) + super::#fq_ident.get_unchecked().len() == 0 + } + } + )); + } + } + + // watermark API + let watermark_api = if cfg!(feature = "memory-watermark") { + watermark_api(app) + } else { + quote!() + }; + + quote! { + // Make `Post` methods available in the app module. + use rtic::export::Post as _; + + #[derive(Clone, Copy)] + pub struct Poster; + + #(#post_impls)* + + #(#task_functions)* + + #(#spawn_infra)* + + #watermark_api + } +} + +fn watermark_api(app: &App) -> TokenStream2 { + let mut actor_mods = vec![]; + for (actor_name, actor) in &app.actors { + if actor.subscriptions.is_empty() { + // skip disconnected actors + continue; + } + + let mut mod_items = vec![]; + let mut subscriptions_elements = vec![]; + for (subscription_index, subscription) in actor.subscriptions.iter().enumerate() { + let capacity = util::capacity_literal(subscription.capacity.into()); + + let counter = format_ident!("COUNTER{}", subscription_index); + let update_watermark = util::update_watermark(subscription_index); + let watermark = util::watermark(subscription_index); + mod_items.push(quote!( + static #counter: AtomicUsize = AtomicUsize::new(0); + + pub fn #update_watermark(fq_len: usize) { + let new_usage = #capacity - fq_len; + if new_usage > #counter.load(Ordering::Relaxed) { + #counter.store(new_usage, Ordering::Relaxed) + } + } + + pub fn #watermark() -> usize { + #counter.load(Ordering::Relaxed) + } + )); + + let ty = &subscription.ty; + let message_type = quote!(#ty).to_string(); + subscriptions_elements.push(quote!( + Subscription { + capacity: #capacity, + message_type: #message_type, + watermark: #watermark, + } + )); + } + + actor_mods.push(quote!( + pub mod #actor_name { + use core::sync::atomic::{AtomicUsize, Ordering}; + + use super::Subscription; + + pub static SUBSCRIPTIONS: &[Subscription] = + &[#(#subscriptions_elements),*]; + + #(#mod_items)* + } + )) + } + + if actor_mods.is_empty() { + // all actors are disconnected + return quote!(); + } + + // NOTE this API could live in a crate like `rtic-core` + let subscription_api = quote!( + pub struct Subscription { + pub capacity: usize, + pub message_type: &'static str, + watermark: fn() -> usize, + } + + impl Subscription { + pub fn watermark(&self) -> usize { + (self.watermark)() + } + } + + impl core::fmt::Debug for Subscription { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("Subscription") + .field("capacity", &self.capacity) + .field("message_type", &self.message_type) + .field("watermark", &self.watermark()) + .finish() + } + } + ); + + quote!( + #subscription_api + + #(#actor_mods)* + ) +} diff --git a/macros/src/codegen/dispatchers.rs b/macros/src/codegen/dispatchers.rs index 57103acd0b5b..dbb43918a376 100644 --- a/macros/src/codegen/dispatchers.rs +++ b/macros/src/codegen/dispatchers.rs @@ -1,6 +1,6 @@ use proc_macro2::TokenStream as TokenStream2; use quote::quote; -use rtic_syntax::ast::App; +use rtic_syntax::{analyze::Spawnee, ast::App}; use crate::{analyze::Analysis, check::Extra, codegen::util}; @@ -14,15 +14,25 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec { + let cfgs = &app.software_tasks[name].cfgs; - quote!( - #(#cfgs)* - #name - ) + quote!( + #(#cfgs)* + #name + ) + } + + Spawnee::Actor { + name, + subscription_index, + } => { + let task_name = util::actor_receive_task(name, *subscription_index); + quote!(#task_name) + } }) .collect::>(); @@ -65,32 +75,58 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec { - let #tupled = - #inputs + .map(|spawnee| match spawnee { + Spawnee::Task { name } => { + let task = &app.software_tasks[name]; + let cfgs = &task.cfgs; + let fq = util::fq_ident(name); + let inputs = util::inputs_ident(name); + let (_, tupled, pats, _) = util::regroup_inputs(&task.inputs); + + quote!( + #(#cfgs)* + #t::#name => { + let #tupled = + #inputs .get_unchecked() - .get_unchecked(usize::from(index)) - .as_ptr() - .read(); - #fq.get_mut_unchecked().split().0.enqueue_unchecked(index); - let priority = &rtic::export::Priority::new(PRIORITY); - #name( - #name::Context::new(priority) - #(,#pats)* - ) - } - ) + .get_unchecked(usize::from(index)) + .as_ptr() + .read(); + #fq.get_mut_unchecked().split().0.enqueue_unchecked(index); + let priority = &rtic::export::Priority::new(PRIORITY); + #name( + #name::Context::new(priority) + #(,#pats)* + ) + } + ) + } + + Spawnee::Actor { + name: actor_name, + subscription_index, + } => { + let task_name = util::actor_receive_task(actor_name, *subscription_index); + let fq = util::fq_ident(&task_name); + let inputs = util::inputs_ident(&task_name); + let function_name = + util::internal_actor_receive_task(actor_name, *subscription_index); + + quote!( + #t::#task_name => { + let input = + #inputs + .get_unchecked() + .get_unchecked(usize::from(index)) + .as_ptr() + .read(); + #fq.get_mut_unchecked().split().0.enqueue_unchecked(index); + #function_name(input) + } + ) + } }) .collect::>(); diff --git a/macros/src/codegen/init.rs b/macros/src/codegen/init.rs index 2de3e73481c1..e6a1e6af545b 100644 --- a/macros/src/codegen/init.rs +++ b/macros/src/codegen/init.rs @@ -61,6 +61,28 @@ pub fn codegen(app: &App, analysis: &Analysis, extra: &Extra) -> CodegenResult { ) }) .collect(); + let actors_struct = if let Some(actors) = &init.user_actors_struct { + let fields = app + .actors + .iter() + .filter_map(|(name, ao)| { + if ao.init.is_none() { + let ty = &ao.ty; + Some(quote!(#name: #ty,)) + } else { + None + } + }) + .collect::>(); + + quote!( + struct #actors { + #(#fields)* + } + ) + } else { + quote!() + }; root_init.push(quote! { struct #shared { #(#shared_resources)* @@ -69,11 +91,17 @@ pub fn codegen(app: &App, analysis: &Analysis, extra: &Extra) -> CodegenResult { struct #local { #(#local_resources)* } + + #actors_struct }); // let locals_pat = locals_pat.iter(); - let user_init_return = quote! {#shared, #local, #name::Monotonics}; + let user_init_return = if let Some(actors) = &init.user_actors_struct { + quote! {#shared, #local, #name::Monotonics, #actors} + } else { + quote! {#shared, #local, #name::Monotonics} + }; let user_init = quote!( #(#attrs)* @@ -97,8 +125,13 @@ pub fn codegen(app: &App, analysis: &Analysis, extra: &Extra) -> CodegenResult { } // let locals_new = locals_new.iter(); + let let_pat = if init.user_actors_struct.is_some() { + quote!((shared_resources, local_resources, mut monotonics, actors)) + } else { + quote!((shared_resources, local_resources, mut monotonics)) + }; let call_init = quote! { - let (shared_resources, local_resources, mut monotonics) = #name(#name::Context::new(core.into())); + let #let_pat = #name(#name::Context::new(core.into())); }; root_init.push(module::codegen( diff --git a/macros/src/codegen/local_resources.rs b/macros/src/codegen/local_resources.rs index ff534862499d..e02f89a01d32 100644 --- a/macros/src/codegen/local_resources.rs +++ b/macros/src/codegen/local_resources.rs @@ -66,5 +66,30 @@ pub fn codegen( )); } + // Actor states + for (actor_name, actor) in &app.actors { + let mangled_name = util::actor_state_ident(actor_name); + let ty = &actor.ty; + + let item = if let Some(init) = &actor.init { + quote!( + #[allow(non_upper_case_globals)] + #[doc(hidden)] + static #mangled_name: rtic::RacyCell<#ty> = rtic::RacyCell::new(#init); + ) + } else { + let uninit_section = util::link_section_uninit(); + + quote!( + #[allow(non_upper_case_globals)] + #[doc(hidden)] + #uninit_section + static #mangled_name: rtic::RacyCell> = rtic::RacyCell::new(core::mem::MaybeUninit::uninit()); + ) + }; + + mod_app.push(item); + } + (mod_app, TokenStream2::new()) } diff --git a/macros/src/codegen/module.rs b/macros/src/codegen/module.rs index 6011c9bc0429..7d1427cca90e 100644 --- a/macros/src/codegen/module.rs +++ b/macros/src/codegen/module.rs @@ -45,9 +45,13 @@ pub fn codegen( pub cs: rtic::export::CriticalSection<#lt> )); + fields.push(quote!(poster: Poster)); + values.push(quote!(cs: rtic::export::CriticalSection::new())); values.push(quote!(core)); + + values.push(quote!(poster: Poster)); } Context::Idle => {} @@ -223,6 +227,32 @@ pub fn codegen( let internal_spawn_ident = util::internal_task_ident(name, "spawn"); + let dequeue = if cfg!(feature = "memory-watermark") { + let update_watermark = util::mark_internal_name("update_watermark"); + let capacity = spawnee.args.capacity; + module_items.push(quote!( + static WATERMARK: core::sync::atomic::AtomicU8 = core::sync::atomic::AtomicU8::new(0); + pub fn #update_watermark(fq_len: u8) { + let new_usage = #capacity - fq_len; + if new_usage > WATERMARK.load(core::sync::atomic::Ordering::Relaxed) { + WATERMARK.store(new_usage, core::sync::atomic::Ordering::Relaxed) + } + } + + pub fn max_buffer_usage() -> u8 { + WATERMARK.load(core::sync::atomic::Ordering::Relaxed) + } + )); + + quote!({ + let index = #fq.get_mut_unchecked().dequeue(); + #name::#update_watermark(#fq.get_unchecked().len()); + index + }) + } else { + quote!(#fq.get_mut_unchecked().dequeue()) + }; + // Spawn caller items.push(quote!( @@ -232,7 +262,7 @@ pub fn codegen( let input = #tupled; unsafe { - if let Some(index) = rtic::export::interrupt::free(|_| #fq.get_mut_unchecked().dequeue()) { + if let Some(index) = rtic::export::interrupt::free(|_| #dequeue) { #inputs .get_mut_unchecked() .get_unchecked_mut(usize::from(index)) diff --git a/macros/src/codegen/post_init.rs b/macros/src/codegen/post_init.rs index 5624b20a7259..6190f760786a 100644 --- a/macros/src/codegen/post_init.rs +++ b/macros/src/codegen/post_init.rs @@ -45,6 +45,24 @@ pub fn codegen(app: &App, analysis: &Analysis) -> Vec { } } + // Initialize actors + for name in app.actors.iter().filter_map(|(name, actor)| { + if actor.init.is_none() { + Some(name) + } else { + None + } + }) { + let mangled_name = util::actor_state_ident(name); + stmts.push(quote!( + // Resource is a RacyCell> + // - `get_mut_unchecked` to obtain `MaybeUninit` + // - `as_mut_ptr` to obtain a raw pointer to `MaybeUninit` + // - `write` the defined value for the late resource T + #mangled_name.get_mut_unchecked().as_mut_ptr().write(actors.#name); + )) + } + for (i, (monotonic, _)) in app.monotonics.iter().enumerate() { // For future use // let doc = format!(" RTIC internal: {}:{}", file!(), line!()); diff --git a/macros/src/codegen/pre_init.rs b/macros/src/codegen/pre_init.rs index 69f16fe376d5..b24e209e5589 100644 --- a/macros/src/codegen/pre_init.rs +++ b/macros/src/codegen/pre_init.rs @@ -23,6 +23,18 @@ pub fn codegen(app: &App, analysis: &Analysis, extra: &Extra) -> Vec Ident { Span::call_site(), ) } + +pub fn actor_state_ident(name: &Ident) -> Ident { + mark_internal_name(&format!("actor_{}_state", name)) +} + +pub fn actor_receive_task(name: &Ident, subscription_index: usize) -> Ident { + format_ident!("{}_receive_{}", name, subscription_index) +} + +pub fn internal_actor_receive_task(name: &Ident, subscription_index: usize) -> Ident { + mark_internal_name(&actor_receive_task(name, subscription_index).to_string()) +} + +pub fn actor_post(name: &Ident, subscription_index: usize) -> Ident { + mark_internal_name(&format!("{}_post_{}", name, subscription_index)) +} + +pub fn update_watermark(subscription_index: usize) -> Ident { + mark_internal_name(&format!("update_watermark{}", subscription_index)) +} + +pub fn watermark(subscription_index: usize) -> Ident { + mark_internal_name(&format!("watermark{}", subscription_index)) +} diff --git a/post-spy/Cargo.toml b/post-spy/Cargo.toml new file mode 100644 index 000000000000..364e64ea46c4 --- /dev/null +++ b/post-spy/Cargo.toml @@ -0,0 +1,7 @@ +[package] +edition = "2018" +name = "rtic-post-spy" +version = "0.1.0" + +[dependencies] +rtic-actor-traits = { path = "../actor-traits" } diff --git a/post-spy/src/lib.rs b/post-spy/src/lib.rs new file mode 100644 index 000000000000..4b35c703a1bc --- /dev/null +++ b/post-spy/src/lib.rs @@ -0,0 +1,76 @@ +use std::any::Any; + +use rtic_actor_traits::Post; + +/// An implementation of `Post` that accepts "any" message type and lets you inspect all `post`-ed +/// messages +#[derive(Default)] +pub struct PostSpy { + posted_messages: Vec>, +} + +impl PostSpy { + /// Returns an *iterator* over the posted messages + /// + /// Note that you must specify *which* type of message you want to retrieve (the `T` in the + /// signature) + /// In practice, this will most likely mean using "turbo fish" syntax to specify the type: + /// `post_spy.posted_messages::()` + pub fn posted_messages(&self) -> impl Iterator + where + T: Any, + { + self.posted_messages + .iter() + .filter_map(|message| message.downcast_ref()) + } +} + +impl Post for PostSpy +where + M: Any, +{ + fn post(&mut self, message: M) -> Result<(), M> { + self.posted_messages.push(Box::new(message)); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn post_and_inspect() { + let mut spy = PostSpy::default(); + assert_eq!(None, spy.posted_messages::().next()); + spy.post(42).unwrap(); + assert_eq!(vec![&42], spy.posted_messages::().collect::>()); + } + + #[test] + fn can_post_two_types_to_the_same_spy() { + #[derive(Debug, PartialEq)] + struct MessageA(i32); + #[derive(Debug, PartialEq)] + struct MessageB(i32); + + let mut post_spy = PostSpy::default(); + post_spy.post(MessageA(0)).unwrap(); + post_spy.post(MessageB(1)).unwrap(); + post_spy.post(MessageA(2)).unwrap(); + post_spy.post(MessageB(3)).unwrap(); + + // peek *only* `MessageA` messages in `post` order + assert_eq!( + vec![&MessageA(0), &MessageA(2)], + post_spy.posted_messages::().collect::>() + ); + + // peek *only* `MessageB` messages in `post` order + assert_eq!( + vec![&MessageB(1), &MessageB(3)], + post_spy.posted_messages::().collect::>() + ); + } +} diff --git a/src/export.rs b/src/export.rs index 8fdcb67ef830..1690e949c913 100644 --- a/src/export.rs +++ b/src/export.rs @@ -14,6 +14,7 @@ pub use cortex_m::{ pub use heapless::sorted_linked_list::SortedLinkedList; pub use heapless::spsc::Queue; pub use heapless::BinaryHeap; +pub use rtic_actor_traits::{Post, Receive}; pub use rtic_monotonic as monotonic; pub type SCFQ = Queue; diff --git a/ui/actor-extern-interrupt-not-enough.rs b/ui/actor-extern-interrupt-not-enough.rs new file mode 100644 index 000000000000..a52dc9074553 --- /dev/null +++ b/ui/actor-extern-interrupt-not-enough.rs @@ -0,0 +1,20 @@ +#![no_main] + +#[rtic::app(device = lm3s6965)] +mod app { + #[actors] + struct Actors { + a: A, + } + + #[shared] + struct Shared {} + + #[local] + struct Local {} + + #[init] + fn init(cx: init::Context) -> (Shared, Local, init::Monotonics, Actors) { + (Shared {}, Local {}, init::Monotonics {}, Actors { a: A }) + } +} diff --git a/ui/actor-extern-interrupt-not-enough.stderr b/ui/actor-extern-interrupt-not-enough.stderr new file mode 100644 index 000000000000..692107a5f783 --- /dev/null +++ b/ui/actor-extern-interrupt-not-enough.stderr @@ -0,0 +1,5 @@ +error: not enough interrupts to dispatch all software tasks / actors (need: 1; given: 0) + --> $DIR/actor-extern-interrupt-not-enough.rs:7:9 + | +7 | a: A, + | ^ diff --git a/ui/extern-interrupt-not-enough.stderr b/ui/extern-interrupt-not-enough.stderr index a667c5882452..daf66f8c9da7 100644 --- a/ui/extern-interrupt-not-enough.stderr +++ b/ui/extern-interrupt-not-enough.stderr @@ -1,4 +1,4 @@ -error: not enough interrupts to dispatch all software tasks (need: 1; given: 0) +error: not enough interrupts to dispatch all software tasks / actors (need: 1; given: 0) --> $DIR/extern-interrupt-not-enough.rs:17:8 | 17 | fn a(_: a::Context) {}