Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional fixes for yadu/raii-client. #304

Merged
merged 3 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 70 additions & 46 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>

#include "attachment_helpers.hpp"
#include "cdr.hpp"
Expand All @@ -45,14 +46,15 @@ namespace rmw_zenoh_cpp
// way to cancel a query once it is in-flight via the z_get() zenoh-c API. Thus, if an
// rmw_zenoh_cpp user does rmw_create_client(), rmw_send_request(), rmw_destroy_client(), but the
// query comes in after the rmw_destroy_client(), rmw_zenoh_cpp could access already-freed memory.
// The next 2 variables are used to avoid that situation. Any time a query is initiated via
// rmw_send_request(), num_in_flight_ is incremented. When the Zenoh calls the callback for the
// query reply, num_in_flight_ is decremented.
// When shutdown() is called, is_shutdown_ is set to true.
// If num_in_flight_ is 0, the data associated with this structure is freed.
// If num_in_flight_ is *not* 0, then the data associated with this structure is maintained.
// In the situation where is_shutdown_ is true, and num_in_flight_ drops to 0 in the query
// callback, the query callback will free up the structure.
// The next 3 global variables are used to avoid that situation. Any time a query is initiated via
// rmw_send_request(), num_in_flight_ is incremented. When Zenoh calls the callback for the
// query drop, num_in_flight_map->second is decremented.
// When ClientData is destroyed, it checks to see if there are things in flight. If there are,
// it leaves this ClientData pointer both in the num_in_flight_map and the deleted_clients map.
// When the client_data_handler() is called on these destroyed objects, it knows that it cannot
// dereference the data anymore, and it gets out early. When client_data_drop() is called, it
// decrements num_in_flight_map->second, and if that drops to zero, drops the pointer address
// completely from deleted_clients.
//
// There is one case which is not handled by this, which has to do with timeouts. The query
// timeout is currently set to essentially infinite. Thus, if a query is in-flight but never
Expand All @@ -65,6 +67,7 @@ namespace rmw_zenoh_cpp
static std::mutex num_in_flight_mutex;
static std::unordered_map<const ClientData *, std::size_t> num_in_flight_map = {};
static std::unordered_set<const ClientData *> deleted_clients = {};

///=============================================================================
void client_data_handler(z_owned_reply_t * reply, void * data)
{
Expand All @@ -77,8 +80,6 @@ void client_data_handler(z_owned_reply_t * reply, void * data)
return;
}

