Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support zero-copy intra-process publishing #306

Merged
merged 15 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions image_transport/include/image_transport/camera_publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ class CameraPublisher
const sensor_msgs::msg::Image::ConstSharedPtr & image,
const sensor_msgs::msg::CameraInfo::ConstSharedPtr & info) const;

/*!
* \brief Publish an (image, info) pair on the topics associated with this CameraPublisher.
*/
IMAGE_TRANSPORT_PUBLIC
void publish(
sensor_msgs::msg::Image::UniquePtr image,
sensor_msgs::msg::CameraInfo::UniquePtr info) const;

/*!
* \brief Publish an (image, info) pair with given timestamp on the topics associated with
* this CameraPublisher.
Expand All @@ -123,6 +131,19 @@ class CameraPublisher
sensor_msgs::msg::Image & image, sensor_msgs::msg::CameraInfo & info,
rclcpp::Time stamp) const;

/*!
* \brief Publish an (image, info) pair with given timestamp on the topics associated with
* this CameraPublisher.
*
* Convenience version, which sets the timestamps of both image and info to stamp before
* publishing.
*/
IMAGE_TRANSPORT_PUBLIC
void publish(
sensor_msgs::msg::Image::UniquePtr image,
sensor_msgs::msg::CameraInfo::UniquePtr info,
rclcpp::Time stamp) const;

/*!
* \brief Shutdown the advertisements associated with this Publisher.
*/
Expand Down
6 changes: 6 additions & 0 deletions image_transport/include/image_transport/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ class Publisher
IMAGE_TRANSPORT_PUBLIC
void publish(const sensor_msgs::msg::Image::ConstSharedPtr & message) const;

/*!
* \brief Publish an image on the topics associated with this Publisher.
*/
IMAGE_TRANSPORT_PUBLIC
void publish(sensor_msgs::msg::Image::UniquePtr message) const;

/*!
* \brief Shutdown the advertisements associated with this Publisher.
*/
Expand Down
21 changes: 21 additions & 0 deletions image_transport/include/image_transport/publisher_plugin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#ifndef IMAGE_TRANSPORT__PUBLISHER_PLUGIN_HPP_
#define IMAGE_TRANSPORT__PUBLISHER_PLUGIN_HPP_

#include <stdexcept>
#include <string>
#include <vector>

Expand Down Expand Up @@ -59,6 +60,14 @@ class PublisherPlugin
*/
virtual std::string getTransportName() const = 0;

/**
* \brief Check whether this plugin supports publishing using UniquePtr.
*/
virtual bool supportsUniquePtrPub() const
{
return false;
}

/**
* \brief Advertise a topic, simple version.
*/
Expand Down Expand Up @@ -95,6 +104,18 @@ class PublisherPlugin
publish(*message);
}

/**
* \brief Publish an image using the transport associated with this PublisherPlugin.
ahcorde marked this conversation as resolved.
Show resolved Hide resolved
* This version of the function can be used to optimize cases where the Plugin can
* avoid doing copies of the data when having the ownership to the image message.
* Plugins that can take advantage of message ownership should overwrite this method
* along with supportsUniquePtrPub().
*/
virtual void publishUniquePtr(sensor_msgs::msg::Image::UniquePtr /*message*/) const
{
throw std::logic_error("publishUniquePtr() is not implemented.");
}

