Skip to content

Commit

Permalink
Merge branch 'main' into antoine/settings-screen
Browse files Browse the repository at this point in the history
  • Loading branch information
abey79 committed Nov 5, 2024
2 parents e0c13e2 + 8b2b7b3 commit 562e9d3
Showing 1 changed file with 116 additions and 56 deletions.
172 changes: 116 additions & 56 deletions crates/store/re_video/src/decode/ffmpeg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::{
collections::BTreeMap,
io::Write,
process::ChildStdin,
sync::{atomic::AtomicBool, Arc},
};

Expand All @@ -12,6 +12,7 @@ use ffmpeg_sidecar::{
command::FfmpegCommand,
event::{FfmpegEvent, LogLevel},
};
use parking_lot::Mutex;

use crate::Time;

Expand Down Expand Up @@ -87,6 +88,36 @@ enum FfmpegFrameData {
EndOfStream,
}

/// Wraps an stdin with a shared shutdown boolean.
struct StdinWithShutdown {
shutdown: Arc<AtomicBool>,
stdin: ChildStdin,
}

impl StdinWithShutdown {
// Don't use `std::io::ErrorKind::Interrupted` because it has special meaning for default implementations of the `Write` trait,
// causing it to continue.
const SHUTDOWN_ERROR_KIND: std::io::ErrorKind = std::io::ErrorKind::Other;
}

impl std::io::Write for StdinWithShutdown {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
if self.shutdown.load(std::sync::atomic::Ordering::Acquire) {
Err(std::io::Error::new(Self::SHUTDOWN_ERROR_KIND, "shutdown"))
} else {
self.stdin.write(buf)
}
}

fn flush(&mut self) -> std::io::Result<()> {
if self.shutdown.load(std::sync::atomic::Ordering::Acquire) {
Err(std::io::Error::new(Self::SHUTDOWN_ERROR_KIND, "shutdown"))
} else {
self.stdin.flush()
}
}
}

struct FfmpegProcessAndListener {
ffmpeg: FfmpegChild,

Expand All @@ -100,7 +131,10 @@ struct FfmpegProcessAndListener {
write_thread: Option<std::thread::JoinHandle<()>>,

/// If true, the write thread will not report errors. Used upon exit, so the write thread won't log spam on the hung up stdin.
suppress_write_error_reports: Arc<AtomicBool>,
stdin_shutdown: Arc<AtomicBool>,

/// On output instance used by the threads.
on_output: Arc<Mutex<Option<Arc<OutputCallback>>>>,
}

impl FfmpegProcessAndListener {
Expand Down Expand Up @@ -151,6 +185,12 @@ impl FfmpegProcessAndListener {
let (frame_info_tx, frame_info_rx) = crossbeam::channel::unbounded();
let (frame_data_tx, frame_data_rx) = crossbeam::channel::unbounded();

let stdin_shutdown = Arc::new(AtomicBool::new(false));

// Mutex protect `on_output` so that we can shut down the threads at a defined point in time at which we
// no longer receive any new frames or errors from this process.
let on_output = Arc::new(Mutex::new(Some(on_output)));

let listen_thread = std::thread::Builder::new()
.name(format!("ffmpeg-reader for {debug_name}"))
.spawn({
Expand All @@ -166,20 +206,21 @@ impl FfmpegProcessAndListener {
}
})
.expect("Failed to spawn ffmpeg listener thread");

let suppress_write_error_reports = Arc::new(AtomicBool::new(false));
let write_thread = std::thread::Builder::new()
.name(format!("ffmpeg-writer for {debug_name}"))
.spawn({
let on_output = on_output.clone();
let ffmpeg_stdin = ffmpeg.take_stdin().ok_or(Error::NoStdin)?;
let suppress_write_error_reports = suppress_write_error_reports.clone();
let mut ffmpeg_stdin = StdinWithShutdown {
stdin: ffmpeg_stdin,
shutdown: stdin_shutdown.clone(),
};
move || {
write_ffmpeg_input(
ffmpeg_stdin,
&mut ffmpeg_stdin,
&frame_data_rx,
on_output.as_ref(),
&avcc,
&suppress_write_error_reports,
);
}
})
Expand All @@ -191,38 +232,65 @@ impl FfmpegProcessAndListener {
frame_data_tx,
listen_thread: Some(listen_thread),
write_thread: Some(write_thread),
suppress_write_error_reports,
stdin_shutdown,
on_output,
})
}
}

