From e46c7f030ffc24c9214714ddc5c144abb9e51110 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Sat, 26 Oct 2024 13:20:28 -0700 Subject: [PATCH 1/7] Make snapshot writing into a new file each time when it is triggered - Note. Snapshot now became a blocking call and mutually exclusive with writer::write(message) method to avoid race conditions. i.e. blocking the same writer_mutex_ Signed-off-by: Michael Orlov --- rosbag2_cpp/src/rosbag2_cpp/writer.cpp | 1 + .../rosbag2_cpp/writers/sequential_writer.cpp | 17 +++++++++++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp index e44753f12..cdda9f88a 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp @@ -94,6 +94,7 @@ void Writer::remove_topic(const rosbag2_storage::TopicMetadata & topic_with_type bool Writer::take_snapshot() { + std::lock_guard writer_lock(writer_mutex_); return writer_impl_->take_snapshot(); } diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index bd56eb8ad..1c0bbeda8 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -409,7 +409,7 @@ void SequentialWriter::write(std::shared_ptrnotify_data_ready(); + (void)split_bagfile_local(true); return true; } @@ -528,6 +531,16 @@ void SequentialWriter::write_messages( return; } storage_->write(messages); + if (storage_options_.snapshot_mode) { + // Update FileInformation about the last file in metadata in case of snapshot mode + const auto first_msg_timestamp = std::chrono::time_point( + std::chrono::nanoseconds(messages.front()->recv_timestamp)); + const auto last_msg_timestamp = std::chrono::time_point( + std::chrono::nanoseconds(messages.back()->recv_timestamp)); + metadata_.files.back().starting_time = first_msg_timestamp; + metadata_.files.back().duration = last_msg_timestamp - first_msg_timestamp; + metadata_.files.back().message_count = messages.size(); + } std::lock_guard lock(topics_info_mutex_); for (const auto & msg : messages) { if (topics_names_to_info_.find(msg->topic_name) != topics_names_to_info_.end()) { From 169c6b46473cf23f638c83ad8cdd9322c9271c86 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Sat, 26 Oct 2024 13:49:46 -0700 Subject: [PATCH 2/7] Add unit test to make sure that snapshot writing in the new file MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Clemens Mühlbacher Signed-off-by: Michael Orlov --- .../rosbag2_cpp/test_sequential_writer.cpp | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp index cb0229f52..2f0f0396a 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp @@ -580,6 +580,91 @@ TEST_F(SequentialWriterTest, snapshot_mode_zero_cache_size_throws_exception) EXPECT_THROW(writer_->open(storage_options_, {rmw_format, rmw_format}), std::runtime_error); } +TEST_F(SequentialWriterTest, snapshot_writes_with_splitting) +{ + storage_options_.max_bagfile_size = 0; + storage_options_.max_cache_size = 200; + storage_options_.snapshot_mode = true; + + // Expect a single write call when the snapshot is triggered + EXPECT_CALL( + *storage_, write( + An> &>()) + ).Times(1); + + ON_CALL( + *storage_, + write(An>())).WillByDefault( + [this](std::shared_ptr) { + fake_storage_size_ += 1; + }); + + ON_CALL(*storage_, get_bagfile_size).WillByDefault( + [this]() { + return fake_storage_size_.load(); + }); + + ON_CALL(*metadata_io_, write_metadata).WillByDefault( + [this](const std::string &, const rosbag2_storage::BagMetadata & metadata) { + fake_metadata_ = metadata; + }); + + ON_CALL(*storage_, get_relative_file_path).WillByDefault( + [this]() { + return fake_storage_uri_; + }); + + auto sequential_writer = std::make_unique( + std::move(storage_factory_), converter_factory_, std::move(metadata_io_)); + writer_ = std::make_unique(std::move(sequential_writer)); + + std::vector closed_files; + std::vector opened_files; + rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; + callbacks.write_split_callback = + [&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) { + closed_files.emplace_back(info.closed_file); + opened_files.emplace_back(info.opened_file); + }; + writer_->add_event_callbacks(callbacks); + + std::string rmw_format = "rmw_format"; + + std::string msg_content = "Hello"; + auto msg_length = msg_content.length(); + auto message = std::make_shared(); + message->topic_name = "test_topic"; + message->serialized_data = + rosbag2_storage::make_serialized_message(msg_content.c_str(), msg_length); + + writer_->open(storage_options_, {rmw_format, rmw_format}); + writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""}); + + for (size_t i = 0; i < 100; i++) { + writer_->write(message); + } + writer_->take_snapshot(); + + EXPECT_THAT(closed_files.size(), 1); + EXPECT_THAT(opened_files.size(), 1); + + if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == 1))) { + // Output debug info + for (size_t i = 0; i < opened_files.size(); i++) { + std::cout << "opened_file[" << i << "] = '" << opened_files[i] << + "'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl; + } + } + + ASSERT_EQ(opened_files.size(), 1); + ASSERT_EQ(closed_files.size(), 1); + + auto expected_closed = fs::path(storage_options_.uri) / (bag_base_dir_ + "_0"); + auto expected_opened = fs::path(storage_options_.uri) / (bag_base_dir_ + "_1"); + ASSERT_STREQ(closed_files[0].c_str(), expected_closed.generic_string().c_str()); + ASSERT_STREQ(opened_files[0].c_str(), expected_opened.generic_string().c_str()); +} + TEST_F(SequentialWriterTest, split_event_calls_callback) { const uint64_t max_bagfile_size = 3; From f1c36b909fe7f516a215fd4b29e532029589d9e1 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Sat, 26 Oct 2024 22:40:26 -0700 Subject: [PATCH 3/7] Add support for snapshot with file compression Signed-off-by: Michael Orlov --- .../test_sequential_compression_writer.cpp | 77 +++++++++++++++++++ .../rosbag2_cpp/writers/sequential_writer.cpp | 2 +- 2 files changed, 78 insertions(+), 1 deletion(-) diff --git a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp index 2b5071952..8b67f5ec7 100644 --- a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp +++ b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp @@ -29,6 +29,7 @@ #include "rosbag2_cpp/writer.hpp" #include "rosbag2_storage/storage_options.hpp" +#include "rosbag2_storage/ros_helper.hpp" #include "mock_converter_factory.hpp" #include "mock_metadata_io.hpp" @@ -624,6 +625,82 @@ TEST_P(SequentialCompressionWriterTest, split_event_calls_callback_with_file_com } } +TEST_F(SequentialCompressionWriterTest, snapshot_writes_with_splitting_and_file_compression) +{ + tmp_dir_storage_options_.max_bagfile_size = 0; + tmp_dir_storage_options_.max_cache_size = 200; + tmp_dir_storage_options_.snapshot_mode = true; + + initializeFakeFileStorage(); + // Expect a single write call when the snapshot is triggered + EXPECT_CALL( + *storage_, write( + An> &>()) + ).Times(1); + + rosbag2_compression::CompressionOptions compression_options { + DefaultTestCompressor, + rosbag2_compression::CompressionMode::FILE, + 1, + 1, + kDefaultCompressionQueueThreadsPriority + }; + initializeWriter(compression_options); + + std::vector closed_files; + std::vector opened_files; + rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; + callbacks.write_split_callback = + [&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) { + closed_files.emplace_back(info.closed_file); + opened_files.emplace_back(info.opened_file); + }; + writer_->add_event_callbacks(callbacks); + + std::string rmw_format = "rmw_format"; + + std::string msg_content = "Hello"; + auto msg_length = msg_content.length(); + auto message = std::make_shared(); + message->topic_name = "test_topic"; + message->serialized_data = + rosbag2_storage::make_serialized_message(msg_content.c_str(), msg_length); + + writer_->open(tmp_dir_storage_options_, {rmw_format, rmw_format}); + writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""}); + + for (size_t i = 0; i < 100; i++) { + writer_->write(message); + } + writer_->take_snapshot(); + writer_->close(); + + EXPECT_THAT(closed_files.size(), 2); + EXPECT_THAT(opened_files.size(), 2); + + if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == 2))) { + // Output debug info + for (size_t i = 0; i < opened_files.size(); i++) { + std::cout << "opened_file[" << i << "] = '" << opened_files[i] << + "'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl; + } + } + + ASSERT_EQ(opened_files.size(), 2); + ASSERT_EQ(closed_files.size(), 2); + + for (size_t i = 0; i < 2; i++) { + auto expected_closed = fs::path(tmp_dir_storage_options_.uri) / + (bag_base_dir_ + "_" + std::to_string(i) + "." + DefaultTestCompressor); + auto expected_opened = (i == 1) ? + // The last opened file shall be empty string when we do "writer->close();" + "" : fs::path(tmp_dir_storage_options_.uri) / + (bag_base_dir_ + "_" + std::to_string(i + 1)); + ASSERT_STREQ(closed_files[i].c_str(), expected_closed.generic_string().c_str()); + ASSERT_STREQ(opened_files[i].c_str(), expected_opened.generic_string().c_str()); + } +} + INSTANTIATE_TEST_SUITE_P( SequentialCompressionWriterTestQueueSizes, SequentialCompressionWriterTest, diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index 1c0bbeda8..1459bfa9b 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -447,7 +447,7 @@ bool SequentialWriter::take_snapshot() // Note: Information about start, duration and num messages for the current file in metadata_ // will be updated in the write_messages(..), when cache_consumer call it as a callback. message_cache_->notify_data_ready(); - (void)split_bagfile_local(true); + split_bagfile(); return true; } From 92a435b0c50e9d07e995054752a07e0965402c4d Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Sun, 27 Oct 2024 07:32:54 -0700 Subject: [PATCH 4/7] Rename newly added tests to avoid misunderstanding Signed-off-by: Michael Orlov --- .../rosbag2_compression/test_sequential_compression_writer.cpp | 2 +- rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp index 8b67f5ec7..df6c488a9 100644 --- a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp +++ b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp @@ -625,7 +625,7 @@ TEST_P(SequentialCompressionWriterTest, split_event_calls_callback_with_file_com } } -TEST_F(SequentialCompressionWriterTest, snapshot_writes_with_splitting_and_file_compression) +TEST_F(SequentialCompressionWriterTest, snapshot_writes_to_new_file_with_file_compression) { tmp_dir_storage_options_.max_bagfile_size = 0; tmp_dir_storage_options_.max_cache_size = 200; diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp index 2f0f0396a..5246fb032 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp @@ -580,7 +580,7 @@ TEST_F(SequentialWriterTest, snapshot_mode_zero_cache_size_throws_exception) EXPECT_THROW(writer_->open(storage_options_, {rmw_format, rmw_format}), std::runtime_error); } -TEST_F(SequentialWriterTest, snapshot_writes_with_splitting) +TEST_F(SequentialWriterTest, snapshot_writes_to_new_file) { storage_options_.max_bagfile_size = 0; storage_options_.max_cache_size = 200; From 90f035ae20a1795a1e2a2b9746608ec29d4a2174 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Thu, 31 Oct 2024 11:41:03 -0700 Subject: [PATCH 5/7] Address review comments in tests Signed-off-by: Michael Orlov --- .../rosbag2_cpp/writers/sequential_writer.cpp | 3 + .../rosbag2_cpp/test_sequential_writer.cpp | 137 ++++++++++++++++-- 2 files changed, 129 insertions(+), 11 deletions(-) diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index 1459bfa9b..21585ee6d 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -78,11 +78,13 @@ void SequentialWriter::init_metadata() metadata_.storage_identifier = storage_->get_storage_identifier(); metadata_.starting_time = std::chrono::time_point( std::chrono::nanoseconds::max()); + metadata_.duration = std::chrono::nanoseconds(0); metadata_.relative_file_paths = {strip_parent_path(storage_->get_relative_file_path())}; rosbag2_storage::FileInformation file_info{}; file_info.path = strip_parent_path(storage_->get_relative_file_path()); file_info.starting_time = std::chrono::time_point( std::chrono::nanoseconds::max()); + file_info.duration = std::chrono::nanoseconds(0); file_info.message_count = 0; metadata_.custom_data = storage_options_.custom_data; metadata_.files = {file_info}; @@ -540,6 +542,7 @@ void SequentialWriter::write_messages( metadata_.files.back().starting_time = first_msg_timestamp; metadata_.files.back().duration = last_msg_timestamp - first_msg_timestamp; metadata_.files.back().message_count = messages.size(); + metadata_.message_count += messages.size(); } std::lock_guard lock(topics_info_mutex_); for (const auto & msg : messages) { diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp index 5246fb032..5bb79ff53 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp @@ -580,11 +580,31 @@ TEST_F(SequentialWriterTest, snapshot_mode_zero_cache_size_throws_exception) EXPECT_THROW(writer_->open(storage_options_, {rmw_format, rmw_format}), std::runtime_error); } -TEST_F(SequentialWriterTest, snapshot_writes_to_new_file) +TEST_F(SequentialWriterTest, snapshot_writes_to_new_file_with_bag_split) { storage_options_.max_bagfile_size = 0; storage_options_.max_cache_size = 200; storage_options_.snapshot_mode = true; + const rcutils_time_point_value_t first_msg_timestamp = 100; + const size_t num_msgs_to_write = 100; + const std::string topic_name = "test_topic"; + std::string msg_content = "Hello"; + const size_t serialized_msg_buffer_length = msg_content.length(); + const size_t num_expected_msgs = storage_options_.max_cache_size / serialized_msg_buffer_length; + const size_t expected_start_time = first_msg_timestamp + (num_msgs_to_write - num_expected_msgs); + const auto expected_last_msg_timestamp = (first_msg_timestamp + num_msgs_to_write - 1); + const size_t expected_duration = expected_last_msg_timestamp - expected_start_time; + // Prepare vector of messages + std::vector messages; + for (size_t i = 0; i < num_msgs_to_write; i++) { + auto message = std::make_shared(); + message->recv_timestamp = first_msg_timestamp + static_cast(i); + message->send_timestamp = first_msg_timestamp + static_cast(i); + message->topic_name = topic_name; + message->serialized_data = + rosbag2_storage::make_serialized_message(msg_content.c_str(), msg_content.length()); + messages.push_back(message); + } // Expect a single write call when the snapshot is triggered EXPECT_CALL( @@ -630,17 +650,10 @@ TEST_F(SequentialWriterTest, snapshot_writes_to_new_file) std::string rmw_format = "rmw_format"; - std::string msg_content = "Hello"; - auto msg_length = msg_content.length(); - auto message = std::make_shared(); - message->topic_name = "test_topic"; - message->serialized_data = - rosbag2_storage::make_serialized_message(msg_content.c_str(), msg_length); - writer_->open(storage_options_, {rmw_format, rmw_format}); writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""}); - for (size_t i = 0; i < 100; i++) { + for (const auto & message : messages) { writer_->write(message); } writer_->take_snapshot(); @@ -659,10 +672,112 @@ TEST_F(SequentialWriterTest, snapshot_writes_to_new_file) ASSERT_EQ(opened_files.size(), 1); ASSERT_EQ(closed_files.size(), 1); - auto expected_closed = fs::path(storage_options_.uri) / (bag_base_dir_ + "_0"); - auto expected_opened = fs::path(storage_options_.uri) / (bag_base_dir_ + "_1"); + const auto expected_closed = fs::path(storage_options_.uri) / (bag_base_dir_ + "_0"); + const auto expected_opened = fs::path(storage_options_.uri) / (bag_base_dir_ + "_1"); ASSERT_STREQ(closed_files[0].c_str(), expected_closed.generic_string().c_str()); ASSERT_STREQ(opened_files[0].c_str(), expected_opened.generic_string().c_str()); + + // Check metadata + ASSERT_EQ(v_intercepted_update_metadata_.size(), 3u); + // The v_intercepted_update_metadata_[0] is the very first metadata saved from the writer's + // constructor. We don't update it during the snapshot, and it doesn't make sense checking it. + // The v_intercepted_update_metadata_[1] is the metadata written right before closing the file + // with the new snapshot. + // The v_intercepted_update_metadata_[2] is the metadata written when we are opening a new file + // after switching to a new storage. + EXPECT_EQ(v_intercepted_update_metadata_[1].message_count, num_expected_msgs); + EXPECT_EQ(v_intercepted_update_metadata_[2].message_count, num_expected_msgs); + EXPECT_EQ( + std::chrono::time_point_cast( + v_intercepted_update_metadata_[1].starting_time).time_since_epoch().count(), + first_msg_timestamp); + + ASSERT_FALSE(v_intercepted_update_metadata_[1].files.empty()); + const auto & first_file_info = v_intercepted_update_metadata_[1].files[0]; + EXPECT_STREQ(first_file_info.path.c_str(), std::string(bag_base_dir_ + "_0").c_str()); + EXPECT_EQ(first_file_info.message_count, num_expected_msgs); + EXPECT_EQ( + std::chrono::time_point_cast( + first_file_info.starting_time).time_since_epoch().count(), + expected_start_time); + EXPECT_EQ(first_file_info.duration.count(), expected_duration); +} + +TEST_F(SequentialWriterTest, snapshot_can_be_called_twice) +{ + storage_options_.max_bagfile_size = 0; + storage_options_.max_cache_size = 200; + storage_options_.snapshot_mode = true; + const size_t num_msgs_to_write = 100; + + // Expect to call write method twice. Once per each snapshot. + EXPECT_CALL( + *storage_, write( + An> &>()) + ).Times(2); + + ON_CALL(*storage_, get_relative_file_path).WillByDefault( + [this]() { + return fake_storage_uri_; + }); + + auto sequential_writer = std::make_unique( + std::move(storage_factory_), converter_factory_, std::move(metadata_io_)); + writer_ = std::make_unique(std::move(sequential_writer)); + + std::vector closed_files; + std::vector opened_files; + rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; + callbacks.write_split_callback = + [&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) { + closed_files.emplace_back(info.closed_file); + opened_files.emplace_back(info.opened_file); + }; + writer_->add_event_callbacks(callbacks); + + std::string rmw_format = "rmw_format"; + + writer_->open(storage_options_, {rmw_format, rmw_format}); + writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""}); + + std::string msg_content = "Hello"; + auto message = std::make_shared(); + message->topic_name = "test_topic"; + message->serialized_data = + rosbag2_storage::make_serialized_message(msg_content.c_str(), msg_content.length()); + + for (size_t i = 0; i < num_msgs_to_write / 2; i++) { + writer_->write(message); + } + writer_->take_snapshot(); + + for (size_t i = num_msgs_to_write / 2; i < num_msgs_to_write; i++) { + writer_->write(message); + } + writer_->take_snapshot(); + + EXPECT_THAT(closed_files.size(), 2); + EXPECT_THAT(opened_files.size(), 2); + + if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == 2))) { + // Output debug info + for (size_t i = 0; i < opened_files.size(); i++) { + std::cout << "opened_file[" << i << "] = '" << opened_files[i] << + "'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl; + } + } + + ASSERT_EQ(opened_files.size(), 2); + ASSERT_EQ(closed_files.size(), 2); + + for (size_t i = 0; i < opened_files.size(); i++) { + const auto expected_closed = fs::path(storage_options_.uri) / + (bag_base_dir_ + "_" + std::to_string(i)); + const auto expected_opened = fs::path(storage_options_.uri) / + (bag_base_dir_ + "_" + std::to_string(i + 1)); + ASSERT_STREQ(closed_files[i].c_str(), expected_closed.generic_string().c_str()); + ASSERT_STREQ(opened_files[i].c_str(), expected_opened.generic_string().c_str()); + } } TEST_F(SequentialWriterTest, split_event_calls_callback) From 4b62921a53882092f15b1a874270dc5cbb17934e Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Thu, 31 Oct 2024 11:43:40 -0700 Subject: [PATCH 6/7] Change order of includes in the test_sequential_compression_writer.cpp Signed-off-by: Michael Orlov --- .../rosbag2_compression/test_sequential_compression_writer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp index df6c488a9..941cb7210 100644 --- a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp +++ b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp @@ -28,8 +28,8 @@ #include "rosbag2_cpp/writer.hpp" -#include "rosbag2_storage/storage_options.hpp" #include "rosbag2_storage/ros_helper.hpp" +#include "rosbag2_storage/storage_options.hpp" #include "mock_converter_factory.hpp" #include "mock_metadata_io.hpp" From 8344e79fa4eafae31f0f7c1263da39385d7296fc Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Thu, 31 Oct 2024 21:33:18 -0700 Subject: [PATCH 7/7] Update metadata_.message_count unconditionally in write_messages(..) Signed-off-by: Michael Orlov --- rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index 21585ee6d..2653c3e04 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -542,8 +542,8 @@ void SequentialWriter::write_messages( metadata_.files.back().starting_time = first_msg_timestamp; metadata_.files.back().duration = last_msg_timestamp - first_msg_timestamp; metadata_.files.back().message_count = messages.size(); - metadata_.message_count += messages.size(); } + metadata_.message_count += messages.size(); std::lock_guard lock(topics_info_mutex_); for (const auto & msg : messages) { if (topics_names_to_info_.find(msg->topic_name) != topics_names_to_info_.end()) {