Skip to content

Commit

Permalink
Implement service record phase 2
Browse files Browse the repository at this point in the history
Signed-off-by: Barry Xu <[email protected]>
  • Loading branch information
Barry-Xu-2018 committed Aug 1, 2023
1 parent 288ebda commit f709616
Show file tree
Hide file tree
Showing 22 changed files with 1,003 additions and 117 deletions.
15 changes: 13 additions & 2 deletions ros2bag/ros2bag/verb/burst.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ def add_arguments(self, parser, cli_name): # noqa: D102
'delay of message playback.')
parser.add_argument(
'--topics', type=str, default=[], nargs='+',
help='topics to replay, separated by space. If none specified, all topics will be '
'replayed.')
help='topics to replay, separated by space. At least one topic needs to be '
"specified. If this parameter isn\'t specified, all topics will be replayed.")
parser.add_argument(
'--services', type=str, default=[], nargs='+',
help='services to replay, separated by space. At least one service needs to be '
"specified. If this parameter isn\'t specified, all services will be replayed.")
parser.add_argument(
'--qos-profile-overrides-path', type=FileType('r'),
help='Path to a yaml file defining overrides of the QoS profile for specific topics.')
Expand Down Expand Up @@ -90,6 +94,13 @@ def main(self, *, args): # noqa: D102
play_options.node_prefix = NODE_NAME_PREFIX
play_options.rate = 1.0
play_options.topics_to_filter = args.topics
# Convert service name to service event topic name
services = []
if args.services and len(args.services) != 0:
for s in args.services:
name = '/' + s if s[0] != '/' else s
services.append(name + '/_service_event')
play_options.services_to_filter = services
play_options.topic_qos_profile_overrides = qos_profile_overrides
play_options.loop = False
play_options.topic_remapping_options = topic_remapping
Expand Down
27 changes: 24 additions & 3 deletions ros2bag/ros2bag/verb/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,22 @@ def add_arguments(self, parser, cli_name): # noqa: D102
'--topics', type=str, default=[], nargs='+',
help='topics to replay, separated by space. If none specified, all topics will be '
'replayed.')
parser.add_argument(
'--services', type=str, default=[], nargs='+',
help='services to replay, separated by space. At least one service needs to be '
'specified.')
parser.add_argument(
'-e', '--regex', default='',
help='filter topics by regular expression to replay, separated by space. If none '
'specified, all topics will be replayed.')
parser.add_argument(
'-x', '--exclude', default='',
'--exclude-topics', default='',
help='regular expressions to exclude topics from replay, separated by space. If none '
'specified, all topics will be replayed.')
parser.add_argument(
'--exclude-services', default='',
help='regular expressions to exclude services from replay, separated by space. If '
'none specified, all services will be replayed.')
parser.add_argument(
'--qos-profile-overrides-path', type=FileType('r'),
help='Path to a yaml file defining overrides of the QoS profile for specific topics.')
Expand Down Expand Up @@ -183,8 +191,21 @@ def main(self, *, args): # noqa: D102
play_options.node_prefix = NODE_NAME_PREFIX
play_options.rate = args.rate
play_options.topics_to_filter = args.topics
play_options.topics_regex_to_filter = args.regex
play_options.topics_regex_to_exclude = args.exclude

# Convert service name to service event topic name
services = []
if args.services and len(args.services) != 0:
for s in args.services:
name = '/' + s if s[0] != '/' else s
services.append(name + '/_service_event')
play_options.services_to_filter = services

play_options.regex_to_filter = args.regex
play_options.topics_regex_to_exclude = args.exclude_topics
if args.exclude_services:
play_options.services_regex_to_exclude = args.exclude_services + '/_service_event'
else:
play_options.services_regex_to_exclude = args.exclude_services
play_options.topic_qos_profile_overrides = qos_profile_overrides
play_options.loop = args.loop
play_options.topic_remapping_options = topic_remapping
Expand Down
17 changes: 12 additions & 5 deletions rosbag2_py/src/rosbag2_py/_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,22 @@ PYBIND11_MODULE(_storage, m) {

pybind11::class_<rosbag2_storage::StorageFilter>(m, "StorageFilter")
.def(
pybind11::init<std::vector<std::string>, std::string, std::string>(),
pybind11::init<
std::vector<std::string>, std::vector<std::string>, std::string, std::string, std::string>(),
pybind11::arg("topics") = std::vector<std::string>(),
pybind11::arg("topics_regex") = "",
pybind11::arg("topics_regex_to_exclude") = "")
pybind11::arg("services") = std::vector<std::string>(),
pybind11::arg("regex") = "",
pybind11::arg("topics_regex_to_exclude") = "",
pybind11::arg("services_regex_to_exclude") = "")
.def_readwrite("topics", &rosbag2_storage::StorageFilter::topics)
.def_readwrite("topics_regex", &rosbag2_storage::StorageFilter::topics_regex)
.def_readwrite("services", &rosbag2_storage::StorageFilter::services)
.def_readwrite("regex", &rosbag2_storage::StorageFilter::regex)
.def_readwrite(
"topics_regex_to_exclude",
&rosbag2_storage::StorageFilter::topics_regex_to_exclude);
&rosbag2_storage::StorageFilter::topics_regex_to_exclude)
.def_readwrite(
"services_regex_to_exclude",
&rosbag2_storage::StorageFilter::services_regex_to_exclude);

