From 7af6ef944afde33aeafc035f9d6afefcd98f08ce Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Mon, 12 Jun 2023 23:21:53 -0700 Subject: [PATCH] Gracefully handle SIGINT and SIGTERM in rosbag2 recorder (#1301) * Gracefully handle SIGINT and SIGTERM signal for rosbag2 recorder - Call recorder->stop() and exec_->cancel() instead of rclcpp::shutdown Signed-off-by: Michael Orlov * Use singleton for static executor in the rosbag2_py::Recorder Signed-off-by: Michael Orlov * Rollback to the non-static executor and don't call executor->cancel() - In case when signal will arrive we will trigger our internal exit_ variable and wait while current exec_->spin_all(10msec) will exit. Signed-off-by: Michael Orlov * Spin recorder node in a separate thread for better handling exit - Run exec->spin() in a separate thread, because we need to call exec->cancel() after recorder->stop() to be able to send notifications about bag split and close. - Wait on conditional variable for exit_ flag Signed-off-by: Michael Orlov * Address race condition in rosbag2_py.test_record_cancel - Add `record_thread.join()` before trying to parse metadata. Signed-off-by: Michael Orlov --------- Signed-off-by: Michael Orlov (cherry picked from commit 46a23e9c0a6f4ab4e41b2b2d8471b0a25ab1be79) # Conflicts: # rosbag2_py/src/rosbag2_py/_transport.cpp # rosbag2_py/test/test_transport.py --- rosbag2_py/src/rosbag2_py/_transport.cpp | 69 ++++++++++++++++++++---- rosbag2_py/test/test_transport.py | 12 +++++ 2 files changed, 70 insertions(+), 11 deletions(-) diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index bbfa21b73a..426ff05912 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -141,17 +141,20 @@ class Player class Recorder { -private: - std::unique_ptr exec_; - public: Recorder() { - rclcpp::init(0, nullptr); - exec_ = std::make_unique(); + auto init_options = rclcpp::InitOptions(); + init_options.shutdown_on_signal = false; + rclcpp::init(0, nullptr, init_options, rclcpp::SignalHandlerOptions::None); + std::signal( SIGTERM, [](int /* signal */) { - rclcpp::shutdown(); + rosbag2_py::Recorder::cancel(); + }); + std::signal( + SIGINT, [](int /* signal */) { + rosbag2_py::Recorder::cancel(); }); } @@ -164,6 +167,8 @@ class Recorder const rosbag2_storage::StorageOptions & storage_options, RecordOptions & record_options) { + exit_ = false; + auto exec = std::make_unique(); if (record_options.rmw_serialization_format.empty()) { record_options.rmw_serialization_format = std::string(rmw_get_serialization_format()); } @@ -173,20 +178,55 @@ class Recorder std::move(writer), storage_options, record_options); recorder->record(); - exec_->add_node(recorder); - // Release the GIL for long-running record, so that calling Python code can use other threads + exec->add_node(recorder); + // Run exec->spin() in a separate thread, because we need to call exec->cancel() after + // recorder->stop() to be able to send notifications about bag split and close. + auto spin_thread = std::thread( + [&exec]() { + exec->spin(); + }); { + // Release the GIL for long-running record, so that calling Python code can use other threads py::gil_scoped_release release; - exec_->spin(); + std::unique_lock lock(wait_for_exit_mutex_); + wait_for_exit_cv_.wait(lock, [] {return rosbag2_py::Recorder::exit_.load();}); + recorder->stop(); + } + exec->cancel(); + if (spin_thread.joinable()) { + spin_thread.join(); } + exec->remove_node(recorder); } - void cancel() + static void cancel() { - exec_->cancel(); + exit_ = true; + wait_for_exit_cv_.notify_all(); } + +protected: + static std::atomic_bool exit_; + static std::condition_variable wait_for_exit_cv_; + std::mutex wait_for_exit_mutex_; }; +<<<<<<< HEAD +======= +std::atomic_bool Recorder::exit_{false}; +std::condition_variable Recorder::wait_for_exit_cv_{}; + +// Return a RecordOptions struct with defaults set for rewriting bags. +rosbag2_transport::RecordOptions bag_rewrite_default_record_options() +{ + rosbag2_transport::RecordOptions options{}; + // We never want to drop messages when converting bags, so set the compression queue size to 0 + // (unbounded). + options.compression_queue_size = 0; + return options; +} + +>>>>>>> 46a23e9 (Gracefully handle SIGINT and SIGTERM in rosbag2 recorder (#1301)) // Simple wrapper to read the output config YAML into structs void bag_rewrite( const std::vector & input_options, @@ -283,8 +323,15 @@ PYBIND11_MODULE(_transport, m) { py::class_(m, "Recorder") .def(py::init()) +<<<<<<< HEAD .def("record", &rosbag2_py::Recorder::record) .def("cancel", &rosbag2_py::Recorder::cancel) +======= + .def( + "record", &rosbag2_py::Recorder::record, py::arg("storage_options"), py::arg("record_options"), + py::arg("node_name") = "rosbag2_recorder") + .def_static("cancel", &rosbag2_py::Recorder::cancel) +>>>>>>> 46a23e9 (Gracefully handle SIGINT and SIGTERM in rosbag2 recorder (#1301)) ; m.def( diff --git a/rosbag2_py/test/test_transport.py b/rosbag2_py/test/test_transport.py index 176d7e9fe2..0b31d83fcb 100644 --- a/rosbag2_py/test/test_transport.py +++ b/rosbag2_py/test/test_transport.py @@ -81,7 +81,19 @@ def test_record_cancel(tmp_path): recorder.cancel() +<<<<<<< HEAD metadata_path = Path(bag_path) / 'metadata.yaml' db3_path = Path(bag_path) / 'test_record_cancel_0.db3' assert wait_for(lambda: metadata_path.is_file() and db3_path.is_file(), +======= + metadata_io = rosbag2_py.MetadataIo() + assert wait_for(lambda: metadata_io.metadata_file_exists(bag_path), + timeout=rclpy.duration.Duration(seconds=3)) + record_thread.join() + + metadata = metadata_io.read_metadata(bag_path) + assert(len(metadata.relative_file_paths)) + storage_path = Path(metadata.relative_file_paths[0]) + assert wait_for(lambda: storage_path.is_file(), +>>>>>>> 46a23e9 (Gracefully handle SIGINT and SIGTERM in rosbag2 recorder (#1301)) timeout=rclpy.duration.Duration(seconds=3))