Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make snapshot writing into a new file each time it is triggered #1842

Merged
merged 7 commits into from
Nov 1, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "rosbag2_cpp/writer.hpp"

#include "rosbag2_storage/ros_helper.hpp"
#include "rosbag2_storage/storage_options.hpp"

#include "mock_converter_factory.hpp"
Expand Down Expand Up @@ -624,6 +625,82 @@ TEST_P(SequentialCompressionWriterTest, split_event_calls_callback_with_file_com
}
}

TEST_F(SequentialCompressionWriterTest, snapshot_writes_to_new_file_with_file_compression)
{
tmp_dir_storage_options_.max_bagfile_size = 0;
tmp_dir_storage_options_.max_cache_size = 200;
tmp_dir_storage_options_.snapshot_mode = true;

initializeFakeFileStorage();
// Expect a single write call when the snapshot is triggered
EXPECT_CALL(
*storage_, write(
An<const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> &>())
).Times(1);

rosbag2_compression::CompressionOptions compression_options {
DefaultTestCompressor,
rosbag2_compression::CompressionMode::FILE,
christophebedard marked this conversation as resolved.
Show resolved Hide resolved
1,
1,
kDefaultCompressionQueueThreadsPriority
};
initializeWriter(compression_options);

std::vector<std::string> closed_files;
std::vector<std::string> opened_files;
rosbag2_cpp::bag_events::WriterEventCallbacks callbacks;
callbacks.write_split_callback =
[&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) {
closed_files.emplace_back(info.closed_file);
opened_files.emplace_back(info.opened_file);
};
writer_->add_event_callbacks(callbacks);

std::string rmw_format = "rmw_format";

std::string msg_content = "Hello";
auto msg_length = msg_content.length();
auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = "test_topic";
message->serialized_data =
rosbag2_storage::make_serialized_message(msg_content.c_str(), msg_length);

writer_->open(tmp_dir_storage_options_, {rmw_format, rmw_format});
writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""});

for (size_t i = 0; i < 100; i++) {
writer_->write(message);
}
writer_->take_snapshot();
writer_->close();

EXPECT_THAT(closed_files.size(), 2);
EXPECT_THAT(opened_files.size(), 2);

if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == 2))) {
// Output debug info
for (size_t i = 0; i < opened_files.size(); i++) {
std::cout << "opened_file[" << i << "] = '" << opened_files[i] <<
"'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl;
}
}

ASSERT_EQ(opened_files.size(), 2);
ASSERT_EQ(closed_files.size(), 2);

for (size_t i = 0; i < 2; i++) {
auto expected_closed = fs::path(tmp_dir_storage_options_.uri) /
(bag_base_dir_ + "_" + std::to_string(i) + "." + DefaultTestCompressor);
auto expected_opened = (i == 1) ?
// The last opened file shall be empty string when we do "writer->close();"
"" : fs::path(tmp_dir_storage_options_.uri) /
(bag_base_dir_ + "_" + std::to_string(i + 1));
ASSERT_STREQ(closed_files[i].c_str(), expected_closed.generic_string().c_str());
ASSERT_STREQ(opened_files[i].c_str(), expected_opened.generic_string().c_str());
}
}

