Skip to content

Commit

Permalink
Code review
Browse files Browse the repository at this point in the history
Signed-off-by: Nicola Loi <[email protected]>
  • Loading branch information
nicolaloi committed Oct 28, 2024
1 parent a00bdc4 commit 43fa0f8
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 70 deletions.
10 changes: 7 additions & 3 deletions ros2bag/ros2bag/verb/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,12 @@ def add_arguments(self, parser, cli_name): # noqa: D102
choices=['debug', 'info', 'warn', 'error', 'fatal'],
help='Logging level.')
parser.add_argument(
'--progress-bar', action='store_true', default=False,
help='Print a progress bar of the playback player.')
'--progress-bar', type=float, default=10.0,
help='Print a progress bar of the playback player at a specific frequency in Hz. '
'Default is %(default)d. '
'Negative values mark an update for every published message, while '
' a zero value disables the progress bar.',
metavar='PROGRESS_BAR_FREQUENCY')

def get_playback_until_from_arg_group(self, playback_until_sec, playback_until_nsec) -> int:
nano_scale = 1000 * 1000 * 1000
Expand Down Expand Up @@ -246,7 +250,7 @@ def main(self, *, args): # noqa: D102
play_options.service_requests_source = ServiceRequestsSource.SERVICE_INTROSPECTION
else:
play_options.service_requests_source = ServiceRequestsSource.CLIENT_INTROSPECTION
play_options.disable_progress_bar = not args.progress_bar
play_options.progress_bar_print_frequency = args.progress_bar

player = Player(args.log_level)
try:
Expand Down
2 changes: 1 addition & 1 deletion rosbag2_py/rosbag2_py/_transport.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ class PlayOptions:
delay: float
disable_keyboard_controls: bool
disable_loan_message: bool
disable_progress_bar: bool
exclude_regex_to_filter: str
exclude_service_events_to_filter: List[str]
exclude_topics_to_filter: List[str]
loop: bool
node_prefix: str
playback_duration: float
playback_until_timestamp: int
progress_bar_print_frequency: float
publish_service_requests: bool
rate: float
read_ahead_queue_size: int
Expand Down
3 changes: 1 addition & 2 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -544,8 +544,7 @@ PYBIND11_MODULE(_transport, m) {
"playback_until_timestamp",
&PlayOptions::getPlaybackUntilTimestamp,
&PlayOptions::setPlaybackUntilTimestamp)
.def_readwrite(
"disable_progress_bar", &PlayOptions::disable_progress_bar)
.def_readwrite("progress_bar_print_frequency", &PlayOptions::progress_bar_print_frequency)
.def_readwrite("wait_acked_timeout", &PlayOptions::wait_acked_timeout)
.def_readwrite("disable_loan_message", &PlayOptions::disable_loan_message)
.def_readwrite("publish_service_requests", &PlayOptions::publish_service_requests)
Expand Down
4 changes: 2 additions & 2 deletions rosbag2_transport/include/rosbag2_transport/play_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ struct PlayOptions
// The source of the service request
ServiceRequestsSource service_requests_source = ServiceRequestsSource::SERVICE_INTROSPECTION;

