Skip to content

Commit

Permalink
Store serialized metadata in MCAP file (#1423)
Browse files Browse the repository at this point in the history
* Store serialized metadata in MCAP file when calling update_metadata()

Signed-off-by: Michael Orlov <[email protected]>

* Remove ROS_DISTRO key-value from mcap::Metadata

- ROS_DISTRO now stored inside serialized metadata

Signed-off-by: Michael Orlov <[email protected]>

* Fix for test failure on Windows

- Use `UnorderedElementsAreArray` for checking
`metadata.topics_with_message_count`.
- The list of the metadata.topics_with_message_count forming from the
 mcap_reader_->channels() which is unordered_map.
 The order of the elements in the unordered_map is unpredictable and
 can vary on different platforms.

Signed-off-by: Michael Orlov <[email protected]>

---------

Signed-off-by: Michael Orlov <[email protected]>
  • Loading branch information
MichaelOrlov authored Sep 25, 2023
1 parent 96c11b0 commit a029ef9
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 33 deletions.
73 changes: 44 additions & 29 deletions rosbag2_storage_mcap/src/mcap_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ class ROSBAG2_STORAGE_MCAP_PUBLIC MCAPStorage
#endif

private:
void read_metadata();
void open_impl(const std::string & uri, const std::string & preset_profile,
rosbag2_storage::storage_interfaces::IOFlag io_flag,
const std::string & storage_config_uri);
Expand Down Expand Up @@ -370,12 +371,43 @@ void MCAPStorage::open_impl(const std::string & uri, const std::string & preset_
metadata_.relative_file_paths = {get_relative_file_path()};
}

/** BaseInfoInterface **/
rosbag2_storage::BagMetadata MCAPStorage::get_metadata()
void MCAPStorage::read_metadata()
{
ensure_summary_read();
const auto & mcap_metadatas = mcap_reader_->metadataIndexes();
auto range = mcap_metadatas.equal_range("rosbag2");
mcap::Status status{};
mcap::Record mcap_record{};
mcap::Metadata mcap_metadata{};
for (auto i = range.first; i != range.second; ++i) {
status = mcap::McapReader::ReadRecord(*data_source_, i->second.offset, &mcap_record);
if (!status.ok()) {
OnProblem(status);
continue;
}
status = mcap::McapReader::ParseMetadata(mcap_record, &mcap_metadata);
if (!status.ok()) {
OnProblem(status);
continue;
}
std::string serialized_metadata;
try {
serialized_metadata = mcap_metadata.metadata.at("serialized_metadata");
} catch (const std::out_of_range & /* err */) {
RCUTILS_LOG_WARN_NAMED(
LOG_NAME, "Metadata record with name 'rosbag2' did not contain key 'serialized_metadata'.");
}
if (!serialized_metadata.empty()) {
YAML::Node metadata_node = YAML::Load(serialized_metadata);
YAML::convert<rosbag2_storage::BagMetadata>::decode(metadata_node, metadata_);
}
try {
metadata_.ros_distro = mcap_metadata.metadata.at("ROS_DISTRO");
} catch (const std::out_of_range & /* err */) {
// Ignor this error. In new versions ROS_DISTRO stored inside `serialized_metadata`.
}
}

metadata_.version = 2;
metadata_.storage_identifier = get_storage_identifier();
metadata_.bag_size = get_bagfile_size();
metadata_.relative_file_paths = {get_relative_file_path()};
Expand Down Expand Up @@ -420,35 +452,16 @@ rosbag2_storage::BagMetadata MCAPStorage::get_metadata()
} else {
topic_info.message_count = 0;
}

metadata_.topics_with_message_count.push_back(topic_info);
}
}

