Skip to content

Commit

Permalink
Add SplitBagfile recording service. (#1115)
Browse files Browse the repository at this point in the history
* feat(rosbag2_cpp): Add SplitBagfile recording service.

Fixes #1087

Tested from the command line and verified that below command closed one
log file and opened another.

ros2 service call /rosbag2_recorder/split_bagfile
rosbag2_interfaces/srv/SplitBagfile

Signed-off-by: Rick Shanor rshanor <[email protected]>

* feat(rosbag2_cpp): Add unit tests for SplitBagfile feature.

Also address PR comments from @MichaelOrlov and deal with rebase merge
conflicts.

Signed-off-by: Rick Shanor rshanor <[email protected]>

* fix(rosbag2_cpp): Remove unnecessary ManualSplitSequentialWriter.

After making split_bagfile public, this class was no longer necessary.

Signed-off-by: Rick Shanor rshanor <[email protected]>

* ci(rosbag2_transport): Mark test_record_services as xfail.

Signed-off-by: Rick Shanor rshanor <[email protected]>

Signed-off-by: Rick Shanor rshanor <[email protected]>
(cherry picked from commit e6f7fd7)

# Conflicts:
#	rosbag2_cpp/src/rosbag2_cpp/writer.cpp
#	rosbag2_cpp/test/rosbag2_cpp/fake_data.cpp
#	rosbag2_cpp/test/rosbag2_cpp/fake_data.hpp
#	rosbag2_interfaces/CMakeLists.txt
#	rosbag2_transport/CMakeLists.txt
  • Loading branch information
rshanor authored and MichaelOrlov committed Oct 18, 2022
1 parent b71944a commit b69b5ff
Show file tree
Hide file tree
Showing 13 changed files with 196 additions and 8 deletions.
5 changes: 5 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ class ROSBAG2_CPP_PUBLIC Writer final
*/
bool take_snapshot();

/**
* Close the current bagfile and opens the next bagfile.
*/
void split_bagfile();

/**
* Remove a new topic in the underlying storage.
* If creation of subscription fails remove the topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ class ROSBAG2_CPP_PUBLIC BaseWriterInterface
*/
virtual bool take_snapshot() = 0;

/**
* Close the current bagfile and opens the next bagfile.
*/
virtual void split_bagfile() = 0;

virtual void add_event_callbacks(const bag_events::WriterEventCallbacks & callbacks) = 0;
};

Expand Down
8 changes: 5 additions & 3 deletions rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
*/
void add_event_callbacks(const bag_events::WriterEventCallbacks & callbacks) override;

/**
* \brief Closes the current backed storage and opens the next bagfile.
*/
void split_bagfile() override;

protected:
std::string base_folder_;
std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory_;
Expand All @@ -146,9 +151,6 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter

rosbag2_storage::BagMetadata metadata_;

// Closes the current backed storage and opens the next bagfile.
virtual void split_bagfile();

// Checks if the current recording bagfile needs to be split and rolled over to a new file.
bool should_split_bagfile(
const std::chrono::time_point<std::chrono::high_resolution_clock> & current_time) const;
Expand Down
10 changes: 10 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,17 @@ bool Writer::take_snapshot()
return writer_impl_->take_snapshot();
}

<<<<<<< HEAD
void Writer::write(std::shared_ptr<rosbag2_storage::SerializedBagMessage> message)
=======
void Writer::split_bagfile()
{
std::lock_guard<std::mutex> writer_lock(writer_mutex_);
return writer_impl_->split_bagfile();
}

