From d3b459f016a40a77ba23f5e396f84a4ea20d86e5 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Wed, 29 Mar 2023 10:00:41 +0200 Subject: [PATCH] end-to-end batching 4: retire `MsgBundle` + batching support in transport layer (#1679) * Batching support at the transport layer * fmt * woop * self reviewW * doctest whining * oh cmon * addressing PR comments * 0-indexed rows in examples --- Cargo.lock | 1 + crates/re_data_store/examples/memory_usage.rs | 52 +- crates/re_data_store/src/lib.rs | 7 +- crates/re_data_store/src/log_db.rs | 25 +- crates/re_log_types/Cargo.toml | 7 +- .../benches/msg_encode_benchmark.rs | 116 +++- crates/re_log_types/src/arrow_msg.rs | 111 ++-- .../src/component_types/msg_id.rs | 5 + crates/re_log_types/src/data_cell.rs | 2 +- crates/re_log_types/src/data_row.rs | 86 ++- crates/re_log_types/src/data_table.rs | 590 +++++++++++++++--- crates/re_log_types/src/lib.rs | 6 +- crates/re_log_types/src/msg_bundle.rs | 314 ---------- crates/re_log_types/src/path/entity_path.rs | 64 ++ crates/re_log_types/src/time_point/mod.rs | 17 + crates/re_sdk/src/lib.rs | 2 +- crates/re_sdk/src/msg_sender.rs | 185 ++---- crates/re_sdk_comms/src/server.rs | 11 +- crates/re_tuid/src/lib.rs | 5 + crates/re_viewer/src/ui/data_ui/log_msg.rs | 43 +- crates/re_viewer/src/ui/event_log_view.rs | 71 ++- rerun_py/src/arrow.rs | 11 +- rerun_py/src/python_bridge.rs | 46 +- 23 files changed, 992 insertions(+), 785 deletions(-) delete mode 100644 crates/re_log_types/src/msg_bundle.rs diff --git a/Cargo.lock b/Cargo.lock index 1c295a1473d1..00e8f03c1294 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3926,6 +3926,7 @@ dependencies = [ "serde", "serde_bytes", "serde_test", + "smallvec", "thiserror", "time 0.3.20", "typenum", diff --git a/crates/re_data_store/examples/memory_usage.rs b/crates/re_data_store/examples/memory_usage.rs index 9b023d420366..105d7e1a0014 100644 --- a/crates/re_data_store/examples/memory_usage.rs +++ b/crates/re_data_store/examples/memory_usage.rs @@ -48,7 +48,7 @@ fn live_bytes() -> usize { // ---------------------------------------------------------------------------- -use re_log_types::{entity_path, DataRow, DataTable, MsgId}; +use re_log_types::{entity_path, DataRow, MsgId}; fn main() { log_messages(); @@ -105,23 +105,20 @@ fn log_messages() { { let used_bytes_start = live_bytes(); - let msg_bundle = Box::new( - DataTable::from_rows( - MsgId::ZERO, // not used (yet) - [DataRow::from_cells1( - MsgId::random(), - entity_path!("points"), - [build_frame_nr(0.into())], - 1, - build_some_point2d(1), - )], + let table = Box::new( + DataRow::from_cells1( + MsgId::random(), + entity_path!("points"), + [build_frame_nr(0.into())], + 1, + build_some_point2d(1), ) - .into_msg_bundle(), + .into_table(), ); - let msg_bundle_bytes = live_bytes() - used_bytes_start; - let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*msg_bundle).unwrap())); + let table_bytes = live_bytes() - used_bytes_start; + let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(&*table).unwrap())); let log_msg_bytes = live_bytes() - used_bytes_start; - println!("Arrow MsgBundle containing a Pos2 uses {msg_bundle_bytes} bytes in RAM"); + println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM"); let encoded = encode_log_msg(&log_msg); println!( "Arrow LogMsg containing a Pos2 uses {}-{log_msg_bytes} bytes in RAM, and {} bytes encoded", @@ -131,23 +128,20 @@ fn log_messages() { { let used_bytes_start = live_bytes(); - let msg_bundle = Box::new( - DataTable::from_rows( - MsgId::ZERO, // not used (yet) - [DataRow::from_cells1( - MsgId::random(), - entity_path!("points"), - [build_frame_nr(0.into())], - NUM_POINTS as _, - build_some_point2d(NUM_POINTS), - )], + let table = Box::new( + DataRow::from_cells1( + MsgId::random(), + entity_path!("points"), + [build_frame_nr(0.into())], + NUM_POINTS as _, + build_some_point2d(NUM_POINTS), ) - .into_msg_bundle(), + .into_table(), ); - let msg_bundle_bytes = live_bytes() - used_bytes_start; - let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*msg_bundle).unwrap())); + let table_bytes = live_bytes() - used_bytes_start; + let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(&*table).unwrap())); let log_msg_bytes = live_bytes() - used_bytes_start; - println!("Arrow MsgBundle containing a Pos2 uses {msg_bundle_bytes} bytes in RAM"); + println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM"); let encoded = encode_log_msg(&log_msg); println!( "Arrow LogMsg containing {NUM_POINTS}x Pos2 uses {}-{log_msg_bytes} bytes in RAM, and {} bytes encoded", diff --git a/crates/re_data_store/src/lib.rs b/crates/re_data_store/src/lib.rs index 510799043016..9a0fb73a12ae 100644 --- a/crates/re_data_store/src/lib.rs +++ b/crates/re_data_store/src/lib.rs @@ -16,10 +16,9 @@ pub use entity_tree::*; pub use instance_path::*; pub use log_db::LogDb; -use re_log_types::msg_bundle; - #[cfg(feature = "serde")] pub use editable_auto_value::EditableAutoValue; +use re_log_types::DataTableError; pub use re_log_types::{ComponentName, EntityPath, EntityPathPart, Index, TimeInt, Timeline}; // ---------------------------------------------------------------------------- @@ -30,8 +29,8 @@ pub use re_log_types::{ComponentName, EntityPath, EntityPathPart, Index, TimeInt /// or how the logging SDK is being used (PEBKAC). #[derive(thiserror::Error, Debug)] pub enum Error { - #[error(transparent)] - MsgBundleError(#[from] msg_bundle::MsgBundleError), + #[error("Error with one the underlying data table")] + DataTable(#[from] DataTableError), #[error(transparent)] WriteError(#[from] re_arrow_store::WriteError), diff --git a/crates/re_data_store/src/log_db.rs b/crates/re_data_store/src/log_db.rs index 4c4ff7d72db3..50a5ce57a703 100644 --- a/crates/re_data_store/src/log_db.rs +++ b/crates/re_data_store/src/log_db.rs @@ -3,10 +3,10 @@ use nohash_hasher::IntMap; use re_arrow_store::{DataStoreConfig, GarbageCollectionTarget, TimeInt}; use re_log_types::{ component_types::InstanceKey, - external::arrow2_convert::deserialize::arrow_array_deserialize_iterator, msg_bundle::MsgBundle, - ArrowMsg, BeginRecordingMsg, Component as _, ComponentPath, DataCell, DataRow, DataTable, - EntityPath, EntityPathHash, EntityPathOpMsg, LogMsg, MsgId, PathOp, RecordingId, RecordingInfo, - TimePoint, Timeline, + external::arrow2_convert::deserialize::arrow_array_deserialize_iterator, ArrowMsg, + BeginRecordingMsg, Component as _, ComponentPath, DataCell, DataRow, DataTable, EntityPath, + EntityPathHash, EntityPathOpMsg, LogMsg, MsgId, PathOp, RecordingId, RecordingInfo, TimePoint, + Timeline, }; use crate::{Error, TimesPerTimeline}; @@ -76,9 +76,8 @@ impl EntityDb { .or_insert_with(|| entity_path.clone()); } - fn try_add_arrow_data_msg(&mut self, msg: &ArrowMsg) -> Result<(), Error> { - let msg_bundle = MsgBundle::try_from(msg).map_err(Error::MsgBundleError)?; - let table = DataTable::from_msg_bundle(msg_bundle); + fn try_add_arrow_msg(&mut self, msg: &ArrowMsg) -> Result<(), Error> { + let table: DataTable = msg.try_into()?; // TODO(#1619): batch all of this for row in table.as_rows() { @@ -95,7 +94,7 @@ impl EntityDb { self.register_entity_path(&row.entity_path); - for cell in row.cells() { + for cell in row.cells().iter() { let component_path = ComponentPath::new(row.entity_path().clone(), cell.component_name()); if cell.component_name() == MsgId::name() { @@ -233,6 +232,7 @@ impl LogDb { pub fn add(&mut self, msg: LogMsg) -> Result<(), Error> { crate::profile_function!(); + match &msg { LogMsg::BeginRecordingMsg(msg) => self.add_begin_recording_msg(msg), LogMsg::EntityPathOpMsg(msg) => { @@ -243,13 +243,16 @@ impl LogDb { } = msg; self.entity_db.add_path_op(*msg_id, time_point, path_op); } - LogMsg::ArrowMsg(msg) => { - self.entity_db.try_add_arrow_data_msg(msg)?; - } + LogMsg::ArrowMsg(inner) => self.entity_db.try_add_arrow_msg(inner)?, LogMsg::Goodbye(_) => {} } + + // TODO(#1619): the following only makes sense because, while we support sending and + // receiving batches, we don't actually do so yet. + // We need to stop storing raw `LogMsg`s before we can benefit from our batching. self.chronological_message_ids.push(msg.id()); self.log_messages.insert(msg.id(), msg); + Ok(()) } diff --git a/crates/re_log_types/Cargo.toml b/crates/re_log_types/Cargo.toml index 59bb931e3712..12ae7e9e5463 100644 --- a/crates/re_log_types/Cargo.toml +++ b/crates/re_log_types/Cargo.toml @@ -60,7 +60,11 @@ re_tuid.workspace = true # External ahash.workspace = true array-init = "2.1.0" -arrow2 = { workspace = true, features = ["io_ipc", "io_print"] } +arrow2 = { workspace = true, features = [ + "io_ipc", + "io_print", + "compute_concatenate", +] } arrow2_convert.workspace = true bytemuck = "1.11" document-features = "0.2" @@ -72,6 +76,7 @@ ndarray.workspace = true nohash-hasher = "0.2" num-derive = "0.3" num-traits = "0.2" +smallvec = "1.10" thiserror.workspace = true time = { workspace = true, default-features = false, features = [ "formatting", diff --git a/crates/re_log_types/benches/msg_encode_benchmark.rs b/crates/re_log_types/benches/msg_encode_benchmark.rs index 34492617de0b..d9131ef9f9f9 100644 --- a/crates/re_log_types/benches/msg_encode_benchmark.rs +++ b/crates/re_log_types/benches/msg_encode_benchmark.rs @@ -6,9 +6,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; use re_log_types::{ datagen::{build_frame_nr, build_some_colors, build_some_point2d}, - entity_path, - msg_bundle::MsgBundle, - ArrowMsg, DataRow, DataTable, Index, LogMsg, MsgId, + entity_path, ArrowMsg, DataRow, DataTable, Index, LogMsg, MsgId, }; use criterion::{criterion_group, criterion_main, Criterion}; @@ -20,7 +18,12 @@ const NUM_POINTS: usize = 10_000; #[cfg(debug_assertions)] const NUM_POINTS: usize = 1; -criterion_group!(benches, mono_points_arrow, batch_points_arrow,); +criterion_group!( + benches, + mono_points_arrow, + mono_points_arrow_batched, + batch_points_arrow, +); criterion_main!(benches); fn encode_log_msgs(messages: &[LogMsg]) -> Vec { @@ -39,19 +42,19 @@ fn decode_log_msgs(mut bytes: &[u8]) -> Vec { messages } -fn generate_messages(bundles: &[MsgBundle]) -> Vec { - bundles +fn generate_messages(tables: &[DataTable]) -> Vec { + tables .iter() - .map(|bundle| LogMsg::ArrowMsg(ArrowMsg::try_from(bundle.clone()).unwrap())) + .map(|table| LogMsg::ArrowMsg(ArrowMsg::try_from(table).unwrap())) .collect() } -fn decode_message_bundles(messages: &[LogMsg]) -> Vec { +fn decode_tables(messages: &[LogMsg]) -> Vec { messages .iter() .map(|log_msg| { if let LogMsg::ArrowMsg(arrow_msg) = log_msg { - MsgBundle::try_from(arrow_msg).unwrap() + DataTable::try_from(arrow_msg).unwrap() } else { unreachable!() } @@ -60,7 +63,7 @@ fn decode_message_bundles(messages: &[LogMsg]) -> Vec { } fn mono_points_arrow(c: &mut Criterion) { - fn generate_message_bundles() -> Vec { + fn generate_tables() -> Vec { (0..NUM_POINTS) .map(|i| { DataTable::from_rows( @@ -73,7 +76,6 @@ fn mono_points_arrow(c: &mut Criterion) { (build_some_point2d(1), build_some_colors(1)), )], ) - .into_msg_bundle() }) .collect() } @@ -82,18 +84,18 @@ fn mono_points_arrow(c: &mut Criterion) { let mut group = c.benchmark_group("mono_points_arrow"); group.throughput(criterion::Throughput::Elements(NUM_POINTS as _)); group.bench_function("generate_message_bundles", |b| { - b.iter(generate_message_bundles); + b.iter(generate_tables); }); - let bundles = generate_message_bundles(); + let tables = generate_tables(); group.bench_function("generate_messages", |b| { - b.iter(|| generate_messages(&bundles)); + b.iter(|| generate_messages(&tables)); }); - let messages = generate_messages(&bundles); + let messages = generate_messages(&tables); group.bench_function("encode_log_msg", |b| { b.iter(|| encode_log_msgs(&messages)); }); group.bench_function("encode_total", |b| { - b.iter(|| encode_log_msgs(&generate_messages(&generate_message_bundles()))); + b.iter(|| encode_log_msgs(&generate_messages(&generate_tables()))); }); let encoded = encode_log_msgs(&messages); @@ -106,19 +108,74 @@ fn mono_points_arrow(c: &mut Criterion) { }); group.bench_function("decode_message_bundles", |b| { b.iter(|| { - let bundles = decode_message_bundles(&messages); + let tables = decode_tables(&messages); + assert_eq!(tables.len(), messages.len()); + tables + }); + }); + group.bench_function("decode_total", |b| { + b.iter(|| decode_tables(&decode_log_msgs(&encoded))); + }); + } +} + +fn mono_points_arrow_batched(c: &mut Criterion) { + fn generate_table() -> DataTable { + DataTable::from_rows( + MsgId::ZERO, + (0..NUM_POINTS).map(|i| { + DataRow::from_cells2( + MsgId::ZERO, + entity_path!("points", Index::Sequence(i as _)), + [build_frame_nr(0.into())], + 1, + (build_some_point2d(1), build_some_colors(1)), + ) + }), + ) + } + + { + let mut group = c.benchmark_group("mono_points_arrow_batched"); + group.throughput(criterion::Throughput::Elements(NUM_POINTS as _)); + group.bench_function("generate_message_bundles", |b| { + b.iter(generate_table); + }); + let tables = [generate_table()]; + group.bench_function("generate_messages", |b| { + b.iter(|| generate_messages(&tables)); + }); + let messages = generate_messages(&tables); + group.bench_function("encode_log_msg", |b| { + b.iter(|| encode_log_msgs(&messages)); + }); + group.bench_function("encode_total", |b| { + b.iter(|| encode_log_msgs(&generate_messages(&[generate_table()]))); + }); + + let encoded = encode_log_msgs(&messages); + group.bench_function("decode_log_msg", |b| { + b.iter(|| { + let decoded = decode_log_msgs(&encoded); + assert_eq!(decoded.len(), messages.len()); + decoded + }); + }); + group.bench_function("decode_message_bundles", |b| { + b.iter(|| { + let bundles = decode_tables(&messages); assert_eq!(bundles.len(), messages.len()); bundles }); }); group.bench_function("decode_total", |b| { - b.iter(|| decode_message_bundles(&decode_log_msgs(&encoded))); + b.iter(|| decode_tables(&decode_log_msgs(&encoded))); }); } } fn batch_points_arrow(c: &mut Criterion) { - fn generate_message_bundles() -> Vec { + fn generate_tables() -> Vec { vec![DataTable::from_rows( MsgId::ZERO, [DataRow::from_cells2( @@ -131,26 +188,25 @@ fn batch_points_arrow(c: &mut Criterion) { build_some_colors(NUM_POINTS), ), )], - ) - .into_msg_bundle()] + )] } { let mut group = c.benchmark_group("batch_points_arrow"); group.throughput(criterion::Throughput::Elements(NUM_POINTS as _)); group.bench_function("generate_message_bundles", |b| { - b.iter(generate_message_bundles); + b.iter(generate_tables); }); - let bundles = generate_message_bundles(); + let tables = generate_tables(); group.bench_function("generate_messages", |b| { - b.iter(|| generate_messages(&bundles)); + b.iter(|| generate_messages(&tables)); }); - let messages = generate_messages(&bundles); + let messages = generate_messages(&tables); group.bench_function("encode_log_msg", |b| { b.iter(|| encode_log_msgs(&messages)); }); group.bench_function("encode_total", |b| { - b.iter(|| encode_log_msgs(&generate_messages(&generate_message_bundles()))); + b.iter(|| encode_log_msgs(&generate_messages(&generate_tables()))); }); let encoded = encode_log_msgs(&messages); @@ -163,13 +219,13 @@ fn batch_points_arrow(c: &mut Criterion) { }); group.bench_function("decode_message_bundles", |b| { b.iter(|| { - let bundles = decode_message_bundles(&messages); - assert_eq!(bundles.len(), messages.len()); - bundles + let tables = decode_tables(&messages); + assert_eq!(tables.len(), messages.len()); + tables }); }); group.bench_function("decode_total", |b| { - b.iter(|| decode_message_bundles(&decode_log_msgs(&encoded))); + b.iter(|| decode_tables(&decode_log_msgs(&encoded))); }); } } diff --git a/crates/re_log_types/src/arrow_msg.rs b/crates/re_log_types/src/arrow_msg.rs index 90d5322531e6..81f48c032057 100644 --- a/crates/re_log_types/src/arrow_msg.rs +++ b/crates/re_log_types/src/arrow_msg.rs @@ -10,22 +10,25 @@ use arrow2::{array::Array, chunk::Chunk, datatypes::Schema}; #[must_use] #[derive(Clone, Debug, PartialEq)] pub struct ArrowMsg { - /// A unique id per [`crate::LogMsg`]. - pub msg_id: MsgId, - - /// Arrow schema + /// Unique identifier for the [`crate::DataTable`] in this message. + /// + /// NOTE(#1619): While we're in the process of transitioning towards end-to-end batching, the + /// `table_id` is always the same as the `row_id` as the first and only row. + pub table_id: MsgId, + + /// The maximum values for all timelines across the entire batch of data. + /// + /// Used to timestamp the batch as a whole for e.g. latency measurements without having to + /// deserialize the arrow payload. + pub timepoint_max: TimePoint, + + /// Schema for all control & data columns. pub schema: Schema, - /// Arrow chunk + /// Data for all control & data columns. pub chunk: Chunk>, } -impl ArrowMsg { - pub fn time_point(&self) -> Result { - crate::msg_bundle::extract_timelines(&self.schema, &self.chunk) - } -} - #[cfg(feature = "serde")] impl serde::Serialize for ArrowMsg { fn serialize(&self, serializer: S) -> Result @@ -47,8 +50,9 @@ impl serde::Serialize for ArrowMsg { .finish() .map_err(|e| serde::ser::Error::custom(e.to_string()))?; - let mut inner = serializer.serialize_tuple(2)?; - inner.serialize_element(&self.msg_id)?; + let mut inner = serializer.serialize_tuple(3)?; + inner.serialize_element(&self.table_id)?; + inner.serialize_element(&self.timepoint_max)?; inner.serialize_element(&serde_bytes::ByteBuf::from(buf))?; inner.end() } @@ -68,17 +72,20 @@ impl<'de> serde::Deserialize<'de> for ArrowMsg { type Value = ArrowMsg; fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - formatter.write_str("Tuple Data") + formatter.write_str("(table_id, timepoint, buf)") } fn visit_seq(self, mut seq: A) -> Result where A: serde::de::SeqAccess<'de>, { - let msg_id: Option = seq.next_element()?; + let table_id: Option = seq.next_element()?; + let timepoint_min: Option = seq.next_element()?; let buf: Option = seq.next_element()?; - if let (Some(msg_id), Some(buf)) = (msg_id, buf) { + if let (Some(table_id), Some(timepoint_min), Some(buf)) = + (table_id, timepoint_min, buf) + { let mut cursor = std::io::Cursor::new(buf); let metadata = read_stream_metadata(&mut cursor).unwrap(); let mut stream = StreamReader::new(cursor, metadata, None); @@ -93,17 +100,20 @@ impl<'de> serde::Deserialize<'de> for ArrowMsg { .ok_or_else(|| serde::de::Error::custom("No Chunk found in stream"))?; Ok(ArrowMsg { - msg_id, + table_id, + timepoint_max: timepoint_min, schema: stream.metadata().schema.clone(), chunk, }) } else { - Err(serde::de::Error::custom("Expected msg_id and buf")) + Err(serde::de::Error::custom( + "Expected (table_id, timepoint, buf)", + )) } } } - deserializer.deserialize_tuple(2, FieldVisitor) + deserializer.deserialize_tuple(3, FieldVisitor) } } @@ -112,75 +122,30 @@ impl<'de> serde::Deserialize<'de> for ArrowMsg { #[cfg(test)] #[cfg(feature = "serde")] mod tests { - use serde_test::{assert_tokens, Token}; + use super::*; - use super::{ArrowMsg, Chunk, MsgId, Schema}; use crate::{ datagen::{build_frame_nr, build_some_point2d, build_some_rects}, - DataRow, + DataRow, DataTable, MsgId, }; #[test] - fn test_serialized_tokens() { - let schema = Schema::default(); - let chunk = Chunk::new(vec![]); - let msg = ArrowMsg { - msg_id: MsgId::ZERO, - schema, - chunk, - }; - - assert_tokens( - &msg, - &[ - Token::Tuple { len: 2 }, - // MsgId portion - Token::NewtypeStruct { name: "MsgId" }, - Token::Struct { - name: "Tuid", - len: 2, - }, - Token::Str("time_ns"), - Token::U64(0), - Token::Str("inc"), - Token::U64(0), - Token::StructEnd, - // Arrow buffer portion. This is flatbuffers encoded schema+chunk. - Token::Bytes(&[ - 255, 255, 255, 255, 48, 0, 0, 0, 4, 0, 0, 0, 242, 255, 255, 255, 20, 0, 0, 0, - 4, 0, 1, 0, 0, 0, 10, 0, 11, 0, 8, 0, 10, 0, 4, 0, 248, 255, 255, 255, 12, 0, - 0, 0, 8, 0, 8, 0, 0, 0, 4, 0, 0, 0, 0, 0, 255, 255, 255, 255, 72, 0, 0, 0, 8, - 0, 0, 0, 0, 0, 0, 0, 242, 255, 255, 255, 20, 0, 0, 0, 4, 0, 3, 0, 0, 0, 10, 0, - 11, 0, 8, 0, 10, 0, 4, 0, 242, 255, 255, 255, 28, 0, 0, 0, 16, 0, 0, 0, 0, 0, - 10, 0, 12, 0, 0, 0, 4, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 255, 255, 255, 255, 0, 0, 0, 0, - ]), - Token::TupleEnd, - ], - ); - } - - #[test] - fn test_roundtrip_payload() { + fn arrow_msg_roundtrip() { let row = DataRow::from_cells2( - MsgId::ZERO, + MsgId::random(), "world/rects", [build_frame_nr(0.into())], 1, (build_some_point2d(1), build_some_rects(1)), ); - let msg_bundle = row - .into_table(MsgId::ZERO /* not used (yet) */) - .into_msg_bundle(); - - // TODO(#1619): test the full roundtrip: - // cell -> row -> table_in -> msg_in -> msg_out -> table_out - // => msg_in == msg_out - // => table_in == table_out - let msg_in: ArrowMsg = msg_bundle.try_into().unwrap(); + let table_in = row.into_table(); + let msg_in: ArrowMsg = (&table_in).try_into().unwrap(); let buf = rmp_serde::to_vec(&msg_in).unwrap(); let msg_out: ArrowMsg = rmp_serde::from_slice(&buf).unwrap(); + let table_out: DataTable = (&msg_out).try_into().unwrap(); + assert_eq!(msg_in, msg_out); + assert_eq!(table_in, table_out); } } diff --git a/crates/re_log_types/src/component_types/msg_id.rs b/crates/re_log_types/src/component_types/msg_id.rs index 9e28c8180692..104f444cac09 100644 --- a/crates/re_log_types/src/component_types/msg_id.rs +++ b/crates/re_log_types/src/component_types/msg_id.rs @@ -55,6 +55,11 @@ impl MsgId { self.0.as_u128() } + #[inline] + pub fn nanoseconds_since_epoch(&self) -> u64 { + self.0.nanoseconds_since_epoch() + } + /// A shortened string representation of the message id. #[inline] pub fn short_string(&self) -> String { diff --git a/crates/re_log_types/src/data_cell.rs b/crates/re_log_types/src/data_cell.rs index 7c8254e2bbd4..a0fdbdcbaf95 100644 --- a/crates/re_log_types/src/data_cell.rs +++ b/crates/re_log_types/src/data_cell.rs @@ -89,7 +89,7 @@ pub type DataCellResult = ::std::result::Result; /// # assert_eq!(points, cell.as_native().collect_vec().as_slice()); /// ``` /// -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct DataCell { /// Name of the component type used in this cell. // diff --git a/crates/re_log_types/src/data_row.rs b/crates/re_log_types/src/data_row.rs index 4d9b5635e7fa..96a9c438f897 100644 --- a/crates/re_log_types/src/data_row.rs +++ b/crates/re_log_types/src/data_row.rs @@ -1,10 +1,8 @@ use ahash::HashSetExt; -use itertools::Itertools as _; use nohash_hasher::IntSet; +use smallvec::SmallVec; -use crate::{ - Component, ComponentName, DataCell, DataCellError, DataTable, EntityPath, MsgId, TimePoint, -}; +use crate::{ComponentName, DataCell, DataCellError, DataTable, EntityPath, MsgId, TimePoint}; // --- @@ -31,10 +29,10 @@ pub enum DataRowError { component: ComponentName, }, - #[error("Error with one or more the underlying data cells")] + #[error("Error with one or more the underlying data cells: {0}")] DataCell(#[from] DataCellError), - #[error("Could not serialize/deserialize data to/from Arrow")] + #[error("Could not serialize/deserialize data to/from Arrow: {0}")] Arrow(#[from] arrow2::error::Error), // Needed to handle TryFrom -> T @@ -46,6 +44,49 @@ pub type DataRowResult = ::std::result::Result; // --- +type DataCellVec = SmallVec<[DataCell; 4]>; + +/// A row's worth of [`DataCell`]s: a collection of independent [`DataCell`]s with different +/// underlying datatypes and pointing to different parts of the heap. +/// +/// Each cell in the row corresponds to a different column of the same row. +#[derive(Debug, Clone, PartialEq)] +pub struct DataCellRow(pub DataCellVec); + +impl std::ops::Deref for DataCellRow { + type Target = [DataCell]; + + #[inline] + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::ops::DerefMut for DataCellRow { + #[inline] + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl std::ops::Index for DataCellRow { + type Output = DataCell; + + #[inline] + fn index(&self, index: usize) -> &Self::Output { + &self.0[index] + } +} + +impl std::ops::IndexMut for DataCellRow { + #[inline] + fn index_mut(&mut self, index: usize) -> &mut Self::Output { + &mut self.0[index] + } +} + +// --- + /// A row's worth of data, i.e. an event: a list of [`DataCell`]s associated with an auto-generated /// `RowId`, a user-specified [`TimePoint`] and [`EntityPath`], and an expected number of /// instances. @@ -100,7 +141,7 @@ pub type DataRowResult = ::std::result::Result; /// # let row_id = MsgId::ZERO; /// # let timepoint = [ /// # (Timeline::new_sequence("frame_nr"), 42.into()), // -/// # (Timeline::new_sequence("pouet"), 666.into()), // +/// # (Timeline::new_sequence("clock"), 666.into()), // /// # ]; /// # /// let num_instances = 2; @@ -139,7 +180,7 @@ pub struct DataRow { pub num_instances: u32, /// The actual cells (== columns, == components). - pub cells: Vec, + pub cells: DataCellRow, } impl DataRow { @@ -155,13 +196,13 @@ impl DataRow { num_instances: u32, cells: impl IntoIterator, ) -> DataRowResult { - let cells = cells.into_iter().collect_vec(); + let cells = DataCellRow(cells.into_iter().collect()); let entity_path = entity_path.into(); let timepoint = timepoint.into(); let mut components = IntSet::with_capacity(cells.len()); - for cell in &cells { + for cell in cells.iter() { let component = cell.component_name(); if !components.insert(component) { @@ -195,10 +236,12 @@ impl DataRow { // TODO(cmc): Since we don't yet support mixing splatted data within instanced rows, // we need to craft an array of `MsgId`s that matches the length of the other components. - // TODO(#1619): This goes away with batching & al + // TODO(#1619): This goes away once the store supports the new control columns + use crate::Component as _; if !components.contains(&MsgId::name()) { - this.cells.push(DataCell::from_native( - vec![row_id; this.num_instances() as _].iter(), + let num_instances = this.num_instances(); + this.cells.0.push(DataCell::from_native( + vec![row_id; num_instances as _].iter(), )); } @@ -222,9 +265,13 @@ impl DataRow { Self::try_from_cells(row_id, timepoint, entity_path, num_instances, cells).unwrap() } + /// Turns the `DataRow` into a single-row [`DataTable`] that carries the same ID. + /// + /// This only makes sense as part of our transition to batching. See #1619. + #[doc(hidden)] #[inline] - pub fn into_table(self, table_id: MsgId) -> DataTable { - DataTable::from_rows(table_id, [self]) + pub fn into_table(self) -> DataTable { + DataTable::from_rows(self.row_id, [self]) } } @@ -260,12 +307,12 @@ impl DataRow { } #[inline] - pub fn cells(&self) -> &[DataCell] { + pub fn cells(&self) -> &DataCellRow { &self.cells } #[inline] - pub fn into_cells(self) -> Vec { + pub fn into_cells(self) -> DataCellRow { self.cells } @@ -448,7 +495,10 @@ impl std::fmt::Display for DataRow { mod tests { use super::*; - use crate::component_types::{ColorRGBA, Label, Point2D}; + use crate::{ + component_types::{ColorRGBA, Label, Point2D}, + Component as _, + }; #[test] fn data_row_error_num_instances() { diff --git a/crates/re_log_types/src/data_table.rs b/crates/re_log_types/src/data_table.rs index 5cb606e5510f..6183c31f1a86 100644 --- a/crates/re_log_types/src/data_table.rs +++ b/crates/re_log_types/src/data_table.rs @@ -1,16 +1,30 @@ +use ahash::HashMap; use itertools::Itertools as _; use nohash_hasher::{IntMap, IntSet}; +use smallvec::SmallVec; -use crate::{ComponentName, DataCell, DataRow, DataRowError, EntityPath, MsgId, TimePoint}; +use crate::{ + ArrowMsg, ComponentName, DataCell, DataCellError, DataRow, DataRowError, EntityPath, MsgId, + TimePoint, +}; // --- #[derive(thiserror::Error, Debug)] pub enum DataTableError { - #[error("Error with one or more the underlying data rows")] + #[error("Trying to deserialize data that is missing a column present in the schema: {0:?}")] + MissingColumn(String), + + #[error("Trying to deserialize column data that doesn't contain any ListArrays: {0:?}")] + NotAColumn(String), + + #[error("Error with one or more the underlying data rows: {0}")] DataRow(#[from] DataRowError), - #[error("Could not serialize/deserialize component instances to/from Arrow")] + #[error("Error with one or more the underlying data cells: {0}")] + DataCell(#[from] DataCellError), + + #[error("Could not serialize/deserialize component instances to/from Arrow: {0}")] Arrow(#[from] arrow2::error::Error), // Needed to handle TryFrom -> T @@ -22,6 +36,53 @@ pub type DataTableResult = ::std::result::Result; // --- +type RowIdVec = SmallVec<[MsgId; 4]>; +type TimePointVec = SmallVec<[TimePoint; 4]>; +type EntityPathVec = SmallVec<[EntityPath; 4]>; +type NumInstancesVec = SmallVec<[u32; 4]>; +type DataCellOptVec = SmallVec<[Option; 4]>; + +/// A column's worth of [`DataCell`]s: a sparse collection of [`DataCell`]s that share the same +/// underlying type and likely point to shared, contiguous memory. +/// +/// Each cell in the column corresponds to a different row of the same column. +#[derive(Debug, Clone, PartialEq)] +pub struct DataCellColumn(pub DataCellOptVec); + +impl std::ops::Deref for DataCellColumn { + type Target = [Option]; + + #[inline] + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::ops::DerefMut for DataCellColumn { + #[inline] + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl std::ops::Index for DataCellColumn { + type Output = Option; + + #[inline] + fn index(&self, index: usize) -> &Self::Output { + &self.0[index] + } +} + +impl std::ops::IndexMut for DataCellColumn { + #[inline] + fn index_mut(&mut self, index: usize) -> &mut Self::Output { + &mut self.0[index] + } +} + +// --- + /// A sparse table's worth of data, i.e. a batch of events: a collection of [`DataRow`]s. /// This is the top-level layer in our data model. /// @@ -53,7 +114,44 @@ pub type DataTableResult = ::std::result::Result; /// ] /// ``` /// -/// TODO(#1619): practical demo once we have support for table serialization (next PR) +/// Consider this example: +/// ```ignore +/// let row0 = { +/// let num_instances = 2; +/// let points: &[Point2D] = &[[10.0, 10.0].into(), [20.0, 20.0].into()]; +/// let colors: &[_] = &[ColorRGBA::from_rgb(128, 128, 128)]; +/// let labels: &[Label] = &[]; +/// DataRow::from_cells3(MsgId::random(), "a", timepoint(1, 1), num_instances, (points, colors, labels)) +/// }; +/// let row1 = { +/// let num_instances = 0; +/// let colors: &[ColorRGBA] = &[]; +/// DataRow::from_cells1(MsgId::random(), "b", timepoint(1, 2), num_instances, colors) +/// }; +/// let row2 = { +/// let num_instances = 1; +/// let colors: &[_] = &[ColorRGBA::from_rgb(255, 255, 255)]; +/// let labels: &[_] = &[Label("hey".into())]; +/// DataRow::from_cells2(MsgId::random(), "c", timepoint(2, 1), num_instances, (colors, labels)) +/// }; +/// let table = DataTable::from_rows(table_id, [row0, row1, row2]); +/// ``` +/// +/// A table has no arrow representation nor datatype of its own, as it is merely a collection of +/// independent rows. +/// +/// The table above translates to the following, where each column is contiguous in memory: +/// ```text +/// ┌───────────────────────┬───────────────────────────────────┬────────────────────┬─────────────────────┬─────────────┬──────────────────────────────────┬─────────────────┐ +/// │ rerun.row_id ┆ rerun.timepoint ┆ rerun.entity_path ┆ rerun.num_instances ┆ rerun.label ┆ rerun.point2d ┆ rerun.colorrgba │ +/// ╞═══════════════════════╪═══════════════════════════════════╪════════════════════╪═════════════════════╪═════════════╪══════════════════════════════════╪═════════════════╡ +/// │ {167967218, 54449486} ┆ [{frame_nr, 1, 1}, {clock, 1, 1}] ┆ a ┆ 2 ┆ [] ┆ [{x: 10, y: 10}, {x: 20, y: 20}] ┆ [2155905279] │ +/// ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ +/// │ {167967218, 54449486} ┆ [{frame_nr, 1, 1}, {clock, 1, 2}] ┆ b ┆ 0 ┆ - ┆ - ┆ [] │ +/// ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ +/// │ {167967218, 54449486} ┆ [{frame_nr, 1, 2}, {clock, 1, 1}] ┆ c ┆ 1 ┆ [hey] ┆ - ┆ [4294967295] │ +/// └───────────────────────┴───────────────────────────────────┴────────────────────┴─────────────────────┴─────────────┴──────────────────────────────────┴─────────────────┘ +/// ``` /// /// ## Example /// @@ -65,14 +163,14 @@ pub type DataTableResult = ::std::result::Result; /// # /// # let table_id = MsgId::ZERO; // not used (yet) /// # -/// # let timepoint = |frame_nr: i64, pouet: i64| { +/// # let timepoint = |frame_nr: i64, clock: i64| { /// # TimePoint::from([ /// # (Timeline::new_sequence("frame_nr"), frame_nr.into()), -/// # (Timeline::new_sequence("pouet"), pouet.into()), +/// # (Timeline::new_sequence("clock"), clock.into()), /// # ]) /// # }; /// # -/// let row1 = { +/// let row0 = { /// let num_instances = 2; /// let points: &[Point2D] = &[[10.0, 10.0].into(), [20.0, 20.0].into()]; /// let colors: &[_] = &[ColorRGBA::from_rgb(128, 128, 128)]; @@ -87,16 +185,16 @@ pub type DataTableResult = ::std::result::Result; /// ) /// }; /// -/// let row2 = { +/// let row1 = { /// let num_instances = 0; /// let colors: &[ColorRGBA] = &[]; /// /// DataRow::from_cells1(MsgId::random(), "b", timepoint(1, 2), num_instances, colors) /// }; /// -/// let row3 = { +/// let row2 = { /// let num_instances = 1; -/// let colors: &[_] = &[ColorRGBA::from_rgb(128, 128, 128)]; +/// let colors: &[_] = &[ColorRGBA::from_rgb(255, 255, 255)]; /// let labels: &[_] = &[Label("hey".into())]; /// /// DataRow::from_cells2( @@ -108,11 +206,20 @@ pub type DataTableResult = ::std::result::Result; /// ) /// }; /// -/// let table = DataTable::from_rows(table_id, [row1, row2, row3]); -/// eprintln!("{table}"); +/// let table_in = DataTable::from_rows(table_id, [row0, row1, row2]); +/// eprintln!("Table in:\n{table_in}"); +/// +/// let (schema, columns) = table_in.serialize().unwrap(); +/// // eprintln!("{schema:#?}"); +/// eprintln!("Wired chunk:\n{columns:#?}"); +/// +/// let table_out = DataTable::deserialize(table_id, &schema, &columns).unwrap(); +/// eprintln!("Table out:\n{table_out}"); +/// # +/// # assert_eq!(table_in, table_out); /// ``` // TODO(#1619): introduce RowId & TableId -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct DataTable { /// Auto-generated `TUID`, uniquely identifying this batch of data and keeping track of the /// client's wall-clock. @@ -120,25 +227,37 @@ pub struct DataTable { pub table_id: MsgId, /// The entire column of `RowId`s. - pub row_id: Vec, + pub row_id: RowIdVec, /// The entire column of [`TimePoint`]s. - pub timepoint: Vec, + pub timepoint: TimePointVec, /// The entire column of [`EntityPath`]s. - pub entity_path: Vec, + pub entity_path: EntityPathVec, /// The entire column of `num_instances`. - pub num_instances: Vec, + pub num_instances: NumInstancesVec, /// All the rows for all the component columns. /// /// The cells are optional since not all rows will have data for every single component /// (i.e. the table is sparse). - pub table: IntMap>>, + pub columns: IntMap, } impl DataTable { + /// Creates a new empty table with the given ID. + pub fn new(table_id: MsgId) -> Self { + Self { + table_id, + row_id: Default::default(), + timepoint: Default::default(), + entity_path: Default::default(), + num_instances: Default::default(), + columns: Default::default(), + } + } + /// Builds a new `DataTable` from an iterable of [`DataRow`]s. pub fn from_rows(table_id: MsgId, rows: impl IntoIterator) -> Self { crate::profile_function!(); @@ -147,11 +266,12 @@ impl DataTable { // Explode all rows into columns, and keep track of which components are involved. let mut components = IntSet::default(); - let (row_id, timepoint, entity_path, num_instances, rows): ( - Vec<_>, - Vec<_>, - Vec<_>, - Vec<_>, + #[allow(clippy::type_complexity)] + let (row_id, timepoint, entity_path, num_instances, column): ( + RowIdVec, + TimePointVec, + EntityPathVec, + NumInstancesVec, Vec<_>, ) = rows .map(|row| { @@ -168,27 +288,36 @@ impl DataTable { .multiunzip(); // Pre-allocate all columns (one per component). - let mut table = IntMap::default(); + let mut columns = IntMap::default(); for component in components { - table.insert(component, vec![None; rows.len()]); + columns.insert( + component, + DataCellColumn(smallvec::smallvec![None; column.len()]), + ); } // Fill all columns (where possible: data is likely sparse). - for (i, row) in rows.into_iter().enumerate() { - for cell in row { + for (i, cells) in column.into_iter().enumerate() { + for cell in cells.0 { let component = cell.component_name(); // NOTE: unwrap cannot fail, all arrays pre-allocated above. - table.get_mut(&component).unwrap()[i] = Some(cell); + columns.get_mut(&component).unwrap()[i] = Some(cell); } } + if row_id.len() > 1 { + re_log::warn_once!( + "batching features are not ready for use, use single-row data tables instead!" + ); + } + Self { table_id, row_id, timepoint, entity_path, num_instances, - table, + columns, } } } @@ -198,16 +327,8 @@ impl DataTable { pub fn num_rows(&self) -> u32 { self.row_id.len() as _ } -} -// --- - -// TODO(#1619): Temporary stuff while we're transitioning away from ComponentBundle/MsgBundle and -// single-row payloads. Will go away asap. - -use crate::msg_bundle::MsgBundle; - -impl DataTable { + #[inline] pub fn as_rows(&self) -> impl ExactSizeIterator + '_ { let num_rows = self.num_rows() as usize; @@ -217,11 +338,11 @@ impl DataTable { timepoint, entity_path, num_instances, - table, + columns, } = self; (0..num_rows).map(move |i| { - let cells = table + let cells = columns .values() .filter_map(|rows| rows[i].clone() /* shallow */); @@ -235,53 +356,378 @@ impl DataTable { }) } - pub fn from_msg_bundle(msg_bundle: MsgBundle) -> Self { - let MsgBundle { - msg_id, - entity_path, - time_point, - cells, - } = msg_bundle; - - Self::from_rows( - MsgId::ZERO, // not used (yet) - [DataRow::from_cells( - msg_id, - time_point, - entity_path, - cells.first().map_or(0, |cell| cell.num_instances()), - cells, - )], - ) + /// Computes the maximum value for each and every timeline present across this entire table, + /// and returns the corresponding [`TimePoint`]. + #[inline] + pub fn timepoint_max(&self) -> TimePoint { + self.timepoint + .iter() + .fold(TimePoint::timeless(), |acc, tp| acc.union_max(tp)) + } +} + +// --- Serialization --- + +use arrow2::{ + array::{Array, ListArray}, + bitmap::Bitmap, + chunk::Chunk, + datatypes::{DataType, Field, Schema}, + offset::Offsets, +}; +use arrow2_convert::{ + deserialize::TryIntoCollection, field::ArrowField, serialize::ArrowSerialize, + serialize::TryIntoArrow, +}; + +// TODO(#1696): Those names should come from the datatypes themselves. + +pub const COLUMN_ROW_ID: &str = "rerun.row_id"; +pub const COLUMN_TIMEPOINT: &str = "rerun.timepoint"; +pub const COLUMN_ENTITY_PATH: &str = "rerun.entity_path"; +pub const COLUMN_NUM_INSTANCES: &str = "rerun.num_instances"; + +pub const METADATA_KIND: &str = "rerun.kind"; +pub const METADATA_KIND_DATA: &str = "data"; +pub const METADATA_KIND_CONTROL: &str = "control"; +pub const METADATA_TABLE_ID: &str = "rerun.table_id"; + +impl DataTable { + /// Serializes the entire table into an arrow payload and schema. + /// + /// A serialized `DataTable` contains two kinds of columns: control & data. + /// + /// * Control columns are those that drive the behavior of the storage systems. + /// They are always present, always dense, and always deserialized upon reception by the + /// server. + /// * Data columns are the one that hold component data. + /// They are optional, potentially sparse, and never deserialized on the server-side (not by + /// the storage systems, at least). + pub fn serialize(&self) -> DataTableResult<(Schema, Chunk>)> { + crate::profile_function!(); + + let mut schema = Schema::default(); + let mut columns = Vec::new(); + + { + let (control_schema, control_columns) = self.serialize_control_columns()?; + schema.fields.extend(control_schema.fields); + schema.metadata.extend(control_schema.metadata); + columns.extend(control_columns.into_iter()); + } + + { + let (data_schema, data_columns) = self.serialize_data_columns()?; + schema.fields.extend(data_schema.fields); + schema.metadata.extend(data_schema.metadata); + columns.extend(data_columns.into_iter()); + } + + Ok((schema, Chunk::new(columns))) } - pub fn into_msg_bundle(self) -> MsgBundle { - let mut rows = self.as_rows(); - assert!(rows.len() == 1, "must have 1 row, got {}", rows.len()); - let row = rows.next().unwrap(); + /// Serializes all controls columns into an arrow payload and schema. + /// + /// Control columns are those that drive the behavior of the storage systems. + /// They are always present, always dense, and always deserialized upon reception by the + /// server. + fn serialize_control_columns(&self) -> DataTableResult<(Schema, Vec>)> { + crate::profile_function!(); + + /// Serializes an iterable of dense arrow-like data. + fn serialize_dense_column + 'static>( + name: &str, + values: &[C], + ) -> DataTableResult<(Field, Box)> { + let data: Box = values.try_into_arrow()?; + // let data = unit_values_to_unit_lists(data); + + let mut field = Field::new(name, data.data_type().clone(), false).with_metadata( + [(METADATA_KIND.to_owned(), METADATA_KIND_CONTROL.to_owned())].into(), + ); + + // TODO(cmc): why do we have to do this manually on the way out, but it's done + // automatically on our behalf on the way in...? + if let DataType::Extension(name, _, _) = data.data_type() { + field + .metadata + .extend([("ARROW:extension:name".to_owned(), name.clone())]); + } + + Ok((field, data)) + } + + /// Transforms an array of unit values into a list of unit arrays. + /// + /// * Before: `[C, C, C, C, C, ...]` + /// * After: `ListArray[ [C], [C], [C], [C], [C], ... ]` + // NOTE: keeping that one around, just in case. + #[allow(dead_code)] + fn unit_values_to_unit_lists(array: Box) -> Box { + let datatype = array.data_type().clone(); + let datatype = ListArray::::default_datatype(datatype); + let offsets = Offsets::try_from_lengths(std::iter::repeat(1).take(array.len())) + .unwrap() + .into(); + let validity = None; + ListArray::::new(datatype, offsets, array, validity).boxed() + } - let DataRow { + let Self { + table_id, row_id, timepoint, entity_path, + num_instances, + columns: _, + } = self; + + let mut schema = Schema::default(); + let mut columns = Vec::new(); + + let (row_id_field, row_id_column) = serialize_dense_column(COLUMN_ROW_ID, row_id)?; + schema.fields.push(row_id_field); + columns.push(row_id_column); + + let (timepoint_field, timepoint_column) = + serialize_dense_column(COLUMN_TIMEPOINT, timepoint)?; + schema.fields.push(timepoint_field); + columns.push(timepoint_column); + + let (entity_path_field, entity_path_column) = + serialize_dense_column(COLUMN_ENTITY_PATH, entity_path)?; + schema.fields.push(entity_path_field); + columns.push(entity_path_column); + + // TODO(#1712): This is unnecessarily slow... + let (num_instances_field, num_instances_column) = + serialize_dense_column(COLUMN_NUM_INSTANCES, num_instances)?; + schema.fields.push(num_instances_field); + columns.push(num_instances_column); + + schema.metadata = [(METADATA_TABLE_ID.into(), table_id.to_string())].into(); + + Ok((schema, columns)) + } + + /// Serializes all data columns into an arrow payload and schema. + /// + /// They are optional, potentially sparse, and never deserialized on the server-side (not by + /// the storage systems, at least). + fn serialize_data_columns(&self) -> DataTableResult<(Schema, Vec>)> { + crate::profile_function!(); + + let Self { + table_id: _, + row_id: _, + timepoint: _, + entity_path: _, num_instances: _, - cells, - } = row; + columns: table, + } = self; - let table_id = row_id; // ! + let mut schema = Schema::default(); + let mut columns = Vec::new(); + + fn serialize_sparse_column( + name: &str, + column: &[Option], + ) -> DataTableResult<(Field, Box)> { + // TODO(cmc): All we're doing here is allocating and filling a nice contiguous array so + // our `ListArray`s can compute their indices and for the serializer to work with... + // In a far enough future, we could imagine having much finer grain control over the + // serializer and doing all of this at once, bypassing all the mem copies and + // allocations. + + let cell_refs = column + .iter() + .flatten() + .map(|cell| cell.as_arrow_ref()) + .collect_vec(); + + // NOTE: Avoid paying for the cost of the concatenation machinery if there's a single + // row in the column. + let data = if cell_refs.len() == 1 { + data_to_lists(column, cell_refs[0].to_boxed()) + } else { + // NOTE: This is a column of cells, it shouldn't ever fail to concatenate since + // they share the same underlying type. + let data = arrow2::compute::concatenate::concatenate(cell_refs.as_slice())?; + data_to_lists(column, data) + }; + + let field = Field::new(name, data.data_type().clone(), false) + .with_metadata([(METADATA_KIND.to_owned(), METADATA_KIND_DATA.to_owned())].into()); + + Ok((field, data)) + } + + /// Create a list-array out of a flattened array of cell values. + /// + /// * Before: `[C, C, C, C, C, C, C, ...]` + /// * After: `ListArray[ [[C, C], [C, C, C], None, [C], [C], ...] ]` + fn data_to_lists(column: &[Option], data: Box) -> Box { + let datatype = data.data_type().clone(); + + let datatype = ListArray::::default_datatype(datatype); + let offsets = Offsets::try_from_lengths(column.iter().map(|cell| { + cell.as_ref() + .map_or(0, |cell| cell.num_instances() as usize) + })) + // NOTE: cannot fail, `data` has as many instances as `column` + .unwrap() + .into(); + + #[allow(clippy::from_iter_instead_of_collect)] + let validity = Bitmap::from_iter(column.iter().map(|cell| cell.is_some())); + + ListArray::::new(datatype, offsets, data, validity.into()).boxed() + } + + for (component, rows) in table { + let (field, column) = serialize_sparse_column(component.as_str(), rows)?; + schema.fields.push(field); + columns.push(column); + } + + Ok((schema, columns)) + } +} + +impl DataTable { + /// Deserializes an entire table from an arrow payload and schema. + pub fn deserialize( + table_id: MsgId, + schema: &Schema, + chunk: &Chunk>, + ) -> DataTableResult { + crate::profile_function!(); + + let control_indices: HashMap<&str, usize> = schema + .fields + .iter() + .enumerate() + .filter_map(|(i, field)| { + field.metadata.get(METADATA_KIND).and_then(|kind| { + (kind == METADATA_KIND_CONTROL).then_some((field.name.as_str(), i)) + }) + }) + .collect(); + let control_index = move |name: &str| { + control_indices + .get(name) + .copied() + .ok_or(DataTableError::MissingColumn(name.into())) + }; + + // NOTE: the unwrappings cannot fail since control_index() makes sure the index is valid + let row_id = + (&**chunk.get(control_index(COLUMN_ROW_ID)?).unwrap()).try_into_collection()?; + let timepoint = + (&**chunk.get(control_index(COLUMN_TIMEPOINT)?).unwrap()).try_into_collection()?; + let entity_path = + (&**chunk.get(control_index(COLUMN_ENTITY_PATH)?).unwrap()).try_into_collection()?; + // TODO(#1712): This is unnecessarily slow... + let num_instances = + (&**chunk.get(control_index(COLUMN_NUM_INSTANCES)?).unwrap()).try_into_collection()?; + + let columns: DataTableResult<_> = schema + .fields + .iter() + .enumerate() + .filter_map(|(i, field)| { + field.metadata.get(METADATA_KIND).and_then(|kind| { + (kind == METADATA_KIND_DATA).then_some((field.name.as_str(), i)) + }) + }) + .map(|(name, index)| { + let component: ComponentName = name.into(); + chunk + .get(index) + .ok_or(DataTableError::MissingColumn(name.to_owned())) + .and_then(|column| { + Self::deserialize_data_column(component, &**column) + .map(|data| (component, data)) + }) + }) + .collect(); + let columns = columns?; + + Ok(Self { + table_id, + row_id, + timepoint, + entity_path, + num_instances, + columns, + }) + } + + /// Deserializes a sparse data column. + fn deserialize_data_column( + component: ComponentName, + column: &dyn Array, + ) -> DataTableResult { + Ok(DataCellColumn( + column + .as_any() + .downcast_ref::>() + .ok_or(DataTableError::NotAColumn(component.to_string()))? + .iter() + .map(|array| array.map(|values| DataCell::from_arrow(component, values))) + .collect(), + )) + } +} + +// --- + +impl TryFrom<&ArrowMsg> for DataTable { + type Error = DataTableError; + + #[inline] + fn try_from(msg: &ArrowMsg) -> DataTableResult { + let ArrowMsg { + table_id, + timepoint_max: _, + schema, + chunk, + } = msg; + + Self::deserialize(*table_id, schema, chunk) + } +} - MsgBundle::new(table_id, entity_path, timepoint, cells) +impl TryFrom<&DataTable> for ArrowMsg { + type Error = DataTableError; + + #[inline] + fn try_from(table: &DataTable) -> DataTableResult { + let timepoint_max = table.timepoint_max(); + let (schema, chunk) = table.serialize()?; + + Ok(ArrowMsg { + table_id: table.table_id, + timepoint_max, + schema, + chunk, + }) } } // --- -// TODO(#1619): real display impl once we have serialization support impl std::fmt::Display for DataTable { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - for row in self.as_rows() { - writeln!(f, "{row}")?; - } - Ok(()) + let (schema, columns) = self.serialize().map_err(|err| { + re_log::error_once!("couldn't display data table: {err}"); + std::fmt::Error + })?; + writeln!(f, "DataTable({}):", self.table_id)?; + re_format::arrow::format_table( + columns.columns(), + schema.fields.iter().map(|field| field.name.as_str()), + ) + .fmt(f) } } diff --git a/crates/re_log_types/src/lib.rs b/crates/re_log_types/src/lib.rs index 0b6f1f83e3b6..61b0aa817f67 100644 --- a/crates/re_log_types/src/lib.rs +++ b/crates/re_log_types/src/lib.rs @@ -19,7 +19,6 @@ mod data_row; mod data_table; pub mod hash; mod index; -pub mod msg_bundle; pub mod path; mod time; pub mod time_point; @@ -188,8 +187,11 @@ impl LogMsg { match self { Self::BeginRecordingMsg(msg) => msg.msg_id, Self::EntityPathOpMsg(msg) => msg.msg_id, - Self::ArrowMsg(msg) => msg.msg_id, Self::Goodbye(msg_id) => *msg_id, + // TODO(#1619): the following only makes sense because, while we support sending and + // receiving batches, we don't actually do so yet. + // We need to stop storing raw `LogMsg`s before we can benefit from our batching. + Self::ArrowMsg(msg) => msg.table_id, } } } diff --git a/crates/re_log_types/src/msg_bundle.rs b/crates/re_log_types/src/msg_bundle.rs deleted file mode 100644 index 1d0b8b81c90a..000000000000 --- a/crates/re_log_types/src/msg_bundle.rs +++ /dev/null @@ -1,314 +0,0 @@ -//! Structs and functions used for framing and de-framing a Rerun log message in Arrow. -//! -//! An example main message (outer) schema: -//! ```text -//! +---------------------------------------------+-----------------------------------------------------+ -//! | timelines | components | -//! +---------------------------------------------+-----------------------------------------------------+ -//! | [{timeline: log_time, type: 0, time: 1234}] | {rect: [{x: 0, y: 0, w: 0, h: 0}], color_rgba: [0]} | -//! +---------------------------------------------+-----------------------------------------------------+ -//! ``` -//! -//! The outer schema has precisely 2 columns: `timelines`, `components` -//! (TODO(john) do we want to add `MsgId`?) -//! -//! The `timelines` schema is *fixed* and is defined by the [`ArrowField`] implementation on -//! [`TimePoint`]. -//! -//! The `components` schema is semi-flexible: it should be a [`StructArray`] with one column per -//! component. Each component schema is defined in [`crate::component_types`]. - -use std::collections::BTreeMap; - -use arrow2::{ - array::{Array, ListArray, StructArray}, - chunk::Chunk, - datatypes::{DataType, Field, Schema}, -}; -use arrow2_convert::{field::ArrowField, serialize::TryIntoArrow}; - -use crate::{ - parse_entity_path, ArrowMsg, ComponentName, DataCell, DataCellError, EntityPath, MsgId, - PathParseError, TimePoint, -}; - -// --- - -// TODO(cmc): can probably make that one pub(crate) already -/// The errors that can occur when trying to convert between Arrow and `MessageBundle` types -#[derive(thiserror::Error, Debug)] -pub enum MsgBundleError { - #[error("Could not find entity path in Arrow Schema")] - MissingEntityPath, - - #[error("Expect top-level `timelines` field`")] - MissingTimelinesField, - - #[error("Expect top-level `components` field`")] - MissingComponentsField, - - #[error("No rows in timelines")] - NoRowsInTimeline, - - #[error("Expected component values to be `StructArray`s")] - BadComponentValues, - - #[error("Expect a single TimePoint, but found more than one")] - MultipleTimepoints, - - #[error(transparent)] - PathParseError(#[from] PathParseError), - - #[error("Could not serialize components to Arrow")] - ArrowSerializationError(#[from] arrow2::error::Error), - - #[error("Error with one or more the underlying data cells")] - DataCell(#[from] DataCellError), - - // Needed to handle TryFrom -> T - #[error("Infallible")] - Unreachable(#[from] std::convert::Infallible), -} - -pub type Result = std::result::Result; - -// --- - -//TODO(john) get rid of this eventually -const ENTITY_PATH_KEY: &str = "RERUN:entity_path"; - -const COL_COMPONENTS: &str = "components"; -const COL_TIMELINES: &str = "timelines"; - -// --- - -/// A `MsgBundle` holds data necessary for composing a single log message. -#[derive(Clone, Debug)] -pub struct MsgBundle { - /// A unique id per [`crate::LogMsg`]. - pub msg_id: MsgId, - pub entity_path: EntityPath, - pub time_point: TimePoint, - pub cells: Vec, -} - -impl MsgBundle { - /// Create a new `MsgBundle` with a pre-built Vec of [`DataCell`] components. - /// - /// The `MsgId` will automatically be appended as a component to the given `bundles`, allowing - /// the backend to keep track of the origin of any row of data. - pub(crate) fn new( - msg_id: MsgId, - entity_path: EntityPath, - time_point: TimePoint, - cells: Vec, - ) -> Self { - Self { - msg_id, - entity_path, - time_point, - cells, - } - } - - /// Returns the number of component collections in this bundle, i.e. the length of the bundle - /// itself. - #[inline] - pub fn num_components(&self) -> usize { - self.cells.len() - } - - /// Returns the number of _instances_ for a given `row` in the bundle, i.e. the length of a - /// specific row within the bundle. - /// - /// Since we don't yet support batch insertions and all components within a single row must - /// have the same number of instances, we simply pick the value for the first component - /// collection. - #[inline] - pub fn num_instances(&self) -> usize { - self.cells - .first() - .map_or(0, |cell| cell.num_instances() as _) - } - - /// Returns the index of `component` in the bundle, if it exists. - /// - /// This is `O(n)`. - #[inline] - pub fn find_component(&self, component: &ComponentName) -> Option { - self.cells - .iter() - .map(|cell| cell.component_name()) - .position(|name| name == *component) - } -} - -impl std::fmt::Display for MsgBundle { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let values = self.cells.iter().map(|cell| cell.as_arrow_ref()); - let names = self.cells.iter().map(|cell| cell.component_name().as_str()); - let table = re_format::arrow::format_table(values, names); - f.write_fmt(format_args!( - "MsgBundle '{}' @ {:?}:\n{table}", - self.entity_path, self.time_point - )) - } -} - -/// Pack the passed iterator of [`DataCell`] into a `(Schema, StructArray)` tuple. -#[inline] -fn pack_components(cells: impl Iterator) -> (Schema, StructArray) { - let (component_fields, component_cols): (Vec, Vec>) = cells - .map(|cell| { - // NOTE: wrap in a ListArray to emulate the presence of rows, this'll go away with - // batching. - let data = cell.as_arrow_monolist(); - ( - Field::new( - cell.component_name().as_str(), - data.data_type().clone(), - false, - ), - data, - ) - }) - .unzip(); - - let data_type = DataType::Struct(component_fields); - let packed = StructArray::new(data_type, component_cols, None); - - let schema = Schema { - fields: [ - Field::new(COL_COMPONENTS, packed.data_type().clone(), false), // - ] - .to_vec(), - ..Default::default() - }; - - (schema, packed) -} - -impl TryFrom<&ArrowMsg> for MsgBundle { - type Error = MsgBundleError; - - /// Extract a `MsgBundle` from an `ArrowMsg`. - fn try_from(msg: &ArrowMsg) -> Result { - let ArrowMsg { - msg_id, - schema, - chunk, - } = msg; - - let entity_path_cmp = schema - .metadata - .get(ENTITY_PATH_KEY) - .ok_or(MsgBundleError::MissingEntityPath) - .and_then(|path| { - parse_entity_path(path.as_str()).map_err(MsgBundleError::PathParseError) - })?; - - let time_point = extract_timelines(schema, chunk)?; - let components = extract_components(schema, chunk)?; - - Ok(Self { - msg_id: *msg_id, - entity_path: entity_path_cmp.into(), - time_point, - cells: components, - }) - } -} - -impl TryFrom for ArrowMsg { - type Error = MsgBundleError; - - /// Build a single Arrow log message tuple from this `MsgBundle`. See the documentation on - /// [`MsgBundle`] for details. - fn try_from(bundle: MsgBundle) -> Result { - let mut schema = Schema::default(); - let mut cols: Vec> = Vec::new(); - - schema.metadata = - BTreeMap::from([(ENTITY_PATH_KEY.into(), bundle.entity_path.to_string())]); - - // Build & pack timelines - let timelines_field = Field::new(COL_TIMELINES, TimePoint::data_type(), false); - let timelines_col = [bundle.time_point].try_into_arrow()?; - - schema.fields.push(timelines_field); - cols.push(timelines_col); - - // Build & pack components - let (components_schema, components_data) = pack_components(bundle.cells.into_iter()); - - schema.fields.extend(components_schema.fields); - schema.metadata.extend(components_schema.metadata); - cols.push(components_data.boxed()); - - Ok(ArrowMsg { - msg_id: bundle.msg_id, - schema, - chunk: Chunk::new(cols), - }) - } -} - -/// Extract a [`TimePoint`] from the "timelines" column. This function finds the "timelines" field -/// in `chunk` and deserializes the values into a `TimePoint` using the -/// [`arrow2_convert::deserialize::ArrowDeserialize`] trait. -pub fn extract_timelines(schema: &Schema, chunk: &Chunk>) -> Result { - use arrow2_convert::deserialize::arrow_array_deserialize_iterator; - - let timelines = schema - .fields - .iter() - .position(|f| f.name == COL_TIMELINES) - .and_then(|idx| chunk.columns().get(idx)) - .ok_or(MsgBundleError::MissingTimelinesField)?; - - let mut timepoints_iter = arrow_array_deserialize_iterator::(timelines.as_ref())?; - - // We take only the first result of the iterator because at this time we only support *single* - // row messages. At some point in the future we can support batching with this. - let timepoint = timepoints_iter - .next() - .ok_or(MsgBundleError::NoRowsInTimeline)?; - - if timepoints_iter.next().is_some() { - return Err(MsgBundleError::MultipleTimepoints); - } - - Ok(timepoint) -} - -/// Extract a vector of `DataCell` from the message. This is necessary since the -/// "components" schema is flexible. -fn extract_components(schema: &Schema, msg: &Chunk>) -> Result> { - let components = schema - .fields - .iter() - .position(|f| f.name == COL_COMPONENTS) - .and_then(|idx| msg.columns().get(idx)) - .ok_or(MsgBundleError::MissingComponentsField)?; - - let components = components - .as_any() - .downcast_ref::() - .ok_or(MsgBundleError::BadComponentValues)?; - - Ok(components - .fields() - .iter() - .zip(components.values()) - .map(|(field, component)| { - // NOTE: unwrap the ListArray layer that we added during packing in order to emulate - // the presence of rows, this'll go away with batching. - let component = component - .as_any() - .downcast_ref::>() - .unwrap() - .values(); - DataCell::from_arrow(ComponentName::from(field.name.as_str()), component.clone()) - }) - .collect()) -} diff --git a/crates/re_log_types/src/path/entity_path.rs b/crates/re_log_types/src/path/entity_path.rs index 421d2c7c55db..657141df634b 100644 --- a/crates/re_log_types/src/path/entity_path.rs +++ b/crates/re_log_types/src/path/entity_path.rs @@ -63,6 +63,16 @@ impl std::fmt::Debug for EntityPathHash { /// Cheap to clone. /// /// Implements [`nohash_hasher::IsEnabled`]. +/// +/// ``` +/// # use re_log_types::EntityPath; +/// # use arrow2_convert::field::ArrowField; +/// # use arrow2::datatypes::{DataType, Field}; +/// assert_eq!( +/// EntityPath::data_type(), +/// DataType::Extension("rerun.entity_path".into(), Box::new(DataType::Utf8), None), +/// ); +/// ``` #[derive(Clone, Eq)] pub struct EntityPath { /// precomputed hash @@ -198,6 +208,60 @@ impl From for String { // ---------------------------------------------------------------------------- +use arrow2::{ + array::{MutableUtf8ValuesArray, TryPush, Utf8Array}, + datatypes::DataType, + offset::Offsets, +}; +use arrow2_convert::{deserialize::ArrowDeserialize, field::ArrowField, serialize::ArrowSerialize}; + +arrow2_convert::arrow_enable_vec_for_type!(EntityPath); + +impl ArrowField for EntityPath { + type Type = Self; + + #[inline] + fn data_type() -> DataType { + DataType::Extension( + "rerun.entity_path".to_owned(), + Box::new(DataType::Utf8), + None, + ) + } +} + +impl ArrowSerialize for EntityPath { + type MutableArrayType = MutableUtf8ValuesArray; + + #[inline] + fn new_array() -> Self::MutableArrayType { + MutableUtf8ValuesArray::::try_new( + ::data_type(), + Offsets::new(), + Vec::::new(), + ) + .unwrap() // literally cannot fail + } + + fn arrow_serialize( + v: &::Type, + array: &mut Self::MutableArrayType, + ) -> arrow2::error::Result<()> { + array.try_push(v.to_string()) + } +} + +impl ArrowDeserialize for EntityPath { + type ArrayType = Utf8Array; + + #[inline] + fn arrow_deserialize(v: Option<&str>) -> Option { + v.map(Into::into) + } +} + +// ---------------------------------------------------------------------------- + #[cfg(feature = "serde")] impl serde::Serialize for EntityPath { #[inline] diff --git a/crates/re_log_types/src/time_point/mod.rs b/crates/re_log_types/src/time_point/mod.rs index 5ea4ea8715e6..bec628a8c954 100644 --- a/crates/re_log_types/src/time_point/mod.rs +++ b/crates/re_log_types/src/time_point/mod.rs @@ -65,6 +65,23 @@ impl TimePoint { pub fn iter(&self) -> impl ExactSizeIterator { self.0.iter() } + + /// Computes the union of two `TimePoint`s, keeping the maximum time value in case of + /// conflicts. + pub fn union_max(mut self, rhs: &Self) -> Self { + for (&timeline, &time) in rhs { + match self.0.entry(timeline) { + btree_map::Entry::Vacant(entry) => { + entry.insert(time); + } + btree_map::Entry::Occupied(mut entry) => { + let entry = entry.get_mut(); + *entry = TimeInt::max(*entry, time); + } + } + } + self + } } // ---------------------------------------------------------------------------- diff --git a/crates/re_sdk/src/lib.rs b/crates/re_sdk/src/lib.rs index 75a4cbc17d85..03d329a6435e 100644 --- a/crates/re_sdk/src/lib.rs +++ b/crates/re_sdk/src/lib.rs @@ -53,7 +53,7 @@ pub mod sink { /// Things directly related to logging. pub mod log { - pub use re_log_types::{msg_bundle::MsgBundle, DataCell, LogMsg, MsgId, PathOp}; + pub use re_log_types::{DataCell, DataRow, DataTable, LogMsg, MsgId, PathOp}; } /// Time-related types. diff --git a/crates/re_sdk/src/msg_sender.rs b/crates/re_sdk/src/msg_sender.rs index 5213a6ba457a..790a06e71bd4 100644 --- a/crates/re_sdk/src/msg_sender.rs +++ b/crates/re_sdk/src/msg_sender.rs @@ -1,37 +1,21 @@ -use re_log_types::{component_types::InstanceKey, msg_bundle::MsgBundleError, DataRow, DataTable}; - -use nohash_hasher::IntMap; +use re_log_types::{component_types::InstanceKey, DataRow, DataTableError}; use crate::{ components::Transform, - log::{DataCell, LogMsg, MsgBundle, MsgId}, + log::{DataCell, LogMsg, MsgId}, sink::LogSink, time::{Time, TimeInt, TimePoint, Timeline}, - Component, ComponentName, EntityPath, SerializableComponent, + Component, EntityPath, SerializableComponent, }; +// TODO(#1619): Rust SDK batching + // --- /// Errors that can occur when constructing or sending messages /// using [`MsgSender`]. #[derive(thiserror::Error, Debug)] pub enum MsgSenderError { - /// The same component were put in the same log message multiple times. - /// E.g. `with_component()` was called multiple times for `Point3D`. - /// We don't support that yet. - #[error( - "All component collections must have exactly one row (i.e. no batching), got {0:?} instead. Perhaps with_component() was called multiple times with the same component type?" - )] - MoreThanOneRow(Vec<(ComponentName, usize)>), - - /// Some components had more or less instances than some other. - /// For example, there were `10` positions and `8` colors. - #[error( - "All component collections must share the same number of instances (i.e. row length) \ - for a given row, got {0:?} instead" - )] - MismatchedRowLengths(Vec<(ComponentName, u32)>), - /// Instance keys cannot be splatted #[error("Instance keys cannot be splatted")] SplattedInstanceKeys, @@ -40,9 +24,9 @@ pub enum MsgSenderError { #[error("InstanceKey(u64::MAX) is reserved for Rerun internals")] IllegalInstanceKey, - /// A message during packing. See [`MsgBundleError`]. + /// A message during packing. See [`DataTableError`]. #[error(transparent)] - PackingError(#[from] MsgBundleError), + PackingError(#[from] DataTableError), } /// Facilitates building and sending component payloads with the Rerun SDK. @@ -64,7 +48,7 @@ pub enum MsgSenderError { /// .map_err(Into::into) /// } /// ``` -// TODO(#1619): this should embed a DataTable soon. +// TODO(#1619): this whole thing needs to be rethought to incorporate batching and datatables. pub struct MsgSender { // TODO(cmc): At the moment, a `MsgBundle` can only contain data for a single entity, so // this must be known as soon as we spawn the builder. @@ -186,7 +170,7 @@ impl MsgSender { mut self, data: impl IntoIterator, ) -> Result { - let cell = DataCell::try_from_native(data).map_err(MsgBundleError::from)?; + let cell = DataCell::try_from_native(data).map_err(DataTableError::from)?; let num_instances = cell.num_instances(); @@ -196,17 +180,6 @@ impl MsgSender { self.num_instances = Some(num_instances); } - // Detect mismatched row-lengths early on... unless it's a Transform cell: transforms - // behave differently and will be sent in their own message! - if C::name() != Transform::name() && self.num_instances.unwrap() != num_instances { - let collections = self - .instanced - .into_iter() - .map(|cell| (cell.component_name(), cell.num_instances())) - .collect(); - return Err(MsgSenderError::MismatchedRowLengths(collections)); - } - // TODO(cmc): if this is an InstanceKey and it contains u64::MAX, fire IllegalInstanceKey. self.instanced.push(cell); @@ -233,7 +206,7 @@ impl MsgSender { } self.splatted - .push(DataCell::try_from_native(&[data]).map_err(MsgBundleError::from)?); + .push(DataCell::try_from_native(&[data]).map_err(DataTableError::from)?); Ok(self) } @@ -256,35 +229,35 @@ impl MsgSender { /// Consumes, packs, sanity checks and finally sends the message to the currently configured /// target of the SDK. - pub fn send(self, sink: &impl std::borrow::Borrow) -> Result<(), MsgSenderError> { + pub fn send(self, sink: &impl std::borrow::Borrow) -> Result<(), DataTableError> { self.send_to_sink(sink.borrow()) } /// Consumes, packs, sanity checks and finally sends the message to the currently configured /// target of the SDK. - fn send_to_sink(self, sink: &dyn LogSink) -> Result<(), MsgSenderError> { + fn send_to_sink(self, sink: &dyn LogSink) -> Result<(), DataTableError> { if !sink.is_enabled() { return Ok(()); // silently drop the message } - let [msg_standard, msg_transforms, msg_splats] = self.into_messages()?; + let [row_standard, row_transforms, row_splats] = self.into_rows(); - if let Some(msg_transforms) = msg_transforms { - sink.send(LogMsg::ArrowMsg(msg_transforms.try_into()?)); + if let Some(row_transforms) = row_transforms { + sink.send(LogMsg::ArrowMsg((&row_transforms.into_table()).try_into()?)); } - if let Some(msg_splats) = msg_splats { - sink.send(LogMsg::ArrowMsg(msg_splats.try_into()?)); + if let Some(row_splats) = row_splats { + sink.send(LogMsg::ArrowMsg((&row_splats.into_table()).try_into()?)); } - // Always the primary component last so range-based queries will include the other data. See(#1215) - // Since the primary component can't be splatted it must be in msg_standard - if let Some(msg_standard) = msg_standard { - sink.send(LogMsg::ArrowMsg(msg_standard.try_into()?)); + // Always the primary component last so range-based queries will include the other data. + // Since the primary component can't be splatted it must be in msg_standard, see(#1215). + if let Some(row_standard) = row_standard { + sink.send(LogMsg::ArrowMsg((&row_standard.into_table()).try_into()?)); } Ok(()) } - fn into_messages(self) -> Result<[Option; 3], MsgSenderError> { + fn into_rows(self) -> [Option; 3] { let Self { entity_path, timepoint, @@ -319,26 +292,6 @@ impl MsgSender { .collect(); debug_assert!(all_cells.into_iter().all(|cell| cell.is_none())); - // TODO(cmc): The sanity checks we do in here can (and probably should) be done in - // `MsgBundle` instead so that the python SDK benefits from them too... but one step at a - // time. - // TODO(#1619): All of this disappears once DataRow lands. - - // sanity check: no row-level batching - let mut rows_per_comptype: IntMap = IntMap::default(); - for cell in standard_cells - .iter() - .chain(&transform_cells) - .chain(&splatted) - { - *rows_per_comptype.entry(cell.component_name()).or_default() += 1; - } - if rows_per_comptype.values().any(|num_rows| *num_rows > 1) { - return Err(MsgSenderError::MoreThanOneRow( - rows_per_comptype.into_iter().collect(), - )); - } - // sanity check: transforms can't handle multiple instances let num_transform_instances = transform_cells .get(0) @@ -347,56 +300,38 @@ impl MsgSender { re_log::warn!("detected Transform component with multiple instances"); } - let mut msgs = [(); 3].map(|_| None); + let mut rows = [(); 3].map(|_| None); // Standard - msgs[0] = (!standard_cells.is_empty()).then(|| { - DataTable::from_rows( - MsgId::ZERO, // not used (yet) - [DataRow::from_cells( - MsgId::random(), - timepoint.clone(), - entity_path.clone(), - num_instances.unwrap_or(0), - standard_cells, - )], + rows[0] = (!standard_cells.is_empty()).then(|| { + DataRow::from_cells( + MsgId::random(), + timepoint.clone(), + entity_path.clone(), + num_instances.unwrap_or(0), + standard_cells, ) - .into_msg_bundle() }); // Transforms - msgs[1] = (!transform_cells.is_empty()).then(|| { - DataTable::from_rows( - MsgId::ZERO, // not used (yet) - [DataRow::from_cells( - MsgId::random(), - timepoint.clone(), - entity_path.clone(), - num_transform_instances, - transform_cells, - )], + rows[1] = (!transform_cells.is_empty()).then(|| { + DataRow::from_cells( + MsgId::random(), + timepoint.clone(), + entity_path.clone(), + num_transform_instances, + transform_cells, ) - .into_msg_bundle() }); // Splats // TODO(cmc): unsplit splats once new data cells are in - msgs[2] = (!splatted.is_empty()).then(|| { + rows[2] = (!splatted.is_empty()).then(|| { splatted.push(DataCell::from_native(&[InstanceKey::SPLAT])); - DataTable::from_rows( - MsgId::ZERO, // not used (yet) - [DataRow::from_cells( - MsgId::random(), - timepoint, - entity_path, - 1, - splatted, - )], - ) - .into_msg_bundle() + DataRow::from_cells(MsgId::random(), timepoint, entity_path, 1, splatted) }); - Ok(msgs) + rows } } @@ -408,7 +343,7 @@ mod tests { #[test] fn empty() { - let [standard, transforms, splats] = MsgSender::new("some/path").into_messages().unwrap(); + let [standard, transforms, splats] = MsgSender::new("some/path").into_rows(); assert!(standard.is_none()); assert!(transforms.is_none()); assert!(splats.is_none()); @@ -427,12 +362,11 @@ mod tests { .with_component(&labels)? .with_component(&transform)? .with_splat(color)? - .into_messages() - .unwrap(); + .into_rows(); { let standard = standard.unwrap(); - let idx = standard.find_component(&components::Label::name()).unwrap(); + let idx = standard.find_cell(&components::Label::name()).unwrap(); let cell = &standard.cells[idx]; assert!(cell.num_instances() == 2); } @@ -440,7 +374,7 @@ mod tests { { let transforms = transforms.unwrap(); let idx = transforms - .find_component(&components::Transform::name()) + .find_cell(&components::Transform::name()) .unwrap(); let cell = &transforms.cells[idx]; assert!(cell.num_instances() == 1); @@ -448,9 +382,7 @@ mod tests { { let splats = splats.unwrap(); - let idx = splats - .find_component(&components::ColorRGBA::name()) - .unwrap(); + let idx = splats.find_cell(&components::ColorRGBA::name()).unwrap(); let cell = &splats.cells[idx]; assert!(cell.num_instances() == 1); } @@ -477,25 +409,12 @@ mod tests { let sender = MsgSender::new("some/path") .with_timeless(true) - .with_component(&vec![components::Label("label1".into())])? + .with_component([components::Label("label1".into())].as_slice())? .with_time(my_timeline, 2); assert!(!sender.timepoint.is_empty()); // not yet - let [standard, _, _] = sender.into_messages().unwrap(); - assert!(standard.unwrap().time_point.is_empty()); - - Ok(()) - } - - #[test] - fn attempted_batch() -> Result<(), MsgSenderError> { - let res = MsgSender::new("some/path") - .with_component(&vec![components::Label("label1".into())])? - .with_component(&vec![components::Label("label2".into())])? - .into_messages(); - - let Err(MsgSenderError::MoreThanOneRow(err)) = res else { panic!() }; - assert_eq!([(components::Label::name(), 2)].to_vec(), err); + let [standard, _, _] = sender.into_rows(); + assert!(standard.unwrap().timepoint.is_empty()); Ok(()) } @@ -503,9 +422,9 @@ mod tests { #[test] fn illegal_instance_key() -> Result<(), MsgSenderError> { let _ = MsgSender::new("some/path") - .with_component(&vec![components::Label("label1".into())])? - .with_component(&vec![components::InstanceKey(u64::MAX)])? - .into_messages()?; + .with_component([components::Label("label1".into())].as_slice())? + .with_component([components::InstanceKey(u64::MAX)].as_slice())? + .into_rows(); // TODO(cmc): This is not detected as of today, but it probably should. @@ -515,7 +434,7 @@ mod tests { #[test] fn splatted_instance_key() -> Result<(), MsgSenderError> { let res = MsgSender::new("some/path") - .with_component(&vec![components::Label("label1".into())])? + .with_component([components::Label("label1".into())].as_slice())? .with_splat(components::InstanceKey(42)); assert!(matches!(res, Err(MsgSenderError::SplattedInstanceKeys))); diff --git a/crates/re_sdk_comms/src/server.rs b/crates/re_sdk_comms/src/server.rs index fbbe3944b8d2..71c7c786f763 100644 --- a/crates/re_sdk_comms/src/server.rs +++ b/crates/re_sdk_comms/src/server.rs @@ -208,15 +208,10 @@ impl CongestionManager { #[allow(clippy::match_same_arms)] match msg { - LogMsg::BeginRecordingMsg(_) | LogMsg::EntityPathOpMsg(_) | LogMsg::Goodbye(_) => true, // we don't want to drop any of these + // we don't want to drop any of these + LogMsg::BeginRecordingMsg(_) | LogMsg::EntityPathOpMsg(_) | LogMsg::Goodbye(_) => true, - LogMsg::ArrowMsg(arrow_msg) => match arrow_msg.time_point() { - Ok(time_point) => self.should_send_time_point(&time_point), - Err(err) => { - re_log::error_once!("Failed to parse an Arrow Message - dropping this message, and maybe more. {err}"); - false - } - }, + LogMsg::ArrowMsg(arrow_msg) => self.should_send_time_point(&arrow_msg.timepoint_max), } } diff --git a/crates/re_tuid/src/lib.rs b/crates/re_tuid/src/lib.rs index 18a173289642..49c0840dad3d 100644 --- a/crates/re_tuid/src/lib.rs +++ b/crates/re_tuid/src/lib.rs @@ -74,6 +74,11 @@ impl Tuid { pub fn as_u128(&self) -> u128 { ((self.time_ns as u128) << 64) | (self.inc as u128) } + + #[inline] + pub fn nanoseconds_since_epoch(&self) -> u64 { + self.time_ns + } } /// Returns a high-precision, monotonically increasing count that approximates nanoseconds since unix epoch. diff --git a/crates/re_viewer/src/ui/data_ui/log_msg.rs b/crates/re_viewer/src/ui/data_ui/log_msg.rs index 82adebb258fc..b536284f6eeb 100644 --- a/crates/re_viewer/src/ui/data_ui/log_msg.rs +++ b/crates/re_viewer/src/ui/data_ui/log_msg.rs @@ -1,5 +1,5 @@ use re_log_types::{ - msg_bundle::MsgBundle, ArrowMsg, BeginRecordingMsg, EntityPathOpMsg, LogMsg, RecordingInfo, + ArrowMsg, BeginRecordingMsg, DataTable, EntityPathOpMsg, LogMsg, RecordingInfo, }; use crate::{misc::ViewerContext, ui::UiVerbosity}; @@ -101,33 +101,32 @@ impl DataUi for ArrowMsg { verbosity: UiVerbosity, query: &re_arrow_store::LatestAtQuery, ) { - match self.try_into() { - Ok(MsgBundle { - msg_id: _, - entity_path, - time_point, - cells: components, - }) => { - egui::Grid::new("fields").num_columns(2).show(ui, |ui| { - ui.monospace("entity_path:"); - ctx.entity_path_button(ui, None, &entity_path); - ui.end_row(); - - ui.monospace("time_point:"); - time_point.data_ui(ctx, ui, verbosity, query); - ui.end_row(); - - ui.monospace("components:"); - components.as_slice().data_ui(ctx, ui, verbosity, query); - ui.end_row(); - }); - } + let table: DataTable = match self.try_into() { + Ok(table) => table, Err(err) => { ui.label( ctx.re_ui .error_text(format!("Error parsing ArrowMsg: {err}")), ); + return; } + }; + + // TODO(cmc): Come up with something a bit nicer once data tables become a common sight. + for row in table.as_rows() { + egui::Grid::new("fields").num_columns(2).show(ui, |ui| { + ui.monospace("entity_path:"); + ctx.entity_path_button(ui, None, row.entity_path()); + ui.end_row(); + + ui.monospace("time_point:"); + row.timepoint().data_ui(ctx, ui, verbosity, query); + ui.end_row(); + + ui.monospace("components:"); + row.cells().data_ui(ctx, ui, verbosity, query); + ui.end_row(); + }); } } } diff --git a/crates/re_viewer/src/ui/event_log_view.rs b/crates/re_viewer/src/ui/event_log_view.rs index ce00ca27cd1f..94c96edddfe8 100644 --- a/crates/re_viewer/src/ui/event_log_view.rs +++ b/crates/re_viewer/src/ui/event_log_view.rs @@ -2,9 +2,7 @@ use itertools::Itertools as _; use re_arrow_store::{LatestAtQuery, TimeInt}; use re_format::format_number; -use re_log_types::{ - msg_bundle::MsgBundle, BeginRecordingMsg, EntityPathOpMsg, LogMsg, RecordingInfo, -}; +use re_log_types::{BeginRecordingMsg, DataTable, EntityPathOpMsg, LogMsg, RecordingInfo}; use crate::{UiVerbosity, ViewerContext}; @@ -175,38 +173,47 @@ fn table_row( path_op.data_ui(ctx, ui, UiVerbosity::All, &query); }); } - LogMsg::ArrowMsg(msg) => match MsgBundle::try_from(msg) { - Ok(MsgBundle { - msg_id, - entity_path, - time_point, - cells: components, - }) => { - row.col(|ui| { - ctx.msg_id_button(ui, msg_id); - }); - row.col(|ui| { - ui.monospace("ArrowMsg"); - }); - for timeline in ctx.log_db.timelines() { + // NOTE: This really only makes sense because we don't yet have batches with more than a + // single row at the moment... and by the time we do, the event log view will have + // disappeared entirely. + LogMsg::ArrowMsg(msg) => match DataTable::try_from(msg) { + Ok(table) => { + for datarow in table.as_rows() { row.col(|ui| { - if let Some(value) = time_point.get(timeline) { - ctx.time_button(ui, timeline, *value); - } + ctx.msg_id_button(ui, datarow.row_id()); + }); + row.col(|ui| { + ui.monospace("ArrowMsg"); + }); + for timeline in ctx.log_db.timelines() { + row.col(|ui| { + if let Some(value) = datarow.timepoint().get(timeline) { + ctx.time_button(ui, timeline, *value); + } + }); + } + row.col(|ui| { + ctx.entity_path_button(ui, None, datarow.entity_path()); }); - } - row.col(|ui| { - ctx.entity_path_button(ui, None, &entity_path); - }); - row.col(|ui| { - let timeline = *ctx.rec_cfg.time_ctrl.timeline(); - let query = LatestAtQuery::new( - timeline, - time_point.get(&timeline).copied().unwrap_or(TimeInt::MAX), - ); - components.data_ui(ctx, ui, UiVerbosity::MaxHeight(row_height), &query); - }); + row.col(|ui| { + let timeline = *ctx.rec_cfg.time_ctrl.timeline(); + let query = LatestAtQuery::new( + timeline, + datarow + .timepoint() + .get(&timeline) + .copied() + .unwrap_or(TimeInt::MAX), + ); + datarow.cells().data_ui( + ctx, + ui, + UiVerbosity::MaxHeight(row_height), + &query, + ); + }); + } } Err(err) => { re_log::error_once!("Bad arrow payload: {err}",); diff --git a/rerun_py/src/arrow.rs b/rerun_py/src/arrow.rs index 35ac4e5554b9..8e75cc60bc4d 100644 --- a/rerun_py/src/arrow.rs +++ b/rerun_py/src/arrow.rs @@ -10,8 +10,7 @@ use pyo3::{ PyAny, PyResult, }; use re_log_types::{ - component_types, msg_bundle::MsgBundleError, DataCell, DataRow, EntityPath, LogMsg, MsgId, - TimePoint, + component_types, DataCell, DataRow, DataTableError, EntityPath, LogMsg, MsgId, TimePoint, }; /// Perform conversion between a pyarrow array to arrow2 types. @@ -113,13 +112,9 @@ pub fn build_chunk_from_components( cells, ); - let msg_bundle = row - .into_table(MsgId::ZERO /* not used (yet) */) - .into_msg_bundle(); - - let msg = msg_bundle + let msg = (&row.into_table()) .try_into() - .map_err(|e: MsgBundleError| PyValueError::new_err(e.to_string()))?; + .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; Ok(LogMsg::ArrowMsg(msg)) } diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index 3c38e8e55936..0437757f6b3f 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -6,12 +6,12 @@ use std::{borrow::Cow, io::Cursor, path::PathBuf}; use itertools::izip; use pyo3::{ - exceptions::{PyRuntimeError, PyTypeError}, + exceptions::{PyRuntimeError, PyTypeError, PyValueError}, prelude::*, types::PyDict, }; -use re_log_types::DataRow; +use re_log_types::{DataRow, DataTableError}; use rerun::{ log::{LogMsg, MsgId, PathOp}, time::{Time, TimeInt, TimePoint, TimeType, Timeline}, @@ -472,10 +472,9 @@ fn log_transform( [transform].as_slice(), ); - let msg_bundle = row - .into_table(MsgId::ZERO /* not used (yet) */) - .into_msg_bundle(); - let msg = msg_bundle.try_into().unwrap(); + let msg = (&row.into_table()) + .try_into() + .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; session.send(LogMsg::ArrowMsg(msg)); @@ -557,10 +556,9 @@ fn log_view_coordinates( [coordinates].as_slice(), ); - let msg_bundle = row - .into_table(MsgId::ZERO /* not used (yet) */) - .into_msg_bundle(); - let msg = msg_bundle.try_into().unwrap(); + let msg = (&row.into_table()) + .try_into() + .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; session.send(LogMsg::ArrowMsg(msg)); @@ -692,10 +690,9 @@ fn log_meshes( meshes, ); - let msg_bundle = row - .into_table(MsgId::ZERO /* not used (yet) */) - .into_msg_bundle(); - let msg = msg_bundle.try_into().unwrap(); + let msg = (&row.into_table()) + .try_into() + .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; session.send(LogMsg::ArrowMsg(msg)); @@ -774,10 +771,9 @@ fn log_mesh_file( [mesh3d].as_slice(), ); - let msg_bundle = row - .into_table(MsgId::ZERO /* not used (yet) */) - .into_msg_bundle(); - let msg = msg_bundle.try_into().unwrap(); + let msg = (&row.into_table()) + .try_into() + .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; session.send(LogMsg::ArrowMsg(msg)); @@ -867,10 +863,9 @@ fn log_image_file( [tensor].as_slice(), ); - let msg_bundle = row - .into_table(MsgId::ZERO /* not used (yet) */) - .into_msg_bundle(); - let msg = msg_bundle.try_into().unwrap(); + let msg = (&row.into_table()) + .try_into() + .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; session.send(LogMsg::ArrowMsg(msg)); @@ -947,10 +942,9 @@ fn log_annotation_context( [annotation_context].as_slice(), ); - let msg_bundle = row - .into_table(MsgId::ZERO /* not used (yet) */) - .into_msg_bundle(); - let msg = msg_bundle.try_into().unwrap(); + let msg = (&row.into_table()) + .try_into() + .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; session.send(LogMsg::ArrowMsg(msg));