/**
* \brief Publish an image using the transport associated with this PublisherPlugin.
* This version of the function can be used to optimize cases where you don't want to
Expand Down
19 changes: 19 additions & 0 deletions image_transport/include/image_transport/raw_publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#define IMAGE_TRANSPORT__RAW_PUBLISHER_HPP_

#include <string>
#include <utility>

#include "sensor_msgs/msg/image.hpp"

Expand All @@ -56,12 +57,30 @@ class RawPublisher : public SimplePublisherPlugin<sensor_msgs::msg::Image>
return "raw";
}

virtual bool supportsUniquePtrPub() const
{
return true;
}

protected:
[[deprecated("Use publish(const sensor_msgs::msg::Image&, const PublisherT&) instead.")]]
virtual void publish(const sensor_msgs::msg::Image & message, const PublishFn & publish_fn) const
bjsowa marked this conversation as resolved.
Show resolved Hide resolved
{
publish_fn(message);
}

virtual void publish(const sensor_msgs::msg::Image & message, const PublisherT & publisher) const
{
publisher->publish(message);
}

virtual void publish(
sensor_msgs::msg::Image::UniquePtr message,
const PublisherT & publisher) const
{
publisher->publish(std::move(message));
}

virtual std::string getTopicToAdvertise(const std::string & base_topic) const
{
return base_topic;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
#define IMAGE_TRANSPORT__SIMPLE_PUBLISHER_PLUGIN_HPP_

#include <memory>
#include <stdexcept>
#include <string>
#include <utility>

#include "rclcpp/node.hpp"
#include "rclcpp/logger.hpp"
Expand Down Expand Up @@ -89,7 +91,20 @@ class SimplePublisherPlugin : public PublisherPlugin
return;
}

publish(message, bindInternalPublisher(simple_impl_->pub_.get()));
publish(message, simple_impl_->pub_);
}

void publishUniquePtr(sensor_msgs::msg::Image::UniquePtr message) const override
{
if (!simple_impl_ || !simple_impl_->pub_) {
auto logger = simple_impl_ ? simple_impl_->logger_ : rclcpp::get_logger("image_transport");
RCLCPP_ERROR(
logger,
"Call to publish() on an invalid image_transport::SimplePublisherPlugin");
return;
}

publish(std::move(message), simple_impl_->pub_);
}

void shutdown() override
Expand All @@ -112,20 +127,54 @@ class SimplePublisherPlugin : public PublisherPlugin
simple_impl_->pub_ = node->create_publisher<M>(transport_topic, qos, options);
}

typedef typename rclcpp::Publisher<M>::SharedPtr PublisherT;

//! Generic function for publishing the internal message type.
typedef std::function<void (const M &)> PublishFn;

/**
* \brief Publish an image using the specified publish function. Must be implemented by
* the subclass.
* \brief Publish an image using the specified publish function.
*
* \deprecated Use publish(const sensor_msgs::msg::Image&, const PublisherT&) instead.
*
* The PublishFn publishes the transport-specific message type. This indirection allows
* SimpleSubscriberPlugin to use this function for both normal broadcast publishing and
* single subscriber publishing (in subscription callbacks).
*/
virtual void publish(
const sensor_msgs::msg::Image & /*message*/,
const PublishFn & /*publish_fn*/) const
{
throw std::logic_error(
"publish(const sensor_msgs::msg::Image&, const PublishFn&) is not implemented.");
}

/**
* \brief Publish an image using the specified publisher.
*/
virtual void publish(
const sensor_msgs::msg::Image & message,
const PublishFn & publish_fn) const = 0;
const PublisherT & publisher) const
{
// Fallback to old, deprecated method
publish(message, bindInternalPublisher(publisher.get()));
}

/**
* \brief Publish an image using the specified publisher.
*
* This version of the function can be used to optimize cases where the Plugin can
* avoid doing copies of the data when having the ownership to the image message.
* Plugins that can take advantage of message ownership should overwrite this method
* along with supportsUniquePtrPub().
*/
virtual void publish(
sensor_msgs::msg::Image::UniquePtr /*message*/,
const PublisherT & /*publisher*/) const
{
throw std::logic_error(
"publish(sensor_msgs::msg::Image::UniquePtr, const PublisherT&) is not implemented.");
}