pybind11::class_<rosbag2_storage::MessageDefinition>(m, "MessageDefinition")
.def(
Expand Down
4 changes: 3 additions & 1 deletion rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,10 @@ PYBIND11_MODULE(_transport, m) {
.def_readwrite("node_prefix", &PlayOptions::node_prefix)
.def_readwrite("rate", &PlayOptions::rate)
.def_readwrite("topics_to_filter", &PlayOptions::topics_to_filter)
.def_readwrite("topics_regex_to_filter", &PlayOptions::topics_regex_to_filter)
.def_readwrite("services_to_filter", &PlayOptions::services_to_filter)
.def_readwrite("regex_to_filter", &PlayOptions::regex_to_filter)
.def_readwrite("topics_regex_to_exclude", &PlayOptions::topics_regex_to_exclude)
.def_readwrite("services_regex_to_exclude", &PlayOptions::services_regex_to_exclude)
.def_property(
"topic_qos_profile_overrides",
&PlayOptions::getTopicQoSProfileOverrides,
Expand Down
20 changes: 15 additions & 5 deletions rosbag2_storage/include/rosbag2_storage/storage_filter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,28 @@ struct StorageFilter
{
// Topic names to whitelist when reading a bag. Only messages matching these
// specified topics will be returned. If list is empty, the filter is ignored
// and all messages are returned.
// and all messages of topics are returned.
std::vector<std::string> topics;

// Regular expression of topic names to whitelist when playing a bag.
// Only messages matching these specified topics will be played.
// Service names to whitelist when reading a bag. Only messages matching these
// specified service will be returned. If list is empty, the filter is ignored
// and all messages of services are returned.
std::vector<std::string> services;

// Regular expression of topic names and service name to whitelist when playing a bag.
// Only messages matching these specified topics or services will be played.
// If list is empty, the filter is ignored and all messages are played.
std::string topics_regex = "";
std::string regex = "";

// Regular expression of topic names to exclude when playing a bag.
// Only messages not matching these specified topics will be played.
// If list is empty, the filter is ignored and all messages are played.
// If list is empty, the filter is ignored and all messages of topics are played.
std::string topics_regex_to_exclude = "";

// Regular expression of topic names to exclude when playing a bag.
// Only messages not matching these specified services will be played.
// If list is empty, the filter is ignored and all messages of services are played.
std::string services_regex_to_exclude = "";
};

} // namespace rosbag2_storage
Expand Down
61 changes: 49 additions & 12 deletions rosbag2_storage_mcap/src/mcap_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -521,26 +521,63 @@ void MCAPStorage::reset_iterator()
options.endTime = mcap::MaxTime;
}
options.readOrder = read_order_;
if (!storage_filter_.topics.empty()) {
options.topicFilter = [this](std::string_view topic) {

auto filter_process = [this](std::string_view topic) {
if (!storage_filter_.topics.empty()) {
for (const auto & match_topic : storage_filter_.topics) {
if (match_topic == topic) {
return true;
}
}
return false;
};
}
}

if (!storage_filter_.services.empty()) {
for (const auto & match_service : storage_filter_.services) {
if (match_service == topic) {
return true;
}
}
}

bool topics_regex_to_exclude_match = false;
bool services_regex_to_exclude_match = false;
std::string topic_string(topic);

if (!storage_filter_.topics_regex_to_exclude.empty()) {
std::smatch m;
std::regex re(storage_filter_.topics_regex_to_exclude);
topics_regex_to_exclude_match = std::regex_match(topic_string, m, re);
}

if (!storage_filter_.services_regex_to_exclude.empty()) {
std::smatch m;
std::regex re(storage_filter_.services_regex_to_exclude);
services_regex_to_exclude_match = std::regex_match(topic_string, m, re);
}

#ifdef ROSBAG2_STORAGE_MCAP_HAS_STORAGE_FILTER_TOPIC_REGEX
if (!storage_filter_.topics_regex.empty()) {
options.topicFilter = [this](std::string_view topic) {
if (!storage_filter_.regex.empty()) {
std::smatch m;
std::string topic_string(topic);
std::regex re(storage_filter_.topics_regex);
return std::regex_match(topic_string, m, re);
};
}
std::regex re(storage_filter_.regex);

if (std::regex_match(topic_string, m, re) && !topics_regex_to_exclude_match &&
!services_regex_to_exclude_match) {
return true;
} else {
return false;
}
}
#endif

if ((storage_filter_.topics.empty() && !topics_regex_to_exclude_match) &&
(storage_filter_.services.empty() && !services_regex_to_exclude_match)) {
return true;
}

return false;
};
options.topicFilter = filter_process;

linear_view_ =
std::make_unique<mcap::LinearMessageView>(mcap_reader_->readMessages(OnProblem, options));
linear_iterator_ = std::make_unique<mcap::LinearMessageView::Iterator>(linear_view_->begin());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ void SqliteStorage::prepare_for_reading()
"FROM messages JOIN topics ON messages.topic_id = topics.id WHERE ";
std::vector<std::string> where_conditions;

std::string topic_and_service_list;
// add topic filter
if (!storage_filter_.topics.empty()) {
// Construct string for selected topics
Expand All @@ -531,13 +532,39 @@ void SqliteStorage::prepare_for_reading()
topic_list += ",";
}
}
where_conditions.push_back("(topics.name IN (" + topic_list + "))");
topic_and_service_list = "(topics.name IN (" + topic_list + "))";
}
// add topic filter based on regular expression
if (!storage_filter_.topics_regex.empty()) {

// add service filter
if (!storage_filter_.services.empty()) {
// Construct string for selected topics
where_conditions.push_back("(topics.name REGEXP '" + storage_filter_.topics_regex + "')");
std::string service_list{""};
for (auto & service : storage_filter_.services) {
service_list += "'" + service + "'";
if (&service != &storage_filter_.topics.back()) {
service_list += ",";
}
}

topic_and_service_list = topic_and_service_list +
std::string(topic_and_service_list.empty() ? "" : " OR ") +
"(topics.name IN (" + service_list + "))";
}

std::string list_and_regex = topic_and_service_list;
// add topic filter based on regular expression
if (!storage_filter_.regex.empty()) {
std::string regex = "(topics.name REGEXP '" + storage_filter_.regex + "')";
list_and_regex = list_and_regex +
std::string(!list_and_regex.empty() ? " OR " : "") +
regex;
}

if (!list_and_regex.empty()) {
where_conditions.push_back(list_and_regex);
}

std::string exclude_topics_services;
// exclude topics based on regular expressions
if (!storage_filter_.topics_regex_to_exclude.empty()) {
// Construct string for selected topics
Expand All @@ -546,6 +573,14 @@ void SqliteStorage::prepare_for_reading()
"(SELECT topics.name FROM topics WHERE topics.name REGEXP '" +
storage_filter_.topics_regex_to_exclude + "'))");
}
// exclude service based on regular expressions
if (!storage_filter_.services_regex_to_exclude.empty()) {
// Construct string for selected topics
where_conditions.push_back(
"(topics.name NOT IN "
"(SELECT topics.name FROM topics WHERE topics.name REGEXP '" +
storage_filter_.services_regex_to_exclude + "'))");
}

