Skip to content

Commit

Permalink
Make sure to always write out end-of-stream markers during encoding (r…
Browse files Browse the repository at this point in the history
…erun-io#7796)

Saving Rerun RRD data can happen in many, many different ways, and it
must always make sure to include the end-of-stream marker.
This is unfortunately not trivial because many things want to partial
move out of `Encoder`, which rules out `impl Drop`.

This PR introduces a `DroppableEncoder` that can be used in many cases,
or makes sure to properly `finish()` otherwise.

This fixes half of the issues described in:
* rerun-io#7791
  • Loading branch information
teh-cmc authored Oct 17, 2024
1 parent f0e4076 commit 8233130
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 17 deletions.
4 changes: 2 additions & 2 deletions crates/store/re_data_loader/src/loader_rrd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl RetryableFileReader {
mod tests {
use re_build_info::CrateVersion;
use re_chunk::RowId;
use re_log_encoding::{decoder, encoder::Encoder};
use re_log_encoding::{decoder, encoder::DroppableEncoder};
use re_log_types::{
ApplicationId, LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource, Time,
};
Expand Down Expand Up @@ -247,7 +247,7 @@ mod tests {
.open(rrd_file_path.to_str().unwrap())
.unwrap();

let mut encoder = Encoder::new(
let mut encoder = DroppableEncoder::new(
re_build_info::CrateVersion::LOCAL,
re_log_encoding::EncodingOptions::UNCOMPRESSED,
rrd_file,
Expand Down
82 changes: 77 additions & 5 deletions crates/store/re_log_encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,66 @@ pub fn encode_to_bytes<'a>(

// ----------------------------------------------------------------------------

/// An [`Encoder`] that properly closes the stream on drop.
///
/// When dropped, it will automatically insert an end-of-stream marker, if that wasn't already done manually.
pub struct DroppableEncoder<W: std::io::Write> {
encoder: Encoder<W>,

/// Tracks whether the end-of-stream marker has been written out already.
is_finished: bool,
}

impl<W: std::io::Write> DroppableEncoder<W> {
#[inline]
pub fn new(
version: CrateVersion,
options: EncodingOptions,
write: W,
) -> Result<Self, EncodeError> {
Ok(Self {
encoder: Encoder::new(version, options, write)?,
is_finished: false,
})
}

/// Returns the size in bytes of the encoded data.
#[inline]
pub fn append(&mut self, message: &LogMsg) -> Result<u64, EncodeError> {
self.encoder.append(message)
}

#[inline]
pub fn finish(&mut self) -> Result<(), EncodeError> {
if !self.is_finished {
self.encoder.finish()?;
}

self.is_finished = true;

Ok(())
}

#[inline]
pub fn flush_blocking(&mut self) -> std::io::Result<()> {
self.encoder.flush_blocking()
}
}

impl<W: std::io::Write> std::ops::Drop for DroppableEncoder<W> {
fn drop(&mut self) {
if !self.is_finished {
if let Err(err) = self.finish() {
re_log::warn!("encoder couldn't be finished: {err}");
}
}
}
}

/// Encode a stream of [`LogMsg`] into an `.rrd` file.
///
/// Prefer [`DroppableEncoder`] if possible, make sure to call [`Encoder::finish`] when appropriate
/// otherwise.
pub struct Encoder<W: std::io::Write> {
compression: Compression,
write: W,
Expand Down Expand Up @@ -120,15 +179,20 @@ impl<W: std::io::Write> Encoder<W> {
}
}

// NOTE: This cannot be done in a `Drop` implementation because of `Self::into_inner` which
// does a partial move.
#[inline]
pub fn finish(&mut self) -> Result<(), EncodeError> {
MessageHeader::EndOfStream.encode(&mut self.write)?;
Ok(())
}

#[inline]
pub fn flush_blocking(&mut self) -> std::io::Result<()> {
self.write.flush()
}

#[inline]
pub fn into_inner(self) -> W {
self.write
}
Expand All @@ -142,7 +206,7 @@ pub fn encode(
write: &mut impl std::io::Write,
) -> Result<u64, EncodeError> {
re_tracing::profile_function!();
let mut encoder = Encoder::new(version, options, write)?;
let mut encoder = DroppableEncoder::new(version, options, write)?;
let mut size_bytes = 0;
for message in messages {
size_bytes += encoder.append(&message?)?;
Expand All @@ -158,7 +222,7 @@ pub fn encode_ref<'a>(
write: &mut impl std::io::Write,
) -> Result<u64, EncodeError> {
re_tracing::profile_function!();
let mut encoder = Encoder::new(version, options, write)?;
let mut encoder = DroppableEncoder::new(version, options, write)?;
let mut size_bytes = 0;
for message in messages {
size_bytes += encoder.append(message?)?;
Expand All @@ -177,32 +241,40 @@ pub fn encode_as_bytes(
for message in messages {
encoder.append(&message?)?;
}
encoder.finish()?;
Ok(bytes)
}

#[inline]
pub fn local_encoder() -> Result<Encoder<Vec<u8>>, EncodeError> {
pub fn local_encoder() -> Result<DroppableEncoder<Vec<u8>>, EncodeError> {
DroppableEncoder::new(CrateVersion::LOCAL, EncodingOptions::COMPRESSED, Vec::new())
}

#[inline]
pub fn local_raw_encoder() -> Result<Encoder<Vec<u8>>, EncodeError> {
Encoder::new(CrateVersion::LOCAL, EncodingOptions::COMPRESSED, Vec::new())
}

#[inline]
pub fn encode_as_bytes_local(
messages: impl Iterator<Item = ChunkResult<LogMsg>>,
) -> Result<Vec<u8>, EncodeError> {
let mut encoder = local_encoder()?;
let mut encoder = local_raw_encoder()?;
for message in messages {
encoder.append(&message?)?;
}
encoder.finish()?;
Ok(encoder.into_inner())
}

#[inline]
pub fn encode_ref_as_bytes_local<'a>(
messages: impl Iterator<Item = ChunkResult<&'a LogMsg>>,
) -> Result<Vec<u8>, EncodeError> {
let mut encoder = local_encoder()?;
let mut encoder = local_raw_encoder()?;
for message in messages {
encoder.append(message?)?;
}
encoder.finish()?;
Ok(encoder.into_inner())
}
6 changes: 3 additions & 3 deletions crates/store/re_log_encoding/src/file_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl FileSink {

let file = std::fs::File::create(&path)
.map_err(|err| FileSinkError::CreateFile(path.clone(), err))?;
let encoder = crate::encoder::Encoder::new(
let encoder = crate::encoder::DroppableEncoder::new(
re_build_info::CrateVersion::LOCAL,
encoding_options,
file,
Expand All @@ -97,7 +97,7 @@ impl FileSink {

re_log::debug!("Writing to stdout…");

let encoder = crate::encoder::Encoder::new(
let encoder = crate::encoder::DroppableEncoder::new(
re_build_info::CrateVersion::LOCAL,
encoding_options,
std::io::stdout(),
Expand Down Expand Up @@ -127,7 +127,7 @@ impl FileSink {
/// Set `filepath` to `None` to stream to standard output.
fn spawn_and_stream<W: std::io::Write + Send + 'static>(
filepath: Option<&std::path::Path>,
mut encoder: crate::encoder::Encoder<W>,
mut encoder: crate::encoder::DroppableEncoder<W>,
rx: Receiver<Option<Command>>,
) -> Result<std::thread::JoinHandle<()>, FileSinkError> {
let (name, target) = if let Some(filepath) = filepath {
Expand Down
4 changes: 2 additions & 2 deletions crates/top/re_sdk/src/binary_stream_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl BinaryStreamSink {

let (tx, rx) = std::sync::mpsc::channel();

let encoder = re_log_encoding::encoder::Encoder::new(
let encoder = re_log_encoding::encoder::DroppableEncoder::new(
re_build_info::CrateVersion::LOCAL,
encoding_options,
storage.inner.clone(),
Expand Down Expand Up @@ -167,7 +167,7 @@ impl LogSink for BinaryStreamSink {

/// Spawn the encoder thread that will write log messages to the binary stream.
fn spawn_and_stream<W: std::io::Write + Send + 'static>(
mut encoder: re_log_encoding::encoder::Encoder<W>,
mut encoder: re_log_encoding::encoder::DroppableEncoder<W>,
rx: Receiver<Option<Command>>,
) -> Result<std::thread::JoinHandle<()>, BinaryStreamSinkError> {
std::thread::Builder::new()
Expand Down
8 changes: 5 additions & 3 deletions crates/top/re_sdk/src/log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::fmt;
use std::sync::Arc;

use parking_lot::Mutex;
use re_log_encoding::encoder::EncodeError;
use re_log_encoding::encoder::{encode_as_bytes_local, local_encoder};
use re_log_encoding::encoder::encode_as_bytes_local;
use re_log_encoding::encoder::{local_raw_encoder, EncodeError};
use re_log_types::{BlueprintActivationCommand, LogMsg, StoreId};

use crate::RecordingStream;
Expand Down Expand Up @@ -250,7 +250,7 @@ impl MemorySinkStorage {
/// This automatically takes care of flushing the underlying [`crate::RecordingStream`].
#[inline]
pub fn concat_memory_sinks_as_bytes(sinks: &[&Self]) -> Result<Vec<u8>, EncodeError> {
let mut encoder = local_encoder()?;
let mut encoder = local_raw_encoder()?;

for sink in sinks {
// NOTE: It's fine, this is an in-memory sink so by definition there's no I/O involved
Expand All @@ -264,6 +264,8 @@ impl MemorySinkStorage {
}
}

encoder.finish()?;

Ok(encoder.into_inner())
}

Expand Down
2 changes: 1 addition & 1 deletion crates/top/rerun/src/commands/entrypoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,7 @@ fn stream_to_rrd_on_disk(
let encoding_options = re_log_encoding::EncodingOptions::COMPRESSED;
let file =
std::fs::File::create(path).map_err(|err| FileSinkError::CreateFile(path.clone(), err))?;
let mut encoder = re_log_encoding::encoder::Encoder::new(
let mut encoder = re_log_encoding::encoder::DroppableEncoder::new(
re_build_info::CrateVersion::LOCAL,
encoding_options,
file,
Expand Down
3 changes: 2 additions & 1 deletion crates/top/rerun/src/commands/rrd/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl FilterCommand {
// TODO(cmc): encoding options & version should match the original.
let version = CrateVersion::LOCAL;
let options = re_log_encoding::EncodingOptions::COMPRESSED;
re_log_encoding::encoder::Encoder::new(version, options, &mut rrd_out)
re_log_encoding::encoder::DroppableEncoder::new(version, options, &mut rrd_out)
.context("couldn't init encoder")?
};

Expand All @@ -93,6 +93,7 @@ impl FilterCommand {
size_bytes += encoder.append(&msg).context("encoding failure")?;
}

drop(encoder);
rrd_out.flush().context("couldn't flush output")?;

Ok(size_bytes)
Expand Down

0 comments on commit 8233130

Please sign in to comment.