impl Drop for FfmpegProcessAndListener {
fn drop(&mut self) {
re_tracing::profile_function!();
self.suppress_write_error_reports
.store(true, std::sync::atomic::Ordering::Relaxed);

// Stop all outputs from being written to - any attempt from here on out will fail and cause thread shutdown.
// This way, we ensure all ongoing writes are finished and won't get any more on_output callbacks from this process
// before we take any other action on the shutdown sequence.
{
self.on_output.lock().take();
}

// Notify (potentially wake up) the stdin write thread to stop it (it might be sleeping).
self.frame_data_tx.send(FfmpegFrameData::EndOfStream).ok();
// Kill stdin for the write thread. This helps cancelling ongoing stream write operations.
self.stdin_shutdown
.store(true, std::sync::atomic::Ordering::Release);

// Kill the ffmpeg process itself.
// This should wake up the listen thread if it is sleeping, but that may take a while.
self.ffmpeg.kill().ok();

if let Some(write_thread) = self.write_thread.take() {
if write_thread.join().is_err() {
re_log::error!("Failed to join ffmpeg listener thread.");
// Unfortunately, even with the above measures, it can still happen that the listen threads take occasionally 100ms and more to shut down.
// (very much depending on the system & OS, typical times may be low with large outliers)
// It is crucial that the threads come down eventually and rather timely so to avoid leaking resources.
// However, in order to avoid stalls, we'll let them finish in parallel.
//
// Since we disconnected the `on_output` callback from them, they won't influence any new instances.
if false {
{
re_tracing::profile_scope!("shutdown write thread");
if let Some(write_thread) = self.write_thread.take() {
if write_thread.join().is_err() {
re_log::error!("Failed to join ffmpeg listener thread.");
}
}
}
}
if let Some(listen_thread) = self.listen_thread.take() {
if listen_thread.join().is_err() {
re_log::error!("Failed to join ffmpeg listener thread.");
{
re_tracing::profile_scope!("shutdown listen thread");
if let Some(listen_thread) = self.listen_thread.take() {
if listen_thread.join().is_err() {
re_log::error!("Failed to join ffmpeg listener thread.");
}
}
}
}
}
}

fn write_ffmpeg_input(
mut ffmpeg_stdin: std::process::ChildStdin,
ffmpeg_stdin: &mut dyn std::io::Write,
frame_data_rx: &Receiver<FfmpegFrameData>,
on_output: &OutputCallback,
on_output: &Mutex<Option<Arc<OutputCallback>>>,
avcc: &re_mp4::Avc1Box,
suppress_write_error_reports: &AtomicBool,
) {
let mut state = NaluStreamState::default();

Expand All @@ -232,19 +300,18 @@ fn write_ffmpeg_input(
FfmpegFrameData::EndOfStream => break,
};

if let Err(err) =
write_avc_chunk_to_nalu_stream(avcc, &mut ffmpeg_stdin, &chunk, &mut state)
{
let write_error = matches!(err, Error::FailedToWriteToFfmpeg(_));
if !write_error
|| !suppress_write_error_reports.load(std::sync::atomic::Ordering::Relaxed)
{
(on_output)(Err(err.into()));
}
if let Err(err) = write_avc_chunk_to_nalu_stream(avcc, ffmpeg_stdin, &chunk, &mut state) {
let on_output = on_output.lock();
if let Some(on_output) = on_output.as_ref() {
let write_error = matches!(err, Error::FailedToWriteToFfmpeg(_));
on_output(Err(err.into()));

// This is unlikely to improve! Ffmpeg process likely died.
// By exiting here we hang up on the channel, making future attempts to push into it fail which should cause a reset eventually.
if write_error {
if write_error {
// This is unlikely to improve! Ffmpeg process likely died.
// By exiting here we hang up on the channel, making future attempts to push into it fail which should cause a reset eventually.
return;
}
} else {
return;
}
} else {
Expand All @@ -257,8 +324,8 @@ fn read_ffmpeg_output(
debug_name: &str,
ffmpeg_iterator: ffmpeg_sidecar::iter::FfmpegIterator,
frame_info_rx: &Receiver<FfmpegFrameInfo>,
on_output: &OutputCallback,
) {
on_output: &Mutex<Option<Arc<OutputCallback>>>,
) -> Option<()> {
/// Ignore some common output from ffmpeg:
fn should_ignore_log_msg(msg: &str) -> bool {
let patterns = [
Expand Down Expand Up @@ -310,19 +377,18 @@ fn read_ffmpeg_output(
}

FfmpegEvent::Log(LogLevel::Error, msg) => {
on_output(Err(Error::Ffmpeg(msg).into()));
(on_output.lock().as_ref()?)(Err(Error::Ffmpeg(msg).into()));
}

FfmpegEvent::Log(LogLevel::Fatal, msg) => {
on_output(Err(Error::FfmpegFatal(msg).into()));
return;
(on_output.lock().as_ref()?)(Err(Error::FfmpegFatal(msg).into()));
}

FfmpegEvent::Log(LogLevel::Unknown, msg) => {
if msg.contains("system signals, hard exiting") {
// That was probably us, killing the process.
re_log::debug!("FFmpeg process for {debug_name} was killed");
return;
return None;
}
if !should_ignore_log_msg(&msg) {
re_log::warn_once!("{debug_name} decoder: {msg}");
Expand All @@ -336,7 +402,7 @@ fn read_ffmpeg_output(

FfmpegEvent::Error(error) => {
// An error in ffmpeg sidecar itself, rather than ffmpeg.
on_output(Err(Error::FfmpegSidecar(error).into()));
(on_output.lock().as_ref()?)(Err(Error::FfmpegSidecar(error).into()));
}

FfmpegEvent::ParsedInput(input) => {
Expand Down Expand Up @@ -423,7 +489,7 @@ fn read_ffmpeg_output(
re_log::debug!(
"{debug_name} ffmpeg decoder frame info channel disconnected"
);
return;
return None;
};

// If the decodetimestamp did not increase, we're probably seeking backwards!
Expand Down Expand Up @@ -458,7 +524,7 @@ fn read_ffmpeg_output(
debug_assert_eq!(pix_fmt, "rgb24");
debug_assert_eq!(width as usize * height as usize * 3, data.len());

on_output(Ok(super::Frame {
(on_output.lock().as_ref()?)(Ok(super::Frame {
content: super::FrameContent {
data,
width,
Expand All @@ -476,7 +542,7 @@ fn read_ffmpeg_output(
FfmpegEvent::Done => {
// This happens on `pkill ffmpeg`, for instance.
re_log::debug!("{debug_name}'s ffmpeg is Done");
return;
return None;
}

FfmpegEvent::ParsedVersion(ffmpeg_version) => {
Expand All @@ -497,11 +563,13 @@ fn read_ffmpeg_output(
FfmpegEvent::OutputChunk(_) => {
// Something went seriously wrong if we end up here.
re_log::error!("Unexpected ffmpeg output chunk for {debug_name}");
on_output(Err(Error::UnexpectedFfmpegOutputChunk.into()));
return;
(on_output.lock().as_ref()?)(Err(Error::UnexpectedFfmpegOutputChunk.into()));
return None;
}
}
}

Some(())
}

/// Decode H.264 video via ffmpeg over CLI
Expand Down Expand Up @@ -606,20 +674,12 @@ fn write_avc_chunk_to_nalu_stream(
// Otherwise the decoder is not able to get the necessary information about how the video stream is encoded.
if chunk.is_sync && !state.previous_frame_was_idr {
for sps in &avcc.sequence_parameter_sets {
nalu_stream
.write_all(NAL_START_CODE)
.map_err(Error::FailedToWriteToFfmpeg)?;
nalu_stream
.write_all(&sps.bytes)
.map_err(Error::FailedToWriteToFfmpeg)?;
write_bytes(nalu_stream, NAL_START_CODE)?;
write_bytes(nalu_stream, &sps.bytes)?;
}
for pps in &avcc.picture_parameter_sets {
nalu_stream
.write_all(NAL_START_CODE)
.map_err(Error::FailedToWriteToFfmpeg)?;
nalu_stream
.write_all(&pps.bytes)
.map_err(Error::FailedToWriteToFfmpeg)?;
write_bytes(nalu_stream, NAL_START_CODE)?;
write_bytes(nalu_stream, &pps.bytes)?;
}
state.previous_frame_was_idr = true;
} else {
Expand Down

0 comments on commit 562e9d3

Please sign in to comment.