diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index 7e511d4097..d94d1ccb87 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -82,7 +82,7 @@ class RecorderImpl std::shared_ptr writer_; rosbag2_storage::StorageOptions storage_options_; rosbag2_transport::RecordOptions record_options_; - std::atomic stop_discovery_; + std::atomic stop_discovery_ = false; std::unordered_map> subscriptions_; private: @@ -132,14 +132,15 @@ class RecorderImpl rclcpp::Service::SharedPtr srv_split_bagfile_; std::atomic paused_ = false; + std::atomic in_recording_ = false; std::shared_ptr keyboard_handler_; KeyboardHandler::callback_handle_t toggle_paused_key_callback_handle_ = KeyboardHandler::invalid_handle; // Variables for event publishing rclcpp::Publisher::SharedPtr split_event_pub_; - bool event_publisher_thread_should_exit_ = false; - bool write_split_has_occurred_ = false; + std::atomic event_publisher_thread_should_exit_ = false; + std::atomic write_split_has_occurred_ = false; rosbag2_cpp::bag_events::BagSplitInfo bag_split_info_; std::mutex event_publisher_thread_mutex_; std::condition_variable event_publisher_thread_wake_cv_; @@ -185,12 +186,17 @@ RecorderImpl::~RecorderImpl() stop(); } - void RecorderImpl::stop() { stop_discovery_ = true; if (discovery_future_.valid()) { - discovery_future_.wait(); + auto status = discovery_future_.wait_for(2 * record_options_.topic_polling_interval); + if (status != std::future_status::ready) { + RCLCPP_ERROR_STREAM( + node->get_logger(), + "discovery_future_.wait_for(" << record_options_.topic_polling_interval.count() << + ") return status: " << (status == std::future_status::timeout ? "timeout" : "deferred")); + } } paused_ = true; subscriptions_.clear(); @@ -204,10 +210,18 @@ void RecorderImpl::stop() if (event_publisher_thread_.joinable()) { event_publisher_thread_.join(); } + in_recording_ = false; + RCLCPP_INFO(node->get_logger(), "Recording stopped"); } void RecorderImpl::record() { + if (in_recording_.exchange(true)) { + RCLCPP_WARN_STREAM( + node->get_logger(), + "Called Recorder::record() while already in recording, dismissing request."); + return; + } paused_ = record_options_.start_paused; topic_qos_profile_overrides_ = record_options_.topic_qos_profile_overrides; if (record_options_.rmw_serialization_format.empty()) { @@ -274,9 +288,8 @@ void RecorderImpl::record() // Start the thread that will publish events event_publisher_thread_ = std::thread(&RecorderImpl::event_publisher_thread_main, this); - split_event_pub_ = node->create_publisher( - "events/write_split", - 1); + split_event_pub_ = + node->create_publisher("events/write_split", 1); rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; callbacks.write_split_callback = [this](rosbag2_cpp::bag_events::BagSplitInfo & info) { @@ -297,15 +310,13 @@ void RecorderImpl::record() discovery_future_ = std::async(std::launch::async, std::bind(&RecorderImpl::topics_discovery, this)); } + RCLCPP_INFO(node->get_logger(), "Recording..."); } void RecorderImpl::event_publisher_thread_main() { RCLCPP_INFO(node->get_logger(), "Event publisher thread: Starting"); - - bool should_exit = false; - - while (!should_exit) { + while (!event_publisher_thread_should_exit_.load()) { std::unique_lock lock(event_publisher_thread_mutex_); event_publisher_thread_wake_cv_.wait( lock, @@ -319,10 +330,7 @@ void RecorderImpl::event_publisher_thread_main() message.opened_file = bag_split_info_.opened_file; split_event_pub_->publish(message); } - - should_exit = event_publisher_thread_should_exit_; } - RCLCPP_INFO(node->get_logger(), "Event publisher thread: Exiting"); }