// Disable to print a progress bar of the playback player
bool disable_progress_bar = true;
// Rate in Hz at which to print progress bar.
double progress_bar_print_frequency = 10.0;
};

} // namespace rosbag2_transport
Expand Down
155 changes: 93 additions & 62 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ rclcpp::QoS publisher_qos_for_topic(

namespace rosbag2_transport
{

enum class PlayerStatus : char
{
RUNNING = 'R',
PAUSED = 'P',
DELAYED = 'D',
};

constexpr Player::callback_handle_t Player::invalid_callback_handle;

class PlayerImpl
Expand Down Expand Up @@ -123,12 +131,13 @@ class PlayerImpl
virtual bool set_rate(double);

/// \brief Playing next message from queue when in pause.
/// \param in_burst_mode If true, it will not call the progress bar update to avoid delays.
/// \details This is blocking call and it will wait until next available message will be
/// published or rclcpp context shut down.
/// \note If internal player queue is starving and storage has not been completely loaded,
/// this method will wait until new element will be pushed to the queue.
/// \return true if player in pause mode and successfully played next message, otherwise false.
virtual bool play_next();
virtual bool play_next(bool in_burst_mode = false);

/// \brief Burst the next \p num_messages messages from the queue when paused.
/// \param num_messages The number of messages to burst from the queue. Specifying zero means no
Expand Down Expand Up @@ -298,12 +307,8 @@ class PlayerImpl
void configure_play_until_timestamp();
bool shall_stop_at_timestamp(const rcutils_time_point_value_t & msg_timestamp) const;

template<char status>
void print_status() const;
void check_and_print_status() const;
void print_running_status() const;
void print_paused_status() const;
void print_delayed_status() const;
void print_progress_bar_help_str(double progress_bar_print_frequency) const;
void print_progress_bar_status(const PlayerStatus & status, bool force_update = false);

static constexpr double read_ahead_lower_bound_percentage_ = 0.9;
static const std::chrono::milliseconds queue_read_wait_period_;
Expand Down Expand Up @@ -360,7 +365,11 @@ class PlayerImpl
std::shared_ptr<PlayerServiceClientManager> player_service_client_manager_;

// progress bar
const bool disable_progress_bar_;
const bool enable_progress_bar_;
const bool progress_bar_update_always_;
const rcutils_duration_value_t progress_bar_update_period_;
std::chrono::steady_clock::time_point progress_bar_last_time_printed_{};
rcutils_time_point_value_t recv_timestamp_last_published_;
};

PlayerImpl::PlayerImpl(
Expand All @@ -374,7 +383,11 @@ PlayerImpl::PlayerImpl(
play_options_(play_options),
keyboard_handler_(std::move(keyboard_handler)),
player_service_client_manager_(std::make_shared<PlayerServiceClientManager>()),
disable_progress_bar_(play_options.disable_progress_bar)
enable_progress_bar_(play_options.progress_bar_print_frequency != 0),
progress_bar_update_always_(play_options.progress_bar_print_frequency < 0),
progress_bar_update_period_(play_options.progress_bar_print_frequency != 0 ?
RCUTILS_S_TO_NS(1.0 / play_options.progress_bar_print_frequency) :
std::numeric_limits<int64_t>::max())
{
for (auto & topic : play_options_.topics_to_filter) {
topic = rclcpp::expand_topic_or_service_name(
Expand Down Expand Up @@ -423,6 +436,7 @@ PlayerImpl::PlayerImpl(
}
starting_time_secs_ = RCUTILS_NS_TO_S(static_cast<double>(starting_time_));
duration_secs_ = RCUTILS_NS_TO_S(static_cast<double>(bag_duration));
recv_timestamp_last_published_ = starting_time_;
clock_ = std::make_unique<rosbag2_cpp::TimeControllerClock>(
starting_time_, std::chrono::steady_clock::now,
std::chrono::milliseconds{100}, play_options_.start_paused);
Expand All @@ -433,13 +447,7 @@ PlayerImpl::PlayerImpl(
}
create_control_services();
add_keyboard_callbacks();
std::string progress_bar_help_str;
if (!disable_progress_bar_) {
progress_bar_help_str = "Bag Time and Duration [?]: ? = R running, P paused, D delayed";
} else {
progress_bar_help_str = "Bag Time and Duration progress bar disabled.";
}
RCLCPP_INFO_STREAM(owner_->get_logger(), progress_bar_help_str);
print_progress_bar_help_str(play_options.progress_bar_print_frequency);
}

PlayerImpl::~PlayerImpl()
Expand All @@ -460,10 +468,9 @@ PlayerImpl::~PlayerImpl()
if (reader_) {
reader_->close();
}
// print new line to keep on screen the latest progress bar
if (!disable_progress_bar_) {
printf("\n");
fflush(stdout);
// print new line to keep on screen the latest progress bar (which had a carriage return)
if (enable_progress_bar_) {
std::cout << std::endl;
}
}

Expand Down Expand Up @@ -515,9 +522,12 @@ bool PlayerImpl::play()
do {
if (delay > rclcpp::Duration(0, 0)) {
RCLCPP_INFO_STREAM(owner_->get_logger(), "Sleep " << delay.nanoseconds() << " ns");
print_delayed_status();
print_progress_bar_status(PlayerStatus::DELAYED);
std::chrono::nanoseconds delay_duration(delay.nanoseconds());
std::this_thread::sleep_for(delay_duration);
} else {
print_progress_bar_status(clock_->is_paused() ?
PlayerStatus::PAUSED : PlayerStatus::RUNNING, true);
}
{
std::lock_guard<std::mutex> lk(reader_mutex_);
Expand Down Expand Up @@ -582,6 +592,8 @@ bool PlayerImpl::play()
is_in_playback_ = false;
playback_finished_cv_.notify_all();
}
print_progress_bar_status(clock_->is_paused() ?
PlayerStatus::PAUSED : PlayerStatus::RUNNING, true);
});
return true;
}
Expand Down Expand Up @@ -632,21 +644,22 @@ void PlayerImpl::stop()
playback_thread_.join();
}
}
check_and_print_status();
print_progress_bar_status(clock_->is_paused() ?
PlayerStatus::PAUSED : PlayerStatus::RUNNING, true);
}

