Skip to content

Commit

Permalink
Refactor native video decoder for code reuse (rerun-io#7641)
Browse files Browse the repository at this point in the history
### What
* Part of rerun-io#7606 

This refactors the native decoder so that we can easily slot in a
different native decoder (ffmpeg!).

The innermost trait is now `SyncDecoder` which gets pushed a chunk, and
blocks while producing frames (or errors). Around that is an
`AsyncDecoder` that runs the `SyncDecoder` on a background thread to
produce a non-blocking interface.
Finally in `re_renderer` there is the `NativeDecoder` that wraps
`SyncDecoder` and handles texture uploads.

There is a lot of code moved, but very little code actually _changed_.
It's just another layer of abstraction introduced.

### Checklist
* [x] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)
* [x] I've included a screenshot or gif (if applicable)
* [x] I have tested the web demo (if applicable):
* Using examples from latest `main` build:
[rerun.io/viewer](https://rerun.io/viewer/pr/7641?manifest_url=https://app.rerun.io/version/main/examples_manifest.json)
* Using full set of examples from `nightly` build:
[rerun.io/viewer](https://rerun.io/viewer/pr/7641?manifest_url=https://app.rerun.io/version/nightly/examples_manifest.json)
* [x] The PR title and labels are set such as to maximize their
usefulness for the next release's CHANGELOG
* [x] If applicable, add a new check to the [release
checklist](https://github.com/rerun-io/rerun/blob/main/tests/python/release_checklist)!
* [x] If have noted any breaking changes to the log API in
`CHANGELOG.md` and the migration guide

- [PR Build Summary](https://build.rerun.io/pr/7641)
- [Recent benchmark results](https://build.rerun.io/graphs/crates.html)
- [Wasm size tracking](https://build.rerun.io/graphs/sizes.html)

To run all checks from `main`, comment on the PR with `@rerun-bot
full-check`.
  • Loading branch information
emilk authored Oct 8, 2024
1 parent 46e0b3b commit d90a697
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 199 deletions.
10 changes: 8 additions & 2 deletions crates/store/re_video/examples/frames.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ fn main() {
let video = std::fs::read(video_path).expect("failed to read video");
let video = re_video::VideoData::load_mp4(&video).expect("failed to load video");

let sync_decoder = Box::new(
re_video::decode::av1::SyncDav1dDecoder::new().expect("Failed to start AV1 decoder"),
);

println!(
"{} {}x{}",
video.gops.len(),
Expand All @@ -38,14 +42,16 @@ fn main() {
progress.enable_steady_tick(Duration::from_millis(100));

let frames = Arc::new(Mutex::new(Vec::new()));
let mut decoder = re_video::decode::av1::Decoder::new("debug_name".to_owned(), {
let on_output = {
let frames = frames.clone();
let progress = progress.clone();
move |frame| {
progress.inc(1);
frames.lock().push(frame);
}
});
};
let mut decoder =
re_video::decode::AsyncDecoder::new("debug_name".to_owned(), sync_decoder, on_output);

let start = Instant::now();
for sample in &video.samples {
Expand Down
163 changes: 163 additions & 0 deletions crates/store/re_video/src/decode/async_decoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
use std::sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
};

use crossbeam::channel::{unbounded, Receiver, Sender};

use super::{Chunk, Frame, OutputCallback, Result, SyncDecoder};

enum Command {
Chunk(Chunk),
Flush { on_done: Sender<()> },
Reset,
Stop,
}

#[derive(Clone)]
struct Comms {
/// Set when it is time to die
should_stop: Arc<AtomicBool>,

/// Incremented on each call to [`AsyncDecoder::reset`].
/// Decremented each time the decoder thread receives [`Command::Reset`].
num_outstanding_resets: Arc<AtomicU64>,
}

impl Default for Comms {
fn default() -> Self {
Self {
should_stop: Arc::new(AtomicBool::new(false)),
num_outstanding_resets: Arc::new(AtomicU64::new(0)),
}
}
}

/// Runs a [`SyncDecoder`] in a background thread, for non-blocking video decoding.
pub struct AsyncDecoder {
/// Where the decoding happens
_thread: std::thread::JoinHandle<()>,

/// Commands sent to the decoder thread.
command_tx: Sender<Command>,

/// Instant communication to the decoder thread (circumventing the command queue).
comms: Comms,
}

impl AsyncDecoder {
pub fn new(
debug_name: String,
mut sync_decoder: Box<dyn SyncDecoder + Send>,
on_output: impl Fn(Result<Frame>) + Send + Sync + 'static,
) -> Self {
re_tracing::profile_function!();

let (command_tx, command_rx) = unbounded();
let comms = Comms::default();

let thread = std::thread::Builder::new()
.name("av1_decoder".into())
.spawn({
let comms = comms.clone();
move || {
econtext::econtext_data!("Video", debug_name.clone());

decoder_thread(sync_decoder.as_mut(), &comms, &command_rx, &on_output);
re_log::debug!("Closing decoder thread for {debug_name}");
}
})
.expect("failed to spawn decoder thread");

Self {
_thread: thread,
command_tx,
comms,
}
}

// NOTE: The interface is all `&mut self` to avoid certain types of races.
pub fn decode(&mut self, chunk: Chunk) {
re_tracing::profile_function!();
self.command_tx.send(Command::Chunk(chunk)).ok();
}

/// Resets the decoder.
///
/// This does not block, all chunks sent to `decode` before this point will be discarded.
// NOTE: The interface is all `&mut self` to avoid certain types of races.
pub fn reset(&mut self) {
re_tracing::profile_function!();

// Increment resets first…
self.comms
.num_outstanding_resets
.fetch_add(1, Ordering::Release);

// …so it is visible on the decoder thread when it gets the `Reset` command.
self.command_tx.send(Command::Reset).ok();
}

/// Blocks until all pending frames have been decoded.
// NOTE: The interface is all `&mut self` to avoid certain types of races.
pub fn flush(&mut self) {
re_tracing::profile_function!();
let (tx, rx) = crossbeam::channel::bounded(0);
self.command_tx.send(Command::Flush { on_done: tx }).ok();
rx.recv().ok();
}
}

impl Drop for AsyncDecoder {
fn drop(&mut self) {
re_tracing::profile_function!();

// Set `should_stop` first…
self.comms.should_stop.store(true, Ordering::Release);

// …so it is visible on the decoder thread when it gets the `Stop` command.
self.command_tx.send(Command::Stop).ok();

// NOTE: we don't block here. The decoder thread will finish soon enough.
}
}

fn decoder_thread(
decoder: &mut dyn SyncDecoder,
comms: &Comms,
command_rx: &Receiver<Command>,
on_output: &OutputCallback,
) {
#![allow(clippy::debug_assert_with_mut_call)]

while let Ok(command) = command_rx.recv() {
if comms.should_stop.load(Ordering::Acquire) {
re_log::debug!("Should stop");
return;
}

// If we're waiting for a reset we should ignore all other commands until we receive it.
let has_outstanding_reset = 0 < comms.num_outstanding_resets.load(Ordering::Acquire);

match command {
Command::Chunk(chunk) => {
if !has_outstanding_reset {
decoder.submit_chunk(&comms.should_stop, chunk, on_output);
}
}
Command::Flush { on_done } => {
on_done.send(()).ok();
}
Command::Reset => {
decoder.reset();
comms.num_outstanding_resets.fetch_sub(1, Ordering::Release);
}
Command::Stop => {
re_log::debug!("Stop");
return;
}
}
}

re_log::debug!("Disconnected");
}
Loading

0 comments on commit d90a697

Please sign in to comment.