void Writer::write(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> message)
>>>>>>> e6f7fd7 (Add SplitBagfile recording service. (#1115))
{
std::lock_guard<std::mutex> writer_lock(writer_mutex_);
writer_impl_->write(message);
Expand Down
56 changes: 56 additions & 0 deletions rosbag2_cpp/test/rosbag2_cpp/fake_data.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2022, Foxglove Technologies. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "fake_data.hpp"

#include "rosbag2_storage/ros_helper.hpp"

void write_sample_split_bag(
const rosbag2_storage::StorageOptions & storage_options,
const std::vector<std::pair<rcutils_time_point_value_t, uint32_t>> & fake_messages,
size_t split_every)
{
std::string topic_name = "testtopic";

rosbag2_cpp::writers::SequentialWriter writer{};
writer.open(storage_options, rosbag2_cpp::ConverterOptions{});
writer.create_topic(
{
topic_name,
"test_msgs/ByteMultiArray",
"cdr",
""
});
for (size_t i = 0; i < fake_messages.size(); i++) {
if (i > 0 && (i % split_every == 0)) {
writer.split_bagfile();
}

const auto message = fake_messages[i];
rcutils_time_point_value_t time_stamp = message.first;
uint32_t value = message.second;

auto msg = std::make_shared<rosbag2_storage::SerializedBagMessage>();
msg->serialized_data = rosbag2_storage::make_serialized_message(&value, sizeof(value));
msg->time_stamp = time_stamp;
msg->topic_name = topic_name;
writer.write(msg);
}
writer.close();
}
29 changes: 29 additions & 0 deletions rosbag2_cpp/test/rosbag2_cpp/fake_data.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2022, Foxglove Technologies. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_CPP__FAKE_DATA_HPP_
#define ROSBAG2_CPP__FAKE_DATA_HPP_

#include <utility>
#include <vector>

#include "rosbag2_cpp/writers/sequential_writer.hpp"

// Write vector of <timestamp, uint32_data_value> pairs to bag files, splitting every N messages
void write_sample_split_bag(
const rosbag2_storage::StorageOptions & storage_options,
const std::vector<std::pair<rcutils_time_point_value_t, uint32_t>> & fake_messages,
size_t split_every);

#endif // ROSBAG2_CPP__FAKE_DATA_HPP_
5 changes: 5 additions & 0 deletions rosbag2_interfaces/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ rosidl_generate_interfaces(${PROJECT_NAME}
"srv/Seek.srv"
"srv/SetRate.srv"
"srv/Snapshot.srv"
<<<<<<< HEAD
=======
"srv/SplitBagfile.srv"
"srv/Stop.srv"
>>>>>>> e6f7fd7 (Add SplitBagfile recording service. (#1115))
"srv/TogglePaused.srv"
DEPENDENCIES builtin_interfaces
ADD_LINTER_TESTS
Expand Down
1 change: 1 addition & 0 deletions rosbag2_interfaces/srv/SplitBagfile.srv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
---
12 changes: 12 additions & 0 deletions rosbag2_transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,18 @@ function(create_tests_for_rmw_implementation)
LINK_LIBS rosbag2_transport
AMENT_DEPS test_msgs rosbag2_test_common)

<<<<<<< HEAD
=======
ament_add_test_label(test_record_services${target_suffix} xfail)

if(${rmw_implementation} MATCHES "rmw_cyclonedds(.*)")
ament_add_test_label(test_play_services__rmw_cyclonedds_cpp xfail)
endif()

if(${rmw_implementation} MATCHES "rmw_fastrtps(.*)")
ament_add_test_label(test_play_services__rmw_fastrtps_cpp xfail)
endif()
>>>>>>> e6f7fd7 (Add SplitBagfile recording service. (#1115))
endfunction()

if(BUILD_TESTING)
Expand Down
2 changes: 2 additions & 0 deletions rosbag2_transport/include/rosbag2_transport/recorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "rosbag2_cpp/writer.hpp"

#include "rosbag2_interfaces/srv/snapshot.hpp"
#include "rosbag2_interfaces/srv/split_bagfile.hpp"

#include "rosbag2_interfaces/msg/write_split_event.hpp"

Expand Down Expand Up @@ -156,6 +157,7 @@ class Recorder : public rclcpp::Node
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides_;
std::unordered_set<std::string> topic_unknown_types_;
rclcpp::Service<rosbag2_interfaces::srv::Snapshot>::SharedPtr srv_snapshot_;
rclcpp::Service<rosbag2_interfaces::srv::SplitBagfile>::SharedPtr srv_split_bagfile_;
std::atomic<bool> paused_ = false;
// Keyboard handler
std::shared_ptr<KeyboardHandler> keyboard_handler_;
Expand Down
10 changes: 10 additions & 0 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@ void Recorder::record()
});
}

srv_split_bagfile_ = create_service<rosbag2_interfaces::srv::SplitBagfile>(
"~/split_bagfile",
[this](
const std::shared_ptr<rmw_request_id_t>/* request_header */,
const std::shared_ptr<rosbag2_interfaces::srv::SplitBagfile::Request>/* request */,
const std::shared_ptr<rosbag2_interfaces::srv::SplitBagfile::Response>/* response */)
{
writer_->split_bagfile();
});

// Start the thread that will publish events
event_publisher_thread_ = std::thread(&Recorder::event_publisher_thread_main, this);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,22 @@ class MockSequentialWriter : public rosbag2_cpp::writer_interfaces::BaseWriterIn
return true;
}

void split_bagfile() override
{
auto info = std::make_shared<rosbag2_cpp::bag_events::BagSplitInfo>();
info->closed_file = "BagFile" + std::to_string(file_number_);
file_number_ += 1;
info->opened_file = "BagFile" + std::to_string(file_number_);
callback_manager_.execute_callbacks(rosbag2_cpp::bag_events::BagEvent::WRITE_SPLIT, info);
messages_per_file_ = 0;
split_bagfile_called_ = true;
}

bool split_bagfile_called()
{
return split_bagfile_called_;
}

void
add_event_callbacks(const rosbag2_cpp::bag_events::WriterEventCallbacks & callbacks) override
{
Expand Down Expand Up @@ -118,6 +134,7 @@ class MockSequentialWriter : public rosbag2_cpp::writer_interfaces::BaseWriterIn
rosbag2_cpp::bag_events::EventCallbackManager callback_manager_;
size_t file_number_ = 0;
const size_t max_messages_per_file_ = 5;
bool split_bagfile_called_ = false;
};

#endif // ROSBAG2_TRANSPORT__MOCK_SEQUENTIAL_WRITER_HPP_
44 changes: 39 additions & 5 deletions rosbag2_transport/test/rosbag2_transport/test_record_services.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "rclcpp/rclcpp.hpp"

#include "rosbag2_interfaces/srv/snapshot.hpp"
#include "rosbag2_interfaces/srv/split_bagfile.hpp"
#include "rosbag2_transport/recorder.hpp"

#include "rosbag2_test_common/publication_manager.hpp"
Expand All @@ -40,9 +41,11 @@ class RecordSrvsTest : public RecordIntegrationTestFixture
{
public:
using Snapshot = rosbag2_interfaces::srv::Snapshot;
using SplitBagfile = rosbag2_interfaces::srv::SplitBagfile;

RecordSrvsTest()
: RecordIntegrationTestFixture()
explicit RecordSrvsTest(const bool snapshot_mode)
: RecordIntegrationTestFixture(),
snapshot_mode_(snapshot_mode)
{}

~RecordSrvsTest() override
Expand All @@ -68,7 +71,7 @@ class RecordSrvsTest : public RecordIntegrationTestFixture

rosbag2_transport::RecordOptions record_options =
{false, false, {test_topic_}, "rmw_format", 100ms};
storage_options_.snapshot_mode = true;
storage_options_.snapshot_mode = snapshot_mode_;
storage_options_.max_cache_size = 200;
recorder_ = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer_), storage_options_, record_options, recorder_name_);
Expand All @@ -80,6 +83,7 @@ class RecordSrvsTest : public RecordIntegrationTestFixture

const std::string ns = "/" + recorder_name_;
cli_snapshot_ = client_node_->create_client<Snapshot>(ns + "/snapshot");
cli_split_bagfile_ = client_node_->create_client<SplitBagfile>(ns + "/split_bagfile");

exec_ = std::make_shared<rclcpp::executors::SingleThreadedExecutor>();

Expand All @@ -94,7 +98,10 @@ class RecordSrvsTest : public RecordIntegrationTestFixture
pub_manager.run_publishers();

// Make sure expected service is present before starting test
ASSERT_TRUE(cli_snapshot_->wait_for_service(service_wait_timeout_));
if (snapshot_mode_) {
ASSERT_TRUE(cli_snapshot_->wait_for_service(service_wait_timeout_));
}
ASSERT_TRUE(cli_split_bagfile_->wait_for_service(service_wait_timeout_));
}

/// Send a service request, and expect it to successfully return within a reasonable timeout
Expand Down Expand Up @@ -135,9 +142,19 @@ class RecordSrvsTest : public RecordIntegrationTestFixture
// Service clients
rclcpp::Node::SharedPtr client_node_;
rclcpp::Client<Snapshot>::SharedPtr cli_snapshot_;
rclcpp::Client<SplitBagfile>::SharedPtr cli_split_bagfile_;

bool snapshot_mode_;
};

