Skip to content

Commit

Permalink
Discriminate when the Client has gone from when the Client has not co…
Browse files Browse the repository at this point in the history
…mpletely matched (ros2#467)

* Workaround when the client is gone before server sends response

Signed-off-by: JLBuenoLopez-eProsima <[email protected]>

* Change add to the map to listener callback

Signed-off-by: JLBuenoLopez-eProsima <[email protected]>

* Apply suggestions from code review

Co-authored-by: Michel Hidalgo <[email protected]>
Signed-off-by: JLBuenoLopez-eProsima <[email protected]>

* Uncrustify

Signed-off-by: JLBuenoLopez-eProsima <[email protected]>

Co-authored-by: Michel Hidalgo <[email protected]>
  • Loading branch information
JLBuenoLopez and hidmic committed Oct 23, 2020
1 parent 087240d commit 7de5b0c
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 49 deletions.
2 changes: 1 addition & 1 deletion rmw_fastrtps_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ rmw_create_service(
delete info->pub_listener_;
}
});
info->pub_listener_ = new (std::nothrow) ServicePubListener();
info->pub_listener_ = new (std::nothrow) ServicePubListener(info);
if (!info->pub_listener_) {
RMW_SET_ERROR_MSG("failed to create service response publisher listener");
return nullptr;
Expand Down
2 changes: 1 addition & 1 deletion rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ rmw_create_service(
RMW_SET_ERROR_MSG("failed to get datawriter qos");
goto fail;
}
info->pub_listener_ = new ServicePubListener();
info->pub_listener_ = new ServicePubListener(info);
info->response_publisher_ =
Domain::createPublisher(participant, publisherParam, info->pub_listener_);
if (!info->response_publisher_) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <list>
#include <mutex>
#include <unordered_set>
#include <unordered_map>

#include "fastcdr/FastBuffer.h"

Expand All @@ -38,6 +39,14 @@
class ServiceListener;
class ServicePubListener;

enum class client_present_t
{
FAILURE, // an error occurred when checking
MAYBE, // reader not matched, writer still present
YES, // reader matched
GONE // neither reader nor writer
};

typedef struct CustomServiceInfo
{
rmw_fastrtps_shared_cpp::TypeSupport * request_type_support_{nullptr};
Expand All @@ -62,6 +71,91 @@ typedef struct CustomServiceRequest
: buffer_(nullptr) {}
} CustomServiceRequest;

class ServicePubListener : public eprosima::fastrtps::PublisherListener
{
using subscriptions_set_t =
std::unordered_set<eprosima::fastrtps::rtps::GUID_t,
rmw_fastrtps_shared_cpp::hash_fastrtps_guid>;
using clients_endpoints_map_t =
std::unordered_map<eprosima::fastrtps::rtps::GUID_t,
eprosima::fastrtps::rtps::GUID_t,
rmw_fastrtps_shared_cpp::hash_fastrtps_guid>;

public:
explicit ServicePubListener(CustomServiceInfo * info)
: info_(info)
{
(void)info_;
}

void
onPublicationMatched(
eprosima::fastrtps::Publisher * pub,
eprosima::fastrtps::rtps::MatchingInfo & matchingInfo)
{
(void) pub;
std::lock_guard<std::mutex> lock(mutex_);
if (eprosima::fastrtps::rtps::MATCHED_MATCHING == matchingInfo.status) {
subscriptions_.insert(matchingInfo.remoteEndpointGuid);
} else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) {
subscriptions_.erase(matchingInfo.remoteEndpointGuid);
auto endpoint = clients_endpoints_.find(matchingInfo.remoteEndpointGuid);
if (endpoint != clients_endpoints_.end()) {
clients_endpoints_.erase(endpoint->second);
clients_endpoints_.erase(matchingInfo.remoteEndpointGuid);
}
} else {
return;
}
cv_.notify_all();
}

template<class Rep, class Period>
bool
wait_for_subscription(
const eprosima::fastrtps::rtps::GUID_t & guid,
const std::chrono::duration<Rep, Period> & rel_time)
{
auto guid_is_present = [this, guid]() RCPPUTILS_TSA_REQUIRES(mutex_)->bool
{
return subscriptions_.find(guid) != subscriptions_.end();
};

std::unique_lock<std::mutex> lock(mutex_);
return cv_.wait_for(lock, rel_time, guid_is_present);
}

client_present_t
check_for_subscription(
const eprosima::fastrtps::rtps::GUID_t & guid)
{
// Check if the guid is still in the map
if (clients_endpoints_.find(guid) == clients_endpoints_.end()) {
// Client is gone
return client_present_t::GONE;
}
// Wait for subscription
if (!wait_for_subscription(guid, std::chrono::milliseconds(100))) {
return client_present_t::MAYBE;
}
return client_present_t::YES;
}

// Accesors
clients_endpoints_map_t & clients_endpoints()
{
std::lock_guard<std::mutex> lock(mutex_);
return clients_endpoints_;
}

private:
CustomServiceInfo * info_;
std::mutex mutex_;
subscriptions_set_t subscriptions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
clients_endpoints_map_t clients_endpoints_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
std::condition_variable cv_;
};

