Skip to content

Commit

Permalink
Add support for snapshot with file compression
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Orlov <[email protected]>
  • Loading branch information
MichaelOrlov committed Oct 27, 2024
1 parent 169c6b4 commit f1c36b9
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "rosbag2_cpp/writer.hpp"

#include "rosbag2_storage/storage_options.hpp"
#include "rosbag2_storage/ros_helper.hpp"

#include "mock_converter_factory.hpp"
#include "mock_metadata_io.hpp"
Expand Down Expand Up @@ -624,6 +625,82 @@ TEST_P(SequentialCompressionWriterTest, split_event_calls_callback_with_file_com
}
}

TEST_F(SequentialCompressionWriterTest, snapshot_writes_with_splitting_and_file_compression)
{
tmp_dir_storage_options_.max_bagfile_size = 0;
tmp_dir_storage_options_.max_cache_size = 200;
tmp_dir_storage_options_.snapshot_mode = true;

initializeFakeFileStorage();
// Expect a single write call when the snapshot is triggered
EXPECT_CALL(
*storage_, write(
An<const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> &>())
).Times(1);

rosbag2_compression::CompressionOptions compression_options {
DefaultTestCompressor,
rosbag2_compression::CompressionMode::FILE,
1,
1,
kDefaultCompressionQueueThreadsPriority
};
initializeWriter(compression_options);

std::vector<std::string> closed_files;
std::vector<std::string> opened_files;
rosbag2_cpp::bag_events::WriterEventCallbacks callbacks;
callbacks.write_split_callback =
[&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) {
closed_files.emplace_back(info.closed_file);
opened_files.emplace_back(info.opened_file);
};
writer_->add_event_callbacks(callbacks);

std::string rmw_format = "rmw_format";

std::string msg_content = "Hello";
auto msg_length = msg_content.length();
auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = "test_topic";
message->serialized_data =
rosbag2_storage::make_serialized_message(msg_content.c_str(), msg_length);

writer_->open(tmp_dir_storage_options_, {rmw_format, rmw_format});
writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""});

for (size_t i = 0; i < 100; i++) {
writer_->write(message);
}
writer_->take_snapshot();
writer_->close();

EXPECT_THAT(closed_files.size(), 2);
EXPECT_THAT(opened_files.size(), 2);

if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == 2))) {
// Output debug info
for (size_t i = 0; i < opened_files.size(); i++) {
std::cout << "opened_file[" << i << "] = '" << opened_files[i] <<
"'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl;
}
}

ASSERT_EQ(opened_files.size(), 2);
ASSERT_EQ(closed_files.size(), 2);

for (size_t i = 0; i < 2; i++) {
auto expected_closed = fs::path(tmp_dir_storage_options_.uri) /
(bag_base_dir_ + "_" + std::to_string(i) + "." + DefaultTestCompressor);
auto expected_opened = (i == 1) ?
// The last opened file shall be empty string when we do "writer->close();"
"" : fs::path(tmp_dir_storage_options_.uri) /
(bag_base_dir_ + "_" + std::to_string(i + 1));
ASSERT_STREQ(closed_files[i].c_str(), expected_closed.generic_string().c_str());
ASSERT_STREQ(opened_files[i].c_str(), expected_opened.generic_string().c_str());
}
}

INSTANTIATE_TEST_SUITE_P(
SequentialCompressionWriterTestQueueSizes,
SequentialCompressionWriterTest,
Expand Down
2 changes: 1 addition & 1 deletion rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ bool SequentialWriter::take_snapshot()
// Note: Information about start, duration and num messages for the current file in metadata_
// will be updated in the write_messages(..), when cache_consumer call it as a callback.
message_cache_->notify_data_ready();
(void)split_bagfile_local(true);
split_bagfile();
return true;
}

Expand Down

0 comments on commit f1c36b9

Please sign in to comment.