Skip to content

Commit

Permalink
Make snapshot writing into a new file each time when it is triggered
Browse files Browse the repository at this point in the history
- Note. Snapshot now became a blocking call and mutually exclusive with
writer::write(message) method to avoid race conditions.
i.e. blocking the same writer_mutex_

Signed-off-by: Michael Orlov <[email protected]>
  • Loading branch information
MichaelOrlov committed Oct 26, 2024
1 parent 786c3c4 commit e46c7f0
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 2 deletions.
1 change: 1 addition & 0 deletions rosbag2_cpp/src/rosbag2_cpp/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ void Writer::remove_topic(const rosbag2_storage::TopicMetadata & topic_with_type

bool Writer::take_snapshot()
{
std::lock_guard<std::mutex> writer_lock(writer_mutex_);
return writer_impl_->take_snapshot();
}

Expand Down
17 changes: 15 additions & 2 deletions rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ void SequentialWriter::write(std::shared_ptr<const rosbag2_storage::SerializedBa
is_first_message_ = false;
}

if (should_split_bagfile(message_timestamp)) {
if (!storage_options_.snapshot_mode && should_split_bagfile(message_timestamp)) {
split_bagfile();
metadata_.files.back().starting_time = message_timestamp;
}
Expand Down Expand Up @@ -441,10 +441,13 @@ void SequentialWriter::write(std::shared_ptr<const rosbag2_storage::SerializedBa
bool SequentialWriter::take_snapshot()
{
if (!storage_options_.snapshot_mode) {
ROSBAG2_CPP_LOG_WARN("SequentialWriter take_snaphot called when snapshot mode is disabled");
ROSBAG2_CPP_LOG_WARN("SequentialWriter take_snapshot called when snapshot mode is disabled");
return false;
}
// Note: Information about start, duration and num messages for the current file in metadata_
// will be updated in the write_messages(..), when cache_consumer call it as a callback.
message_cache_->notify_data_ready();
(void)split_bagfile_local(true);
return true;
}

Expand Down Expand Up @@ -528,6 +531,16 @@ void SequentialWriter::write_messages(
return;
}
storage_->write(messages);
if (storage_options_.snapshot_mode) {
// Update FileInformation about the last file in metadata in case of snapshot mode
const auto first_msg_timestamp = std::chrono::time_point<std::chrono::high_resolution_clock>(
std::chrono::nanoseconds(messages.front()->recv_timestamp));
const auto last_msg_timestamp = std::chrono::time_point<std::chrono::high_resolution_clock>(
std::chrono::nanoseconds(messages.back()->recv_timestamp));
metadata_.files.back().starting_time = first_msg_timestamp;
metadata_.files.back().duration = last_msg_timestamp - first_msg_timestamp;
metadata_.files.back().message_count = messages.size();
}
std::lock_guard<std::mutex> lock(topics_info_mutex_);
for (const auto & msg : messages) {
if (topics_names_to_info_.find(msg->topic_name) != topics_names_to_info_.end()) {
Expand Down

0 comments on commit e46c7f0

Please sign in to comment.