class ServiceListener : public eprosima::fastrtps::SubscriberListener
{
public:
Expand All @@ -72,6 +166,21 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
(void)info_;
}

void
onSubscriptionMatched(
eprosima::fastrtps::Subscriber * sub,
eprosima::fastrtps::rtps::MatchingInfo & matchingInfo)
{
(void) sub;
if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) {
auto endpoint = info_->pub_listener_->clients_endpoints().find(
matchingInfo.remoteEndpointGuid);
if (endpoint != info_->pub_listener_->clients_endpoints().end()) {
info_->pub_listener_->clients_endpoints().erase(endpoint->second);
info_->pub_listener_->clients_endpoints().erase(matchingInfo.remoteEndpointGuid);
}
}
}

void
onNewDataMessage(eprosima::fastrtps::Subscriber * sub)
Expand All @@ -95,6 +204,12 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
request.sample_identity_.writer_guid() = reader_guid;
}

// Save both guids in the clients_endpoints map
const eprosima::fastrtps::rtps::GUID_t & writer_guid =
request.sample_info_.sample_identity.writer_guid();
info_->pub_listener_->clients_endpoints().emplace(reader_guid, writer_guid);
info_->pub_listener_->clients_endpoints().emplace(writer_guid, reader_guid);

std::lock_guard<std::mutex> lock(internalMutex_);

if (conditionMutex_ != nullptr) {
Expand Down Expand Up @@ -169,49 +284,4 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
};

class ServicePubListener : public eprosima::fastrtps::PublisherListener
{
public:
ServicePubListener() = default;

template<class Rep, class Period>
bool wait_for_subscription(
const eprosima::fastrtps::rtps::GUID_t & guid,
const std::chrono::duration<Rep, Period> & rel_time)
{
auto guid_is_present = [this, guid]() RCPPUTILS_TSA_REQUIRES(mutex_)->bool
{
return subscriptions_.find(guid) != subscriptions_.end();
};

std::unique_lock<std::mutex> lock(mutex_);
return cv_.wait_for(lock, rel_time, guid_is_present);
}

void onPublicationMatched(
eprosima::fastrtps::Publisher * pub,
eprosima::fastrtps::rtps::MatchingInfo & matchingInfo)
{
(void) pub;
std::lock_guard<std::mutex> lock(mutex_);
if (eprosima::fastrtps::rtps::MATCHED_MATCHING == matchingInfo.status) {
subscriptions_.insert(matchingInfo.remoteEndpointGuid);
} else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) {
subscriptions_.erase(matchingInfo.remoteEndpointGuid);
} else {
return;
}
cv_.notify_all();
}

private:
using subscriptions_set_t =
std::unordered_set<eprosima::fastrtps::rtps::GUID_t,
rmw_fastrtps_shared_cpp::hash_fastrtps_guid>;

std::mutex mutex_;
subscriptions_set_t subscriptions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
std::condition_variable cv_;
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_
7 changes: 5 additions & 2 deletions rmw_fastrtps_shared_cpp/src/rmw_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,12 @@ __rmw_send_response(
// Related guid is a reader, so it is the response subscription guid.
// Wait for the response writer to be matched with it.
auto listener = info->pub_listener_;
if (!listener->wait_for_subscription(related_guid, std::chrono::milliseconds(100))) {
client_present_t ret = listener->check_for_subscription(related_guid);
if (ret == client_present_t::GONE) {
return RMW_RET_OK;
} else if (ret == client_present_t::MAYBE) {
RMW_SET_ERROR_MSG("client will not receive response");
return RMW_RET_ERROR;
return RMW_RET_TIMEOUT;
}
}

Expand Down

0 comments on commit 7de5b0c

Please sign in to comment.