Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Progress-bar for ros2 bag play #1836

Draft
wants to merge 5 commits into
base: rolling
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ros2bag/ros2bag/verb/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,13 @@ def add_arguments(self, parser, cli_name): # noqa: D102
'--log-level', type=str, default='info',
choices=['debug', 'info', 'warn', 'error', 'fatal'],
help='Logging level.')
parser.add_argument(
'--progress-bar', type=float, default=10.0,
help='Print a progress bar of the playback player at a specific frequency in Hz. '
'Default is %(default)d. '
'Negative values mark an update for every published message, while '
' a zero value disables the progress bar.',
metavar='PROGRESS_BAR_FREQUENCY')

def get_playback_until_from_arg_group(self, playback_until_sec, playback_until_nsec) -> int:
nano_scale = 1000 * 1000 * 1000
Expand Down Expand Up @@ -243,6 +250,7 @@ def main(self, *, args): # noqa: D102
play_options.service_requests_source = ServiceRequestsSource.SERVICE_INTROSPECTION
else:
play_options.service_requests_source = ServiceRequestsSource.CLIENT_INTROSPECTION
play_options.progress_bar_print_frequency = args.progress_bar

player = Player(args.log_level)
try:
Expand Down
1 change: 1 addition & 0 deletions rosbag2_py/rosbag2_py/_transport.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class PlayOptions:
node_prefix: str
playback_duration: float
playback_until_timestamp: int
progress_bar_print_frequency: float
publish_service_requests: bool
rate: float
read_ahead_queue_size: int
Expand Down
1 change: 1 addition & 0 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,7 @@ PYBIND11_MODULE(_transport, m) {
"playback_until_timestamp",
&PlayOptions::getPlaybackUntilTimestamp,
&PlayOptions::setPlaybackUntilTimestamp)
.def_readwrite("progress_bar_print_frequency", &PlayOptions::progress_bar_print_frequency)
.def_readwrite("wait_acked_timeout", &PlayOptions::wait_acked_timeout)
.def_readwrite("disable_loan_message", &PlayOptions::disable_loan_message)
.def_readwrite("publish_service_requests", &PlayOptions::publish_service_requests)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ struct PlayOptions

// The source of the service request
ServiceRequestsSource service_requests_source = ServiceRequestsSource::SERVICE_INTROSPECTION;

