Skip to content

Commit

Permalink
end-to-end batching 4: retire MsgBundle + batching support in trans…
Browse files Browse the repository at this point in the history
…port layer (rerun-io#1679)

* Batching support at the transport layer

* fmt

* woop

* self reviewW

* doctest whining

* oh cmon

* addressing PR comments

* 0-indexed rows in examples
  • Loading branch information
teh-cmc authored Mar 29, 2023
1 parent 1c12938 commit d3b459f
Show file tree
Hide file tree
Showing 23 changed files with 992 additions and 785 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 23 additions & 29 deletions crates/re_data_store/examples/memory_usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ fn live_bytes() -> usize {

// ----------------------------------------------------------------------------

use re_log_types::{entity_path, DataRow, DataTable, MsgId};
use re_log_types::{entity_path, DataRow, MsgId};

fn main() {
log_messages();
Expand Down Expand Up @@ -105,23 +105,20 @@ fn log_messages() {

{
let used_bytes_start = live_bytes();
let msg_bundle = Box::new(
DataTable::from_rows(
MsgId::ZERO, // not used (yet)
[DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
1,
build_some_point2d(1),
)],
let table = Box::new(
DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
1,
build_some_point2d(1),
)
.into_msg_bundle(),
.into_table(),
);
let msg_bundle_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*msg_bundle).unwrap()));
let table_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(&*table).unwrap()));
let log_msg_bytes = live_bytes() - used_bytes_start;
println!("Arrow MsgBundle containing a Pos2 uses {msg_bundle_bytes} bytes in RAM");
println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM");
let encoded = encode_log_msg(&log_msg);
println!(
"Arrow LogMsg containing a Pos2 uses {}-{log_msg_bytes} bytes in RAM, and {} bytes encoded",
Expand All @@ -131,23 +128,20 @@ fn log_messages() {

{
let used_bytes_start = live_bytes();
let msg_bundle = Box::new(
DataTable::from_rows(
MsgId::ZERO, // not used (yet)
[DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
NUM_POINTS as _,
build_some_point2d(NUM_POINTS),
)],
let table = Box::new(
DataRow::from_cells1(
MsgId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
NUM_POINTS as _,
build_some_point2d(NUM_POINTS),
)
.into_msg_bundle(),
.into_table(),
);
let msg_bundle_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*msg_bundle).unwrap()));
let table_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(&*table).unwrap()));
let log_msg_bytes = live_bytes() - used_bytes_start;
println!("Arrow MsgBundle containing a Pos2 uses {msg_bundle_bytes} bytes in RAM");
println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM");
let encoded = encode_log_msg(&log_msg);
println!(
"Arrow LogMsg containing {NUM_POINTS}x Pos2 uses {}-{log_msg_bytes} bytes in RAM, and {} bytes encoded",
Expand Down
7 changes: 3 additions & 4 deletions crates/re_data_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ pub use entity_tree::*;
pub use instance_path::*;
pub use log_db::LogDb;

use re_log_types::msg_bundle;

#[cfg(feature = "serde")]
pub use editable_auto_value::EditableAutoValue;
use re_log_types::DataTableError;
pub use re_log_types::{ComponentName, EntityPath, EntityPathPart, Index, TimeInt, Timeline};

// ----------------------------------------------------------------------------
Expand All @@ -30,8 +29,8 @@ pub use re_log_types::{ComponentName, EntityPath, EntityPathPart, Index, TimeInt
/// or how the logging SDK is being used (PEBKAC).
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error(transparent)]
MsgBundleError(#[from] msg_bundle::MsgBundleError),
#[error("Error with one the underlying data table")]
DataTable(#[from] DataTableError),

#[error(transparent)]
WriteError(#[from] re_arrow_store::WriteError),
Expand Down
25 changes: 14 additions & 11 deletions crates/re_data_store/src/log_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use nohash_hasher::IntMap;
use re_arrow_store::{DataStoreConfig, GarbageCollectionTarget, TimeInt};
use re_log_types::{
component_types::InstanceKey,
external::arrow2_convert::deserialize::arrow_array_deserialize_iterator, msg_bundle::MsgBundle,
ArrowMsg, BeginRecordingMsg, Component as _, ComponentPath, DataCell, DataRow, DataTable,
EntityPath, EntityPathHash, EntityPathOpMsg, LogMsg, MsgId, PathOp, RecordingId, RecordingInfo,
TimePoint, Timeline,
external::arrow2_convert::deserialize::arrow_array_deserialize_iterator, ArrowMsg,
BeginRecordingMsg, Component as _, ComponentPath, DataCell, DataRow, DataTable, EntityPath,
EntityPathHash, EntityPathOpMsg, LogMsg, MsgId, PathOp, RecordingId, RecordingInfo, TimePoint,
Timeline,
};

use crate::{Error, TimesPerTimeline};
Expand Down Expand Up @@ -76,9 +76,8 @@ impl EntityDb {
.or_insert_with(|| entity_path.clone());
}

fn try_add_arrow_data_msg(&mut self, msg: &ArrowMsg) -> Result<(), Error> {
let msg_bundle = MsgBundle::try_from(msg).map_err(Error::MsgBundleError)?;
let table = DataTable::from_msg_bundle(msg_bundle);
fn try_add_arrow_msg(&mut self, msg: &ArrowMsg) -> Result<(), Error> {
let table: DataTable = msg.try_into()?;

// TODO(#1619): batch all of this
for row in table.as_rows() {
Expand All @@ -95,7 +94,7 @@ impl EntityDb {

self.register_entity_path(&row.entity_path);

for cell in row.cells() {
for cell in row.cells().iter() {
let component_path =
ComponentPath::new(row.entity_path().clone(), cell.component_name());
if cell.component_name() == MsgId::name() {
Expand Down Expand Up @@ -233,6 +232,7 @@ impl LogDb {

pub fn add(&mut self, msg: LogMsg) -> Result<(), Error> {
crate::profile_function!();

match &msg {
LogMsg::BeginRecordingMsg(msg) => self.add_begin_recording_msg(msg),
LogMsg::EntityPathOpMsg(msg) => {
Expand All @@ -243,13 +243,16 @@ impl LogDb {
} = msg;
self.entity_db.add_path_op(*msg_id, time_point, path_op);
}
LogMsg::ArrowMsg(msg) => {
self.entity_db.try_add_arrow_data_msg(msg)?;
}
LogMsg::ArrowMsg(inner) => self.entity_db.try_add_arrow_msg(inner)?,
LogMsg::Goodbye(_) => {}
}

// TODO(#1619): the following only makes sense because, while we support sending and
// receiving batches, we don't actually do so yet.
// We need to stop storing raw `LogMsg`s before we can benefit from our batching.
self.chronological_message_ids.push(msg.id());
self.log_messages.insert(msg.id(), msg);

Ok(())
}

Expand Down
7 changes: 6 additions & 1 deletion crates/re_log_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ re_tuid.workspace = true
# External
ahash.workspace = true
array-init = "2.1.0"
arrow2 = { workspace = true, features = ["io_ipc", "io_print"] }
arrow2 = { workspace = true, features = [
"io_ipc",
"io_print",
"compute_concatenate",
] }
arrow2_convert.workspace = true
bytemuck = "1.11"
document-features = "0.2"
Expand All @@ -72,6 +76,7 @@ ndarray.workspace = true
nohash-hasher = "0.2"
num-derive = "0.3"
num-traits = "0.2"
smallvec = "1.10"
thiserror.workspace = true
time = { workspace = true, default-features = false, features = [
"formatting",
Expand Down
Loading

0 comments on commit d3b459f

Please sign in to comment.