Skip to content

Commit

Permalink
minor changes to get Node shr_ptr / wait for server before send request
Browse files Browse the repository at this point in the history
  • Loading branch information
FabianEP11 committed May 2, 2024
1 parent 16a10b4 commit 0e287db
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 83 deletions.
29 changes: 17 additions & 12 deletions include/rr/ros2_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class Ros2Client : public temoto_resource_registrar::RrClientBase
//client_ = nh_.serviceClient<ServiceClass>(id());

node_ = rclcpp::Node::make_shared(name);
RCLCPP_INFO_STREAM(rclcpp::get_logger("ros2_client.h"), "constructing Ros2Client: " << id());
client_ = node_->create_client<ServiceClass>(id());
}

Expand All @@ -51,6 +52,15 @@ class Ros2Client : public temoto_resource_registrar::RrClientBase
void invoke(ServiceClass &request)
{

RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "invoke ServiceClass");
while (!client_->wait_for_service(std::chrono::seconds(1))) {
if (!rclcpp::ok()) {
RCLCPP_ERROR(rclcpp::get_logger("rclcpp"), "Interrupted while waiting for the service. Exiting.");
}
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "service not available, waiting again...");
}


RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "invoke request in async for server:");
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), id().c_str());

Expand All @@ -74,8 +84,13 @@ class Ros2Client : public temoto_resource_registrar::RrClientBase
*/
void invoke(Ros2Query<ServiceClass> &wrapped_request)
{
//ROS_INFO_STREAM("invoke for Ros2Query wrapper started");
//ServiceClass service_call = wrapped_request.rosQuery();
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "invoke wrapped_request");
while (!client_->wait_for_service(std::chrono::seconds(1)) && rclcpp::ok()) {
if (!rclcpp::ok()) {
RCLCPP_ERROR(rclcpp::get_logger("rclcpp"), "Interrupted while waiting for the service. Exiting.");
}
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "service not available, waiting again...");
}

RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "invoke request in async for server:");
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), id().c_str());
Expand All @@ -92,18 +107,10 @@ class Ros2Client : public temoto_resource_registrar::RrClientBase
{
RCLCPP_ERROR(rclcpp::get_logger("rclcpp"), "NOK");
}

//invoke(service_call);

//wrapped_request = Ros2Query<ServiceClass>(service_call);
//ROS_INFO_STREAM("invoke for Ros2Query wrapper completed");
}

void registerUserStatusCb(const std::string &request_id, const UserStatusCb &user_status_cb)
{
//ROS_INFO_STREAM("registerUserStatusCb " << " - " << id() << " request: " << request_id);

// RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "registerUserStatusCb - " + id() + " request: " + request_id);
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), ("registerUserStatusCb - " + id() + " request: " + request_id).c_str());
status_callbacks_[request_id] = user_status_cb;

Expand All @@ -120,8 +127,6 @@ class Ros2Client : public temoto_resource_registrar::RrClientBase
//ROS_INFO_STREAM("Determinging if client cas callback for id " << request_id << " nr of callbacks: " << status_callbacks_.size());
if (hasRegisteredCb(request_id))
{
//ROS_INFO_STREAM("!Executing user CB!");

RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "req size: %i", status.serialised_request_.size());
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "res size: %i", status.serialised_response_.size());

Expand Down
97 changes: 38 additions & 59 deletions include/rr/ros2_resource_registrar.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,27 @@ namespace temoto_resource_registrar
public:
/**
* @brief Construct a new Resource Registrar Ros 2 object. Take into account that names need to be unique.
* They are used as adress fragments for inter-register communications.
* They are used as address fragments for inter-register communications.
*
* @param name
*/
ResourceRegistrarRos2(const std::string &name) : RrBase(name), Node(name)
{
}

rclcpp::Node::SharedPtr getResourceRegistrarNode() {
return shared_from_this();
}