// Rate in Hz at which to print progress bar.
double progress_bar_print_frequency = 10.0;
};

} // namespace rosbag2_transport
Expand Down
121 changes: 117 additions & 4 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ rclcpp::QoS publisher_qos_for_topic(

namespace rosbag2_transport
{

enum class PlayerStatus : char
{
RUNNING = 'R',
PAUSED = 'P',
DELAYED = 'D',
};

constexpr Player::callback_handle_t Player::invalid_callback_handle;

class PlayerImpl
Expand Down Expand Up @@ -123,12 +131,13 @@ class PlayerImpl
virtual bool set_rate(double);

/// \brief Playing next message from queue when in pause.
/// \param in_burst_mode If true, it will not call the progress bar update to avoid delays.
/// \details This is blocking call and it will wait until next available message will be
/// published or rclcpp context shut down.
/// \note If internal player queue is starving and storage has not been completely loaded,
/// this method will wait until new element will be pushed to the queue.
/// \return true if player in pause mode and successfully played next message, otherwise false.
virtual bool play_next();
virtual bool play_next(bool in_burst_mode = false);

/// \brief Burst the next \p num_messages messages from the queue when paused.
/// \param num_messages The number of messages to burst from the queue. Specifying zero means no
Expand Down Expand Up @@ -298,6 +307,9 @@ class PlayerImpl
void configure_play_until_timestamp();
bool shall_stop_at_timestamp(const rcutils_time_point_value_t & msg_timestamp) const;

void print_progress_bar_help_str(double progress_bar_print_frequency) const;
void print_progress_bar_status(const PlayerStatus & status, bool force_update = false);

static constexpr double read_ahead_lower_bound_percentage_ = 0.9;
static const std::chrono::milliseconds queue_read_wait_period_;
std::atomic_bool cancel_wait_for_next_message_{false};
Expand Down Expand Up @@ -328,6 +340,8 @@ class PlayerImpl
std::condition_variable playback_finished_cv_;

rcutils_time_point_value_t starting_time_;
double starting_time_secs_;
double duration_secs_;

// control services
rclcpp::Service<rosbag2_interfaces::srv::Pause>::SharedPtr srv_pause_;
Expand All @@ -349,6 +363,13 @@ class PlayerImpl
std::vector<KeyboardHandler::callback_handle_t> keyboard_callbacks_;

std::shared_ptr<PlayerServiceClientManager> player_service_client_manager_;

// progress bar
const bool enable_progress_bar_;
const bool progress_bar_update_always_;
const rcutils_duration_value_t progress_bar_update_period_;
std::chrono::steady_clock::time_point progress_bar_last_time_printed_{};
rcutils_time_point_value_t recv_timestamp_last_published_;
};

PlayerImpl::PlayerImpl(
Expand All @@ -361,7 +382,12 @@ PlayerImpl::PlayerImpl(
storage_options_(storage_options),
play_options_(play_options),
keyboard_handler_(std::move(keyboard_handler)),
player_service_client_manager_(std::make_shared<PlayerServiceClientManager>())
player_service_client_manager_(std::make_shared<PlayerServiceClientManager>()),
enable_progress_bar_(play_options.progress_bar_print_frequency != 0),
progress_bar_update_always_(play_options.progress_bar_print_frequency < 0),
progress_bar_update_period_(play_options.progress_bar_print_frequency != 0 ?
RCUTILS_S_TO_NS(1.0 / play_options.progress_bar_print_frequency) :
std::numeric_limits<int64_t>::max())
{
for (auto & topic : play_options_.topics_to_filter) {
topic = rclcpp::expand_topic_or_service_name(
Expand Down Expand Up @@ -393,6 +419,7 @@ PlayerImpl::PlayerImpl(
// keep reader open until player is destroyed
reader_->open(storage_options_, {"", rmw_get_serialization_format()});
auto metadata = reader_->get_metadata();
rcutils_duration_value_t bag_duration = metadata.duration.count();
starting_time_ = std::chrono::duration_cast<std::chrono::nanoseconds>(
metadata.starting_time.time_since_epoch()).count();
// If a non-default (positive) starting time offset is provided in PlayOptions,
Expand All @@ -405,7 +432,11 @@ PlayerImpl::PlayerImpl(
". Negative start offset ignored.");
} else {
starting_time_ += play_options_.start_offset;
bag_duration -= play_options_.start_offset;
}
starting_time_secs_ = RCUTILS_NS_TO_S(static_cast<double>(starting_time_));
duration_secs_ = RCUTILS_NS_TO_S(static_cast<double>(bag_duration));
recv_timestamp_last_published_ = starting_time_;
clock_ = std::make_unique<rosbag2_cpp::TimeControllerClock>(
starting_time_, std::chrono::steady_clock::now,
std::chrono::milliseconds{100}, play_options_.start_paused);
Expand All @@ -416,6 +447,7 @@ PlayerImpl::PlayerImpl(
}
create_control_services();
add_keyboard_callbacks();
print_progress_bar_help_str(play_options.progress_bar_print_frequency);
}

PlayerImpl::~PlayerImpl()
Expand All @@ -436,6 +468,10 @@ PlayerImpl::~PlayerImpl()
if (reader_) {
reader_->close();
}
// print new line to keep on screen the latest progress bar (which had a carriage return)
if (enable_progress_bar_) {
std::cout << std::endl;
}
}

const std::chrono::milliseconds
Expand Down Expand Up @@ -486,8 +522,12 @@ bool PlayerImpl::play()
do {
if (delay > rclcpp::Duration(0, 0)) {
RCLCPP_INFO_STREAM(owner_->get_logger(), "Sleep " << delay.nanoseconds() << " ns");
print_progress_bar_status(PlayerStatus::DELAYED);
std::chrono::nanoseconds delay_duration(delay.nanoseconds());
std::this_thread::sleep_for(delay_duration);
} else {
print_progress_bar_status(clock_->is_paused() ?
PlayerStatus::PAUSED : PlayerStatus::RUNNING, true);
}
{
std::lock_guard<std::mutex> lk(reader_mutex_);
Expand Down Expand Up @@ -552,6 +592,8 @@ bool PlayerImpl::play()
is_in_playback_ = false;
playback_finished_cv_.notify_all();
}
print_progress_bar_status(clock_->is_paused() ?
PlayerStatus::PAUSED : PlayerStatus::RUNNING, true);
});
return true;
}
Expand Down Expand Up @@ -602,18 +644,22 @@ void PlayerImpl::stop()
playback_thread_.join();
}
}
print_progress_bar_status(clock_->is_paused() ?
PlayerStatus::PAUSED : PlayerStatus::RUNNING, true);
}

void PlayerImpl::pause()
{
clock_->pause();
RCLCPP_INFO_STREAM(owner_->get_logger(), "Pausing play.");
print_progress_bar_status(PlayerStatus::PAUSED);
}

void PlayerImpl::resume()
{
clock_->resume();
RCLCPP_INFO_STREAM(owner_->get_logger(), "Resuming play.");
print_progress_bar_status(PlayerStatus::RUNNING, true);
}

void PlayerImpl::toggle_paused()
Expand All @@ -640,6 +686,8 @@ bool PlayerImpl::set_rate(double rate)
} else {
RCLCPP_WARN_STREAM(owner_->get_logger(), "Failed to set rate to invalid value " << rate);
}
print_progress_bar_status(clock_->is_paused() ?
PlayerStatus::PAUSED : PlayerStatus::RUNNING, true);
return ok;
}

Expand Down Expand Up @@ -672,10 +720,11 @@ rosbag2_storage::SerializedBagMessageSharedPtr PlayerImpl::peek_next_message_fro
return nullptr;
}

