Skip to content

Commit

Permalink
Discriminate when the Client has gone from when the Client has not co…
Browse files Browse the repository at this point in the history
…mpletely matched

Attempt to backport #467 to Foxy in an ABI compatible way.

Signed-off-by: Jacob Perron <[email protected]>
  • Loading branch information
jacobperron committed Nov 9, 2020
1 parent 6f1a610 commit 55eabb2
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 49 deletions.
2 changes: 1 addition & 1 deletion rmw_fastrtps_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ rmw_create_service(
delete info->pub_listener_;
}
});
info->pub_listener_ = new (std::nothrow) ServicePubListener();
info->pub_listener_ = new (std::nothrow) PatchedServicePubListener();
if (!info->pub_listener_) {
RMW_SET_ERROR_MSG("failed to create service response publisher listener");
return nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <list>
#include <mutex>
#include <unordered_set>
#include <unordered_map>

#include "fastcdr/FastBuffer.h"

Expand All @@ -37,6 +38,15 @@

class ServiceListener;
class ServicePubListener;
class PatchedServicePubListener;

enum class client_present_t
{
FAILURE, // an error occurred when checking
MAYBE, // reader not matched, writer still present
YES, // reader matched
GONE // neither reader nor writer
};

typedef struct CustomServiceInfo
{
Expand All @@ -62,6 +72,130 @@ typedef struct CustomServiceRequest
: buffer_(nullptr) {}
} CustomServiceRequest;

class ServicePubListener : public eprosima::fastrtps::PublisherListener
{
public:
ServicePubListener() = default;

template<class Rep, class Period>
bool wait_for_subscription(
const eprosima::fastrtps::rtps::GUID_t & guid,
const std::chrono::duration<Rep, Period> & rel_time)
{
auto guid_is_present = [this, guid]() RCPPUTILS_TSA_REQUIRES(mutex_)->bool
{
return subscriptions_.find(guid) != subscriptions_.end();
};

std::unique_lock<std::mutex> lock(mutex_);
return cv_.wait_for(lock, rel_time, guid_is_present);
}

void onPublicationMatched(
eprosima::fastrtps::Publisher * pub,
eprosima::fastrtps::rtps::MatchingInfo & matchingInfo)
{
(void) pub;
std::lock_guard<std::mutex> lock(mutex_);
if (eprosima::fastrtps::rtps::MATCHED_MATCHING == matchingInfo.status) {
subscriptions_.insert(matchingInfo.remoteEndpointGuid);
} else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) {
subscriptions_.erase(matchingInfo.remoteEndpointGuid);
} else {
return;
}
cv_.notify_all();
}

protected:
std::mutex & getMutex()
{
return mutex_;
}

std::unordered_set<eprosima::fastrtps::rtps::GUID_t, rmw_fastrtps_shared_cpp::hash_fastrtps_guid> &
getSubscriptions()
{
return subscriptions_;
}

std::condition_variable & getConditionVariable()
{
return cv_;
}

private:
using subscriptions_set_t =
std::unordered_set<eprosima::fastrtps::rtps::GUID_t,
rmw_fastrtps_shared_cpp::hash_fastrtps_guid>;

std::mutex mutex_;
subscriptions_set_t subscriptions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
std::condition_variable cv_;
};

// Wrapper around ServicePubListener to fix issue when matching clients in an ABI compatible way
// See original patch: https://github.com/ros2/rmw_fastrtps/pull/467
class PatchedServicePubListener : public ServicePubListener
{
public:
using clients_endpoints_map_t =
std::unordered_map<eprosima::fastrtps::rtps::GUID_t,
eprosima::fastrtps::rtps::GUID_t,
rmw_fastrtps_shared_cpp::hash_fastrtps_guid>;

void onPublicationMatched(
eprosima::fastrtps::Publisher * pub,
eprosima::fastrtps::rtps::MatchingInfo & matchingInfo)
{
(void) pub;
std::lock_guard<std::mutex> lock(getMutex());
std::unordered_set<
eprosima::fastrtps::rtps::GUID_t,
rmw_fastrtps_shared_cpp::hash_fastrtps_guid> & subscriptions_ = getSubscriptions();
std::condition_variable & cv_ = getConditionVariable();
if (eprosima::fastrtps::rtps::MATCHED_MATCHING == matchingInfo.status) {
subscriptions_.insert(matchingInfo.remoteEndpointGuid);
} else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) {
subscriptions_.erase(matchingInfo.remoteEndpointGuid);
auto endpoint = clients_endpoints_.find(matchingInfo.remoteEndpointGuid);
if (endpoint != clients_endpoints_.end()) {
clients_endpoints_.erase(endpoint->second);
clients_endpoints_.erase(matchingInfo.remoteEndpointGuid);
}
} else {
return;
}
cv_.notify_all();
}