// client_data could be a dangling pointer if this callback was triggered after
// the last reference to ClientData::SharedPtr was dropped from NodeData.
std::lock_guard<std::mutex> lock(num_in_flight_mutex);
if (deleted_clients.count(client_data) > 0) {
RMW_ZENOH_LOG_INFO_NAMED(
Expand All @@ -90,12 +91,6 @@ void client_data_handler(z_owned_reply_t * reply, void * data)

// See the comment about the "num_in_flight" class variable in the ClientData class for
// why we need to do this.
// Note: This called could lead to UB since the ClientData * could have been deallocated
// if the query reply is received after NodeData was destructed. ie even though we do not
// erase the ClientData from NodeData's clients_ map when rmw_destroy_client is called,
// the clients_ maps would get erased when the NodeData's destructor is invoked.
// This is an edge case that should be resolved once we switch to zenoh-cpp and capture
// weaK_ptr<ClientData> in this callback instead.
if (client_data->is_shutdown()) {
return;
}
Expand Down Expand Up @@ -138,15 +133,20 @@ void client_data_drop(void * data)

// See the comment about the "num_in_flight" class variable in the ClientData class for
// why we need to do this.
// Note: This called could lead to UB since the ClientData * could have been deallocated
// if the query reply is received after NodeData was destructed. ie even though we do not
// erase the ClientData from NodeData's clients_ map when rmw_destroy_client is called,
// the clients_ maps would get erased when the NodeData's destructor is invoked.
// This is an edge case that should be resolved once we switch to zenoh-cpp and capture
// weaK_ptr<ClientData> in this callback instead.
std::lock_guard<std::mutex> lock(num_in_flight_mutex);
auto num_in_flight_it = num_in_flight_map.find(client_data);
if (num_in_flight_it != num_in_flight_map.end()) {
--num_in_flight_it->second;
if (num_in_flight_it == num_in_flight_map.end()) {
// This should never happen
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to find object in num_in_flight_map. Report this bug."
);
return;
}

--num_in_flight_it->second;
if (num_in_flight_it->second == 0 && deleted_clients.count(client_data) > 0) {
deleted_clients.erase(client_data);
}
}

Expand Down Expand Up @@ -178,8 +178,8 @@ std::shared_ptr<ClientData> ClientData::make(
service_members->request_members_->data);
auto response_members = static_cast<const message_type_support_callbacks_t *>(
service_members->response_members_->data);
auto request_type_support = std::make_unique<RequestTypeSupport>(service_members);
auto response_type_support = std::make_unique<ResponseTypeSupport>(service_members);
auto request_type_support = std::make_shared<RequestTypeSupport>(service_members);
auto response_type_support = std::make_shared<ResponseTypeSupport>(service_members);

// Note: Service request/response types will contain a suffix Request_ or Response_.
// We remove the suffix when appending the type to the liveliness tokens for
Expand Down Expand Up @@ -233,15 +233,21 @@ std::shared_ptr<ClientData> ClientData::make(
return nullptr;
}

auto client_data = std::shared_ptr<ClientData>(
new ClientData{
node,
std::move(entity),
request_members,
response_members,
std::move(request_type_support),
std::move(response_type_support)
});
std::lock_guard<std::mutex> lock(num_in_flight_mutex);
std::vector<std::shared_ptr<ClientData>> duplicate_pointers;
std::shared_ptr<ClientData> client_data;
do {
client_data = std::shared_ptr<ClientData>(
new ClientData{
node,
entity,
request_members,
response_members,
request_type_support,
response_type_support
});
duplicate_pointers.push_back(client_data);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we store the instantiated client_data ptr in this vector?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we didn't do this, then we could conceivably get the same address over and over again. Consider the case where we have the pointer 0x1 in deleted_clients. The first call to new ClientData might return 0x1. At that point, we'd detect that this is in deleted_clients, and loop around again. But right before we make the second call to new ClientData, we drop the reference to the old one, at which point it may be freed. At that point the allocator might decide to give us address 0x1 again, and we'd never make any progress here.

By holding all of the shared_ptr in a vector like this, we guarantee that the allocator will give us a new address. And then at the end of the function, when std::vector goes out of scope, we'll free all of them.

} while (deleted_clients.count(client_data.get()) > 0);

client_data->keyexpr_ =
z_keyexpr_new(client_data->entity_->topic_info().value().topic_keyexpr_.c_str());
Expand Down Expand Up @@ -273,9 +279,7 @@ std::shared_ptr<ClientData> ClientData::make(
free_ros_keyexpr.cancel();
free_token.cancel();

// Erase from deleted_clients set if the memory address for the client data is reused.
num_in_flight_map.erase(client_data.get());
deleted_clients.erase(client_data.get());
num_in_flight_map[client_data.get()] = 0;

return client_data;
}
Expand All @@ -286,14 +290,14 @@ ClientData::ClientData(
std::shared_ptr<liveliness::Entity> entity,
const void * request_type_support_impl,
const void * response_type_support_impl,
std::unique_ptr<RequestTypeSupport> request_type_support,
std::unique_ptr<ResponseTypeSupport> response_type_support)
std::shared_ptr<RequestTypeSupport> request_type_support,
std::shared_ptr<ResponseTypeSupport> response_type_support)
: rmw_node_(rmw_node),
entity_(std::move(entity)),
request_type_support_impl_(request_type_support_impl),
response_type_support_impl_(response_type_support_impl),
request_type_support_(std::move(request_type_support)),
response_type_support_(std::move(response_type_support)),
request_type_support_(request_type_support),
response_type_support_(response_type_support),
wait_set_data_(nullptr),
sequence_number_(1),
is_shutdown_(false)
Expand Down Expand Up @@ -495,8 +499,9 @@ rmw_ret_t ClientData::send_request(
std::lock_guard<std::mutex> lock(num_in_flight_mutex);
auto num_in_flight_it = num_in_flight_map.find(this);
if (num_in_flight_it == num_in_flight_map.end()) {
num_in_flight_map[this] = 0;
num_in_flight_it = num_in_flight_map.find(this);
// This should never happen
RMW_SET_ERROR_MSG("failed to find object in num_in_flight_map");
return RMW_RET_ERROR;
}
num_in_flight_it->second++;
}
Expand Down Expand Up @@ -536,9 +541,28 @@ ClientData::~ClientData()
entity_->topic_info().value().name_.c_str()
);
}

std::lock_guard<std::mutex> lock(num_in_flight_mutex);
num_in_flight_map.erase(this);
deleted_clients.insert(this);
auto num_in_flight_it = num_in_flight_map.find(this);
if (num_in_flight_it != num_in_flight_map.end()) {
if (num_in_flight_it->second == 0) {
// If there is nothing in flight, we can remove this from the map
// with no further considerations.
num_in_flight_map.erase(this);
} else {
// Since there is still something in flight, we need to just add
// it to the deleted_clients; it will be deleted when the last
// outstanding query finishes.
deleted_clients.insert(this);
}
} else {
// This should never happen
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Error finding client /%s in num_in_flight_map. Report this bug.",
entity_->topic_info().value().name_.c_str()
);
}
}

//==============================================================================
Expand Down
8 changes: 4 additions & 4 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ class ClientData final
std::shared_ptr<liveliness::Entity> entity,
const void * request_type_support_impl,
const void * response_type_support_impl,
std::unique_ptr<RequestTypeSupport> request_type_support,
std::unique_ptr<ResponseTypeSupport> response_type_support);
std::shared_ptr<RequestTypeSupport> request_type_support,
std::shared_ptr<ResponseTypeSupport> response_type_support);

// Internal mutex.
mutable std::mutex mutex_;
Expand All @@ -129,8 +129,8 @@ class ClientData final
// Type support fields.
const void * request_type_support_impl_;
const void * response_type_support_impl_;
std::unique_ptr<RequestTypeSupport> request_type_support_;
std::unique_ptr<ResponseTypeSupport> response_type_support_;
std::shared_ptr<RequestTypeSupport> request_type_support_;
std::shared_ptr<ResponseTypeSupport> response_type_support_;
// Deque to store the replies in the order they arrive.
std::deque<std::unique_ptr<rmw_zenoh_cpp::ZenohReply>> reply_queue_;
// Wait set data.
Expand Down