~ResourceRegistrarRos2()
{
//ROS_INFO_STREAM("Destroying RR Ros " << name());
//ROS_INFO_STREAM("unloading clients");
// RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "Destroying RR Ros");
// RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "unloading clients");
for (const std::string &client_id : clients_.getIds())
{
try
{
//ROS_INFO_STREAM("unloading client " << client_id);
// RCLCPP_INFO_STREAM(rclcpp::get_logger("rclcpp"), "unloading client" << client_id);
unloadClient(client_id);
}
catch (...)
Expand Down Expand Up @@ -95,7 +99,8 @@ namespace temoto_resource_registrar
const temoto_resource_registrar::Status &)>
status_func = NULL)
{
//ROS_INFO_STREAM("calling " << rr << " server " << server);
std::string message = "calling " + std::string(rr) + " server " + std::string(server);
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), message.c_str());

Ros2Query<QueryType> wrapped_base_query(query);

Expand All @@ -109,10 +114,10 @@ namespace temoto_resource_registrar
server,
wrapped_base_query,
status_func);
message = "calling " + std::string(rr) + " server " + std::string(server) + " done";
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), message.c_str());

return wrapped_base_query;

//ROS_INFO_STREAM("calling " << rr << " server " << server << " done");
}

/**
Expand All @@ -127,11 +132,7 @@ namespace temoto_resource_registrar
*/
bool unload(const std::string &rr, const std::string &id)
{
//ROS_INFO_STREAM("unload Called for rr " << rr << " id: " << id);

RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "unload called");
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), rr.c_str());
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), id.c_str());
RCLCPP_INFO_STREAM(rclcpp::get_logger("rclcpp"), "unload Called for rr " << rr << " id: " << id);

std::string client_name = IDUtils::generateUnload(rr);
initClient<rr_interfaces::srv::UnloadComponent>(client_name, unload_clients_, unload_callback_group_);
Expand All @@ -153,8 +154,14 @@ namespace temoto_resource_registrar
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "[inner service] callback finished");
};

auto result = unload_clients_[client_name]->async_send_request(request, inner_client_callback);
while (!unload_clients_[client_name]->wait_for_service(std::chrono::seconds(1)) && rclcpp::ok()) {
if (!rclcpp::ok()) {
RCLCPP_ERROR(rclcpp::get_logger("ros2_r_r.h"), "Interrupted while waiting for the service. Exiting.");
}
RCLCPP_INFO(rclcpp::get_logger("ros2_r_r.h"), "service not available, waiting again...");
}

auto result = unload_clients_[client_name]->async_send_request(request, inner_client_callback);
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "waiting for future");

while (!query_complete && rclcpp::ok())
Expand All @@ -163,10 +170,6 @@ namespace temoto_resource_registrar
}
return res;

//bool res = unload_clients_[client_name]->call(unload_srv);
//ROS_INFO_STREAM("result: " << res);

//return res;
}

