Skip to content

Commit

Permalink
Merge pull request RustAudio#5 from mitchmindtree/timestamp_alsa
Browse files Browse the repository at this point in the history
WIP - Add ALSA timestamp implementation.
  • Loading branch information
mitchmindtree authored Apr 29, 2020
2 parents 777a6b2 + 163f0cc commit 9a00961
Showing 1 changed file with 116 additions and 23 deletions.
139 changes: 116 additions & 23 deletions src/host/alsa/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
PlayStreamError, SampleFormat, SampleRate, StreamConfig, StreamError, SupportedStreamConfig,
SupportedStreamConfigRange, SupportedStreamConfigsError,
};
use std::convert::TryInto;
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::vec::IntoIter as VecIntoIter;
Expand Down Expand Up @@ -202,16 +203,25 @@ impl Device {
num_descriptors
};

// Check to see if we can retrieve valid timestamps from the device.
// Related: https://bugs.freedesktop.org/show_bug.cgi?id=88503
let ts = handle.status()?.get_htstamp();
let creation_instant = match (ts.tv_sec, ts.tv_nsec) {
(0, 0) => Some(std::time::Instant::now()),
_ => None,
};

handle.start()?;

let stream_inner = StreamInner {
channel: handle,
sample_format,
num_descriptors,
num_channels: conf.channels as u16,
conf: conf.clone(),
buffer_len,
period_len,
can_pause,
creation_instant,
};

Ok(stream_inner)
Expand Down Expand Up @@ -421,8 +431,8 @@ struct StreamInner {
// Format of the samples.
sample_format: SampleFormat,

// Number of channels, ie. number of samples per frame.
num_channels: u16,
// The configuration used to open this stream.
conf: StreamConfig,

// Number of samples that can fit in the buffer.
buffer_len: usize,
Expand All @@ -432,6 +442,16 @@ struct StreamInner {

// Whether or not the hardware supports pausing the stream.
can_pause: bool,

// In the case that the device does not return valid timestamps via `get_htstamp`, this field
// will be `Some` and will contain an `Instant` representing the moment the stream was created.
//
// If this field is `Some`, then the stream will use the duration since this instant as a
// source for timestamps.
//
// If this field is `None` then the elapsed duration between `get_trigger_htstamp` and
// `get_htstamp` is used.
creation_instant: Option<std::time::Instant>,
}

// Assume that the ALSA library is built with thread safe option.
Expand Down Expand Up @@ -479,18 +499,17 @@ fn input_stream_worker(
PollDescriptorsFlow::Continue => continue,
PollDescriptorsFlow::Return => return,
PollDescriptorsFlow::Ready {
available_frames: _,
avail_frames: _,
delay_frames,
stream_type,
} => {
assert_eq!(
stream_type,
StreamType::Input,
"expected input stream, but polling descriptors indicated output",
);
report_error(
process_input(stream, &mut ctxt.buffer, data_callback),
error_callback,
);
let res = process_input(stream, &mut ctxt.buffer, delay_frames, data_callback);
report_error(res, error_callback);
}
}
}
Expand All @@ -514,21 +533,24 @@ fn output_stream_worker(
PollDescriptorsFlow::Continue => continue,
PollDescriptorsFlow::Return => return,
PollDescriptorsFlow::Ready {
available_frames,
avail_frames,
delay_frames,
stream_type,
} => {
assert_eq!(
stream_type,
StreamType::Output,
"expected output stream, but polling descriptors indicated input",
);
process_output(
let res = process_output(
stream,
&mut ctxt.buffer,
available_frames,
avail_frames,
delay_frames,
data_callback,
error_callback,
);
report_error(res, error_callback);
}
}
}
Expand All @@ -555,7 +577,8 @@ enum PollDescriptorsFlow {
Return,
Ready {
stream_type: StreamType,
available_frames: usize,
avail_frames: usize,
delay_frames: usize,
},
}

Expand Down Expand Up @@ -614,7 +637,8 @@ fn poll_descriptors_and_prepare_buffer(
}
};
// Get the number of available samples for reading/writing.
let available_samples = get_available_samples(stream)?;
let (avail_frames, delay_frames) = get_avail_delay(stream)?;
let available_samples = avail_frames * stream.conf.channels as usize;