client_present_t
check_for_subscription(
const eprosima::fastrtps::rtps::GUID_t & guid)
{
// Check if the guid is still in the map
if (clients_endpoints_.find(guid) == clients_endpoints_.end()) {
// Client is gone
return client_present_t::GONE;
}
// Wait for subscription
if (!wait_for_subscription(guid, std::chrono::milliseconds(100))) {
return client_present_t::MAYBE;
}
return client_present_t::YES;
}

// Accesors
clients_endpoints_map_t & clients_endpoints()
{
std::lock_guard<std::mutex> lock(getMutex());
return clients_endpoints_;
}

private:
clients_endpoints_map_t clients_endpoints_;
};

class ServiceListener : public eprosima::fastrtps::SubscriberListener
{
public:
Expand All @@ -72,6 +206,22 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
(void)info_;
}

void
onSubscriptionMatched(
eprosima::fastrtps::Subscriber * sub,
eprosima::fastrtps::rtps::MatchingInfo & matchingInfo)
{
(void) sub;
PatchedServicePubListener * pub_listener = static_cast<PatchedServicePubListener *>(info_->pub_listener_);
if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) {
auto endpoint = pub_listener->clients_endpoints().find(
matchingInfo.remoteEndpointGuid);
if (endpoint != pub_listener->clients_endpoints().end()) {
pub_listener->clients_endpoints().erase(endpoint->second);
pub_listener->clients_endpoints().erase(matchingInfo.remoteEndpointGuid);
}
}
}

void
onNewDataMessage(eprosima::fastrtps::Subscriber * sub)
Expand All @@ -95,6 +245,14 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
request.sample_identity_.writer_guid() = reader_guid;
}

// Save both guids in the clients_endpoints map
PatchedServicePubListener * pub_listener =
static_cast<PatchedServicePubListener *>(info_->pub_listener_);
const eprosima::fastrtps::rtps::GUID_t & writer_guid =
request.sample_info_.sample_identity.writer_guid();
pub_listener->clients_endpoints().emplace(reader_guid, writer_guid);
pub_listener->clients_endpoints().emplace(writer_guid, reader_guid);

std::lock_guard<std::mutex> lock(internalMutex_);

if (conditionMutex_ != nullptr) {
Expand Down Expand Up @@ -169,49 +327,4 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
};

class ServicePubListener : public eprosima::fastrtps::PublisherListener
{
public:
ServicePubListener() = default;

template<class Rep, class Period>
bool wait_for_subscription(
const eprosima::fastrtps::rtps::GUID_t & guid,
const std::chrono::duration<Rep, Period> & rel_time)
{
auto guid_is_present = [this, guid]() RCPPUTILS_TSA_REQUIRES(mutex_)->bool
{
return subscriptions_.find(guid) != subscriptions_.end();
};

std::unique_lock<std::mutex> lock(mutex_);
return cv_.wait_for(lock, rel_time, guid_is_present);
}

void onPublicationMatched(
eprosima::fastrtps::Publisher * pub,
eprosima::fastrtps::rtps::MatchingInfo & matchingInfo)
{
(void) pub;
std::lock_guard<std::mutex> lock(mutex_);
if (eprosima::fastrtps::rtps::MATCHED_MATCHING == matchingInfo.status) {
subscriptions_.insert(matchingInfo.remoteEndpointGuid);
} else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) {
subscriptions_.erase(matchingInfo.remoteEndpointGuid);
} else {
return;
}
cv_.notify_all();
}

private:
using subscriptions_set_t =
std::unordered_set<eprosima::fastrtps::rtps::GUID_t,
rmw_fastrtps_shared_cpp::hash_fastrtps_guid>;

std::mutex mutex_;
subscriptions_set_t subscriptions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
std::condition_variable cv_;
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_
9 changes: 6 additions & 3 deletions rmw_fastrtps_shared_cpp/src/rmw_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,13 @@ __rmw_send_response(
if ((related_guid.entityId.value[3] & entity_id_is_reader_bit) != 0) {
// Related guid is a reader, so it is the response subscription guid.
// Wait for the response writer to be matched with it.
auto listener = info->pub_listener_;
if (!listener->wait_for_subscription(related_guid, std::chrono::milliseconds(100))) {
auto listener = static_cast<PatchedServicePubListener *>(info->pub_listener_);
client_present_t ret = listener->check_for_subscription(related_guid);
if (ret == client_present_t::GONE) {
return RMW_RET_OK;
} else if (ret == client_present_t::MAYBE) {
RMW_SET_ERROR_MSG("client will not receive response");
return RMW_RET_ERROR;
return RMW_RET_TIMEOUT;
}
}

Expand Down

0 comments on commit 55eabb2

Please sign in to comment.