Skip to content

Commit

Permalink
cleanup events executor class
Browse files Browse the repository at this point in the history
Signed-off-by: Alberto Soragna <[email protected]>
  • Loading branch information
alsora committed Feb 22, 2021
1 parent bb491bd commit 191f47a
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 62 deletions.
19 changes: 9 additions & 10 deletions rclcpp/include/rclcpp/executors/events_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,16 @@ namespace executors

/// Events executor implementation
/**
* This executor is a variation of the standard one that does not uses a waitset.
* The executor uses an events queue and a timers manager to execute entities from its
* This executor uses an events queue and a timers manager to execute entities from its
* associated nodes and callback groups.
* This provides improved performance as it allows to skip all the waitset maintenance operations.
* The RMW listener APIs are used to collect new events.
*
* This executor tries to reduce as much as possible the amount of maintenance operations.
* This allows to use customized `EventsQueue` classes to achieve different goals such
* as very low CPU usage, bounded memory requirement, determinism, etc.
*
* The executor uses a weak ownership model and it locks entities only while executing
* their related events.
*
* To run this executor:
* rclcpp::executors::EventsExecutor executor;
Expand Down Expand Up @@ -204,8 +210,6 @@ class EventsExecutor : public rclcpp::Executor
private:
RCLCPP_DISABLE_COPY(EventsExecutor)

using EventQueue = std::queue<rmw_listener_event_t>;

// Executor callback: Push new events into the queue and trigger cv.
// This function is called by the DDS entities when an event happened,
// like a subscription receiving a message.
Expand All @@ -229,11 +233,6 @@ class EventsExecutor : public rclcpp::Executor
this_executor->events_queue_cv_.notify_one();
}

/// Extract and execute events from the queue until it is empty
RCLCPP_PUBLIC
void
consume_all_events(EventQueue & queue);

// Execute a single event
RCLCPP_PUBLIC
void
Expand Down
23 changes: 12 additions & 11 deletions rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

#include <queue>

#include "rclcpp/executors/events_executor_entities_collector.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/visibility_control.hpp"

#include "rmw/listener_event_types.h"

Expand All @@ -30,10 +30,14 @@ namespace buffers
{

/**
* @brief This abstract class is intended to be used as
* a wrapper around a queue. The derived classes should chose
* which container to use and the strategies for push and prune
* events from the queue.
* @brief This abstract class can be used to implement different types of queues
* where `rmw_listener_event_t` can be stored.
* The derived classes should choose which underlying container to use and
* the strategy for pushing and popping events.
* For example a queue implementation may be bounded or unbounded and have
* different pruning strategies.
* Implementations may or may not check the validity of events and decide how to handle
* the situation where an event is not valid anymore (e.g. a subscription history cache overruns)
*/
class EventsQueue
{
Expand All @@ -56,9 +60,7 @@ class EventsQueue
push(const rmw_listener_event_t & event) = 0;

/**
* @brief removes front element from the queue
* The element removed is the "oldest" element in the queue whose
* value can be retrieved by calling member front().
* @brief removes front element from the queue.
*/
RCLCPP_PUBLIC
virtual
Expand Down Expand Up @@ -101,14 +103,13 @@ class EventsQueue
init() = 0;

/**
* @brief gets a queue with all events accumulated on it since
* the last call. The member queue is empty when the call returns.
* @brief pops out all events stored in the object into an output queue.
* @return queue with events
*/
RCLCPP_PUBLIC
virtual
std::queue<rmw_listener_event_t>
get_all_events() = 0;
pop_all_events() = 0;
};

} // namespace buffers
Expand Down
16 changes: 6 additions & 10 deletions rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@ namespace buffers
{

/**
* @brief This class provides a simple queue implementation
* based on a std::queue. As the objective is having a CPU peformant
* queue, it does not performs any checks about the size of
* the queue, so the queue size could grow unbounded.
* It does not implement any pruning mechanisms.
* @brief This class implements an EventsQueue as a simple wrapper around a std::queue.
* It does not perform any checks about the size of queue, which can grow
* unbounded without being pruned.
* The simplicity of this implementation makes it suitable for optimizing CPU usage.
*/
class SimpleEventsQueue : public EventsQueue
{
Expand All @@ -53,9 +52,7 @@ class SimpleEventsQueue : public EventsQueue
}

/**
* @brief removes front element from the queue
* The element removed is the "oldest" element in the queue whose
* value can be retrieved by calling member front().
* @brief removes front event from the queue.
*/
RCLCPP_PUBLIC
virtual
Expand Down Expand Up @@ -114,7 +111,6 @@ class SimpleEventsQueue : public EventsQueue
std::swap(event_queue_, local_queue);
}