/**
Expand Down Expand Up @@ -213,17 +216,14 @@ namespace temoto_resource_registrar
std::shared_ptr<typename QueryType::Response>>>
getRosChildQueries(const std::string &id, const std::string &server_name)
{
//ROS_DEBUG_STREAM("getting RR queries for id " << id << " and server " << server_name);
std::map<std::string, std::pair<std::string, std::string>> serialised_queries = getChildQueries(id, server_name);

//ROS_DEBUG_STREAM("Found " << serialised_queries.size() << " queries");
std::map<std::string,
std::pair<
std::shared_ptr<typename QueryType::Request>,
std::shared_ptr<typename QueryType::Response>>>
ros_queries;

//ROS_DEBUG_STREAM("Deserializing strings to query type");
for (const auto &el : serialised_queries)
{
std::pair<
Expand All @@ -235,7 +235,6 @@ namespace temoto_resource_registrar

ros_queries[el.first] = request_response_pair;
}
//ROS_DEBUG_STREAM("Returning...");
return ros_queries;
}

Expand All @@ -245,7 +244,6 @@ namespace temoto_resource_registrar
std::shared_ptr<typename QueryType::Response>>>
getServerQueries(const std::string &server)
{
//ROS_INFO_STREAM("getServerQueries printing before processign catalog.");
std::string server_name = IDUtils::generateServerName(name(), server);
rr_catalog_->print();
std::vector<std::pair<
Expand All @@ -258,22 +256,31 @@ namespace temoto_resource_registrar
out.push_back(deSerializeQuery<QueryType>(queryContainer));
}

//ROS_INFO_STREAM("getServerQueries printing after processign catalog.");
rr_catalog_->print();

return out;
}

template <class QueryType>
void updateQueryResponse(const std::string &server,
const std::pair<
std::shared_ptr<typename QueryType::Request>,
std::shared_ptr<typename QueryType::Response>> &call)
const std::pair<
std::shared_ptr<typename QueryType::Request>,
std::shared_ptr<typename QueryType::Response>> &call)
{
//ROS_INFO_STREAM("updateQueryResponse for server " << server);
auto &requestSharedPtr = call.first;
auto &responseSharedPtr = call.second;

std::string serialized_request = temoto_resource_registrar::MessageSerializer::serializeMessage<typename QueryType::Request>(sanityzeData<typename QueryType::Request>(call.first));
std::string serialized_response = temoto_resource_registrar::MessageSerializer::serializeMessage<typename QueryType::Response>(sanityzeData<typename QueryType::Response>(call.second));
if (!requestSharedPtr || !responseSharedPtr)
{
// Handle invalid shared pointers
return;
}

// Serialize the data directly from shared pointers
std::string serialized_request = temoto_resource_registrar::MessageSerializer::serializeMessage<typename QueryType::Request>(requestSharedPtr);
std::string serialized_response = temoto_resource_registrar::MessageSerializer::serializeMessage<typename QueryType::Response>(responseSharedPtr);

// Update the query
updateQuery(server, serialized_request, serialized_response);
}

Expand Down Expand Up @@ -603,8 +610,6 @@ namespace temoto_resource_registrar

auto request = std::make_shared<rr_interfaces::srv::StatusComponent::Request>();

//temoto_resource_registrar::StatusComponent status_srv;
//ROS_INFO_STREAM("callStatusClient");
auto container = rr_catalog_->findOriginalContainer(status_data.id_);
if (!container.empty_)
{
Expand All @@ -630,48 +635,28 @@ namespace temoto_resource_registrar
request->status = static_cast<int>(status_data.state_);
request->message = status_data.message_;

//ROS_INFO_STREAM("calling status client " << client_name << " target id: " << status_data.id_);
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), ("Calling client with id: " + client_name + " target id: " + status_data.id_).c_str());
auto result = status_clients_[client_name]->async_send_request(request);
return true;
//return rclcpp::spin_until_future_complete(this, result) == rclcpp::executor::FutureReturnCode::SUCCESS;

};

virtual std::map<std::string, std::pair<std::string, std::string>> callDataFetchClient(const std::string &target_rr,
const std::string &origin_rr,
const std::string &server_name)
{
//ROS_INFO_STREAM("callDataFetchClient");

std::string client_name = IDUtils::generateFetch(target_rr);
initClient<rr_interfaces::srv::DataFetchComponent>(client_name, fetch_clients_, fetch_callback_group_);

auto request = std::make_shared<rr_interfaces::srv::DataFetchComponent::Request>();

//temoto_resource_registrar::DataFetchComponent data_fetch_srv;

request->origin_rr = origin_rr;
request->server_name = server_name;

//ROS_INFO_STREAM("calling data fetch client " << client_name << " - " << data_fetch_srv.request);

//fetch_clients_[client_name]->call(data_fetch_srv);
auto result = fetch_clients_[client_name]->async_send_request(request);

std::map<std::string, std::pair<std::string, std::string>> res;
/*
if (rclcpp::spin_until_future_complete(this, result) == rclcpp::executor::FutureReturnCode::SUCCESS)
{
int c = 0;
for (const auto &id : result.get()->ids)
{
std::pair<std::string, std::string> req_res(result.get()->serialized_requests.at(c),
result.get()->serialized_responses.at(c));
res[id] = req_res;
c++;
}
}
*/

return res;
}