bool PlayerImpl::play_next()
bool PlayerImpl::play_next(bool is_burst_mode)
{
if (!clock_->is_paused()) {
RCLCPP_WARN_STREAM(owner_->get_logger(), "Called play next, but not in paused state.");
print_progress_bar_status(PlayerStatus::RUNNING, true);
return false;
}

Expand All @@ -688,6 +737,7 @@ bool PlayerImpl::play_next()
// resume() or stop() from another thread while we were waiting on mutex.
if (!clock_->is_paused()) {
RCLCPP_WARN_STREAM(owner_->get_logger(), "Called play next, but not in paused state.");
print_progress_bar_status(PlayerStatus::RUNNING, true);
return false;
}
skip_message_in_main_play_loop_ = true;
Expand All @@ -707,6 +757,11 @@ bool PlayerImpl::play_next()
next_message_published = publish_message(message_ptr);
clock_->jump(message_ptr->recv_timestamp);

// Update the progress bar if we were not called in burst mode to avoid delays.
if (!is_burst_mode) {
recv_timestamp_last_published_ = message_ptr->recv_timestamp;
print_progress_bar_status(PlayerStatus::PAUSED);
}
message_queue_.pop();
message_ptr = peek_next_message_from_queue();
}
Expand All @@ -717,20 +772,23 @@ size_t PlayerImpl::burst(const size_t num_messages)
{
if (!clock_->is_paused()) {
RCLCPP_WARN_STREAM(owner_->get_logger(), "Burst can only be used when in the paused state.");
print_progress_bar_status(PlayerStatus::RUNNING, true);
return 0;
}

uint64_t messages_played = 0;

for (auto ii = 0u; ii < num_messages || num_messages == 0; ++ii) {
if (play_next()) {
if (play_next(true)) {
++messages_played;
} else {
break;
}
}

RCLCPP_INFO_STREAM(owner_->get_logger(), "Burst " << messages_played << " messages.");
print_progress_bar_status(clock_->is_paused() ?
PlayerStatus::PAUSED : PlayerStatus::RUNNING, true);
return messages_played;
}

Expand Down Expand Up @@ -956,6 +1014,8 @@ void PlayerImpl::play_messages_from_queue()
continue;
}
publish_message(message_ptr);
recv_timestamp_last_published_ = message_ptr->recv_timestamp;
print_progress_bar_status(PlayerStatus::RUNNING);
}
message_queue_.pop();
message_ptr = peek_next_message_from_queue();
Expand Down Expand Up @@ -1527,6 +1587,59 @@ const rosbag2_transport::PlayOptions & PlayerImpl::get_play_options()
return play_options_;
}

void PlayerImpl::print_progress_bar_help_str(double progress_bar_print_frequency) const
{
std::string progress_bar_help_str;
if (enable_progress_bar_) {
if (progress_bar_update_period_ < 0) {
progress_bar_help_str = "Progress bar enabled with updates for every message.";
} else {
std::ostringstream oss;
oss << "Progress bar enabled with updates at "
<< std::fixed << std::setprecision(1) << progress_bar_print_frequency << " Hz.";
progress_bar_help_str = oss.str();
}
progress_bar_help_str += " [?]: [R]unning, [P]aused, [D]elayed.";
} else {
progress_bar_help_str = "Progress bar disabled.";
}
RCLCPP_INFO_STREAM(owner_->get_logger(), progress_bar_help_str);
}

void PlayerImpl::print_progress_bar_status(const PlayerStatus & status, bool force_update)
{
if (!enable_progress_bar_) {
return;
}

const std::chrono::steady_clock::time_point steady_time_now = std::chrono::steady_clock::now();

// Check if we should force an update of the progress bar, otherwise check time since last update.
// Force an update if:
// - the user-defined update period is negative;
// - or this function was called with a status other than RUNNING (since we want to update the
// progress bar when we pause or delay the playback, otherwise it will be stuck
// to the last update if not enough time has passed since it);
// - or this function was called with force_update set to true.
if (!(progress_bar_update_always_ || status != PlayerStatus::RUNNING || force_update)) {
if (std::chrono::duration_cast<std::chrono::nanoseconds>(
steady_time_now - progress_bar_last_time_printed_).count() < progress_bar_update_period_)
{
return;
}
}

const double current_time_secs = RCUTILS_NS_TO_S(
static_cast<double>(recv_timestamp_last_published_));
const double progress_secs = current_time_secs - starting_time_secs_;
// Print progress bar ending with carriage return '/r' so it will be overwritten by next print
std::cout << " Bag Time " << std::setw(13) << std::fixed << std::setprecision(6) <<
current_time_secs << " Duration " << std::fixed << std::setprecision(6) <<
progress_secs << "/" << duration_secs_ << " [" << static_cast<char>(status) <<
"]\r" << std::flush;
progress_bar_last_time_printed_ = steady_time_now;
}

///////////////////////////////
// Player public interface

Expand Down
Loading