INSTANTIATE_TEST_SUITE_P(
SequentialCompressionWriterTestQueueSizes,
SequentialCompressionWriterTest,
Expand Down
1 change: 1 addition & 0 deletions rosbag2_cpp/src/rosbag2_cpp/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ void Writer::remove_topic(const rosbag2_storage::TopicMetadata & topic_with_type

bool Writer::take_snapshot()
{
std::lock_guard<std::mutex> writer_lock(writer_mutex_);
return writer_impl_->take_snapshot();
}

Expand Down
20 changes: 18 additions & 2 deletions rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,13 @@ void SequentialWriter::init_metadata()
metadata_.storage_identifier = storage_->get_storage_identifier();
metadata_.starting_time = std::chrono::time_point<std::chrono::high_resolution_clock>(
std::chrono::nanoseconds::max());
metadata_.duration = std::chrono::nanoseconds(0);
metadata_.relative_file_paths = {strip_parent_path(storage_->get_relative_file_path())};
rosbag2_storage::FileInformation file_info{};
file_info.path = strip_parent_path(storage_->get_relative_file_path());
file_info.starting_time = std::chrono::time_point<std::chrono::high_resolution_clock>(
std::chrono::nanoseconds::max());
file_info.duration = std::chrono::nanoseconds(0);
file_info.message_count = 0;
metadata_.custom_data = storage_options_.custom_data;
metadata_.files = {file_info};
Expand Down Expand Up @@ -409,7 +411,7 @@ void SequentialWriter::write(std::shared_ptr<const rosbag2_storage::SerializedBa
is_first_message_ = false;
}

if (should_split_bagfile(message_timestamp)) {
if (!storage_options_.snapshot_mode && should_split_bagfile(message_timestamp)) {
MichaelOrlov marked this conversation as resolved.
Show resolved Hide resolved
split_bagfile();
metadata_.files.back().starting_time = message_timestamp;
}
Expand Down Expand Up @@ -441,10 +443,13 @@ void SequentialWriter::write(std::shared_ptr<const rosbag2_storage::SerializedBa
bool SequentialWriter::take_snapshot()
{
if (!storage_options_.snapshot_mode) {
ROSBAG2_CPP_LOG_WARN("SequentialWriter take_snaphot called when snapshot mode is disabled");
ROSBAG2_CPP_LOG_WARN("SequentialWriter take_snapshot called when snapshot mode is disabled");
return false;
}
// Note: Information about start, duration and num messages for the current file in metadata_
// will be updated in the write_messages(..), when cache_consumer call it as a callback.
message_cache_->notify_data_ready();
split_bagfile();
return true;
}

Expand Down Expand Up @@ -528,6 +533,17 @@ void SequentialWriter::write_messages(
return;
}
storage_->write(messages);
if (storage_options_.snapshot_mode) {
// Update FileInformation about the last file in metadata in case of snapshot mode
const auto first_msg_timestamp = std::chrono::time_point<std::chrono::high_resolution_clock>(
std::chrono::nanoseconds(messages.front()->recv_timestamp));
const auto last_msg_timestamp = std::chrono::time_point<std::chrono::high_resolution_clock>(
std::chrono::nanoseconds(messages.back()->recv_timestamp));
metadata_.files.back().starting_time = first_msg_timestamp;
metadata_.files.back().duration = last_msg_timestamp - first_msg_timestamp;
metadata_.files.back().message_count = messages.size();
}
metadata_.message_count += messages.size();
std::lock_guard<std::mutex> lock(topics_info_mutex_);
for (const auto & msg : messages) {
if (topics_names_to_info_.find(msg->topic_name) != topics_names_to_info_.end()) {
Expand Down
200 changes: 200 additions & 0 deletions rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,206 @@ TEST_F(SequentialWriterTest, snapshot_mode_zero_cache_size_throws_exception)
EXPECT_THROW(writer_->open(storage_options_, {rmw_format, rmw_format}), std::runtime_error);
}

TEST_F(SequentialWriterTest, snapshot_writes_to_new_file_with_bag_split)
{
storage_options_.max_bagfile_size = 0;
storage_options_.max_cache_size = 200;
storage_options_.snapshot_mode = true;
const rcutils_time_point_value_t first_msg_timestamp = 100;
const size_t num_msgs_to_write = 100;
const std::string topic_name = "test_topic";
std::string msg_content = "Hello";
const size_t serialized_msg_buffer_length = msg_content.length();
const size_t num_expected_msgs = storage_options_.max_cache_size / serialized_msg_buffer_length;
const size_t expected_start_time = first_msg_timestamp + (num_msgs_to_write - num_expected_msgs);
const auto expected_last_msg_timestamp = (first_msg_timestamp + num_msgs_to_write - 1);
const size_t expected_duration = expected_last_msg_timestamp - expected_start_time;
// Prepare vector of messages
std::vector<rosbag2_storage::SerializedBagMessageSharedPtr> messages;
for (size_t i = 0; i < num_msgs_to_write; i++) {
auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->recv_timestamp = first_msg_timestamp + static_cast<rcutils_time_point_value_t>(i);
message->send_timestamp = first_msg_timestamp + static_cast<rcutils_time_point_value_t>(i);
message->topic_name = topic_name;
message->serialized_data =
rosbag2_storage::make_serialized_message(msg_content.c_str(), msg_content.length());
messages.push_back(message);
}

// Expect a single write call when the snapshot is triggered
EXPECT_CALL(
*storage_, write(
An<const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> &>())
).Times(1);

ON_CALL(
*storage_,
write(An<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>>())).WillByDefault(
[this](std::shared_ptr<const rosbag2_storage::SerializedBagMessage>) {
fake_storage_size_ += 1;
});

ON_CALL(*storage_, get_bagfile_size).WillByDefault(
[this]() {
return fake_storage_size_.load();
});

ON_CALL(*metadata_io_, write_metadata).WillByDefault(
[this](const std::string &, const rosbag2_storage::BagMetadata & metadata) {
fake_metadata_ = metadata;
});

ON_CALL(*storage_, get_relative_file_path).WillByDefault(
[this]() {
return fake_storage_uri_;
});

auto sequential_writer = std::make_unique<rosbag2_cpp::writers::SequentialWriter>(
std::move(storage_factory_), converter_factory_, std::move(metadata_io_));
writer_ = std::make_unique<rosbag2_cpp::Writer>(std::move(sequential_writer));

std::vector<std::string> closed_files;
std::vector<std::string> opened_files;
rosbag2_cpp::bag_events::WriterEventCallbacks callbacks;
callbacks.write_split_callback =
[&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) {
closed_files.emplace_back(info.closed_file);
opened_files.emplace_back(info.opened_file);
};
writer_->add_event_callbacks(callbacks);

std::string rmw_format = "rmw_format";

writer_->open(storage_options_, {rmw_format, rmw_format});
writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""});

for (const auto & message : messages) {
writer_->write(message);
}
writer_->take_snapshot();

EXPECT_THAT(closed_files.size(), 1);
EXPECT_THAT(opened_files.size(), 1);

if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == 1))) {
// Output debug info
for (size_t i = 0; i < opened_files.size(); i++) {
std::cout << "opened_file[" << i << "] = '" << opened_files[i] <<
"'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl;
}
}

ASSERT_EQ(opened_files.size(), 1);
ASSERT_EQ(closed_files.size(), 1);

const auto expected_closed = fs::path(storage_options_.uri) / (bag_base_dir_ + "_0");
const auto expected_opened = fs::path(storage_options_.uri) / (bag_base_dir_ + "_1");
ASSERT_STREQ(closed_files[0].c_str(), expected_closed.generic_string().c_str());
ASSERT_STREQ(opened_files[0].c_str(), expected_opened.generic_string().c_str());

// Check metadata
ASSERT_EQ(v_intercepted_update_metadata_.size(), 3u);
// The v_intercepted_update_metadata_[0] is the very first metadata saved from the writer's
// constructor. We don't update it during the snapshot, and it doesn't make sense checking it.
// The v_intercepted_update_metadata_[1] is the metadata written right before closing the file
// with the new snapshot.
// The v_intercepted_update_metadata_[2] is the metadata written when we are opening a new file
// after switching to a new storage.
EXPECT_EQ(v_intercepted_update_metadata_[1].message_count, num_expected_msgs);
EXPECT_EQ(v_intercepted_update_metadata_[2].message_count, num_expected_msgs);
EXPECT_EQ(
std::chrono::time_point_cast<std::chrono::nanoseconds>(
v_intercepted_update_metadata_[1].starting_time).time_since_epoch().count(),
first_msg_timestamp);

ASSERT_FALSE(v_intercepted_update_metadata_[1].files.empty());
const auto & first_file_info = v_intercepted_update_metadata_[1].files[0];
EXPECT_STREQ(first_file_info.path.c_str(), std::string(bag_base_dir_ + "_0").c_str());
EXPECT_EQ(first_file_info.message_count, num_expected_msgs);
EXPECT_EQ(
std::chrono::time_point_cast<std::chrono::nanoseconds>(
first_file_info.starting_time).time_since_epoch().count(),
expected_start_time);
EXPECT_EQ(first_file_info.duration.count(), expected_duration);
}