Expand Down Expand Up @@ -759,14 +744,10 @@ namespace temoto_resource_registrar
void unloadCallback(const std::shared_ptr<rr_interfaces::srv::UnloadComponent::Request> req,
std::shared_ptr<rr_interfaces::srv::UnloadComponent::Response> res)
{
//ROS_INFO_STREAM("printing catalog before unload callback");

RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "unload called");
rr_catalog_->print();

//ROS_INFO_STREAM("unloadCallback " << req.target);
std::string id = req->target;
//ROS_INFO_STREAM("std::string id " << id);
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "local unload for id %s", id.c_str());
res->status = localUnload(id);
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "finished unload");
Expand All @@ -776,7 +757,6 @@ namespace temoto_resource_registrar
std::shared_ptr<rr_interfaces::srv::StatusComponent::Response> res)
{
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "got status");
//ROS_INFO_STREAM("statusCallback: " << req.target);
int status = req->status;
std::string target = req->target;
std::string message = req->message;
Expand All @@ -794,7 +774,6 @@ namespace temoto_resource_registrar
void dataFetchCallback(const std::shared_ptr<rr_interfaces::srv::DataFetchComponent::Request> req,
std::shared_ptr<rr_interfaces::srv::DataFetchComponent::Response> res)
{
//ROS_INFO_STREAM("syncCallback " << req);
std::map<UUID, std::pair<std::string, std::string>> resMap = handleDataFetch(req->origin_rr, req->server_name);
std::vector<std::string> ids, serialized_requests, serialized_responses;

Expand Down
18 changes: 6 additions & 12 deletions include/rr/ros2_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class Ros2Server : public temoto_resource_registrar::RrServerBase

/**
* @brief Executes the server message handling logic. A request and response are passed to the method. The request is checked
* for uniqueness int he catalog. If it is unique, a new entry is created and the user load callback is executed. In case
* for uniqueness in the catalog. If it is unique, a new entry is created and the user load callback is executed. In case
* it is not unique the corresponding request response is fetched from catalog storage and deserialized for the user.
*
* @param req
Expand Down Expand Up @@ -211,10 +211,9 @@ class Ros2Server : public temoto_resource_registrar::RrServerBase
}
else
{
//ROS_INFO("Request found. No storage needed. Fetching it... ");
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "Request found. No storage needed. Fetching it... ");
std::shared_ptr<typename ServiceClass::Response> fetched_response = fetchResponse(request_id, wrapped_query);
//ROS_INFO("Fetching done...");
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "Fetching done...");
fetched_response->temoto_metadata.request_id = generated_id;
res = fetched_response;
}
Expand All @@ -225,7 +224,9 @@ class Ros2Server : public temoto_resource_registrar::RrServerBase

void triggerCallback(const temoto_resource_registrar::Status &status) const
{
//ROS_INFO_STREAM("Triggering callback logic..." << id());
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "Triggering callback logic...");
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), id().c_str());

if (member_status_cb_ != NULL)
{
std::shared_ptr<typename ServiceClass::Request> request = temoto_resource_registrar::MessageSerializer::deSerializeMessage<typename ServiceClass::Request>(status.serialised_request_);
Expand All @@ -247,22 +248,15 @@ class Ros2Server : public temoto_resource_registrar::RrServerBase
member_status_cb_;

private:
//ros::NodeHandle nh_;
//ros::ServiceServer service_;


//typename rclcpp::Client<ServiceClass>::SharedPtr client_;
typename rclcpp::Service<ServiceClass>::SharedPtr service_;

std::shared_ptr<rclcpp::Node> node_;

virtual void initialize()
{
//ROS_INFO_STREAM("Starting up server..." << id_);
//service_ = nh_.advertiseService(id_, &Ros2Server::serverCallback, this);
//ROS_INFO_STREAM("Starting up server done!!!");
//client_ = this->serverCallback<ServiceClass>(id());
service_ = node_->create_service<ServiceClass>(id(), std::bind(&Ros2Server::serverCallback, this, std::placeholders::_1, std::placeholders::_2));

RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "initialized server with ID:");
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), id().c_str());
}
Expand Down
Loading

0 comments on commit 0e287db

Please sign in to comment.