diff --git a/examples/minimal_pub_sub/Cargo.toml b/examples/minimal_pub_sub/Cargo.toml index 12771635..0990d855 100644 --- a/examples/minimal_pub_sub/Cargo.toml +++ b/examples/minimal_pub_sub/Cargo.toml @@ -13,6 +13,10 @@ path = "src/minimal_subscriber.rs" name = "minimal_publisher" path = "src/minimal_publisher.rs" +[[bin]] +name = "minimal_two_nodes" +path = "src/minimal_two_nodes.rs" + [[bin]] name = "zero_copy_subscriber" path = "src/zero_copy_subscriber.rs" diff --git a/examples/minimal_pub_sub/src/minimal_two_nodes.rs b/examples/minimal_pub_sub/src/minimal_two_nodes.rs new file mode 100644 index 00000000..ddf685bd --- /dev/null +++ b/examples/minimal_pub_sub/src/minimal_two_nodes.rs @@ -0,0 +1,77 @@ +use std::env; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::{Arc, Mutex}; + +use anyhow::{Error, Result}; + +struct MinimalSubscriber { + num_messages: AtomicU32, + node: Arc, + subscription: Mutex>>>, +} + +impl MinimalSubscriber { + pub fn new(name: &str, topic: &str) -> Result, rclrs::RclrsError> { + let context = rclrs::Context::new(env::args())?; + let node = rclrs::create_node(&context, name)?; + let minimal_subscriber = Arc::new(MinimalSubscriber { + num_messages: 0.into(), + node, + subscription: None.into(), + }); + + let minimal_subscriber_aux = Arc::clone(&minimal_subscriber); + let subscription = minimal_subscriber + .node + .create_subscription::( + topic, + rclrs::QOS_PROFILE_DEFAULT, + move |msg: std_msgs::msg::String| { + minimal_subscriber_aux.callback(msg); + }, + )?; + *minimal_subscriber.subscription.lock().unwrap() = Some(subscription); + Ok(minimal_subscriber) + } + + fn callback(&self, msg: std_msgs::msg::String) { + self.num_messages.fetch_add(1, Ordering::SeqCst); + println!("[{}] I heard: '{}'", self.node.name(), msg.data); + println!( + "[{}] (Got {} messages so far)", + self.node.name(), + self.num_messages.load(Ordering::SeqCst) + ); + } +} + +fn main() -> Result<(), Error> { + let publisher_context = rclrs::Context::new(env::args())?; + let publisher_node = rclrs::create_node(&publisher_context, "minimal_publisher")?; + + let subscriber_node_one = MinimalSubscriber::new("minimal_subscriber_one", "topic")?; + let subscriber_node_two = MinimalSubscriber::new("minimal_subscriber_two", "topic")?; + + let publisher = publisher_node + .create_publisher::("topic", rclrs::QOS_PROFILE_DEFAULT)?; + + std::thread::spawn(move || -> Result<(), rclrs::RclrsError> { + let mut message = std_msgs::msg::String::default(); + let mut publish_count: u32 = 1; + loop { + message.data = format!("Hello, world! {}", publish_count); + println!("Publishing: [{}]", message.data); + publisher.publish(&message)?; + publish_count += 1; + std::thread::sleep(std::time::Duration::from_millis(500)); + } + }); + + let executor = rclrs::SingleThreadedExecutor::new(); + + executor.add_node(&publisher_node)?; + executor.add_node(&subscriber_node_one.node)?; + executor.add_node(&subscriber_node_two.node)?; + + executor.spin().map_err(|err| err.into()) +} diff --git a/rclrs/src/executor.rs b/rclrs/src/executor.rs new file mode 100644 index 00000000..f96dfc94 --- /dev/null +++ b/rclrs/src/executor.rs @@ -0,0 +1,81 @@ +use crate::rcl_bindings::rcl_context_is_valid; +use crate::{Node, RclReturnCode, RclrsError, WaitSet}; +use std::sync::{Arc, Mutex, Weak}; +use std::time::Duration; + +/// Single-threaded executor implementation. +pub struct SingleThreadedExecutor { + nodes_mtx: Mutex>>, +} + +impl Default for SingleThreadedExecutor { + fn default() -> Self { + Self::new() + } +} + +impl SingleThreadedExecutor { + /// Creates a new executor. + pub fn new() -> Self { + SingleThreadedExecutor { + nodes_mtx: Mutex::new(Vec::new()), + } + } + + /// Add a node to the executor. + pub fn add_node(&self, node: &Arc) -> Result<(), RclrsError> { + { self.nodes_mtx.lock().unwrap() }.push(Arc::downgrade(node)); + Ok(()) + } + + /// Remove a node from the executor. + pub fn remove_node(&self, node: Arc) -> Result<(), RclrsError> { + { self.nodes_mtx.lock().unwrap() } + .retain(|n| !n.upgrade().map(|n| Arc::ptr_eq(&n, &node)).unwrap_or(false)); + Ok(()) + } + + /// Polls the nodes for new messages and executes the corresponding callbacks. + /// + /// This function additionally checks that the context is still valid. + pub fn spin_once(&self, timeout: Option) -> Result<(), RclrsError> { + for node in { self.nodes_mtx.lock().unwrap() } + .iter() + .filter_map(Weak::upgrade) + .filter(|node| unsafe { rcl_context_is_valid(&*node.rcl_context_mtx.lock().unwrap()) }) + { + let wait_set = WaitSet::new_for_node(&node)?; + let ready_entities = wait_set.wait(timeout)?; + + for ready_subscription in ready_entities.subscriptions { + ready_subscription.execute()?; + } + + for ready_client in ready_entities.clients { + ready_client.execute()?; + } + + for ready_service in ready_entities.services { + ready_service.execute()?; + } + } + + Ok(()) + } + + /// Convenience function for calling [`SingleThreadedExecutor::spin_once`] in a loop. + pub fn spin(&self) -> Result<(), RclrsError> { + while !{ self.nodes_mtx.lock().unwrap() }.is_empty() { + match self.spin_once(None) { + Ok(_) + | Err(RclrsError::RclError { + code: RclReturnCode::Timeout, + .. + }) => (), + error => return error, + } + } + + Ok(()) + } +} diff --git a/rclrs/src/lib.rs b/rclrs/src/lib.rs index cda4a382..9afec58b 100644 --- a/rclrs/src/lib.rs +++ b/rclrs/src/lib.rs @@ -9,6 +9,7 @@ mod arguments; mod client; mod context; mod error; +mod executor; mod node; mod parameter; mod publisher; @@ -30,11 +31,11 @@ pub use arguments::*; pub use client::*; pub use context::*; pub use error::*; +pub use executor::*; pub use node::*; pub use parameter::*; pub use publisher::*; pub use qos::*; -use rcl_bindings::rcl_context_is_valid; pub use rcl_bindings::rmw_request_id_t; pub use service::*; pub use subscription::*; @@ -51,46 +52,16 @@ pub use wait::*; /// /// [1]: crate::RclReturnCode pub fn spin_once(node: Arc, timeout: Option) -> Result<(), RclrsError> { - let wait_set = WaitSet::new_for_node(&node)?; - let ready_entities = wait_set.wait(timeout)?; - - for ready_subscription in ready_entities.subscriptions { - ready_subscription.execute()?; - } - - for ready_client in ready_entities.clients { - ready_client.execute()?; - } - - for ready_service in ready_entities.services { - ready_service.execute()?; - } - - Ok(()) + let executor = SingleThreadedExecutor::new(); + executor.add_node(&node)?; + executor.spin_once(timeout) } /// Convenience function for calling [`spin_once`] in a loop. -/// -/// This function additionally checks that the context is still valid. pub fn spin(node: Arc) -> Result<(), RclrsError> { - // The context_is_valid functions exists only to abstract away ROS distro differences - // SAFETY: No preconditions for this function. - let context_is_valid = { - let node = Arc::clone(&node); - move || unsafe { rcl_context_is_valid(&*node.rcl_context_mtx.lock().unwrap()) } - }; - - while context_is_valid() { - match spin_once(Arc::clone(&node), None) { - Ok(_) - | Err(RclrsError::RclError { - code: RclReturnCode::Timeout, - .. - }) => (), - error => return error, - } - } - Ok(()) + let executor = SingleThreadedExecutor::new(); + executor.add_node(&node)?; + executor.spin() } /// Creates a new node in the empty namespace. diff --git a/rclrs/src/node.rs b/rclrs/src/node.rs index 74eb5ac8..459d7e8e 100644 --- a/rclrs/src/node.rs +++ b/rclrs/src/node.rs @@ -238,11 +238,16 @@ impl Node { &self, topic: &str, qos: QoSProfile, - ) -> Result, RclrsError> + ) -> Result>, RclrsError> where T: Message, { - Publisher::::new(Arc::clone(&self.rcl_node_mtx), topic, qos) + let publisher = Arc::new(Publisher::::new( + Arc::clone(&self.rcl_node_mtx), + topic, + qos, + )?); + Ok(publisher) } /// Creates a [`Service`][1]. diff --git a/rclrs/src/wait.rs b/rclrs/src/wait.rs index 61bde77b..d8c0e3cb 100644 --- a/rclrs/src/wait.rs +++ b/rclrs/src/wait.rs @@ -337,7 +337,16 @@ impl WaitSet { // We cannot currently guarantee that the wait sets may not share content, but it is // mentioned in the doc comment for `add_subscription`. // Also, the rcl_wait_set is obviously valid. - unsafe { rcl_wait(&mut self.rcl_wait_set, timeout_ns) }.ok()?; + match unsafe { rcl_wait(&mut self.rcl_wait_set, timeout_ns) }.ok() { + Ok(_) => (), + Err(error) => match error { + RclrsError::RclError { code, msg } => match code { + RclReturnCode::WaitSetEmpty => (), + _ => return Err(RclrsError::RclError { code, msg }), + }, + _ => return Err(error), + }, + } let mut ready_entities = ReadyEntities { subscriptions: Vec::new(), clients: Vec::new(),