/**
* @brief gets a queue with all events accumulated on it since
* the last call. The member queue is empty when the call returns.
Expand All @@ -123,7 +119,7 @@ class SimpleEventsQueue : public EventsQueue
RCLCPP_PUBLIC
virtual
std::queue<rmw_listener_event_t>
get_all_events()
pop_all_events()
{
std::queue<rmw_listener_event_t> local_queue;
std::swap(event_queue_, local_queue);
Expand Down
25 changes: 10 additions & 15 deletions rclcpp/src/rclcpp/executors/events_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,19 @@ EventsExecutor::spin()
std::unique_lock<std::mutex> push_lock(push_mutex_);
// We wait here until something has been pushed to the event queue
events_queue_cv_.wait(push_lock, has_event_predicate);
// Local event queue to allow entities to push events while we execute them
EventQueue execution_event_queue = events_queue_->get_all_events();
// Move all events into a local events queue to allow entities to push while we execute them
std::queue<rmw_listener_event_t> execution_events_queue = events_queue_->pop_all_events();
// Unlock the mutex
push_lock.unlock();
// Consume all available events, this queue will be empty at the end of the function
this->consume_all_events(execution_event_queue);
// Consume all available events
while (!execution_events_queue.empty()) {
rmw_listener_event_t event = execution_events_queue.front();
execution_events_queue.pop();
this->execute_event(event);
}
}

// Stop the timers manager thread when we are done spinning
timers_manager_->stop();
}

Expand Down Expand Up @@ -235,17 +241,6 @@ EventsExecutor::remove_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify)
this->remove_node(node_ptr->get_node_base_interface(), notify);
}

void
EventsExecutor::consume_all_events(EventQueue & event_queue)
{
while (!event_queue.empty()) {
rmw_listener_event_t event = event_queue.front();
event_queue.pop();

this->execute_event(event);
}
}

void
EventsExecutor::execute_event(const rmw_listener_event_t & event)
{
Expand Down
18 changes: 2 additions & 16 deletions rclcpp/test/rclcpp/executors/test_events_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,7 @@

using namespace std::chrono_literals;

class TestEventsQueue : public ::testing::Test
{
public:
void SetUp()
{
rclcpp::init(0, nullptr);
}

void TearDown()
{
rclcpp::shutdown();
}
};

TEST_F(TestEventsQueue, SimpleQueueTest)
TEST(TestEventsQueue, SimpleQueueTest)

This comment has been minimized.

Copy link
@mauropasse

mauropasse Feb 22, 2021

Collaborator

What's the diff between TEST_F and TEST ?

{
// Create a SimpleEventsQueue and a local queue
auto simple_queue = std::make_unique<rclcpp::experimental::buffers::SimpleEventsQueue>();
Expand All @@ -54,7 +40,7 @@ TEST_F(TestEventsQueue, SimpleQueueTest)
// Pop one message
simple_queue->pop();

local_events_queue = simple_queue->get_all_events();
local_events_queue = simple_queue->pop_all_events();

// We should have (11 - 1) events in the local queue
size_t local_queue_size = local_events_queue.size();
Expand Down

0 comments on commit 191f47a

Please sign in to comment.