void PlayerImpl::pause()
{
clock_->pause();
RCLCPP_INFO_STREAM(owner_->get_logger(), "Pausing play.");
print_paused_status();
print_progress_bar_status(PlayerStatus::PAUSED);
}

void PlayerImpl::resume()
{
clock_->resume();
RCLCPP_INFO_STREAM(owner_->get_logger(), "Resuming play.");
print_running_status();
print_progress_bar_status(PlayerStatus::RUNNING, true);
}

void PlayerImpl::toggle_paused()
Expand All @@ -673,7 +686,8 @@ bool PlayerImpl::set_rate(double rate)
} else {
RCLCPP_WARN_STREAM(owner_->get_logger(), "Failed to set rate to invalid value " << rate);
}
check_and_print_status();
print_progress_bar_status(clock_->is_paused() ?
PlayerStatus::PAUSED : PlayerStatus::RUNNING, true);
return ok;
}

Expand Down Expand Up @@ -706,10 +720,11 @@ rosbag2_storage::SerializedBagMessageSharedPtr PlayerImpl::peek_next_message_fro
return nullptr;
}

bool PlayerImpl::play_next()
bool PlayerImpl::play_next(bool is_burst_mode)
{
if (!clock_->is_paused()) {
RCLCPP_WARN_STREAM(owner_->get_logger(), "Called play next, but not in paused state.");
print_progress_bar_status(PlayerStatus::RUNNING, true);
return false;
}

Expand All @@ -722,6 +737,7 @@ bool PlayerImpl::play_next()
// resume() or stop() from another thread while we were waiting on mutex.
if (!clock_->is_paused()) {
RCLCPP_WARN_STREAM(owner_->get_logger(), "Called play next, but not in paused state.");
print_progress_bar_status(PlayerStatus::RUNNING, true);
return false;
}
skip_message_in_main_play_loop_ = true;
Expand All @@ -741,7 +757,11 @@ bool PlayerImpl::play_next()
next_message_published = publish_message(message_ptr);
clock_->jump(message_ptr->recv_timestamp);

print_paused_status();
// Update the progress bar if we were not called in burst mode to avoid delays.
if (!is_burst_mode) {
recv_timestamp_last_published_ = message_ptr->recv_timestamp;
print_progress_bar_status(PlayerStatus::PAUSED);
}
message_queue_.pop();
message_ptr = peek_next_message_from_queue();
}
Expand All @@ -752,22 +772,23 @@ size_t PlayerImpl::burst(const size_t num_messages)
{
if (!clock_->is_paused()) {
RCLCPP_WARN_STREAM(owner_->get_logger(), "Burst can only be used when in the paused state.");
print_running_status();
print_progress_bar_status(PlayerStatus::RUNNING, true);
return 0;
}

uint64_t messages_played = 0;

for (auto ii = 0u; ii < num_messages || num_messages == 0; ++ii) {
if (play_next()) {
if (play_next(true)) {
++messages_played;
} else {
break;
}
}

RCLCPP_INFO_STREAM(owner_->get_logger(), "Burst " << messages_played << " messages.");
print_running_status();
print_progress_bar_status(clock_->is_paused() ?
PlayerStatus::PAUSED : PlayerStatus::RUNNING, true);
return messages_played;
}

Expand Down Expand Up @@ -993,7 +1014,8 @@ void PlayerImpl::play_messages_from_queue()
continue;
}
publish_message(message_ptr);
print_running_status();
recv_timestamp_last_published_ = message_ptr->recv_timestamp;
print_progress_bar_status(PlayerStatus::RUNNING);
}
message_queue_.pop();
message_ptr = peek_next_message_from_queue();
Expand Down Expand Up @@ -1565,48 +1587,57 @@ const rosbag2_transport::PlayOptions & PlayerImpl::get_play_options()
return play_options_;
}

