From d8310c62e3160a2fecadd3f6412a9e62731a8c5c Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Fri, 4 Oct 2024 12:08:36 +0200 Subject: [PATCH] Dataframe v2: extensive test suite and associated bug fixes for all existing features (#7587) This implements an extensive test suite that makes sure that all existing features of the dataframe API A) have their behavior fully specified in code (so we can at list agree on how things work _right now_) and B) that said behavior does in fact.. behave. This unsurprisingly revealed many bugs, which this PR fixes. The currently existing features are: * filtered_index * filtered_index_range * view_contents * selection Tests will later be added for the following, yet to be implemented features: * filtered_index_values * sampled_index_values * filtered_point_of_view * sparse_fill_strategy --- Cargo.lock | 2 + crates/store/re_chunk/src/transport.rs | 2 +- crates/store/re_dataframe2/Cargo.toml | 3 + crates/store/re_dataframe2/src/query.rs | 777 ++++++++++++++++++++++-- 4 files changed, 743 insertions(+), 41 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e2549b3f2eea..bc9ac8dd0e48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5061,7 +5061,9 @@ dependencies = [ "re_types", "re_types_core", "seq-macro", + "similar-asserts", "thiserror", + "unindent", ] [[package]] diff --git a/crates/store/re_chunk/src/transport.rs b/crates/store/re_chunk/src/transport.rs index 388810105990..1c50778b1410 100644 --- a/crates/store/re_chunk/src/transport.rs +++ b/crates/store/re_chunk/src/transport.rs @@ -30,7 +30,7 @@ use crate::{Chunk, ChunkError, ChunkId, ChunkResult, RowId, TimeColumn}; /// This means we have to be very careful when checking the validity of the data: slipping corrupt /// data into the store could silently break all the index search logic (e.g. think of a chunk /// claiming to be sorted while it is in fact not). -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct TransportChunk { /// The schema of the dataframe, and all chunk-level and field-level metadata. /// diff --git a/crates/store/re_dataframe2/Cargo.toml b/crates/store/re_dataframe2/Cargo.toml index 0e6ffa0bbfc6..5ed0aac73bf5 100644 --- a/crates/store/re_dataframe2/Cargo.toml +++ b/crates/store/re_dataframe2/Cargo.toml @@ -50,3 +50,6 @@ thiserror.workspace = true [dev-dependencies] re_types.workspace = true + +similar-asserts.workspace = true +unindent.workspace = true diff --git a/crates/store/re_dataframe2/src/query.rs b/crates/store/re_dataframe2/src/query.rs index 2109d9775608..17fa76e503b6 100644 --- a/crates/store/re_dataframe2/src/query.rs +++ b/crates/store/re_dataframe2/src/query.rs @@ -7,7 +7,7 @@ use ahash::HashSet; use arrow2::{ array::Array as ArrowArray, chunk::Chunk as ArrowChunk, datatypes::Schema as ArrowSchema, }; -use itertools::Itertools as _; +use itertools::Itertools; use nohash_hasher::IntMap; use re_chunk::{Chunk, RangeQuery, RowId, TimeInt, Timeline}; @@ -385,6 +385,8 @@ impl QueryHandle<'_> { // // TODO(cmc): implement this properly, cache the result, etc. pub fn num_rows(&self) -> u64 { + re_tracing::profile_function!(); + let all_unique_timestamps: HashSet = self .init() .view_chunks @@ -435,12 +437,13 @@ impl QueryHandle<'_> { re_tracing::profile_function!(); /// Temporary state used to resolve the streaming join for the current iteration. + #[derive(Debug)] struct StreamingJoinState<'a> { /// Which `Chunk` is this? chunk: &'a Chunk, /// How far are we into this `Chunk`? - cursor: &'a AtomicU64, + cursor: u64, /// What's the index value at the current cursor? index_value: TimeInt, @@ -470,25 +473,14 @@ impl QueryHandle<'_> { for (cur_cursor, cur_chunk) in view_chunks { // NOTE: Too soon to increment the cursor, we cannot know yet which chunks will or // will not be part of the current row. - let cursor_value = cur_cursor.load(Ordering::Relaxed) as usize; + let cur_cursor_value = cur_cursor.load(Ordering::Relaxed); // TODO(cmc): This can easily be optimized by looking ahead and breaking as soon as chunks // stop overlapping. - let Some(cur_row_id) = cur_chunk.row_ids().nth(cursor_value) else { - continue; - }; - - let Some(cur_index_value) = cur_chunk - .timelines() - .get(&self.query.filtered_index) - .map_or(Some(TimeInt::STATIC), |time_column| { - time_column - .times_raw() - .get(cursor_value) - .copied() - .map(TimeInt::new_temporal) - }) + let Some((cur_index_value, cur_row_id)) = cur_chunk + .iter_indices(&self.query.filtered_index) + .nth(cur_cursor_value as _) else { continue; }; @@ -510,15 +502,15 @@ impl QueryHandle<'_> { if cur_chunk_has_smaller_index_value || cur_chunk_has_equal_index_but_higher_rowid { - *chunk = chunk; - *cursor = cursor; + *chunk = cur_chunk; + *cursor = cur_cursor_value; *index_value = cur_index_value; *row_id = cur_row_id; } } else { *streaming_state = Some(StreamingJoinState { chunk: cur_chunk, - cursor: cur_cursor, + cursor: cur_cursor_value, index_value: cur_index_value, row_id: cur_row_id, }); @@ -574,10 +566,9 @@ impl QueryHandle<'_> { .timelines() .get(&self.query.filtered_index) .map(|time_column| { - time_column.times_array().sliced( - cur_most_recent_row.cursor.load(Ordering::Relaxed) as usize, - 1, - ) + time_column + .times_array() + .sliced(cur_most_recent_row.cursor as usize, 1) }); debug_assert!( @@ -603,7 +594,7 @@ impl QueryHandle<'_> { .filter(|time_column| *time_column.timeline() != self.query.filtered_index) // NOTE: Cannot fail, just want to stay away from unwraps. .filter_map(|time_column| { - let cursor = streaming_state.cursor.load(Ordering::Relaxed) as usize; + let cursor = streaming_state.cursor as usize; time_column .times_raw() .get(cursor) @@ -636,8 +627,10 @@ impl QueryHandle<'_> { let view_sliced_arrays: Vec> = view_streaming_state .iter() .map(|streaming_state| { + // NOTE: Reminder: the only reason the streaming state could be `None` here is + // because this column does not have data for the current index value (i.e. `null`). streaming_state.as_ref().and_then(|streaming_state| { - let cursor = streaming_state.cursor.fetch_add(1, Ordering::Relaxed); + let cursor = streaming_state.cursor; debug_assert!( streaming_state.chunk.components().len() <= 1, @@ -664,24 +657,25 @@ impl QueryHandle<'_> { // TODO(cmc): It would likely be worth it to allocate all these possible // null-arrays ahead of time, and just return a pointer to those in the failure // case here. - let arrays = state + let selected_arrays = state .selected_contents .iter() .map(|(view_idx, column)| match column { - ColumnDescriptor::Control(_) => cur_most_recent_row.chunk.row_ids_array().sliced( - cur_most_recent_row - .cursor - .load(Ordering::Relaxed) - // NOTE: We did the cursor increments while computing the final sliced arrays, - // so we need to go back one tick for this. - .saturating_sub(1) as usize, - 1, - ), + ColumnDescriptor::Control(descr) => { + if descr.component_name == "rerun.controls.RowId" { + cur_most_recent_row + .chunk + .row_ids_array() + .sliced(cur_most_recent_row.cursor as usize, 1) + } else { + arrow2::array::new_null_array(column.datatype(), 1) + } + } ColumnDescriptor::Time(descr) => { - max_value_per_index.remove(&descr.timeline).map_or_else( + max_value_per_index.get(&descr.timeline).map_or_else( || arrow2::array::new_null_array(column.datatype(), 1), - |(_time, time_sliced)| time_sliced, + |(_time, time_sliced)| time_sliced.clone(), ) } @@ -693,9 +687,28 @@ impl QueryHandle<'_> { }) .collect_vec(); - debug_assert_eq!(state.arrow_schema.fields.len(), arrays.len()); + // We now need to increment cursors. + // + // NOTE: This is trickier than it looks: cursors need to be incremented not only for chunks + // that were used to return data during the current iteration, but also chunks that + // _attempted_ to return data and were preempted for one reason or another (overlap, + // intra-timestamp tie-break, etc). + for view_chunks in &state.view_chunks { + for (cur_cursor, cur_chunk) in view_chunks { + if let Some((index_value, _row_id)) = cur_chunk + .iter_indices(&self.query.filtered_index) + .nth(cur_cursor.load(Ordering::Relaxed) as _) + { + if cur_index_value == index_value { + cur_cursor.fetch_add(1, Ordering::Relaxed); + } + }; + } + } + + debug_assert_eq!(state.arrow_schema.fields.len(), selected_arrays.len()); - Some(arrays) + Some(selected_arrays) } /// Calls [`Self::next_row`] and wraps the result in a [`RecordBatch`]. @@ -726,3 +739,687 @@ impl<'a> QueryHandle<'a> { std::iter::from_fn(move || self.next_row_batch()) } } + +// --- + +#[cfg(test)] +#[allow(clippy::iter_on_single_items)] +mod tests { + use std::sync::Arc; + + use re_chunk::{Chunk, ChunkId, RowId, TimePoint}; + use re_chunk_store::{ChunkStore, ChunkStoreConfig, ResolvedTimeRange, TimeInt}; + use re_log_types::{ + build_frame_nr, build_log_time, + example_components::{MyColor, MyLabel, MyPoint}, + EntityPath, Timeline, + }; + use re_types_core::Loggable as _; + + use crate::QueryCache; + + use super::*; + + // NOTE: The best way to understand what these tests are doing is to run them in verbose mode, + // e.g. `cargo t -p re_dataframe2 -- --show-output barebones`. + // Each test will print the state of the store, the query being run, and the results that were + // returned in the usual human-friendly format. + // From there it is generally straightforward to infer what's going on. + + // TODO(cmc): at least one basic test for every feature in `QueryExpression2`. + // In no particular order: + // * [x] filtered_index + // * [x] filtered_index_range + // * [x] view_contents + // * [x] selection + // * [ ] filtered_index_values + // * [ ] sampled_index_values + // * [ ] filtered_point_of_view + // * [ ] sparse_fill_strategy + + // TODO(cmc): At some point I'd like to stress multi-entity queries too, but that feels less + // urgent considering how things are implemented (each entity lives in its own index, so it's + // really just more of the same). + + /// All features disabled. + #[test] + fn barebones() -> anyhow::Result<()> { + re_log::setup_logging(); + + let store = create_nasty_store()?; + eprintln!("{store}"); + let query_cache = QueryCache::new(&store); + let query_engine = QueryEngine { + store: &store, + cache: &query_cache, + }; + + let timeline = Timeline::new_sequence("frame_nr"); + let query = QueryExpression2::new(timeline); + eprintln!("{query:#?}:"); + + let query_handle = query_engine.query(query.clone()); + let dataframe = concatenate_record_batches( + query_handle.schema().clone(), + &query_handle.into_batch_iter().collect_vec(), + ); + eprintln!("{dataframe}"); + + let got = format!( + "{:#?}", + dataframe.data.iter().skip(1 /* RowId */).collect_vec() + ); + let expected = unindent::unindent( + "\ + [ + Int64[None, 1, 2, 3, 4, 5, 6, 7], + Timestamp(Nanosecond, None)[None, 1970-01-01 00:00:00.000000001, None, None, None, 1970-01-01 00:00:00.000000005, None, 1970-01-01 00:00:00.000000007], + ListArray[None, None, None, [2], [3], [4], None, [6]], + ListArray[[c], None, None, None, None, None, None, None], + ListArray[None, [{x: 0, y: 0}], [{x: 1, y: 1}], [{x: 2, y: 2}], [{x: 3, y: 3}], [{x: 4, y: 4}], [{x: 5, y: 5}], [{x: 8, y: 8}]], + ]\ + " + ); + + similar_asserts::assert_eq!(expected, got); + + Ok(()) + } + + #[test] + fn filtered_index_range() -> anyhow::Result<()> { + re_log::setup_logging(); + + let store = create_nasty_store()?; + eprintln!("{store}"); + let query_cache = QueryCache::new(&store); + let query_engine = QueryEngine { + store: &store, + cache: &query_cache, + }; + + let timeline = Timeline::new_sequence("frame_nr"); + let mut query = QueryExpression2::new(timeline); + query.filtered_index_range = Some(ResolvedTimeRange::new(3, 6)); + eprintln!("{query:#?}:"); + + let query_handle = query_engine.query(query.clone()); + let dataframe = concatenate_record_batches( + query_handle.schema().clone(), + &query_handle.into_batch_iter().collect_vec(), + ); + eprintln!("{dataframe}"); + + let got = format!( + "{:#?}", + dataframe.data.iter().skip(1 /* RowId */).collect_vec() + ); + let expected = unindent::unindent( + "\ + [ + Int64[None, 3, 4, 5, 6], + Timestamp(Nanosecond, None)[None, None, None, 1970-01-01 00:00:00.000000005, None], + ListArray[None, [2], [3], [4], None], + ListArray[[c], None, None, None, None], + ListArray[None, [{x: 2, y: 2}], [{x: 3, y: 3}], [{x: 4, y: 4}], [{x: 5, y: 5}]], + ]\ + ", + ); + + similar_asserts::assert_eq!(expected, got); + + Ok(()) + } + + #[test] + fn view_contents() -> anyhow::Result<()> { + re_log::setup_logging(); + + let store = create_nasty_store()?; + eprintln!("{store}"); + let query_cache = QueryCache::new(&store); + let query_engine = QueryEngine { + store: &store, + cache: &query_cache, + }; + + let entity_path: EntityPath = "this/that".into(); + let timeline = Timeline::new_sequence("frame_nr"); + + // empty view + { + let mut query = QueryExpression2::new(timeline); + query.view_contents = Some( + [(entity_path.clone(), Some(Default::default()))] + .into_iter() + .collect(), + ); + eprintln!("{query:#?}:"); + + let query_handle = query_engine.query(query.clone()); + let dataframe = concatenate_record_batches( + query_handle.schema().clone(), + &query_handle.into_batch_iter().collect_vec(), + ); + eprintln!("{dataframe}"); + + let got = format!( + "{:#?}", + dataframe.data.iter().skip(1 /* RowId */).collect_vec() + ); + let expected = "[]"; + + similar_asserts::assert_eq!(expected, got); + } + + { + let mut query = QueryExpression2::new(timeline); + query.view_contents = Some( + [( + entity_path.clone(), + Some( + [ + MyLabel::name(), + MyColor::name(), + "AColumnThatDoesntEvenExist".into(), + ] + .into_iter() + .collect(), + ), + )] + .into_iter() + .collect(), + ); + eprintln!("{query:#?}:"); + + let query_handle = query_engine.query(query.clone()); + let dataframe = concatenate_record_batches( + query_handle.schema().clone(), + &query_handle.into_batch_iter().collect_vec(), + ); + eprintln!("{dataframe}"); + + let got = format!( + "{:#?}", + dataframe.data.iter().skip(1 /* RowId */).collect_vec() + ); + let expected = unindent::unindent( + "\ + [ + Int64[None, 3, 4, 5, 7], + Timestamp(Nanosecond, None)[None, None, None, None, None], + ListArray[None, [2], [3], [4], [6]], + ListArray[[c], None, None, None, None], + ]\ + ", + ); + + similar_asserts::assert_eq!(expected, got); + } + + Ok(()) + } + + #[test] + fn selection() -> anyhow::Result<()> { + re_log::setup_logging(); + + let store = create_nasty_store()?; + eprintln!("{store}"); + let query_cache = QueryCache::new(&store); + let query_engine = QueryEngine { + store: &store, + cache: &query_cache, + }; + + let entity_path: EntityPath = "this/that".into(); + let timeline = Timeline::new_sequence("frame_nr"); + + // empty selection + { + let mut query = QueryExpression2::new(timeline); + query.selection = Some(vec![]); + eprintln!("{query:#?}:"); + + let query_handle = query_engine.query(query.clone()); + let dataframe = concatenate_record_batches( + query_handle.schema().clone(), + &query_handle.into_batch_iter().collect_vec(), + ); + eprintln!("{dataframe}"); + + let got = format!( + "{:#?}", + dataframe.data.iter().skip(1 /* RowId */).collect_vec() + ); + let expected = "[]"; + + similar_asserts::assert_eq!(expected, got); + } + + // only controls + { + let mut query = QueryExpression2::new(timeline); + query.selection = Some(vec![ + ColumnSelector::Control(ControlColumnSelector { + component: "rerun.controls.RowId".into(), + }), + ColumnSelector::Control(ControlColumnSelector { + component: "AControlColumnThatDoesntExist".into(), + }), + ]); + eprintln!("{query:#?}:"); + + let query_handle = query_engine.query(query.clone()); + let dataframe = concatenate_record_batches( + query_handle.schema().clone(), + &query_handle.into_batch_iter().collect_vec(), + ); + eprintln!("{dataframe}"); + + let got = format!( + "{:#?}", + // NOTE: comparing the rowids themselves is gonna be way too annoying. + dataframe.data.iter().skip(1 /* RowId */).collect_vec() + ); + let expected = unindent::unindent( + "\ + [ + NullArray(8), + ]\ + ", + ); + + similar_asserts::assert_eq!(expected, got); + } + + // only indices (+ duplication) + { + let mut query = QueryExpression2::new(timeline); + query.selection = Some(vec![ + ColumnSelector::Time(TimeColumnSelector { + timeline: *timeline.name(), + }), + ColumnSelector::Time(TimeColumnSelector { + timeline: *timeline.name(), + }), + ColumnSelector::Time(TimeColumnSelector { + timeline: "ATimeColumnThatDoesntExist".into(), + }), + ]); + eprintln!("{query:#?}:"); + + let query_handle = query_engine.query(query.clone()); + let dataframe = concatenate_record_batches( + query_handle.schema().clone(), + &query_handle.into_batch_iter().collect_vec(), + ); + eprintln!("{dataframe}"); + + let got = format!("{:#?}", dataframe.data.iter().collect_vec()); + let expected = unindent::unindent( + "\ + [ + Int64[None, 1, 2, 3, 4, 5, 6, 7], + Int64[None, 1, 2, 3, 4, 5, 6, 7], + NullArray(8), + ]\ + ", + ); + + similar_asserts::assert_eq!(expected, got); + } + + // only components (+ duplication) + { + let mut query = QueryExpression2::new(timeline); + query.selection = Some(vec![ + ColumnSelector::Component(ComponentColumnSelector { + entity_path: entity_path.clone(), + component: MyColor::name(), + join_encoding: Default::default(), + }), + ColumnSelector::Component(ComponentColumnSelector { + entity_path: entity_path.clone(), + component: MyColor::name(), + join_encoding: Default::default(), + }), + ColumnSelector::Component(ComponentColumnSelector { + entity_path: "non_existing_entity".into(), + component: MyColor::name(), + join_encoding: Default::default(), + }), + ColumnSelector::Component(ComponentColumnSelector { + entity_path: entity_path.clone(), + component: "AComponentColumnThatDoesntExist".into(), + join_encoding: Default::default(), + }), + ]); + eprintln!("{query:#?}:"); + + let query_handle = query_engine.query(query.clone()); + let dataframe = concatenate_record_batches( + query_handle.schema().clone(), + &query_handle.into_batch_iter().collect_vec(), + ); + eprintln!("{dataframe}"); + + let got = format!("{:#?}", dataframe.data.iter().collect_vec()); + let expected = unindent::unindent( + "\ + [ + ListArray[None, None, None, [2], [3], [4], None, [6]], + ListArray[None, None, None, [2], [3], [4], None, [6]], + NullArray(8), + NullArray(8), + ]\ + ", + ); + + similar_asserts::assert_eq!(expected, got); + } + + Ok(()) + } + + #[test] + fn view_contents_and_selection() -> anyhow::Result<()> { + re_log::setup_logging(); + + let store = create_nasty_store()?; + eprintln!("{store}"); + let query_cache = QueryCache::new(&store); + let query_engine = QueryEngine { + store: &store, + cache: &query_cache, + }; + + let entity_path: EntityPath = "this/that".into(); + let timeline = Timeline::new_sequence("frame_nr"); + + // only components + { + let mut query = QueryExpression2::new(timeline); + query.view_contents = Some( + [( + entity_path.clone(), + Some([MyColor::name(), MyLabel::name()].into_iter().collect()), + )] + .into_iter() + .collect(), + ); + query.selection = Some(vec![ + ColumnSelector::Time(TimeColumnSelector { + timeline: *timeline.name(), + }), + ColumnSelector::Time(TimeColumnSelector { + timeline: *Timeline::log_time().name(), + }), + ColumnSelector::Time(TimeColumnSelector { + timeline: *Timeline::log_tick().name(), + }), + // + ColumnSelector::Component(ComponentColumnSelector { + entity_path: entity_path.clone(), + component: MyPoint::name(), + join_encoding: Default::default(), + }), + ColumnSelector::Component(ComponentColumnSelector { + entity_path: entity_path.clone(), + component: MyColor::name(), + join_encoding: Default::default(), + }), + ColumnSelector::Component(ComponentColumnSelector { + entity_path: entity_path.clone(), + component: MyLabel::name(), + join_encoding: Default::default(), + }), + ]); + eprintln!("{query:#?}:"); + + let query_handle = query_engine.query(query.clone()); + let dataframe = concatenate_record_batches( + query_handle.schema().clone(), + &query_handle.into_batch_iter().collect_vec(), + ); + eprintln!("{dataframe}"); + + let got = format!("{:#?}", dataframe.data.iter().collect_vec()); + let expected = unindent::unindent( + "\ + [ + Int64[None, 3, 4, 5, 7], + Timestamp(Nanosecond, None)[None, None, None, None, None], + NullArray(5), + NullArray(5), + ListArray[None, [2], [3], [4], [6]], + ListArray[[c], None, None, None, None], + ]\ + ", + ); + + similar_asserts::assert_eq!(expected, got); + } + + Ok(()) + } + + /// Returns a very nasty [`ChunkStore`] with all kinds of partial updates, chunk overlaps, + /// repeated timestamps, duplicated chunks, partial multi-timelines, etc. + fn create_nasty_store() -> anyhow::Result { + let mut store = ChunkStore::new( + re_log_types::StoreId::random(re_log_types::StoreKind::Recording), + ChunkStoreConfig::COMPACTION_DISABLED, + ); + + let entity_path = EntityPath::from("this/that"); + + let frame1 = TimeInt::new_temporal(1); + let frame2 = TimeInt::new_temporal(2); + let frame3 = TimeInt::new_temporal(3); + let frame4 = TimeInt::new_temporal(4); + let frame5 = TimeInt::new_temporal(5); + let frame6 = TimeInt::new_temporal(6); + let frame7 = TimeInt::new_temporal(7); + + let points1 = MyPoint::from_iter(0..1); + let points2 = MyPoint::from_iter(1..2); + let points3 = MyPoint::from_iter(2..3); + let points4 = MyPoint::from_iter(3..4); + let points5 = MyPoint::from_iter(4..5); + let points6 = MyPoint::from_iter(5..6); + let points7_1 = MyPoint::from_iter(6..7); + let points7_2 = MyPoint::from_iter(7..8); + let points7_3 = MyPoint::from_iter(8..9); + + let colors3 = MyColor::from_iter(2..3); + let colors4 = MyColor::from_iter(3..4); + let colors5 = MyColor::from_iter(4..5); + let colors7 = MyColor::from_iter(6..7); + + let labels1 = vec![MyLabel("a".to_owned())]; + let labels2 = vec![MyLabel("b".to_owned())]; + let labels3 = vec![MyLabel("c".to_owned())]; + + let row_id1_1 = RowId::new(); + let row_id1_3 = RowId::new(); + let row_id1_5 = RowId::new(); + let row_id1_7_1 = RowId::new(); + let row_id1_7_2 = RowId::new(); + let row_id1_7_3 = RowId::new(); + let chunk1_1 = Chunk::builder(entity_path.clone()) + .with_sparse_component_batches( + row_id1_1, + [build_frame_nr(frame1), build_log_time(frame1.into())], + [ + (MyPoint::name(), Some(&points1 as _)), + (MyColor::name(), None), + (MyLabel::name(), Some(&labels1 as _)), // shadowed by static + ], + ) + .with_sparse_component_batches( + row_id1_3, + [build_frame_nr(frame3), build_log_time(frame3.into())], + [ + (MyPoint::name(), Some(&points3 as _)), + (MyColor::name(), Some(&colors3 as _)), + ], + ) + .with_sparse_component_batches( + row_id1_5, + [build_frame_nr(frame5), build_log_time(frame5.into())], + [ + (MyPoint::name(), Some(&points5 as _)), + (MyColor::name(), None), + ], + ) + .with_sparse_component_batches( + row_id1_7_1, + [build_frame_nr(frame7), build_log_time(frame7.into())], + [(MyPoint::name(), Some(&points7_1 as _))], + ) + .with_sparse_component_batches( + row_id1_7_2, + [build_frame_nr(frame7), build_log_time(frame7.into())], + [(MyPoint::name(), Some(&points7_2 as _))], + ) + .with_sparse_component_batches( + row_id1_7_3, + [build_frame_nr(frame7), build_log_time(frame7.into())], + [(MyPoint::name(), Some(&points7_3 as _))], + ) + .build()?; + + let chunk1_1 = Arc::new(chunk1_1); + store.insert_chunk(&chunk1_1)?; + let chunk1_2 = Arc::new(chunk1_1.clone_as(ChunkId::new(), RowId::new())); + store.insert_chunk(&chunk1_2)?; // x2 ! + let chunk1_3 = Arc::new(chunk1_1.clone_as(ChunkId::new(), RowId::new())); + store.insert_chunk(&chunk1_3)?; // x3 !! + + let row_id2_2 = RowId::new(); + let row_id2_3 = RowId::new(); + let row_id2_4 = RowId::new(); + let chunk2 = Chunk::builder(entity_path.clone()) + .with_sparse_component_batches( + row_id2_2, + [build_frame_nr(frame2)], + [(MyPoint::name(), Some(&points2 as _))], + ) + .with_sparse_component_batches( + row_id2_3, + [build_frame_nr(frame3)], + [ + (MyPoint::name(), Some(&points3 as _)), + (MyColor::name(), Some(&colors3 as _)), + ], + ) + .with_sparse_component_batches( + row_id2_4, + [build_frame_nr(frame4)], + [(MyPoint::name(), Some(&points4 as _))], + ) + .build()?; + + let chunk2 = Arc::new(chunk2); + store.insert_chunk(&chunk2)?; + + let row_id3_2 = RowId::new(); + let row_id3_4 = RowId::new(); + let row_id3_6 = RowId::new(); + let chunk3 = Chunk::builder(entity_path.clone()) + .with_sparse_component_batches( + row_id3_2, + [build_frame_nr(frame2)], + [(MyPoint::name(), Some(&points2 as _))], + ) + .with_sparse_component_batches( + row_id3_4, + [build_frame_nr(frame4)], + [(MyPoint::name(), Some(&points4 as _))], + ) + .with_sparse_component_batches( + row_id3_6, + [build_frame_nr(frame6)], + [(MyPoint::name(), Some(&points6 as _))], + ) + .build()?; + + let chunk3 = Arc::new(chunk3); + store.insert_chunk(&chunk3)?; + + let row_id4_4 = RowId::new(); + let row_id4_5 = RowId::new(); + let row_id4_7 = RowId::new(); + let chunk4 = Chunk::builder(entity_path.clone()) + .with_sparse_component_batches( + row_id4_4, + [build_frame_nr(frame4)], + [(MyColor::name(), Some(&colors4 as _))], + ) + .with_sparse_component_batches( + row_id4_5, + [build_frame_nr(frame5)], + [(MyColor::name(), Some(&colors5 as _))], + ) + .with_sparse_component_batches( + row_id4_7, + [build_frame_nr(frame7)], + [(MyColor::name(), Some(&colors7 as _))], + ) + .build()?; + + let chunk4 = Arc::new(chunk4); + store.insert_chunk(&chunk4)?; + + let row_id5_1 = RowId::new(); + let chunk5 = Chunk::builder(entity_path.clone()) + .with_sparse_component_batches( + row_id5_1, + TimePoint::default(), + [(MyLabel::name(), Some(&labels2 as _))], + ) + .build()?; + + let chunk5 = Arc::new(chunk5); + store.insert_chunk(&chunk5)?; + + let row_id6_1 = RowId::new(); + let chunk6 = Chunk::builder(entity_path.clone()) + .with_sparse_component_batches( + row_id6_1, + TimePoint::default(), + [(MyLabel::name(), Some(&labels3 as _))], + ) + .build()?; + + let chunk6 = Arc::new(chunk6); + store.insert_chunk(&chunk6)?; + + Ok(store) + } + + fn concatenate_record_batches(schema: ArrowSchema, batches: &[RecordBatch]) -> RecordBatch { + assert!(batches.iter().map(|batch| &batch.schema).all_equal()); + + let mut arrays = Vec::new(); + + if !batches.is_empty() { + for (i, _field) in schema.fields.iter().enumerate() { + let array = arrow2::compute::concatenate::concatenate( + &batches + .iter() + .map(|batch| &*batch.data[i] as &dyn ArrowArray) + .collect_vec(), + ) + .unwrap(); + arrays.push(array); + } + } + + RecordBatch { + schema, + data: ArrowChunk::new(arrays), + } + } +}