Skip to content

Commit

Permalink
fix tracking of num in flight without UB
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Oct 18, 2024
1 parent 4c9b106 commit 37e2e03
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 40 deletions.
80 changes: 60 additions & 20 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <mutex>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>

#include "attachment_helpers.hpp"
Expand All @@ -40,19 +41,49 @@

namespace rmw_zenoh_cpp
{
// A static mutex whose lifetime will extend beyond that of the internal mutex_.
// This is needed as the decrement_queries_in_flight callback that me be executed
// after ClientData is deallocated will try to lock an invalid mutex and thus lead
// to UB. This only happens in rclcpp_action/test_client.
// rmw_zenoh uses Zenoh queries to implement clients. It turns out that in Zenoh, there is no
// way to cancel a query once it is in-flight via the z_get() zenoh-c API. Thus, if an
// rmw_zenoh_cpp user does rmw_create_client(), rmw_send_request(), rmw_destroy_client(), but the
// query comes in after the rmw_destroy_client(), rmw_zenoh_cpp could access already-freed memory.
// The next 2 variables are used to avoid that situation. Any time a query is initiated via
// rmw_send_request(), num_in_flight_ is incremented. When the Zenoh calls the callback for the
// query reply, num_in_flight_ is decremented.
// When shutdown() is called, is_shutdown_ is set to true.
// If num_in_flight_ is 0, the data associated with this structure is freed.
// If num_in_flight_ is *not* 0, then the data associated with this structure is maintained.
// In the situation where is_shutdown_ is true, and num_in_flight_ drops to 0 in the query
// callback, the query callback will free up the structure.
//
// There is one case which is not handled by this, which has to do with timeouts. The query
// timeout is currently set to essentially infinite. Thus, if a query is in-flight but never
// returns, the memory in this structure will never be freed. There isn't much we can do about
// that at this time, but we may want to consider changing the timeout so that the memory can
// eventually be freed up.
//
// TODO(Yadunund): Remove these variables once we switch to zenoh-cpp and can capture
// weak_ptr<ClientData> in zenoh callbacks.
static std::mutex num_in_flight_mutex;
static std::unordered_map<const ClientData *, std::size_t> num_in_flight_map = {};
static std::unordered_set<const ClientData *> deleted_clients = {};
///=============================================================================
void client_data_handler(z_owned_reply_t * reply, void * data)
{
auto client_data = static_cast<ClientData *>(data);
if (client_data == nullptr) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain client_data_t "
"Unable to obtain client_data_t from data in client_data_handler."
);
return;
}

// client_data could be a dangling pointer if this callback was triggered after
// the last reference to ClientData::SharedPtr was dropped from NodeData.
std::lock_guard<std::mutex> lock(num_in_flight_mutex);
if (deleted_clients.count(client_data) > 0) {
RMW_ZENOH_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"client_data_handler triggered for ClientData that has been deleted. Ignoring..."
);
return;
}
Expand Down Expand Up @@ -113,7 +144,10 @@ void client_data_drop(void * data)
// the clients_ maps would get erased when the NodeData's destructor is invoked.
// This is an edge case that should be resolved once we switch to zenoh-cpp and capture
// weaK_ptr<ClientData> in this callback instead.
client_data->decrement_queries_in_flight();
auto num_in_flight_it = num_in_flight_map.find(client_data);
if (num_in_flight_it != num_in_flight_map.end()) {
--num_in_flight_it->second;
}
}

///=============================================================================
Expand Down Expand Up @@ -239,6 +273,10 @@ std::shared_ptr<ClientData> ClientData::make(
free_ros_keyexpr.cancel();
free_token.cancel();

// Erase from deleted_clients set if the memory address for the client data is reused.
num_in_flight_map.erase(client_data.get());
deleted_clients.erase(client_data.get());

return client_data;
}