template<char status>
void PlayerImpl::print_status() const
{
static_assert(status == 'R' || status == 'P' || status == 'D',
"Invalid status character");
double current_time_secs = RCUTILS_NS_TO_S(static_cast<double>(clock_->now()));
double progress_secs = current_time_secs - starting_time_secs_;
printf(" Bag Time %13.6f Duration %.6f/%.6f [%c] \r",
current_time_secs, progress_secs, duration_secs_, status);
fflush(stdout);
}

void PlayerImpl::check_and_print_status() const
void PlayerImpl::print_progress_bar_help_str(double progress_bar_print_frequency) const
{
if (!disable_progress_bar_) {
if (clock_->is_paused()) {
print_status<'P'>();
std::string progress_bar_help_str;
if (enable_progress_bar_) {
if (progress_bar_update_period_ < 0) {
progress_bar_help_str = "Progress bar enabled with updates for every message.";
} else {
print_status<'R'>();
std::ostringstream oss;
oss << "Progress bar enabled with updates at "
<< std::fixed << std::setprecision(1) << progress_bar_print_frequency << " Hz.";
progress_bar_help_str = oss.str();
}
progress_bar_help_str += " [?]: [R]unning, [P]aused, [D]elayed.";
} else {
progress_bar_help_str = "Progress bar disabled.";
}
RCLCPP_INFO_STREAM(owner_->get_logger(), progress_bar_help_str);
}

void PlayerImpl::print_running_status() const
void PlayerImpl::print_progress_bar_status(const PlayerStatus & status, bool force_update)
{
if (!disable_progress_bar_) {
print_status<'R'>();
if (!enable_progress_bar_) {
return;
}
}

void PlayerImpl::print_paused_status() const
{
if (!disable_progress_bar_) {
print_status<'P'>();
}
}
const std::chrono::steady_clock::time_point steady_time_now = std::chrono::steady_clock::now();

void PlayerImpl::print_delayed_status() const
{
if (!disable_progress_bar_) {
print_status<'D'>();
// Check if we should force an update of the progress bar, otherwise check time since last update.
// Force an update if:
// - the user-defined update period is negative;
// - or this function was called with a status other than RUNNING (since we want to update the
// progress bar when we pause or delay the playback, otherwise it will be stuck
// to the last update if not enough time has passed since it);
// - or this function was called with force_update set to true.
if (!(progress_bar_update_always_ || status != PlayerStatus::RUNNING || force_update)) {
if (std::chrono::duration_cast<std::chrono::nanoseconds>(
steady_time_now - progress_bar_last_time_printed_).count() < progress_bar_update_period_)
{
return;
}
}

const double current_time_secs = RCUTILS_NS_TO_S(
static_cast<double>(recv_timestamp_last_published_));
const double progress_secs = current_time_secs - starting_time_secs_;
// Print progress bar ending with carriage return '/r' so it will be overwritten by next print
std::cout << " Bag Time " << std::setw(13) << std::fixed << std::setprecision(6) <<
current_time_secs << " Duration " << std::fixed << std::setprecision(6) <<
progress_secs << "/" << duration_secs_ << " [" << static_cast<char>(status) <<
"]\r" << std::flush;
progress_bar_last_time_printed_ = steady_time_now;
}

///////////////////////////////
Expand Down

0 comments on commit 43fa0f8

Please sign in to comment.