class RecordSrvsSnapshotTest : public RecordSrvsTest
{
protected:
RecordSrvsSnapshotTest()
: RecordSrvsTest(true /*snapshot_mode*/) {}
};

TEST_F(RecordSrvsTest, trigger_snapshot)
TEST_F(RecordSrvsSnapshotTest, trigger_snapshot)
{
auto & writer = recorder_->get_writer_handle();
MockSequentialWriter & mock_writer =
Expand All @@ -152,3 +169,20 @@ TEST_F(RecordSrvsTest, trigger_snapshot)
successful_service_request<Snapshot>(cli_snapshot_);
EXPECT_THAT(mock_writer.get_messages().size(), Ne(0u));
}

class RecordSrvsSplitBagfileTest : public RecordSrvsTest
{
protected:
RecordSrvsSplitBagfileTest()
: RecordSrvsTest(false /*snapshot_mode*/) {}
};

TEST_F(RecordSrvsSplitBagfileTest, split_bagfile)
{
auto & writer = recorder_->get_writer_handle();
MockSequentialWriter & mock_writer =
static_cast<MockSequentialWriter &>(writer.get_implementation_handle());
EXPECT_FALSE(mock_writer.split_bagfile_called());
successful_service_request<SplitBagfile>(cli_split_bagfile_);
EXPECT_TRUE(mock_writer.split_bagfile_called());
}

0 comments on commit b69b5ff

Please sign in to comment.