Skip to content

Commit

Permalink
flydra2: write CSV files in own thread
Browse files Browse the repository at this point in the history
The `csv` crate has (blocking) synchronous interface but we should not
block on IO an async task. Thus, we spawn a separate thread and do the
blocking IO in this separate thread.
  • Loading branch information
astraw committed Aug 7, 2023
1 parent 822dbdc commit 1585cb4
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 42 deletions.
2 changes: 1 addition & 1 deletion braid-offline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ where
let (writer_jh, r2) = tokio::join!(consume_future, reader_local_future);

writer_jh
.await
.join()
.expect("finish writer task 1")
.expect("finish writer task 2");
r2.expect("finish reader task");
Expand Down
2 changes: 1 addition & 1 deletion braid/braid-run/src/mainbrain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1253,7 +1253,7 @@ pub async fn run(phase1: StartupPhase1) -> Result<()> {

// Allow writer task time to finish writing.
writer_jh
.await
.join()
.expect("join writer task 1")
.expect("join writer task 2");

Expand Down
30 changes: 15 additions & 15 deletions flydra2/src/flydra2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ pub struct CoordProcessor {
pub cam_manager: ConnectedCamerasManager,
pub recon: Option<flydra_mvg::FlydraMultiCameraSystem<MyFloat>>, // TODO? keep reference
pub braidz_write_tx: tokio::sync::mpsc::Sender<SaveToDiskMsg>,
pub writer_join_handle: tokio::task::JoinHandle<Result<()>>,
pub writer_join_handle: std::thread::JoinHandle<Result<()>>,
model_servers: Vec<tokio::sync::mpsc::Sender<(SendType, TimeDataPassthrough)>>,
tracking_params: Arc<TrackingParams>,
/// Images of the "mini arenas" in use.
Expand Down Expand Up @@ -847,19 +847,19 @@ impl CoordProcessor {

let (braidz_write_tx, braidz_write_rx) = tokio::sync::mpsc::channel(10);

let braidz_write_rx = tokio_stream::wrappers::ReceiverStream::new(braidz_write_rx);
let braidz_write_rx = valve.wrap(braidz_write_rx);

let writer_future = writer_task_main(
braidz_write_rx,
cam_manager2,
recon2,
tracking_params2,
save_empty_data2d,
metadata_builder,
ignore_latency,
);
let writer_join_handle = handle.spawn(writer_future);
let writer_join_handle = std::thread::Builder::new()
.name("writer_task_main".to_string())
.spawn(move || {
writer_task_main(
braidz_write_rx,
cam_manager2,
recon2,
tracking_params2,
save_empty_data2d,
metadata_builder,
ignore_latency,
)
})?;

Ok(Self {
cam_manager,
Expand Down Expand Up @@ -921,7 +921,7 @@ impl CoordProcessor {
mut self,
frame_data_rx: S,
expected_framerate: Option<f32>,
) -> tokio::task::JoinHandle<Result<()>>
) -> std::thread::JoinHandle<Result<()>>
where
S: 'static + Send + futures::stream::Stream<Item = StreamItem> + std::fmt::Debug + Unpin,
{
Expand Down
42 changes: 18 additions & 24 deletions flydra2/src/write_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct WritingState {

reconstruction_latency_usec: Option<HistogramWritingState>,
reproj_dist_pixels: Option<HistogramWritingState>,
last_flush: std::time::Instant,
}

fn _test_writing_state_is_send() {
Expand Down Expand Up @@ -337,6 +338,7 @@ impl WritingState {
file_start_time,
reconstruction_latency_usec,
reproj_dist_pixels,
last_flush: std::time::Instant::now(),
})
}

Expand All @@ -359,6 +361,7 @@ impl WritingState {
self.textlog_wtr.flush()?;
self.trigger_clock_info_wtr.flush()?;
self.experiment_info_wtr.flush()?;
self.last_flush = std::time::Instant::now();
Ok(())
}
}
Expand Down Expand Up @@ -473,10 +476,8 @@ impl Drop for WritingState {
}

#[tracing::instrument]
pub(crate) async fn writer_task_main(
mut braidz_write_rx: stream_cancel::Valved<
tokio_stream::wrappers::ReceiverStream<SaveToDiskMsg>,
>,
pub(crate) fn writer_task_main(
mut braidz_write_rx: tokio::sync::mpsc::Receiver<SaveToDiskMsg>,
cam_manager: ConnectedCamerasManager,
recon: Option<flydra_mvg::FlydraMultiCameraSystem<MyFloat>>,
tracking_params: Arc<TrackingParams>,
Expand All @@ -492,22 +493,13 @@ pub(crate) async fn writer_task_main(
const FLUSH_INTERVAL: u64 = 1;
let flush_interval = Duration::from_secs(FLUSH_INTERVAL);

let mut flush_tick = tokio::time::interval(flush_interval);

use futures::stream::StreamExt;

tracing::debug!("Starting braidz writer task. {}:{}", file!(), line!());

loop {
tokio::select! {
opt_msg = braidz_write_rx.next() => {
if opt_msg.is_none() {
// sender disconnected. we can quit too.
// We rely on `writing_state.drop()` to flush and close
// everything.
break;
}
let msg = opt_msg.unwrap();
// TODO: improve flushing. Specifically, if we block for a long time
// without receiving a message here, we will not flush to disk.
match braidz_write_rx.blocking_recv() {
Some(msg) => {
match msg {
KalmanEstimate(ke) => {
let KalmanEstimateRecord {
Expand Down Expand Up @@ -640,15 +632,17 @@ pub(crate) async fn writer_task_main(
}
// simply drop data if no file opened
}
};

}
_ = flush_tick.tick() => {
// flush all writers
if let Some(ref mut ws) = writing_state {
ws.flush_all()?;
}
}
None => {
break;
}
}

if let Some(ref mut ws) = writing_state {
if ws.last_flush.elapsed() > flush_interval {
ws.flush_all()?;
}
}
}
tracing::info!("Done with braidz writer task.");
Expand Down
2 changes: 1 addition & 1 deletion strand-cam/src/strand-cam.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1074,7 +1074,7 @@ async fn frame_process_task(
file!(),
line!()
);
jh.await.unwrap().unwrap();
jh.join().unwrap().unwrap();
debug!(
"done waiting on flydratrax coord processor {}:{}",
file!(),
Expand Down

0 comments on commit 1585cb4

Please sign in to comment.