const auto & mcap_metadatas = mcap_reader_->metadataIndexes();
auto range = mcap_metadatas.equal_range("rosbag2");
mcap::Status status{};
mcap::Record mcap_record{};
mcap::Metadata mcap_metadata{};
for (auto i = range.first; i != range.second; ++i) {
status = mcap::McapReader::ReadRecord(*data_source_, i->second.offset, &mcap_record);
if (!status.ok()) {
OnProblem(status);
continue;
}
status = mcap::McapReader::ParseMetadata(mcap_record, &mcap_metadata);
if (!status.ok()) {
OnProblem(status);
continue;
}
try {
metadata_.ros_distro = mcap_metadata.metadata.at("ROS_DISTRO");
} catch (const std::out_of_range & /* err */) {
RCUTILS_LOG_ERROR_NAMED(
LOG_NAME, "Metadata record with name 'rosbag2' did not contain key 'ROS_DISTRO'.");
}
break;
/** BaseInfoInterface **/
rosbag2_storage::BagMetadata MCAPStorage::get_metadata()
{
if (opened_as_ == rosbag2_storage::storage_interfaces::IOFlag::READ_ONLY) {
read_metadata();
}

return metadata_;
}

Expand Down Expand Up @@ -831,7 +844,9 @@ void MCAPStorage::update_metadata(const rosbag2_storage::BagMetadata & bag_metad

mcap::Metadata metadata;
metadata.name = "rosbag2";
metadata.metadata = {{"ROS_DISTRO", bag_metadata.ros_distro}};
YAML::Node metadata_node = YAML::convert<rosbag2_storage::BagMetadata>::encode(bag_metadata);
std::string serialized_metadata = YAML::Dump(metadata_node);
metadata.metadata = {{"serialized_metadata", serialized_metadata}};
mcap::Status status = mcap_writer_->write(metadata);
if (!status.ok()) {
OnProblem(status);
Expand Down
125 changes: 121 additions & 4 deletions rosbag2_storage_mcap/test/rosbag2_storage_mcap/test_mcap_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

#include "rclcpp/serialization.hpp"
#include "rclcpp/serialized_message.hpp"
#include "rcpputils/env.hpp"
#include "rcpputils/filesystem_helper.hpp"
#include "rcutils/logging_macros.h"
#include "rosbag2_storage/storage_factory.hpp"
#ifdef ROSBAG2_STORAGE_MCAP_HAS_STORAGE_OPTIONS
#include "rosbag2_storage/storage_options.hpp"
Expand All @@ -31,6 +31,52 @@
using namespace ::testing; // NOLINT
using TemporaryDirectoryFixture = rosbag2_test_common::TemporaryDirectoryFixture;

class McapStorageTestFixture : public rosbag2_test_common::TemporaryDirectoryFixture
{
public:
McapStorageTestFixture() = default;

std::shared_ptr<rcutils_uint8_array_t> make_serialized_message(const std::string & message)
{
rclcpp::Serialization<std_msgs::msg::String> serialization;

std_msgs::msg::String std_string_msg;
std_string_msg.data = message;
auto serialized_msg = std::make_shared<rclcpp::SerializedMessage>();
serialization.serialize_message(&std_string_msg, serialized_msg.get());

auto ret = std::make_shared<rcutils_uint8_array_t>();
*ret = serialized_msg->release_rcl_serialized_message();
return ret;
}

std::shared_ptr<rosbag2_storage::storage_interfaces::ReadWriteInterface> write_messages_to_mcap(
std::vector<std::tuple<std::string, int64_t, rosbag2_storage::TopicMetadata,
rosbag2_storage::MessageDefinition>> & messages,
std::shared_ptr<rosbag2_storage::storage_interfaces::ReadWriteInterface> rw_storage = nullptr)
{
if (nullptr == rw_storage) {
rosbag2_storage::StorageFactory factory;
rosbag2_storage::StorageOptions options;
auto uri = rcpputils::fs::path(temporary_dir_path_) / "bag";
options.uri = uri.string();
options.storage_id = "mcap";
rw_storage = factory.open_read_write(options);
}

for (auto msg : messages) {
const rosbag2_storage::TopicMetadata & topic_metadata = std::get<2>(msg);
rw_storage->create_topic(topic_metadata, std::get<3>(msg));
auto bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
bag_message->serialized_data = make_serialized_message(std::get<0>(msg));
bag_message->time_stamp = std::get<1>(msg);
bag_message->topic_name = topic_metadata.name;
rw_storage->write(bag_message);
}
return rw_storage;
}
};

namespace rosbag2_storage
{
bool operator==(const TopicInformation & lhs, const TopicInformation & rhs)
Expand All @@ -39,6 +85,80 @@ bool operator==(const TopicInformation & lhs, const TopicInformation & rhs)
}
} // namespace rosbag2_storage

TEST_F(McapStorageTestFixture, can_store_and_read_metadata_correctly)
{
const std::string storage_id = "mcap";
auto uri = (rcpputils::fs::path(temporary_dir_path_) / "rosbag").string();
auto expected_bag = rcpputils::fs::path(temporary_dir_path_) / "rosbag.mcap";
const rosbag2_storage::MessageDefinition definition = {"std_msgs/msg/String", "ros2msg",
"string data", ""};

std::vector<std::string> string_messages = {"first message", "second message", "third message"};
std::vector<std::string> topics = {"topic1", "topic2"};

rosbag2_storage::TopicMetadata topic_metadata_1 = {topics[0], "std_msgs/msg/String", "cdr",
"qos_profile1", "type_hash1"};
rosbag2_storage::TopicMetadata topic_metadata_2 = {topics[1], "std_msgs/msg/String", "cdr",
"qos_profile2", "type_hash2"};

std::vector<std::tuple<std::string, int64_t, rosbag2_storage::TopicMetadata,
rosbag2_storage::MessageDefinition>>
messages = {
std::make_tuple(string_messages[0], static_cast<int64_t>(1e9), topic_metadata_1, definition),
std::make_tuple(string_messages[1], static_cast<int64_t>(2e9), topic_metadata_1, definition),
std::make_tuple(string_messages[2], static_cast<int64_t>(3e9), topic_metadata_2, definition),
};

rosbag2_storage::StorageFactory factory;
rosbag2_storage::StorageOptions options;
options.uri = uri;
options.storage_id = storage_id;

{
auto writer = factory.open_read_write(options);
writer->create_topic({"topic1", "type1", "rmw1", "qos_profile1", "type_hash1"}, {});
writer->create_topic({"topic2", "type2", "rmw2", "qos_profile2", "type_hash2"}, {});
(void)write_messages_to_mcap(messages, writer);
auto metadata = writer->get_metadata();
metadata.ros_distro = "rolling";
metadata.custom_data["key1"] = "value1";
writer->update_metadata(metadata);
}

options.uri = expected_bag.string();
options.storage_id = storage_id;
auto reader = factory.open_read_only(options);
const auto metadata = reader->get_metadata();

EXPECT_THAT(metadata.storage_identifier, Eq("mcap"));
EXPECT_THAT(metadata.relative_file_paths, ElementsAreArray({expected_bag.string()}));

EXPECT_THAT(
metadata.topics_with_message_count,
UnorderedElementsAreArray({
rosbag2_storage::TopicInformation{
rosbag2_storage::TopicMetadata{"topic2", "type2", "rmw2", "qos_profile2", "type_hash2"},
1u},
rosbag2_storage::TopicInformation{
rosbag2_storage::TopicMetadata{"topic1", "type1", "rmw1", "qos_profile1", "type_hash1"},
2u},
}));
EXPECT_THAT(metadata.message_count, Eq(3u));

const auto current_distro = "rolling";
EXPECT_EQ(metadata.ros_distro, current_distro);

EXPECT_THAT(
metadata.starting_time,
Eq(std::chrono::time_point<std::chrono::high_resolution_clock>(std::chrono::seconds(1))));
EXPECT_THAT(metadata.duration, Eq(std::chrono::seconds(2)))
<< "metadata.duration=" << metadata.duration.count();

EXPECT_EQ(metadata.custom_data.size(), 1);
EXPECT_THAT(metadata.custom_data,
UnorderedElementsAreArray({std::pair<std::string, std::string>("key1", "value1")}));
}

TEST_F(TemporaryDirectoryFixture, can_write_and_read_basic_mcap_file)
{
auto uri = rcpputils::fs::path(temporary_dir_path_) / "bag";
Expand Down Expand Up @@ -118,9 +238,6 @@ TEST_F(TemporaryDirectoryFixture, can_write_and_read_basic_mcap_file)
1u}}));
EXPECT_THAT(metadata.message_count, Eq(1u));

const auto current_distro = rcpputils::get_env_var("ROS_DISTRO");
EXPECT_EQ(metadata.ros_distro, current_distro);

EXPECT_TRUE(reader->has_next());

std_msgs::msg::String msg;
Expand Down

0 comments on commit a029ef9

Please sign in to comment.