From f1c36b909fe7f516a215fd4b29e532029589d9e1 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Sat, 26 Oct 2024 22:40:26 -0700 Subject: [PATCH] Add support for snapshot with file compression Signed-off-by: Michael Orlov --- .../test_sequential_compression_writer.cpp | 77 +++++++++++++++++++ .../rosbag2_cpp/writers/sequential_writer.cpp | 2 +- 2 files changed, 78 insertions(+), 1 deletion(-) diff --git a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp index 2b5071952..8b67f5ec7 100644 --- a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp +++ b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp @@ -29,6 +29,7 @@ #include "rosbag2_cpp/writer.hpp" #include "rosbag2_storage/storage_options.hpp" +#include "rosbag2_storage/ros_helper.hpp" #include "mock_converter_factory.hpp" #include "mock_metadata_io.hpp" @@ -624,6 +625,82 @@ TEST_P(SequentialCompressionWriterTest, split_event_calls_callback_with_file_com } } +TEST_F(SequentialCompressionWriterTest, snapshot_writes_with_splitting_and_file_compression) +{ + tmp_dir_storage_options_.max_bagfile_size = 0; + tmp_dir_storage_options_.max_cache_size = 200; + tmp_dir_storage_options_.snapshot_mode = true; + + initializeFakeFileStorage(); + // Expect a single write call when the snapshot is triggered + EXPECT_CALL( + *storage_, write( + An> &>()) + ).Times(1); + + rosbag2_compression::CompressionOptions compression_options { + DefaultTestCompressor, + rosbag2_compression::CompressionMode::FILE, + 1, + 1, + kDefaultCompressionQueueThreadsPriority + }; + initializeWriter(compression_options); + + std::vector closed_files; + std::vector opened_files; + rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; + callbacks.write_split_callback = + [&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) { + closed_files.emplace_back(info.closed_file); + opened_files.emplace_back(info.opened_file); + }; + writer_->add_event_callbacks(callbacks); + + std::string rmw_format = "rmw_format"; + + std::string msg_content = "Hello"; + auto msg_length = msg_content.length(); + auto message = std::make_shared(); + message->topic_name = "test_topic"; + message->serialized_data = + rosbag2_storage::make_serialized_message(msg_content.c_str(), msg_length); + + writer_->open(tmp_dir_storage_options_, {rmw_format, rmw_format}); + writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""}); + + for (size_t i = 0; i < 100; i++) { + writer_->write(message); + } + writer_->take_snapshot(); + writer_->close(); + + EXPECT_THAT(closed_files.size(), 2); + EXPECT_THAT(opened_files.size(), 2); + + if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == 2))) { + // Output debug info + for (size_t i = 0; i < opened_files.size(); i++) { + std::cout << "opened_file[" << i << "] = '" << opened_files[i] << + "'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl; + } + } + + ASSERT_EQ(opened_files.size(), 2); + ASSERT_EQ(closed_files.size(), 2); + + for (size_t i = 0; i < 2; i++) { + auto expected_closed = fs::path(tmp_dir_storage_options_.uri) / + (bag_base_dir_ + "_" + std::to_string(i) + "." + DefaultTestCompressor); + auto expected_opened = (i == 1) ? + // The last opened file shall be empty string when we do "writer->close();" + "" : fs::path(tmp_dir_storage_options_.uri) / + (bag_base_dir_ + "_" + std::to_string(i + 1)); + ASSERT_STREQ(closed_files[i].c_str(), expected_closed.generic_string().c_str()); + ASSERT_STREQ(opened_files[i].c_str(), expected_opened.generic_string().c_str()); + } +} + INSTANTIATE_TEST_SUITE_P( SequentialCompressionWriterTestQueueSizes, SequentialCompressionWriterTest, diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index 1c0bbeda8..1459bfa9b 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -447,7 +447,7 @@ bool SequentialWriter::take_snapshot() // Note: Information about start, duration and num messages for the current file in metadata_ // will be updated in the write_messages(..), when cache_consumer call it as a callback. message_cache_->notify_data_ready(); - (void)split_bagfile_local(true); + split_bagfile(); return true; }