From a029ef9a435d5e118f9c4b500e8271d3998f684d Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Sun, 24 Sep 2023 21:42:36 -0700 Subject: [PATCH] Store serialized metadata in MCAP file (#1423) * Store serialized metadata in MCAP file when calling update_metadata() Signed-off-by: Michael Orlov * Remove ROS_DISTRO key-value from mcap::Metadata - ROS_DISTRO now stored inside serialized metadata Signed-off-by: Michael Orlov * Fix for test failure on Windows - Use `UnorderedElementsAreArray` for checking `metadata.topics_with_message_count`. - The list of the metadata.topics_with_message_count forming from the mcap_reader_->channels() which is unordered_map. The order of the elements in the unordered_map is unpredictable and can vary on different platforms. Signed-off-by: Michael Orlov --------- Signed-off-by: Michael Orlov --- rosbag2_storage_mcap/src/mcap_storage.cpp | 73 ++++++---- .../test_mcap_storage.cpp | 125 +++++++++++++++++- 2 files changed, 165 insertions(+), 33 deletions(-) diff --git a/rosbag2_storage_mcap/src/mcap_storage.cpp b/rosbag2_storage_mcap/src/mcap_storage.cpp index 716967a12b..bcc0269671 100644 --- a/rosbag2_storage_mcap/src/mcap_storage.cpp +++ b/rosbag2_storage_mcap/src/mcap_storage.cpp @@ -228,6 +228,7 @@ class ROSBAG2_STORAGE_MCAP_PUBLIC MCAPStorage #endif private: + void read_metadata(); void open_impl(const std::string & uri, const std::string & preset_profile, rosbag2_storage::storage_interfaces::IOFlag io_flag, const std::string & storage_config_uri); @@ -370,12 +371,43 @@ void MCAPStorage::open_impl(const std::string & uri, const std::string & preset_ metadata_.relative_file_paths = {get_relative_file_path()}; } -/** BaseInfoInterface **/ -rosbag2_storage::BagMetadata MCAPStorage::get_metadata() +void MCAPStorage::read_metadata() { ensure_summary_read(); + const auto & mcap_metadatas = mcap_reader_->metadataIndexes(); + auto range = mcap_metadatas.equal_range("rosbag2"); + mcap::Status status{}; + mcap::Record mcap_record{}; + mcap::Metadata mcap_metadata{}; + for (auto i = range.first; i != range.second; ++i) { + status = mcap::McapReader::ReadRecord(*data_source_, i->second.offset, &mcap_record); + if (!status.ok()) { + OnProblem(status); + continue; + } + status = mcap::McapReader::ParseMetadata(mcap_record, &mcap_metadata); + if (!status.ok()) { + OnProblem(status); + continue; + } + std::string serialized_metadata; + try { + serialized_metadata = mcap_metadata.metadata.at("serialized_metadata"); + } catch (const std::out_of_range & /* err */) { + RCUTILS_LOG_WARN_NAMED( + LOG_NAME, "Metadata record with name 'rosbag2' did not contain key 'serialized_metadata'."); + } + if (!serialized_metadata.empty()) { + YAML::Node metadata_node = YAML::Load(serialized_metadata); + YAML::convert::decode(metadata_node, metadata_); + } + try { + metadata_.ros_distro = mcap_metadata.metadata.at("ROS_DISTRO"); + } catch (const std::out_of_range & /* err */) { + // Ignor this error. In new versions ROS_DISTRO stored inside `serialized_metadata`. + } + } - metadata_.version = 2; metadata_.storage_identifier = get_storage_identifier(); metadata_.bag_size = get_bagfile_size(); metadata_.relative_file_paths = {get_relative_file_path()}; @@ -420,35 +452,16 @@ rosbag2_storage::BagMetadata MCAPStorage::get_metadata() } else { topic_info.message_count = 0; } - metadata_.topics_with_message_count.push_back(topic_info); } +} - const auto & mcap_metadatas = mcap_reader_->metadataIndexes(); - auto range = mcap_metadatas.equal_range("rosbag2"); - mcap::Status status{}; - mcap::Record mcap_record{}; - mcap::Metadata mcap_metadata{}; - for (auto i = range.first; i != range.second; ++i) { - status = mcap::McapReader::ReadRecord(*data_source_, i->second.offset, &mcap_record); - if (!status.ok()) { - OnProblem(status); - continue; - } - status = mcap::McapReader::ParseMetadata(mcap_record, &mcap_metadata); - if (!status.ok()) { - OnProblem(status); - continue; - } - try { - metadata_.ros_distro = mcap_metadata.metadata.at("ROS_DISTRO"); - } catch (const std::out_of_range & /* err */) { - RCUTILS_LOG_ERROR_NAMED( - LOG_NAME, "Metadata record with name 'rosbag2' did not contain key 'ROS_DISTRO'."); - } - break; +/** BaseInfoInterface **/ +rosbag2_storage::BagMetadata MCAPStorage::get_metadata() +{ + if (opened_as_ == rosbag2_storage::storage_interfaces::IOFlag::READ_ONLY) { + read_metadata(); } - return metadata_; } @@ -831,7 +844,9 @@ void MCAPStorage::update_metadata(const rosbag2_storage::BagMetadata & bag_metad mcap::Metadata metadata; metadata.name = "rosbag2"; - metadata.metadata = {{"ROS_DISTRO", bag_metadata.ros_distro}}; + YAML::Node metadata_node = YAML::convert::encode(bag_metadata); + std::string serialized_metadata = YAML::Dump(metadata_node); + metadata.metadata = {{"serialized_metadata", serialized_metadata}}; mcap::Status status = mcap_writer_->write(metadata); if (!status.ok()) { OnProblem(status); diff --git a/rosbag2_storage_mcap/test/rosbag2_storage_mcap/test_mcap_storage.cpp b/rosbag2_storage_mcap/test/rosbag2_storage_mcap/test_mcap_storage.cpp index f94520bf05..28fe66ae92 100644 --- a/rosbag2_storage_mcap/test/rosbag2_storage_mcap/test_mcap_storage.cpp +++ b/rosbag2_storage_mcap/test/rosbag2_storage_mcap/test_mcap_storage.cpp @@ -14,8 +14,8 @@ #include "rclcpp/serialization.hpp" #include "rclcpp/serialized_message.hpp" -#include "rcpputils/env.hpp" #include "rcpputils/filesystem_helper.hpp" +#include "rcutils/logging_macros.h" #include "rosbag2_storage/storage_factory.hpp" #ifdef ROSBAG2_STORAGE_MCAP_HAS_STORAGE_OPTIONS #include "rosbag2_storage/storage_options.hpp" @@ -31,6 +31,52 @@ using namespace ::testing; // NOLINT using TemporaryDirectoryFixture = rosbag2_test_common::TemporaryDirectoryFixture; +class McapStorageTestFixture : public rosbag2_test_common::TemporaryDirectoryFixture +{ +public: + McapStorageTestFixture() = default; + + std::shared_ptr make_serialized_message(const std::string & message) + { + rclcpp::Serialization serialization; + + std_msgs::msg::String std_string_msg; + std_string_msg.data = message; + auto serialized_msg = std::make_shared(); + serialization.serialize_message(&std_string_msg, serialized_msg.get()); + + auto ret = std::make_shared(); + *ret = serialized_msg->release_rcl_serialized_message(); + return ret; + } + + std::shared_ptr write_messages_to_mcap( + std::vector> & messages, + std::shared_ptr rw_storage = nullptr) + { + if (nullptr == rw_storage) { + rosbag2_storage::StorageFactory factory; + rosbag2_storage::StorageOptions options; + auto uri = rcpputils::fs::path(temporary_dir_path_) / "bag"; + options.uri = uri.string(); + options.storage_id = "mcap"; + rw_storage = factory.open_read_write(options); + } + + for (auto msg : messages) { + const rosbag2_storage::TopicMetadata & topic_metadata = std::get<2>(msg); + rw_storage->create_topic(topic_metadata, std::get<3>(msg)); + auto bag_message = std::make_shared(); + bag_message->serialized_data = make_serialized_message(std::get<0>(msg)); + bag_message->time_stamp = std::get<1>(msg); + bag_message->topic_name = topic_metadata.name; + rw_storage->write(bag_message); + } + return rw_storage; + } +}; + namespace rosbag2_storage { bool operator==(const TopicInformation & lhs, const TopicInformation & rhs) @@ -39,6 +85,80 @@ bool operator==(const TopicInformation & lhs, const TopicInformation & rhs) } } // namespace rosbag2_storage +TEST_F(McapStorageTestFixture, can_store_and_read_metadata_correctly) +{ + const std::string storage_id = "mcap"; + auto uri = (rcpputils::fs::path(temporary_dir_path_) / "rosbag").string(); + auto expected_bag = rcpputils::fs::path(temporary_dir_path_) / "rosbag.mcap"; + const rosbag2_storage::MessageDefinition definition = {"std_msgs/msg/String", "ros2msg", + "string data", ""}; + + std::vector string_messages = {"first message", "second message", "third message"}; + std::vector topics = {"topic1", "topic2"}; + + rosbag2_storage::TopicMetadata topic_metadata_1 = {topics[0], "std_msgs/msg/String", "cdr", + "qos_profile1", "type_hash1"}; + rosbag2_storage::TopicMetadata topic_metadata_2 = {topics[1], "std_msgs/msg/String", "cdr", + "qos_profile2", "type_hash2"}; + + std::vector> + messages = { + std::make_tuple(string_messages[0], static_cast(1e9), topic_metadata_1, definition), + std::make_tuple(string_messages[1], static_cast(2e9), topic_metadata_1, definition), + std::make_tuple(string_messages[2], static_cast(3e9), topic_metadata_2, definition), + }; + + rosbag2_storage::StorageFactory factory; + rosbag2_storage::StorageOptions options; + options.uri = uri; + options.storage_id = storage_id; + + { + auto writer = factory.open_read_write(options); + writer->create_topic({"topic1", "type1", "rmw1", "qos_profile1", "type_hash1"}, {}); + writer->create_topic({"topic2", "type2", "rmw2", "qos_profile2", "type_hash2"}, {}); + (void)write_messages_to_mcap(messages, writer); + auto metadata = writer->get_metadata(); + metadata.ros_distro = "rolling"; + metadata.custom_data["key1"] = "value1"; + writer->update_metadata(metadata); + } + + options.uri = expected_bag.string(); + options.storage_id = storage_id; + auto reader = factory.open_read_only(options); + const auto metadata = reader->get_metadata(); + + EXPECT_THAT(metadata.storage_identifier, Eq("mcap")); + EXPECT_THAT(metadata.relative_file_paths, ElementsAreArray({expected_bag.string()})); + + EXPECT_THAT( + metadata.topics_with_message_count, + UnorderedElementsAreArray({ + rosbag2_storage::TopicInformation{ + rosbag2_storage::TopicMetadata{"topic2", "type2", "rmw2", "qos_profile2", "type_hash2"}, + 1u}, + rosbag2_storage::TopicInformation{ + rosbag2_storage::TopicMetadata{"topic1", "type1", "rmw1", "qos_profile1", "type_hash1"}, + 2u}, + })); + EXPECT_THAT(metadata.message_count, Eq(3u)); + + const auto current_distro = "rolling"; + EXPECT_EQ(metadata.ros_distro, current_distro); + + EXPECT_THAT( + metadata.starting_time, + Eq(std::chrono::time_point(std::chrono::seconds(1)))); + EXPECT_THAT(metadata.duration, Eq(std::chrono::seconds(2))) + << "metadata.duration=" << metadata.duration.count(); + + EXPECT_EQ(metadata.custom_data.size(), 1); + EXPECT_THAT(metadata.custom_data, + UnorderedElementsAreArray({std::pair("key1", "value1")})); +} + TEST_F(TemporaryDirectoryFixture, can_write_and_read_basic_mcap_file) { auto uri = rcpputils::fs::path(temporary_dir_path_) / "bag"; @@ -118,9 +238,6 @@ TEST_F(TemporaryDirectoryFixture, can_write_and_read_basic_mcap_file) 1u}})); EXPECT_THAT(metadata.message_count, Eq(1u)); - const auto current_distro = rcpputils::get_env_var("ROS_DISTRO"); - EXPECT_EQ(metadata.ros_distro, current_distro); - EXPECT_TRUE(reader->has_next()); std_msgs::msg::String msg;