Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correct rcl entity lifecycles and fix spurious test failures #386

Merged
merged 20 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion rclrs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,21 @@ path = "src/lib.rs"
# Please keep the list of dependencies alphabetically sorted,
# and also state why each dependency is needed.
[dependencies]
# Needed for dynamically finding type support libraries
# Needed for dynamically finding type support libraries
ament_rs = { version = "0.2", optional = true }

# Needed for uploading documentation to docs.rs
cfg-if = "1.0.0"

# Needed for clients
futures = "0.3"

# Needed to create a global mutex for managing the lifecycles of middleware entities
lazy_static = "1.4"

# Needed for dynamic messages
libloading = { version = "0.8", optional = true }

# Needed for the Message trait, among others
rosidl_runtime_rs = "0.4"

Expand Down
45 changes: 25 additions & 20 deletions rclrs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,33 @@ use rosidl_runtime_rs::Message;

use crate::error::{RclReturnCode, ToResult};
use crate::MessageCow;
use crate::{rcl_bindings::*, RclrsError};
use crate::{rcl_bindings::*, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX};

// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
// they are running in. Therefore, this type can be safely sent to another thread.
unsafe impl Send for rcl_client_t {}

/// Internal struct used by clients.
mxgrey marked this conversation as resolved.
Show resolved Hide resolved
pub struct ClientHandle {
rcl_client_mtx: Mutex<rcl_client_t>,
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
rcl_client: Mutex<rcl_client_t>,
node_handle: Arc<NodeHandle>,
pub(crate) in_use_by_wait_set: Arc<AtomicBool>,
}

impl ClientHandle {
pub(crate) fn lock(&self) -> MutexGuard<rcl_client_t> {
self.rcl_client_mtx.lock().unwrap()
self.rcl_client.lock().unwrap()
}
}

impl Drop for ClientHandle {
fn drop(&mut self) {
let rcl_client = self.rcl_client_mtx.get_mut().unwrap();
let rcl_node_mtx = &mut *self.rcl_node_mtx.lock().unwrap();
let rcl_client = self.rcl_client.get_mut().unwrap();
let mut rcl_node = self.node_handle.rcl_node.lock().unwrap();
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
// SAFETY: No preconditions for this function
unsafe {
rcl_client_fini(rcl_client, rcl_node_mtx);
rcl_client_fini(rcl_client, &mut *rcl_node);
}
}
}
Expand Down Expand Up @@ -74,7 +75,7 @@ where
T: rosidl_runtime_rs::Service,
{
/// Creates a new client.
pub(crate) fn new(rcl_node_mtx: Arc<Mutex<rcl_node_t>>, topic: &str) -> Result<Self, RclrsError>
pub(crate) fn new(node_handle: Arc<NodeHandle>, topic: &str) -> Result<Self, RclrsError>
// This uses pub(crate) visibility to avoid instantiating this struct outside
// [`Node::create_client`], see the struct's documentation for the rationale
where
Expand All @@ -97,19 +98,23 @@ where
// The rcl_node is kept alive because it is co-owned by the client.
// The topic name and the options are copied by this function, so they can be dropped
// afterwards.
rcl_client_init(
&mut rcl_client,
&*rcl_node_mtx.lock().unwrap(),
type_support,
topic_c_string.as_ptr(),
&client_options,
)
.ok()?;
{
let rcl_node = node_handle.rcl_node.lock().unwrap();
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
rcl_client_init(
&mut rcl_client,
&*rcl_node,
type_support,
topic_c_string.as_ptr(),
&client_options,
)
.ok()?;
}
}

let handle = Arc::new(ClientHandle {
rcl_client_mtx: Mutex::new(rcl_client),
rcl_node_mtx,
rcl_client: Mutex::new(rcl_client),
node_handle,
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
});