const std::string direction_op = read_order_.reverse ? "<" : ">";
const std::string order_direction = read_order_.reverse ? "DESC" : "ASC";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ TEST_F(StorageTestFixture, read_next_returns_filtered_messages_regex) {
readable_storage->open({db_filename, kPluginID});

rosbag2_storage::StorageFilter storage_filter;
storage_filter.topics_regex = "topic.*";
storage_filter.regex = "topic.*";
readable_storage->set_filter(storage_filter);

EXPECT_TRUE(readable_storage->has_next());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_TEST_COMMON__SERVICE_CLIENT_MANAGER_HPP_
#define ROSBAG2_TEST_COMMON__SERVICE_CLIENT_MANAGER_HPP_
#ifndef ROSBAG2_TEST_COMMON__CLIENT_MANAGER_HPP_
#define ROSBAG2_TEST_COMMON__CLIENT_MANAGER_HPP_

#include <memory>
#include <string>
Expand All @@ -23,13 +23,14 @@

#include "rclcpp/rclcpp.hpp" // rclcpp must be included before the Windows specific includes.


namespace rosbag2_test_common
{
template<typename ServiceT>
class ServiceClientManager : public rclcpp::Node
class ClientManager : public rclcpp::Node
{
public:
explicit ServiceClientManager(
explicit ClientManager(
std::string service_name,
size_t client_number = 1,
bool service_event_contents = false,
Expand Down Expand Up @@ -122,4 +123,4 @@ class ServiceClientManager : public rclcpp::Node
};
} // namespace rosbag2_test_common

#endif // ROSBAG2_TEST_COMMON__SERVICE_CLIENT_MANAGER_HPP_
#endif // ROSBAG2_TEST_COMMON__CLIENT_MANAGER_HPP_
Loading

0 comments on commit f709616

Please sign in to comment.