TEST_F(SequentialWriterTest, snapshot_can_be_called_twice)
{
storage_options_.max_bagfile_size = 0;
storage_options_.max_cache_size = 200;
storage_options_.snapshot_mode = true;
const size_t num_msgs_to_write = 100;

// Expect to call write method twice. Once per each snapshot.
EXPECT_CALL(
*storage_, write(
An<const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> &>())
).Times(2);

ON_CALL(*storage_, get_relative_file_path).WillByDefault(
[this]() {
return fake_storage_uri_;
});

auto sequential_writer = std::make_unique<rosbag2_cpp::writers::SequentialWriter>(
std::move(storage_factory_), converter_factory_, std::move(metadata_io_));
writer_ = std::make_unique<rosbag2_cpp::Writer>(std::move(sequential_writer));

std::vector<std::string> closed_files;
std::vector<std::string> opened_files;
rosbag2_cpp::bag_events::WriterEventCallbacks callbacks;
callbacks.write_split_callback =
[&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) {
closed_files.emplace_back(info.closed_file);
opened_files.emplace_back(info.opened_file);
};
writer_->add_event_callbacks(callbacks);

std::string rmw_format = "rmw_format";

writer_->open(storage_options_, {rmw_format, rmw_format});
writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""});

std::string msg_content = "Hello";
auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = "test_topic";
message->serialized_data =
rosbag2_storage::make_serialized_message(msg_content.c_str(), msg_content.length());

for (size_t i = 0; i < num_msgs_to_write / 2; i++) {
writer_->write(message);
}
writer_->take_snapshot();

for (size_t i = num_msgs_to_write / 2; i < num_msgs_to_write; i++) {
writer_->write(message);
}
writer_->take_snapshot();

EXPECT_THAT(closed_files.size(), 2);
EXPECT_THAT(opened_files.size(), 2);

if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == 2))) {
// Output debug info
for (size_t i = 0; i < opened_files.size(); i++) {
std::cout << "opened_file[" << i << "] = '" << opened_files[i] <<
"'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl;
}
}

ASSERT_EQ(opened_files.size(), 2);
ASSERT_EQ(closed_files.size(), 2);

for (size_t i = 0; i < opened_files.size(); i++) {
const auto expected_closed = fs::path(storage_options_.uri) /
(bag_base_dir_ + "_" + std::to_string(i));
const auto expected_opened = fs::path(storage_options_.uri) /
(bag_base_dir_ + "_" + std::to_string(i + 1));
ASSERT_STREQ(closed_files[i].c_str(), expected_closed.generic_string().c_str());
ASSERT_STREQ(opened_files[i].c_str(), expected_opened.generic_string().c_str());
}
christophebedard marked this conversation as resolved.
Show resolved Hide resolved
}

TEST_F(SequentialWriterTest, split_event_calls_callback)
{
const uint64_t max_bagfile_size = 3;
Expand Down
Loading