Skip to content

Commit

Permalink
RAII cleanup wthin the ASIO backend
Browse files Browse the repository at this point in the history
- Shares the `Device`'s `driver` and `asio_streams` `Arc`s with the
`Stream`s to ensure they remain valid if the `Host` or `Device` are
dropped early.
- Ensures that a stream's callback is removed upon `Drop`.
  • Loading branch information
mitchmindtree committed Dec 31, 2019
1 parent 972cce0 commit 39ade49
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 13 deletions.
29 changes: 22 additions & 7 deletions asio-sys/src/bindings/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ struct BufferSizes {
grans: c_long,
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct CallbackId(usize);

lazy_static! {
/// A global way to access all the callbacks.
///
Expand All @@ -244,7 +247,7 @@ lazy_static! {
/// Options are used so that when a callback is removed we don't change the Vec indices.
///
/// The indices are how we match a callback with a stream.
static ref BUFFER_CALLBACK: Mutex<Vec<Option<BufferCallback>>> = Mutex::new(Vec::new());
static ref BUFFER_CALLBACK: Mutex<Vec<(CallbackId, BufferCallback)>> = Mutex::new(Vec::new());
}

impl Asio {
Expand Down Expand Up @@ -589,12 +592,26 @@ impl Driver {
/// Adds a callback to the list of active callbacks.
///
/// The given function receives the index of the buffer currently ready for processing.
pub fn set_callback<F>(&self, callback: F)
///
/// Returns an ID uniquely associated with the given callback so that it may be removed later.
pub fn add_callback<F>(&self, callback: F) -> CallbackId
where
F: 'static + FnMut(i32) + Send,
{
let mut bc = BUFFER_CALLBACK.lock().unwrap();
bc.push(Some(BufferCallback(Box::new(callback))));
let id = bc
.last()
.map(|&(id, _)| CallbackId(id.0.checked_add(1).expect("stream ID overflowed")))
.unwrap_or(CallbackId(0));
let cb = BufferCallback(Box::new(callback));
bc.push((id, cb));
id
}

/// Remove the callback with the given ID.
pub fn remove_callback(&self, rem_id: CallbackId) {
let mut bc = BUFFER_CALLBACK.lock().unwrap();
bc.retain(|&(id, _)| id != rem_id);
}

/// Consumes and destroys the `Driver`, stopping the streams if they are running and releasing
Expand Down Expand Up @@ -863,10 +880,8 @@ extern "C" fn buffer_switch_time_info(
) -> *mut ai::ASIOTime {
// This lock is probably unavoidable, but locks in the audio stream are not great.
let mut bcs = BUFFER_CALLBACK.lock().unwrap();
for mut bc in bcs.iter_mut() {
if let Some(ref mut bc) = bc {
bc.run(double_buffer_index);
}
for &mut (_, ref mut bc) in bcs.iter_mut() {
bc.run(double_buffer_index);
}
time
}
Expand Down
37 changes: 31 additions & 6 deletions src/host/asio/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use super::Device;
use std;
use std::sync::atomic::{Ordering, AtomicBool};
use std::sync::Arc;
use super::parking_lot::Mutex;
use BackendSpecificError;
use BuildStreamError;
use Format;
Expand Down Expand Up @@ -41,6 +42,10 @@ struct SilenceAsioBuffer {

pub struct Stream {
playing: Arc<AtomicBool>,
// Ensure the `Driver` does not terminate until the last stream is dropped.
driver: Arc<sys::Driver>,
asio_streams: Arc<Mutex<sys::AsioStreams>>,
callback_id: sys::CallbackId,
}

impl Stream {
Expand All @@ -55,8 +60,6 @@ impl Stream {
}
}

// TODO: drop implementation

impl Device {
pub fn build_input_stream<D, E>(
&self,
Expand Down Expand Up @@ -91,7 +94,7 @@ impl Device {

// Set the input callback.
// This is most performance critical part of the ASIO bindings.
self.driver.set_callback(move |buffer_index| unsafe {
let callback_id = self.driver.add_callback(move |buffer_index| unsafe {
// If not playing return early.
if !playing.load(Ordering::SeqCst) {
return
Expand Down Expand Up @@ -217,10 +220,18 @@ impl Device {
}
});

let driver = self.driver.clone();
let asio_streams = self.asio_streams.clone();

// Immediately start the device?
self.driver.start().map_err(build_stream_err)?;

Ok(Stream { playing: stream_playing })
Ok(Stream {
playing: stream_playing,
driver,
asio_streams,
callback_id,
})
}

pub fn build_output_stream<D, E>(
Expand Down Expand Up @@ -255,7 +266,7 @@ impl Device {
let playing = Arc::clone(&stream_playing);
let asio_streams = self.asio_streams.clone();

self.driver.set_callback(move |buffer_index| unsafe {
let callback_id = self.driver.add_callback(move |buffer_index| unsafe {
// If not playing, return early.
if !playing.load(Ordering::SeqCst) {
return
Expand Down Expand Up @@ -421,10 +432,18 @@ impl Device {
}
});

let driver = self.driver.clone();
let asio_streams = self.asio_streams.clone();

// Immediately start the device?
self.driver.start().map_err(build_stream_err)?;

Ok(Stream { playing: stream_playing })
Ok(Stream {
playing: stream_playing,
driver,
asio_streams,
callback_id,
})
}

/// Create a new CPAL Input Stream.
Expand Down Expand Up @@ -508,6 +527,12 @@ impl Device {
}
}

impl Drop for Stream {
fn drop(&mut self) {
self.driver.remove_callback(self.callback_id);
}
}

impl Silence for i16 {
const SILENCE: Self = 0;
}
Expand Down

0 comments on commit 39ade49

Please sign in to comment.