/**
* \brief Return the communication topic name for a given base topic.
Expand All @@ -148,7 +197,7 @@ class SimplePublisherPlugin : public PublisherPlugin

rclcpp::Node * node_;
rclcpp::Logger logger_;
typename rclcpp::Publisher<M>::SharedPtr pub_;
PublisherT pub_;
};

std::unique_ptr<SimplePublisherPluginImpl> simple_impl_;
Expand Down
39 changes: 36 additions & 3 deletions image_transport/src/camera_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include <memory>
#include <string>
#include <utility>

#include "rclcpp/expand_topic_or_service_name.hpp"
#include "rclcpp/logging.hpp"
Expand Down Expand Up @@ -124,7 +125,6 @@ void CameraPublisher::publish(
const sensor_msgs::msg::CameraInfo & info) const
{
if (!impl_ || !impl_->isValid()) {
// TODO(ros2) Switch to RCUTILS_ASSERT when ros2/rcutils#112 is merged
auto logger = impl_ ? impl_->logger_ : rclcpp::get_logger("image_transport");
RCLCPP_FATAL(
logger,
Expand All @@ -141,7 +141,6 @@ void CameraPublisher::publish(
const sensor_msgs::msg::CameraInfo::ConstSharedPtr & info) const
{
if (!impl_ || !impl_->isValid()) {
// TODO(ros2) Switch to RCUTILS_ASSERT when ros2/rcutils#112 is merged
auto logger = impl_ ? impl_->logger_ : rclcpp::get_logger("image_transport");
RCLCPP_FATAL(
logger,
Expand All @@ -153,12 +152,27 @@ void CameraPublisher::publish(
impl_->info_pub_->publish(*info);
}

void CameraPublisher::publish(
sensor_msgs::msg::Image::UniquePtr image,
sensor_msgs::msg::CameraInfo::UniquePtr info) const
{
if (!impl_ || !impl_->isValid()) {
auto logger = impl_ ? impl_->logger_ : rclcpp::get_logger("image_transport");
RCLCPP_FATAL(
logger,
"Call to publish() on an invalid image_transport::CameraPublisher");
return;
}

impl_->image_pub_.publish(std::move(image));
bjsowa marked this conversation as resolved.
Show resolved Hide resolved
impl_->info_pub_->publish(std::move(info));
}

void CameraPublisher::publish(
sensor_msgs::msg::Image & image, sensor_msgs::msg::CameraInfo & info,
rclcpp::Time stamp) const
{
if (!impl_ || !impl_->isValid()) {
// TODO(ros2) Switch to RCUTILS_ASSERT when ros2/rcutils#112 is merged
auto logger = impl_ ? impl_->logger_ : rclcpp::get_logger("image_transport");
RCLCPP_FATAL(
logger,
Expand All @@ -172,6 +186,25 @@ void CameraPublisher::publish(
impl_->info_pub_->publish(info);
}

void CameraPublisher::publish(
sensor_msgs::msg::Image::UniquePtr image,
sensor_msgs::msg::CameraInfo::UniquePtr info,
rclcpp::Time stamp) const
{
if (!impl_ || !impl_->isValid()) {
auto logger = impl_ ? impl_->logger_ : rclcpp::get_logger("image_transport");
RCLCPP_FATAL(
logger,
"Call to publish() on an invalid image_transport::CameraPublisher");
return;
}

image->header.stamp = stamp;
info->header.stamp = stamp;
impl_->image_pub_.publish(std::move(image));
impl_->info_pub_->publish(std::move(info));
}

void CameraPublisher::shutdown()
{
if (impl_) {
Expand Down
31 changes: 31 additions & 0 deletions image_transport/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "image_transport/publisher.hpp"

#include <memory>
#include <optional>
#include <set>
#include <string>
#include <utility>
Expand Down Expand Up @@ -202,6 +203,36 @@ void Publisher::publish(const sensor_msgs::msg::Image::ConstSharedPtr & message)
}
}

void Publisher::publish(sensor_msgs::msg::Image::UniquePtr message) const
{
if (!impl_ || !impl_->isValid()) {
auto logger = impl_ ? impl_->logger_ : rclcpp::get_logger("image_transport");
RCLCPP_FATAL(logger, "Call to publish() on an invalid image_transport::Publisher");
return;
}

std::vector<std::shared_ptr<PublisherPlugin>> pubs_take_reference;
std::optional<std::shared_ptr<PublisherPlugin>> pub_takes_ownership = std::nullopt;
ahcorde marked this conversation as resolved.
Show resolved Hide resolved

for (const auto & pub : impl_->publishers_) {
if (pub->getNumSubscribers() > 0) {
if (pub->supportsUniquePtrPub() && !pub_takes_ownership.has_value()) {
pub_takes_ownership = pub;
} else {
pubs_take_reference.push_back(pub);
}
}
}

for (const auto & pub : pubs_take_reference) {
pub->publish(*message);
}

if (pub_takes_ownership.has_value()) {
pub_takes_ownership.value()->publishUniquePtr(std::move(message));
}
}

void Publisher::shutdown()
{
if (impl_) {
Expand Down