From f4cbb434ac8142aa81231243eefa29a841b9cfa3 Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Thu, 5 Sep 2024 11:14:30 +0100 Subject: [PATCH 1/3] Correct loaned messages memory handling Instead of returning the loaned after executing the subscription, return it when the last ref of the message goes out of scope. - https://github.com/ros2/rclcpp/issues/2401 Signed-off-by: Mauro Passerino --- .../include/rclcpp/generic_subscription.hpp | 2 +- rclcpp/include/rclcpp/subscription.hpp | 7 ++-- rclcpp/include/rclcpp/subscription_base.hpp | 4 ++- rclcpp/src/rclcpp/executor.cpp | 36 ++++++++++++------- rclcpp/src/rclcpp/generic_subscription.cpp | 5 ++- .../node_interfaces/test_node_topics.cpp | 2 +- rclcpp/test/rclcpp/test_subscription.cpp | 4 +-- 7 files changed, 34 insertions(+), 26 deletions(-) diff --git a/rclcpp/include/rclcpp/generic_subscription.hpp b/rclcpp/include/rclcpp/generic_subscription.hpp index dd0e8be94d..289a9641b4 100644 --- a/rclcpp/include/rclcpp/generic_subscription.hpp +++ b/rclcpp/include/rclcpp/generic_subscription.hpp @@ -126,7 +126,7 @@ class GenericSubscription : public rclcpp::SubscriptionBase /// This function is currently not implemented. RCLCPP_PUBLIC void handle_loaned_message( - void * loaned_message, const rclcpp::MessageInfo & message_info) override; + std::shared_ptr loaned_message, const rclcpp::MessageInfo & message_info) override; // Same as return_serialized_message() as the subscription is to serialized_messages only RCLCPP_PUBLIC diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 4e552eb1df..4a5011f99a 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -330,7 +330,7 @@ class Subscription : public SubscriptionBase void handle_loaned_message( - void * loaned_message, + std::shared_ptr loaned_message, const rclcpp::MessageInfo & message_info) override { if (matches_any_intra_process_publishers(&message_info.get_rmw_message_info().publisher_gid)) { @@ -339,10 +339,7 @@ class Subscription : public SubscriptionBase return; } - auto typed_message = static_cast(loaned_message); - // message is loaned, so we have to make sure that the deleter does not deallocate the message - auto sptr = std::shared_ptr( - typed_message, [](ROSMessageType * msg) {(void) msg;}); + auto sptr = std::static_pointer_cast(loaned_message); std::chrono::time_point now; if (subscription_topic_statistics_) { diff --git a/rclcpp/include/rclcpp/subscription_base.hpp b/rclcpp/include/rclcpp/subscription_base.hpp index 615f3852b6..6f00bd36cd 100644 --- a/rclcpp/include/rclcpp/subscription_base.hpp +++ b/rclcpp/include/rclcpp/subscription_base.hpp @@ -232,7 +232,9 @@ class SubscriptionBase : public std::enable_shared_from_this RCLCPP_PUBLIC virtual void - handle_loaned_message(void * loaned_message, const rclcpp::MessageInfo & message_info) = 0; + handle_loaned_message( + std::shared_ptr loaned_message, + const rclcpp::MessageInfo & message_info) = 0; /// Return the message borrowed in create_message. /** \param[in] message Shared pointer to the returned message. */ diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 69131cc111..61f017ac43 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -558,19 +558,29 @@ Executor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription) } return true; }, - [&]() {subscription->handle_loaned_message(loaned_msg, message_info);}); - if (nullptr != loaned_msg) { - rcl_ret_t ret = rcl_return_loaned_message_from_subscription( - subscription->get_subscription_handle().get(), loaned_msg); - if (RCL_RET_OK != ret) { - RCLCPP_ERROR( - rclcpp::get_logger("rclcpp"), - "rcl_return_loaned_message_from_subscription() failed for subscription on topic " - "'%s': %s", - subscription->get_topic_name(), rcl_get_error_string().str); - } - loaned_msg = nullptr; - } + [&]() + { + // Create a shared_ptr with a custom deleter to return the loaned + // to the RMW when last reference goes out of scope. This allows the user + // to store copies of the message outside the subscription callback. + auto loaned_message_ptr = std::shared_ptr( + loaned_msg, + [subscription](void* loaned_msg_ptr) + { + rcl_ret_t ret = rcl_return_loaned_message_from_subscription( + subscription->get_subscription_handle().get(), + loaned_msg_ptr); + if (RCL_RET_OK != ret) { + RCLCPP_ERROR( + rclcpp::get_logger("rclcpp"), + "rcl_return_loaned_message_from_subscription() failed for subscription on topic '%s': %s", + subscription->get_topic_name(), rcl_get_error_string().str); + } + }); + + // Pass the shared_ptr to the user's callback + subscription->handle_loaned_message(loaned_message_ptr, message_info); + }); } else { // This case is taking a copy of the message data from the middleware via // inter-process communication. diff --git a/rclcpp/src/rclcpp/generic_subscription.cpp b/rclcpp/src/rclcpp/generic_subscription.cpp index ae28354b98..151722ca80 100644 --- a/rclcpp/src/rclcpp/generic_subscription.cpp +++ b/rclcpp/src/rclcpp/generic_subscription.cpp @@ -54,9 +54,8 @@ GenericSubscription::handle_serialized_message( any_callback_.dispatch(message, message_info); } -void -GenericSubscription::handle_loaned_message( - void * message, const rclcpp::MessageInfo & message_info) +void GenericSubscription::handle_loaned_message( + std::shared_ptr message, const rclcpp::MessageInfo & message_info) { (void) message; (void) message_info; diff --git a/rclcpp/test/rclcpp/node_interfaces/test_node_topics.cpp b/rclcpp/test/rclcpp/node_interfaces/test_node_topics.cpp index e8f873f693..ad4f982f6d 100644 --- a/rclcpp/test/rclcpp/node_interfaces/test_node_topics.cpp +++ b/rclcpp/test/rclcpp/node_interfaces/test_node_topics.cpp @@ -77,7 +77,7 @@ class TestSubscription : public rclcpp::SubscriptionBase create_serialized_message() override {return nullptr;} void handle_message(std::shared_ptr &, const rclcpp::MessageInfo &) override {} - void handle_loaned_message(void *, const rclcpp::MessageInfo &) override {} + void handle_loaned_message(std::shared_ptr, const rclcpp::MessageInfo &) override {} void handle_serialized_message( const std::shared_ptr &, const rclcpp::MessageInfo &) override {} void return_message(std::shared_ptr &) override {} diff --git a/rclcpp/test/rclcpp/test_subscription.cpp b/rclcpp/test/rclcpp/test_subscription.cpp index 06f6f9785b..80c86d6dad 100644 --- a/rclcpp/test/rclcpp/test_subscription.cpp +++ b/rclcpp/test/rclcpp/test_subscription.cpp @@ -354,9 +354,9 @@ TEST_F(TestSubscription, handle_loaned_message) { auto callback = [](std::shared_ptr) {}; auto sub = node_->create_subscription("topic", 10, callback); - test_msgs::msg::Empty msg; + auto msg = std::make_shared(); rclcpp::MessageInfo message_info; - EXPECT_NO_THROW(sub->handle_loaned_message(&msg, message_info)); + EXPECT_NO_THROW(sub->handle_loaned_message(msg, message_info)); } /* From e31d34fa0e8501a403214b8efe2e8ef5fae27e15 Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Fri, 6 Sep 2024 22:32:46 +0100 Subject: [PATCH 2/3] Add unit test: Shared memory & loaned msgs Signed-off-by: Mauro Passerino --- rclcpp/test/rclcpp/CMakeLists.txt | 4 + rclcpp/test/rclcpp/test_shared_memory.cpp | 376 ++++++++++++++++++++++ 2 files changed, 380 insertions(+) create mode 100644 rclcpp/test/rclcpp/test_shared_memory.cpp diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index dd4fcc1faa..d5d711aba1 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -135,6 +135,10 @@ ament_add_gtest(test_loaned_message test_loaned_message.cpp) ament_add_test_label(test_loaned_message mimick) target_link_libraries(test_loaned_message ${PROJECT_NAME} mimick ${test_msgs_TARGETS}) +ament_add_gtest(test_shared_memory test_shared_memory.cpp) +ament_add_test_label(test_shared_memory mimick) +target_link_libraries(test_shared_memory ${PROJECT_NAME} mimick ${test_msgs_TARGETS}) + ament_add_gtest(test_memory_strategy test_memory_strategy.cpp) target_link_libraries(test_memory_strategy ${PROJECT_NAME} ${test_msgs_TARGETS}) diff --git a/rclcpp/test/rclcpp/test_shared_memory.cpp b/rclcpp/test/rclcpp/test_shared_memory.cpp new file mode 100644 index 0000000000..b1d5d873b9 --- /dev/null +++ b/rclcpp/test/rclcpp/test_shared_memory.cpp @@ -0,0 +1,376 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + +#include "rclcpp/experimental/executors/events_executor/events_executor.hpp" +#include "rclcpp/rclcpp.hpp" +#include "test_msgs/msg/basic_types.hpp" + +using MessageT = test_msgs::msg::BasicTypes; +using rclcpp::experimental::executors::EventsExecutor; +using namespace std::chrono_literals; + +/* +* The last reference to a loaned message has to return the loaned to the RMW +* when the message goes out of scope. This allows for memory to be +* reused for future loaned messages. +* +* Among the owners of the (shared) loaned messages we have: +* - User (storing messages beyond the subscription callback) +* - Subscriber (storing messages until subscription is executed) +* - Publisher (storing messages for late joiner subscriptions) +* +* This unit test intends to test: +* - Test that loaned are returned when expected, and check that memory is reused after return: +* - User is the last owner +* - Publisher is the last owner +* - Subscriber is the last owner +* - Test that memory is indeed shared: +* - User stores and overrides messages: +* - Other subscription not yet executed (messages queued) should get +* the modified version of messages. +* - Publisher transient local messages should be affected: +* - A late late joiner subscription should receive modified messages +* - Tests only to show current behaviour (which may be not the expected) +* - Test max publisher depth setting (otherwise loaned are not used) +* - Max loans that can be requested without publishing the messages +* - Double delivery issue on transient local subscriptions +*/ + +class PubSubNode : public rclcpp::Node { +public: + PubSubNode(std::string name, uint16_t pub_depth, uint16_t sub_depth, uint16_t user_depth) + : Node(name, rclcpp::NodeOptions().use_intra_process_comms(false)), user_depth_(user_depth) + { + std::cout << "Created node: " << name << ": [pub_depth, sub_depth, user_depth] = [" + << pub_depth << ", " << sub_depth << ", " << user_depth << "]" << std::endl; + + pub_ = this->create_publisher("topic", rclcpp::QoS(pub_depth).transient_local()); + sub_ = this->create_subscription("topic", + rclcpp::QoS(sub_depth).transient_local(), + [this](MessageT::SharedPtr msg) -> void + { + store_message(msg); + // std::stringstream ss; + // std::for_each(user_msgs_.begin(), user_msgs_.end(), + // [&ss](const auto &msg) { ss << msg->uint16_value << ", "; }); + // RCLCPP_INFO(this->get_logger(), "Subscription callback - User msgs: [%s]", ss.str().c_str()); + }); + } + + // Request N loaned messages and publish them + void borrow_and_publish_loaned(uint16_t n = 1) + { + for (uint16_t i = 0; i < n; i++) + { + auto msg = pub_->borrow_loaned_message(); + loaned_is_reused(&msg.get()); + msg.get().uint16_value = count_++; + // RCLCPP_INFO(this->get_logger(), "Publishing: '%d'", msg.get().uint16_value); + pub_->publish(std::move(msg)); + std::this_thread::sleep_for(1ms); + } + } + + // Request N loaned messages but do not publish them, so they're not released + void borrow_and_store_loaned_messages(uint32_t n) + { + for (uint16_t i = 0; i < n; i++) { + auto loaned_msg = pub_->borrow_loaned_message(); + loaned_msgs_.emplace_back(std::move(loaned_msg)); + } + } + + // Check if the requested loan is reusing memory from a previous loan + bool loaned_is_reused(void * msg_ptr) + { + if (std::find(loaned_history_.begin(), + loaned_history_.end(), msg_ptr) != loaned_history_.end()) + { + memory_reused_ = true; + return true; + } + loaned_history_.push_back(msg_ptr); + return false; + } + + // Copy the loaned messages to be stored beyond the subscriptions callback + void store_message(MessageT::SharedPtr msg) + { + user_msgs_.push_back(msg); + + // But keep only the last N messages + if (user_msgs_.size() > user_depth_) { + user_msgs_.erase(user_msgs_.begin()); + } + } + + // User overrides messages for testing purposes + void user_override_messages(uint16_t val) + { + std::for_each(user_msgs_.begin(), user_msgs_.end(), + [val](auto & msg) { msg->uint16_value = val; }); + + // Print the overriden messages + // std::stringstream ss; + // std::for_each(user_msgs_.begin(), user_msgs_.end(), + // [&ss](const auto &msg) { ss << msg->uint16_value << ", "; }); + // RCLCPP_INFO(this->get_logger(), "Overriden messages: [%s]", ss.str().c_str()); + } + + bool memory_is_reused() { return memory_reused_; }; + + uint16_t get_msg_value(uint16_t index) + { + return user_msgs_[index]->uint16_value; + }; + + size_t get_number_of_stored_messages() + { + return user_msgs_.size(); + }; + + std::vector get_user_messages() + { + return user_msgs_; + } + + rclcpp::Publisher::SharedPtr pub_; + rclcpp::Subscription::SharedPtr sub_; + std::vector user_msgs_; + std::vector> loaned_msgs_; + std::vector loaned_history_; + uint16_t user_depth_; + uint16_t count_{0}; + bool memory_reused_{false}; +}; + + +class TestSharedMemory : public ::testing::Test +{ +protected: + void SetUp() + { + rclcpp::init(0, nullptr); + executor = std::make_unique(); + executor_thread = std::thread([&]() + { + executor->spin(); + }); + } + + void TearDown() + { + executor->cancel(); + if (executor_thread.joinable()) + { + executor_thread.join(); + } + rclcpp::shutdown(); + } + + bool loaned_addresses_match( + std::vector loaned_msgs_1, + std::vector loaned_msgs_2) + { + if (loaned_msgs_1.size() != loaned_msgs_2.size()) { + return false; + } + for (size_t i = 0; i < loaned_msgs_1.size(); ++i) { + if (loaned_msgs_1[i].get() != loaned_msgs_2[i].get()) { + return false; + } + } + return true; + } + + std::unique_ptr executor; + std::thread executor_thread; +}; + +// By controlling the depth of the publisher/subscription/user buffers, we can +// set the different scenarios for who is the last entity holding references +// to the messages, to test the right behaviour when last message goes out of scope. +TEST_F(TestSharedMemory, user_is_last_loan_owner) +{ + uint16_t user_depth = 10; + auto pub_sub_node = std::make_shared("pub_sub_node", 1, 1, user_depth); + auto idle_node = std::make_shared("idle_node", 1, 1 ,1); + executor->add_node(pub_sub_node); + + // Publish 11 messages. User will delete oldest message (thus returning + // the loan) + pub_sub_node->borrow_and_publish_loaned(user_depth + 1); + EXPECT_EQ(pub_sub_node->get_number_of_stored_messages(), user_depth); + EXPECT_FALSE(pub_sub_node->memory_is_reused()); + EXPECT_EQ(pub_sub_node->get_msg_value(9), user_depth); + + // Publish 1 message more, memory should be reused since loan was returned + pub_sub_node->borrow_and_publish_loaned(1); + EXPECT_EQ(pub_sub_node->get_number_of_stored_messages(), user_depth); + EXPECT_TRUE(pub_sub_node->memory_is_reused()); + EXPECT_EQ(pub_sub_node->get_msg_value(9), user_depth + 1); +} + +TEST_F(TestSharedMemory, pub_is_last_loan_owner) +{ + uint16_t pub_depth = 10; + auto pub_sub_node = std::make_shared("pub_sub_node", pub_depth, 1, 1); + auto idle_node = std::make_shared("idle_node", 1, 1 ,1); + executor->add_node(pub_sub_node); + + // Publish 11 messages. Publisher will delete oldest message (thus returning + // the loan) + pub_sub_node->borrow_and_publish_loaned(pub_depth + 1); + EXPECT_EQ(pub_sub_node->get_number_of_stored_messages(), 1); + EXPECT_FALSE(pub_sub_node->memory_is_reused()); + EXPECT_EQ(pub_sub_node->get_msg_value(0), pub_depth); + + // Publish 1 message more, memory should be reused since loan was returned + pub_sub_node->borrow_and_publish_loaned(1); + EXPECT_TRUE(pub_sub_node->memory_is_reused()); + EXPECT_EQ(pub_sub_node->get_msg_value(0), pub_depth + 1); +} + +TEST_F(TestSharedMemory, sub_is_last_loan_owner) +{ + uint16_t sub_depth = 10; + auto pub_sub_node = std::make_shared("pub_sub_node", 1, 1, 1); + auto idle_node = std::make_shared("idle_node", 1, sub_depth, 1); + executor->add_node(pub_sub_node); + + // Publish 11 messages. Idle subscription will delete oldest message (thus returning + // the loan) + pub_sub_node->borrow_and_publish_loaned(sub_depth + 1); + EXPECT_EQ(pub_sub_node->get_number_of_stored_messages(), 1); + EXPECT_FALSE(pub_sub_node->memory_is_reused()); + EXPECT_EQ(pub_sub_node->get_msg_value(0), sub_depth); + + // Publish 1 message more, memory should be reused since loan was returned + pub_sub_node->borrow_and_publish_loaned(1); + EXPECT_TRUE(pub_sub_node->memory_is_reused()); + EXPECT_EQ(pub_sub_node->get_msg_value(0), sub_depth + 1); +} + +TEST_F(TestSharedMemory, memory_is_shared) +{ + // Publish 10 messages then overriden by the user. + // Idle subscription is then executed: it should receive the modified messages. + auto pub_sub_node = std::make_shared("pub_sub_node", 15, 1, 15); + auto late_joiner_node = std::make_shared("late_joiner_node", 1, 10 , 15); + executor->add_node(pub_sub_node); + + pub_sub_node->borrow_and_publish_loaned(10); + pub_sub_node->user_override_messages(42); + EXPECT_EQ(pub_sub_node->get_msg_value(9), 42); + + // Execute subscription which had the messages stored in its queue + executor->add_node(late_joiner_node); + std::this_thread::sleep_for(1s); + + EXPECT_EQ(late_joiner_node->get_number_of_stored_messages(), 10); + // Verify it has the modified version + EXPECT_EQ(late_joiner_node->get_msg_value(9), 42); + + auto pub_sub_loaned = pub_sub_node->get_user_messages(); + auto sub_loaned = late_joiner_node->get_user_messages(); + EXPECT_TRUE(loaned_addresses_match(pub_sub_loaned, sub_loaned)); +} + +// FastDDS: Show that the publisher gets loans from memory still in use: +// The pool size is determined by the pub depth. The pub should throw when +// asking for loans if the previous were not returned, instead of siliently +// provide again a previous loaned, which is overriden +TEST_F(TestSharedMemory, publisher_overrides_message) +{ + // The user will store 5 messages before returning loans + auto pub_sub_node = std::make_shared("pub_sub_node", 1, 1, 5); + executor->add_node(pub_sub_node); + + // Publish 2 messages. Memory should not be reused since user still hasn't + // returned the loans + pub_sub_node->borrow_and_publish_loaned(3); + EXPECT_EQ(pub_sub_node->get_number_of_stored_messages(), 3); + EXPECT_EQ(pub_sub_node->get_msg_value(0), 0); + EXPECT_EQ(pub_sub_node->get_msg_value(1), 1); + EXPECT_FALSE(pub_sub_node->memory_is_reused()); +} + +// Test limits: +// CycloneDDS: Show that publisher depth can't be more than 16, otherwise +// middleware refuses to loan messages (local allocator will be used) +TEST_F(TestSharedMemory, max_pub_depth) +{ + uint16_t max_depth = 16; + auto pub_sub_node = std::make_shared("pub_sub_node", max_depth + 1 , 1, 2); + executor->add_node(pub_sub_node); + pub_sub_node->borrow_and_publish_loaned(2); + EXPECT_EQ(pub_sub_node->get_number_of_stored_messages(), 2); + EXPECT_FALSE(pub_sub_node->memory_is_reused()); +} + +// CycloneDDS suffers from the "double delivery issue" in which a late joiner subscription +// receives both interprocess & loaned messages, before segfaulting +TEST_F(TestSharedMemory, double_delivery) +{ + // Publish 10 messages and let the user override all of them. + // Then a 2nd subscription (late joiner) is created, which should receive the modified messages. + // CycloneDDS: The late joiner subscription first receives the 10 original messages, + // (inter-process msgs?) and on the next publisher->publish, it receives the 10 loaned (modified) messages. + // before a segfault. + auto pub_sub_node = std::make_shared("pub_sub_node", 15, 1, 15); + executor->add_node(pub_sub_node); + + pub_sub_node->borrow_and_publish_loaned(10); + pub_sub_node->user_override_messages(42); + + auto late_joiner_node = std::make_shared("late_joiner_node", 1, 20 , 20); + // Give some time for the subscription to receive transient local messages + std::this_thread::sleep_for(50ms); + executor->add_node(late_joiner_node); + // Give some time for the executor to execute subscription and store the msgs + std::this_thread::sleep_for(50ms); + EXPECT_EQ(late_joiner_node->get_number_of_stored_messages(), 10); + + // If memory is shared, we should have obtained the modified messages + EXPECT_EQ(late_joiner_node->get_msg_value(9), 42); + + // If memory is shared, both subscriptions should hold the same messages + auto pub_sub_loaned = pub_sub_node->get_user_messages(); + auto late_sub_loaned = late_joiner_node->get_user_messages(); + + // FastDDS: This fails + EXPECT_TRUE(loaned_addresses_match(pub_sub_loaned, late_sub_loaned)); + + // CycloneDDS: After this publish, the subscription receives the (modified) + // loaned messages and program segfaults + pub_sub_node->borrow_and_publish_loaned(1); + std::this_thread::sleep_for(50ms); + EXPECT_EQ(late_joiner_node->get_number_of_stored_messages(), 11); + EXPECT_EQ(late_joiner_node->get_msg_value(10), 10); +} + +// Depending on the publisher depth is the amount of loans we can make without throwing. +// FastDDS: throws after loaning the 3th msg and not sending it (if pub_depth = 1) +// CycloneDDS: throws after loaning the 9th msg and not sending it (if pub_depth = 1) +TEST_F(TestSharedMemory, max_loaned_stored) +{ + auto pub_sub_node = std::make_shared("pub_sub_node", 1, 1, 1); + EXPECT_NO_THROW(pub_sub_node->borrow_and_store_loaned_messages(3)); +} From 273ed5e3a14c3cc5901dcb0b35a8b242da078aad Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Fri, 13 Sep 2024 16:55:00 +0100 Subject: [PATCH 3/3] Remove tests now that FastDDS works Signed-off-by: Mauro Passerino --- rclcpp/test/rclcpp/test_shared_memory.cpp | 53 ++++++++--------------- 1 file changed, 19 insertions(+), 34 deletions(-) diff --git a/rclcpp/test/rclcpp/test_shared_memory.cpp b/rclcpp/test/rclcpp/test_shared_memory.cpp index b1d5d873b9..e4bdf07f3a 100644 --- a/rclcpp/test/rclcpp/test_shared_memory.cpp +++ b/rclcpp/test/rclcpp/test_shared_memory.cpp @@ -51,6 +51,8 @@ using namespace std::chrono_literals; * - Test max publisher depth setting (otherwise loaned are not used) * - Max loans that can be requested without publishing the messages * - Double delivery issue on transient local subscriptions +* +* Note: FastDDS requires using DYNAMIC_REUSABLE as history memory policy. */ class PubSubNode : public rclcpp::Node { @@ -84,7 +86,7 @@ class PubSubNode : public rclcpp::Node { msg.get().uint16_value = count_++; // RCLCPP_INFO(this->get_logger(), "Publishing: '%d'", msg.get().uint16_value); pub_->publish(std::move(msg)); - std::this_thread::sleep_for(1ms); + std::this_thread::sleep_for(10ms); } } @@ -293,38 +295,6 @@ TEST_F(TestSharedMemory, memory_is_shared) EXPECT_TRUE(loaned_addresses_match(pub_sub_loaned, sub_loaned)); } -// FastDDS: Show that the publisher gets loans from memory still in use: -// The pool size is determined by the pub depth. The pub should throw when -// asking for loans if the previous were not returned, instead of siliently -// provide again a previous loaned, which is overriden -TEST_F(TestSharedMemory, publisher_overrides_message) -{ - // The user will store 5 messages before returning loans - auto pub_sub_node = std::make_shared("pub_sub_node", 1, 1, 5); - executor->add_node(pub_sub_node); - - // Publish 2 messages. Memory should not be reused since user still hasn't - // returned the loans - pub_sub_node->borrow_and_publish_loaned(3); - EXPECT_EQ(pub_sub_node->get_number_of_stored_messages(), 3); - EXPECT_EQ(pub_sub_node->get_msg_value(0), 0); - EXPECT_EQ(pub_sub_node->get_msg_value(1), 1); - EXPECT_FALSE(pub_sub_node->memory_is_reused()); -} - -// Test limits: -// CycloneDDS: Show that publisher depth can't be more than 16, otherwise -// middleware refuses to loan messages (local allocator will be used) -TEST_F(TestSharedMemory, max_pub_depth) -{ - uint16_t max_depth = 16; - auto pub_sub_node = std::make_shared("pub_sub_node", max_depth + 1 , 1, 2); - executor->add_node(pub_sub_node); - pub_sub_node->borrow_and_publish_loaned(2); - EXPECT_EQ(pub_sub_node->get_number_of_stored_messages(), 2); - EXPECT_FALSE(pub_sub_node->memory_is_reused()); -} - // CycloneDDS suffers from the "double delivery issue" in which a late joiner subscription // receives both interprocess & loaned messages, before segfaulting TEST_F(TestSharedMemory, double_delivery) @@ -366,11 +336,26 @@ TEST_F(TestSharedMemory, double_delivery) EXPECT_EQ(late_joiner_node->get_msg_value(10), 10); } +/* Test limits: +// CycloneDDS: Show that publisher depth can't be more than 16, otherwise +// middleware refuses to loan messages (local allocator will be used) +TEST_F(TestSharedMemory, max_pub_depth) +{ + uint16_t max_depth = 16; + auto pub_sub_node = std::make_shared("pub_sub_node", max_depth + 1 , 1, 2); + executor->add_node(pub_sub_node); + pub_sub_node->borrow_and_publish_loaned(2); + EXPECT_EQ(pub_sub_node->get_number_of_stored_messages(), 2); + EXPECT_FALSE(pub_sub_node->memory_is_reused()); +} + // Depending on the publisher depth is the amount of loans we can make without throwing. // FastDDS: throws after loaning the 3th msg and not sending it (if pub_depth = 1) // CycloneDDS: throws after loaning the 9th msg and not sending it (if pub_depth = 1) TEST_F(TestSharedMemory, max_loaned_stored) { auto pub_sub_node = std::make_shared("pub_sub_node", 1, 1, 1); - EXPECT_NO_THROW(pub_sub_node->borrow_and_store_loaned_messages(3)); + EXPECT_NO_THROW(pub_sub_node->borrow_and_store_loaned_messages(2)); + EXPECT_THROW(pub_sub_node->borrow_and_store_loaned_messages(1), rclcpp::exceptions::RCLError); } +*/