Expand All @@ -258,7 +296,6 @@ ClientData::ClientData(
response_type_support_(std::move(response_type_support)),
wait_set_data_(nullptr),
sequence_number_(1),
num_in_flight_(0),
is_shutdown_(false)
{
// Do nothing.
Expand Down Expand Up @@ -456,7 +493,12 @@ rmw_ret_t ClientData::send_request(
// why we need to do this.
{
std::lock_guard<std::mutex> lock(num_in_flight_mutex);
num_in_flight_++;
auto num_in_flight_it = num_in_flight_map.find(this);
if (num_in_flight_it == num_in_flight_map.end()) {
num_in_flight_map[this] = 0;
num_in_flight_it = num_in_flight_map.find(this);
}
num_in_flight_it->second++;
}
opts.attachment = z_bytes_map_as_attachment(&map);
opts.target = Z_QUERY_TARGET_ALL_COMPLETE;
Expand Down Expand Up @@ -494,6 +536,9 @@ ClientData::~ClientData()
entity_->topic_info().value().name_.c_str()
);
}
std::lock_guard<std::mutex> lock(num_in_flight_mutex);
num_in_flight_map.erase(this);
deleted_clients.insert(this);
}

//==============================================================================
Expand Down Expand Up @@ -527,15 +572,6 @@ bool ClientData::detach_condition_and_queue_is_empty()
return reply_queue_.empty();
}

//==============================================================================
// See the comment about the "num_in_flight" class variable in the rmw_client_data_t structure
// for the use of this method.
void ClientData::decrement_queries_in_flight()
{
std::lock_guard<std::mutex> lock(num_in_flight_mutex);
--num_in_flight_;
}

///=============================================================================
rmw_ret_t ClientData::shutdown()
{
Expand All @@ -558,10 +594,14 @@ rmw_ret_t ClientData::shutdown()
}

///=============================================================================
bool ClientData::query_in_flight()
bool ClientData::query_in_flight() const
{
std::lock_guard<std::mutex> lock(mutex_);
return num_in_flight_ > 0;
std::lock_guard<std::mutex> lock(num_in_flight_mutex);
auto query_in_flight_it = num_in_flight_map.find(this);
if (query_in_flight_it != num_in_flight_map.end()) {
return query_in_flight_it->second > 0;
}
return false;
}

///=============================================================================
Expand Down
21 changes: 1 addition & 20 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class ClientData final
// TODO(Yadunund): Remove this API once we are able to capture weak_ptr<ClientData>
// in the client closures to avoid the issue with queries in flight as described
// below.
bool query_in_flight();
bool query_in_flight() const;

// Check if this ClientData is shutdown.
bool is_shutdown() const;
Expand Down Expand Up @@ -139,25 +139,6 @@ class ClientData final
DataCallbackManager data_callback_mgr_;
// Sequence number for queries.
size_t sequence_number_;
// rmw_zenoh uses Zenoh queries to implement clients. It turns out that in Zenoh, there is no
// way to cancel a query once it is in-flight via the z_get() zenoh-c API. Thus, if an
// rmw_zenoh_cpp user does rmw_create_client(), rmw_send_request(), rmw_destroy_client(), but the
// query comes in after the rmw_destroy_client(), rmw_zenoh_cpp could access already-freed memory.
// The next 2 variables are used to avoid that situation. Any time a query is initiated via
// rmw_send_request(), num_in_flight_ is incremented. When the Zenoh calls the callback for the
// query reply, num_in_flight_ is decremented.
// When shutdown() is called, is_shutdown_ is set to true.
// If num_in_flight_ is 0, the data associated with this structure is freed.
// If num_in_flight_ is *not* 0, then the data associated with this structure is maintained.
// In the situation where is_shutdown_ is true, and num_in_flight_ drops to 0 in the query
// callback, the query callback will free up the structure.
//
// There is one case which is not handled by this, which has to do with timeouts. The query
// timeout is currently set to essentially infinite. Thus, if a query is in-flight but never
// returns, the memory in this structure will never be freed. There isn't much we can do about
// that at this time, but we may want to consider changing the timeout so that the memory can
// eventually be freed up.
size_t num_in_flight_;
// Shutdown flag.
bool is_shutdown_;
};
Expand Down

0 comments on commit 37e2e03

Please sign in to comment.