Expand Down Expand Up @@ -245,8 +250,8 @@ where
///
pub fn service_is_ready(&self) -> Result<bool, RclrsError> {
let mut is_ready = false;
let client = &mut *self.handle.rcl_client_mtx.lock().unwrap();
let node = &mut *self.handle.rcl_node_mtx.lock().unwrap();
let client = &mut *self.handle.rcl_client.lock().unwrap();
let node = &mut *self.handle.node_handle.rcl_node.lock().unwrap();

unsafe {
// SAFETY both node and client are guaranteed to be valid here
Expand Down
150 changes: 129 additions & 21 deletions rclrs/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,23 @@ use std::vec::Vec;
use crate::rcl_bindings::*;
use crate::{RclrsError, ToResult};

lazy_static::lazy_static! {
/// This is locked whenever initializing or dropping any middleware entity
/// because we have found issues in RCL and some RMW implementations that
/// make it unsafe to simultaneously initialize and/or drop various types of
/// entities. It seems these C and C++ based libraries will regularly use
/// unprotected global variables in their object initialization and cleanup.
pub(crate) static ref ENTITY_LIFECYCLE_MUTEX: Mutex<()> = Mutex::new(());
}

impl Drop for rcl_context_t {
fn drop(&mut self) {
unsafe {
// The context may be invalid when rcl_init failed, e.g. because of invalid command
// line arguments.
// SAFETY: No preconditions for this function.
if rcl_context_is_valid(self) {
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
// SAFETY: These functions have no preconditions besides a valid rcl_context
rcl_shutdown(self);
rcl_context_fini(self);
Expand All @@ -39,16 +49,26 @@ unsafe impl Send for rcl_context_t {}
/// - the allocator used (left as the default by `rclrs`)
///
pub struct Context {
pub(crate) rcl_context_mtx: Arc<Mutex<rcl_context_t>>,
pub(crate) handle: Arc<ContextHandle>,
}

/// This struct manages the lifetime and access to the rcl context. It will also
/// account for the lifetimes of any dependencies, if we need to add
/// dependencies in the future (currently there are none). It is not strictly
/// necessary to decompose `Context` and `ContextHandle` like this, but we are
/// doing it to be consistent with the lifecycle management of other rcl
/// bindings in this library.
pub(crate) struct ContextHandle {
pub(crate) rcl_context: Mutex<rcl_context_t>,
}

impl Context {
/// Creates a new context.
///
/// Usually, this would be called with `std::env::args()`, analogously to `rclcpp::init()`.
/// Usually this would be called with `std::env::args()`, analogously to `rclcpp::init()`.
/// See also the official "Passing ROS arguments to nodes via the command-line" tutorial.
///
/// Creating a context can fail in case the args contain invalid ROS arguments.
/// Creating a context will fail if the args contain invalid ROS arguments.
///
/// # Example
/// ```
Expand All @@ -58,6 +78,21 @@ impl Context {
/// assert!(Context::new(invalid_remapping).is_err());
/// ```
pub fn new(args: impl IntoIterator<Item = String>) -> Result<Self, RclrsError> {
Self::new_with_options(args, InitOptions::new())
}

/// Same as [`Context::new`] except you can additionally provide initialization options.
///
/// # Example
/// ```
/// use rclrs::{Context, InitOptions};
/// let context = Context::new_with_options([], InitOptions::new().with_domain_id(Some(5))).unwrap();
/// assert_eq!(context.domain_id(), 5);
/// ````
pub fn new_with_options(
args: impl IntoIterator<Item = String>,
options: InitOptions,
) -> Result<Self, RclrsError> {
// SAFETY: Getting a zero-initialized value is always safe
let mut rcl_context = unsafe { rcl_get_zero_initialized_context() };
let cstring_args: Vec<CString> = args
Expand All @@ -74,48 +109,121 @@ impl Context {
unsafe {
// SAFETY: No preconditions for this function.
let allocator = rcutils_get_default_allocator();
// SAFETY: Getting a zero-initialized value is always safe.
let mut rcl_init_options = rcl_get_zero_initialized_init_options();
// SAFETY: Passing in a zero-initialized value is expected.
// In the case where this returns not ok, there's nothing to clean up.
rcl_init_options_init(&mut rcl_init_options, allocator).ok()?;
let mut rcl_init_options = options.into_rcl(allocator)?;
// SAFETY: This function does not store the ephemeral init_options and c_args
// pointers. Passing in a zero-initialized rcl_context is expected.
let ret = rcl_init(
c_args.len() as i32,
if c_args.is_empty() {
std::ptr::null()
} else {
c_args.as_ptr()
},
&rcl_init_options,
&mut rcl_context,
)
.ok();
let ret = {
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
rcl_init(
c_args.len() as i32,
if c_args.is_empty() {
std::ptr::null()
} else {
c_args.as_ptr()
},
&rcl_init_options,
&mut rcl_context,
)
.ok()
};
// SAFETY: It's safe to pass in an initialized object.
// Early return will not leak memory, because this is the last fini function.
rcl_init_options_fini(&mut rcl_init_options).ok()?;
// Move the check after the last fini()
ret?;
}
Ok(Self {
rcl_context_mtx: Arc::new(Mutex::new(rcl_context)),
handle: Arc::new(ContextHandle {
rcl_context: Mutex::new(rcl_context),
}),
})
}

/// Returns the ROS domain ID that the context is using.
///
/// The domain ID controls which nodes can send messages to each other, see the [ROS 2 concept article][1].
/// It can be set through the `ROS_DOMAIN_ID` environment variable.
///
/// [1]: https://docs.ros.org/en/rolling/Concepts/About-Domain-ID.html
pub fn domain_id(&self) -> usize {
let mut domain_id: usize = 0;
let ret = unsafe {
rcl_context_get_domain_id(
&mut *self.handle.rcl_context.lock().unwrap(),
&mut domain_id,
)
};

debug_assert_eq!(ret, 0);
domain_id
}

/// Checks if the context is still valid.
///
/// This will return `false` when a signal has caused the context to shut down (currently
/// unimplemented).
pub fn ok(&self) -> bool {
// This will currently always return true, but once we have a signal handler, the signal
// handler could call `rcl_shutdown()`, hence making the context invalid.
let rcl_context = &mut *self.rcl_context_mtx.lock().unwrap();
let rcl_context = &mut *self.handle.rcl_context.lock().unwrap();
// SAFETY: No preconditions for this function.
unsafe { rcl_context_is_valid(rcl_context) }
}
}

/// Additional options for initializing the Context.
#[derive(Default, Clone)]
pub struct InitOptions {
/// The domain ID that should be used by the Context. Set to None to ask for
/// the default behavior, which is to set the domain ID according to the
/// [ROS_DOMAIN_ID][1] environment variable.
///
/// [1]: https://docs.ros.org/en/rolling/Concepts/Intermediate/About-Domain-ID.html#the-ros-domain-id
domain_id: Option<usize>,
}

impl InitOptions {
/// Create a new InitOptions with all default values.
pub fn new() -> InitOptions {
Self::default()
}

/// Transform an InitOptions into a new one with a certain domain_id
pub fn with_domain_id(mut self, domain_id: Option<usize>) -> InitOptions {
self.domain_id = domain_id;
self
}

/// Set the domain_id of an InitOptions, or reset it to the default behavior
/// (determined by environment variables) by providing None.
pub fn set_domain_id(&mut self, domain_id: Option<usize>) {
self.domain_id = domain_id;
}

/// Get the domain_id that will be provided by these InitOptions.
pub fn domain_id(&self) -> Option<usize> {
self.domain_id
}

fn into_rcl(self, allocator: rcutils_allocator_s) -> Result<rcl_init_options_t, RclrsError> {
unsafe {
// SAFETY: Getting a zero-initialized value is always safe.
let mut rcl_init_options = rcl_get_zero_initialized_init_options();
// SAFETY: Passing in a zero-initialized value is expected.
// In the case where this returns not ok, there's nothing to clean up.
rcl_init_options_init(&mut rcl_init_options, allocator).ok()?;

// We only need to set the domain_id if the user asked for something
// other than None. When the user asks for None, that is equivalent
// to the default value in rcl_init_options.
if let Some(domain_id) = self.domain_id {
rcl_init_options_set_domain_id(&mut rcl_init_options, domain_id);
}
Ok(rcl_init_options)
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
4 changes: 3 additions & 1 deletion rclrs/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ impl SingleThreadedExecutor {
for node in { self.nodes_mtx.lock().unwrap() }
.iter()
.filter_map(Weak::upgrade)
.filter(|node| unsafe { rcl_context_is_valid(&*node.rcl_context_mtx.lock().unwrap()) })
.filter(|node| unsafe {
rcl_context_is_valid(&*node.handle.context_handle.rcl_context.lock().unwrap())
})
{
let wait_set = WaitSet::new_for_node(&node)?;
let ready_entities = wait_set.wait(timeout)?;
Expand Down
Loading
Loading