Skip to content

Commit

Permalink
gRPC spec and codec update - add support for arrow metadata (rerun-io…
Browse files Browse the repository at this point in the history
…#7907)

### What

Metadata associated with remote store recording will have some mandatory
fields and additional arbitrary fields that we represent as arrow single
row record batch.

When fetching metadata from the store about the recording, we expect to
get back just the single row record batch that will have both the
mandatory columns and any additional columns as defined during
registration _and_ subsequent updates.

We rely on codec for simple serialization and serialization of this
data, which once again relies on arrow ipc.

### Checklist
* [x] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)
* [x] I've included a screenshot or gif (if applicable)
* [x] I have tested the web demo (if applicable):
* Using examples from latest `main` build:
[rerun.io/viewer](https://rerun.io/viewer/pr/7907?manifest_url=https://app.rerun.io/version/main/examples_manifest.json)
* Using full set of examples from `nightly` build:
[rerun.io/viewer](https://rerun.io/viewer/pr/7907?manifest_url=https://app.rerun.io/version/nightly/examples_manifest.json)
* [x] The PR title and labels are set such as to maximize their
usefulness for the next release's CHANGELOG
* [x] If applicable, add a new check to the [release
checklist](https://github.com/rerun-io/rerun/blob/main/tests/python/release_checklist)!
* [x] If have noted any breaking changes to the log API in
`CHANGELOG.md` and the migration guide

- [PR Build Summary](https://build.rerun.io/pr/7907)
- [Recent benchmark results](https://build.rerun.io/graphs/crates.html)
- [Wasm size tracking](https://build.rerun.io/graphs/sizes.html)

To run all checks from `main`, comment on the PR with `@rerun-bot
full-check`.
  • Loading branch information
zehiko authored Oct 28, 2024
1 parent 0b9fb3d commit 2383754
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,22 @@ service StorageNode {
// ---------------- RegisterRecording ------------------

message RegisterRecordingsRequest {
// human readable description of the recording
string description = 1;
// information about recording's backing storage
// TODO(zehiko) add separate info about the "source" recording
ObjectStorage obj_storage = 2;
// TODO(zehiko) should this be auto-discoverable?
// type of recording
RecordingType typ = 3;
// (optional) any additional metadata that should be associated with the recording
// You can associate any arbtrirary number of columns with a specific recording
RecordingMetadata metadata = 4;
}

// Recording metadata is single row arrow record batch
message RecordingMetadata {
EncoderVersion encoder_version = 1;
bytes payload = 2;
}

message ObjectStorage {
Expand Down Expand Up @@ -51,13 +63,8 @@ message GetRecordingMetadataRequest {
}

message GetRecordingMetadataResponse {
RecordingMetadata metadata = 1;
}

message RecordingMetadata {
RecordingId id = 1;
Schema schema = 2;
repeated TimeMetadata time_metadata = 3;
RecordingMetadata metadata = 2;
}

message TimeMetadata {
Expand Down
173 changes: 141 additions & 32 deletions crates/store/re_remote_store_types/src/codec.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use arrow2::array::Array as ArrowArray;
use arrow2::chunk::Chunk as ArrowChunk;
use arrow2::datatypes::Schema as ArrowSchema;
use arrow2::error::Error as ArrowError;
use arrow2::io::ipc::{read, write};
use re_dataframe::TransportChunk;

use crate::v0::EncoderVersion;
use crate::v0::{EncoderVersion, RecordingMetadata};

#[derive(Debug, thiserror::Error)]
pub enum CodecError {
Expand All @@ -23,6 +26,9 @@ pub enum CodecError {

#[error("Unknown message header")]
UnknownMessageHeader,

#[error("Invalid argument: {0}")]
InvalidArgument(String),
}

#[derive(Clone, Copy, PartialEq, Eq, Hash, Default)]
Expand Down Expand Up @@ -73,14 +79,7 @@ impl TransportMessageV0 {
let mut data: Vec<u8> = Vec::new();
MessageHader::RECORD_BATCH.encode(&mut data)?;

let options = write::WriteOptions { compression: None };
let mut sw = write::StreamWriter::new(&mut data, options);

sw.start(&chunk.schema, None)
.map_err(CodecError::ArrowSerialization)?;
sw.write(&chunk.data, None)
.map_err(CodecError::ArrowSerialization)?;
sw.finish().map_err(CodecError::ArrowSerialization)?;
write_arrow_to_bytes(&mut data, &chunk.schema, &chunk.data)?;

Ok(data)
}
Expand All @@ -94,29 +93,14 @@ impl TransportMessageV0 {
match header {
MessageHader::NO_DATA => Ok(Self::NoData),
MessageHader::RECORD_BATCH => {
let metadata = read::read_stream_metadata(&mut reader)
.map_err(CodecError::ArrowSerialization)?;
let mut stream = read::StreamReader::new(&mut reader, metadata, None);

let schema = stream.schema().clone();
// there should be at least one record batch in the stream
// TODO(zehiko) isn't there a "read one record batch from bytes" arrow2 function??
let stream_state = stream
.next()
.ok_or(CodecError::MissingRecordBatch)?
.map_err(CodecError::ArrowSerialization)?;

match stream_state {
read::StreamState::Waiting => Err(CodecError::UnexpectedStreamState),
read::StreamState::Some(chunk) => {
let tc = TransportChunk {
schema: schema.clone(),
data: chunk,
};

Ok(Self::RecordBatch(tc))
}
}
let (schema, data) = read_arrow_from_bytes(&mut reader)?;

let tc = TransportChunk {
schema: schema.clone(),
data,
};

Ok(Self::RecordBatch(tc))
}
_ => Err(CodecError::UnknownMessageHeader),
}
Expand Down Expand Up @@ -154,11 +138,95 @@ pub fn decode(version: EncoderVersion, data: &[u8]) -> Result<Option<TransportCh
}
}

impl RecordingMetadata {
/// Create `RecordingMetadata` from arrow schema and arrow record batch
pub fn try_from(
version: EncoderVersion,
schema: &ArrowSchema,
unit_batch: &ArrowChunk<Box<dyn ArrowArray>>,
) -> Result<Self, CodecError> {
if unit_batch.len() > 1 {
return Err(CodecError::InvalidArgument(format!(
"metadata record batch can only have a single row, batch with {} rows given",
unit_batch.len()
)));
}

match version {
EncoderVersion::V0 => {
let mut data: Vec<u8> = Vec::new();
write_arrow_to_bytes(&mut data, schema, unit_batch)?;

Ok(Self {
encoder_version: version as i32,
payload: data,
})
}
}
}

/// Get metadata as arrow data
pub fn data(&self) -> Result<(ArrowSchema, ArrowChunk<Box<dyn ArrowArray>>), CodecError> {
let mut reader = std::io::Cursor::new(self.payload.clone());

let encoder_version = EncoderVersion::try_from(self.encoder_version)
.map_err(|err| CodecError::InvalidArgument(err.to_string()))?;

match encoder_version {
EncoderVersion::V0 => read_arrow_from_bytes(&mut reader),
}
}
}

/// Helper function that serializes given arrow schema and record batch into bytes
/// using Arrow IPC format.
fn write_arrow_to_bytes<W: std::io::Write>(
writer: &mut W,
schema: &ArrowSchema,
data: &ArrowChunk<Box<dyn ArrowArray>>,
) -> Result<(), CodecError> {
let options = write::WriteOptions { compression: None };
let mut sw = write::StreamWriter::new(writer, options);

sw.start(schema, None)
.map_err(CodecError::ArrowSerialization)?;
sw.write(data, None)
.map_err(CodecError::ArrowSerialization)?;
sw.finish().map_err(CodecError::ArrowSerialization)?;

Ok(())
}

/// Helper function that deserializes raw bytes into arrow schema and record batch
/// using Arrow IPC format.
fn read_arrow_from_bytes<R: std::io::Read>(
reader: &mut R,
) -> Result<(ArrowSchema, ArrowChunk<Box<dyn ArrowArray>>), CodecError> {
let metadata = read::read_stream_metadata(reader).map_err(CodecError::ArrowSerialization)?;
let mut stream = read::StreamReader::new(reader, metadata, None);

let schema = stream.schema().clone();
// there should be at least one record batch in the stream
let stream_state = stream
.next()
.ok_or(CodecError::MissingRecordBatch)?
.map_err(CodecError::ArrowSerialization)?;

match stream_state {
read::StreamState::Waiting => Err(CodecError::UnexpectedStreamState),
read::StreamState::Some(chunk) => Ok((schema, chunk)),
}
}

#[cfg(test)]
mod tests {

use arrow2::chunk::Chunk as ArrowChunk;
use arrow2::{array::Int32Array, datatypes::Field, datatypes::Schema as ArrowSchema};
use re_dataframe::external::re_chunk::{Chunk, RowId};
use re_log_types::{example_components::MyPoint, Timeline};

use crate::v0::RecordingMetadata;
use crate::{
codec::{decode, encode, CodecError, TransportMessageV0},
v0::EncoderVersion,
Expand Down Expand Up @@ -250,4 +318,45 @@ mod tests {

assert_eq!(expected_chunk, decoded_chunk);
}

#[test]
fn test_recording_metadata_serialization() {
let expected_schema = ArrowSchema::from(vec![Field::new(
"my_int",
arrow2::datatypes::DataType::Int32,
false,
)]);
let my_ints = Int32Array::from_slice([42]);
let expected_chunk = ArrowChunk::new(vec![Box::new(my_ints) as _]);

let metadata =
RecordingMetadata::try_from(EncoderVersion::V0, &expected_schema, &expected_chunk)
.unwrap();

let (schema, chunk) = metadata.data().unwrap();

assert_eq!(expected_schema, schema);
assert_eq!(expected_chunk, chunk);
}

#[test]
fn test_recording_metadata_fails_with_non_unit_batch() {
let expected_schema = ArrowSchema::from(vec![Field::new(
"my_int",
arrow2::datatypes::DataType::Int32,
false,
)]);
// more than 1 row in the batch
let my_ints = Int32Array::from_slice([41, 42]);

let expected_chunk = ArrowChunk::new(vec![Box::new(my_ints) as _]);

let metadata =
RecordingMetadata::try_from(EncoderVersion::V0, &expected_schema, &expected_chunk);

assert!(matches!(
metadata.err().unwrap(),
CodecError::InvalidArgument(_)
));
}
}
26 changes: 17 additions & 9 deletions crates/store/re_remote_store_types/src/v0/rerun.remote_store.v0.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,28 @@ impl ErrorCode {
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RegisterRecordingsRequest {
/// human readable description of the recording
#[prost(string, tag = "1")]
pub description: ::prost::alloc::string::String,
/// information about recording's backing storage
/// TODO(zehiko) add separate info about the "source" recording
#[prost(message, optional, tag = "2")]
pub obj_storage: ::core::option::Option<ObjectStorage>,
/// TODO(zehiko) should this be auto-discoverable?
/// type of recording
#[prost(enumeration = "RecordingType", tag = "3")]
pub typ: i32,
/// (optional) any additional metadata that should be associated with the recording
/// You can associate any arbtrirary number of columns with a specific recording
#[prost(message, optional, tag = "4")]
pub metadata: ::core::option::Option<RecordingMetadata>,
}
/// Recording metadata is single row arrow record batch
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RecordingMetadata {
#[prost(enumeration = "EncoderVersion", tag = "1")]
pub encoder_version: i32,
#[prost(bytes = "vec", tag = "2")]
pub payload: ::prost::alloc::vec::Vec<u8>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ObjectStorage {
Expand Down Expand Up @@ -288,17 +303,10 @@ pub struct GetRecordingMetadataRequest {
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetRecordingMetadataResponse {
#[prost(message, optional, tag = "1")]
pub metadata: ::core::option::Option<RecordingMetadata>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RecordingMetadata {
#[prost(message, optional, tag = "1")]
pub id: ::core::option::Option<RecordingId>,
#[prost(message, optional, tag = "2")]
pub schema: ::core::option::Option<Schema>,
#[prost(message, repeated, tag = "3")]
pub time_metadata: ::prost::alloc::vec::Vec<TimeMetadata>,
pub metadata: ::core::option::Option<RecordingMetadata>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TimeMetadata {
Expand Down

0 comments on commit 2383754

Please sign in to comment.