// Only go on if there is at least `stream.period_len` samples.
if available_samples < stream.period_len {
Expand All @@ -624,26 +648,33 @@ fn poll_descriptors_and_prepare_buffer(
// Prepare the data buffer.
let buffer_size = stream.sample_format.sample_size() * available_samples;
buffer.resize(buffer_size, 0u8);
let available_frames = available_samples / stream.num_channels as usize;

Ok(PollDescriptorsFlow::Ready {
stream_type,
available_frames,
avail_frames,
delay_frames,
})
}

// Read input data from ALSA and deliver it to the user.
fn process_input(
stream: &StreamInner,
buffer: &mut [u8],
delay_frames: usize,
data_callback: &mut (dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static),
) -> Result<(), BackendSpecificError> {
stream.channel.io().readi(buffer)?;
let sample_format = stream.sample_format;
let data = buffer.as_mut_ptr() as *mut ();
let len = buffer.len() / sample_format.sample_size();
let data = unsafe { Data::from_parts(data, len, sample_format) };
let info = crate::InputCallbackInfo {};
let callback = stream_timestamp(stream)?;
let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate);
let capture = callback
.sub(delay_duration)
.expect("`capture` is earlier than representation supported by `StreamInstant`");
let timestamp = crate::InputStreamTimestamp { callback, capture };
let info = crate::InputCallbackInfo { timestamp };
data_callback(&data, &info);

Ok(())
Expand All @@ -656,16 +687,23 @@ fn process_output(
stream: &StreamInner,
buffer: &mut [u8],
available_frames: usize,
delay_frames: usize,
data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static),
error_callback: &mut dyn FnMut(StreamError),
) {
) -> Result<(), BackendSpecificError> {
{
// We're now sure that we're ready to write data.
let sample_format = stream.sample_format;
let data = buffer.as_mut_ptr() as *mut ();
let len = buffer.len() / sample_format.sample_size();
let mut data = unsafe { Data::from_parts(data, len, sample_format) };
let info = crate::OutputCallbackInfo {};
let callback = stream_timestamp(stream)?;
let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate);
let playback = callback
.add(delay_duration)
.expect("`playback` occurs beyond representation supported by `StreamInstant`");
let timestamp = crate::OutputStreamTimestamp { callback, playback };
let info = crate::OutputCallbackInfo { timestamp };
data_callback(&mut data, &info);
}
loop {
Expand Down Expand Up @@ -693,6 +731,56 @@ fn process_output(
}
}
}
Ok(())
}

// Use the elapsed duration since the start of the stream.
//
// This ensures positive values that are compatible with our `StreamInstant` representation.
fn stream_timestamp(stream: &StreamInner) -> Result<crate::StreamInstant, BackendSpecificError> {
match stream.creation_instant {
None => {
let status = stream.channel.status()?;
let trigger_ts = status.get_trigger_htstamp();
let ts = status.get_htstamp();
let nanos = timespec_diff_nanos(ts, trigger_ts)
.try_into()
.unwrap_or_else(|_| {
panic!(
"get_htstamp `{:?}` was earlier than get_trigger_htstamp `{:?}`",
ts, trigger_ts
);
});
Ok(crate::StreamInstant::from_nanos(nanos))
}
Some(creation) => {
let now = std::time::Instant::now();
let duration = now.duration_since(creation);
let instant = crate::StreamInstant::from_nanos_i128(duration.as_nanos() as i128)
.expect("stream duration has exceeded `StreamInstant` representation");
Ok(instant)
}
}
}

// Adapted from `timestamp2ns` here:
// https://fossies.org/linux/alsa-lib/test/audio_time.c
fn timespec_to_nanos(ts: libc::timespec) -> i64 {
ts.tv_sec as i64 * 1_000_000_000 + ts.tv_nsec
}

// Adapted from `timediff` here:
// https://fossies.org/linux/alsa-lib/test/audio_time.c
fn timespec_diff_nanos(a: libc::timespec, b: libc::timespec) -> i64 {
timespec_to_nanos(a) - timespec_to_nanos(b)
}

// Convert the given duration in frames at the given sample rate to a `std::time::Duration`.
fn frames_to_duration(frames: usize, rate: crate::SampleRate) -> std::time::Duration {
let secsf = frames as f64 / rate.0 as f64;
let secs = secsf as u64;
let nanos = ((secsf - secs as f64) * 1_000_000_000.0) as u32;
std::time::Duration::new(secs, nanos)
}

impl Stream {
Expand Down Expand Up @@ -759,16 +847,16 @@ impl StreamTrait for Stream {
}
}

// Determine the number of samples that are available to read/write.
fn get_available_samples(stream: &StreamInner) -> Result<usize, BackendSpecificError> {
match stream.channel.avail_update() {
// Determine the number of frames that are available to read/write along with the latency.
fn get_avail_delay(stream: &StreamInner) -> Result<(usize, usize), BackendSpecificError> {
match stream.channel.avail_delay() {
Err(err) if err.errno() == Some(nix::errno::Errno::EPIPE) => {
// buffer underrun
// TODO: Notify the user some how.
Ok(stream.buffer_len)
Ok((stream.buffer_len, 0))
}
Err(err) => Err(err.into()),
Ok(available) => Ok(available as usize * stream.num_channels as usize),
Ok((avail, delay)) => Ok((avail as usize, delay as usize)),
}
}

Expand Down Expand Up @@ -830,6 +918,11 @@ fn set_sw_params_from_format(
(buffer, period)
};

sw_params.set_tstamp_mode(true)?;
// TODO:
// Pending new version of alsa-sys and alsa-rs getting published.
// sw_params.set_tstamp_type(alsa::pcm::TstampType::MonotonicRaw)?;

pcm_handle.sw_params(&sw_params)?;

Ok((buffer_len, period_len))
Expand Down

0 comments on commit 9a00961

Please sign in to comment.