Skip to content

Commit

Permalink
Dataframe v2: support for sparse_fill_strategy (rerun-io#7598)
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc authored Oct 7, 2024
1 parent 4ba9fcc commit 54a77f8
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 46 deletions.
5 changes: 4 additions & 1 deletion crates/store/re_dataframe2/examples/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
use itertools::Itertools;

use re_chunk::TimeInt;
use re_chunk_store::{ChunkStore, ChunkStoreConfig, QueryExpression2, Timeline, VersionPolicy};
use re_chunk_store::{
ChunkStore, ChunkStoreConfig, QueryExpression2, SparseFillStrategy, Timeline, VersionPolicy,
};
use re_dataframe2::{QueryCache, QueryEngine};
use re_log_types::{EntityPathFilter, ResolvedTimeRange, StoreKind};

Expand Down Expand Up @@ -65,6 +67,7 @@ fn main() -> anyhow::Result<()> {
.collect(),
);
query.filtered_index_range = Some(ResolvedTimeRange::new(time_from, time_to));
query.sparse_fill_strategy = SparseFillStrategy::LatestAtGlobal;
eprintln!("{query:#?}:");

let query_handle = query_engine.query(query.clone());
Expand Down
218 changes: 173 additions & 45 deletions crates/store/re_dataframe2/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ use arrow2::{
use itertools::Itertools;

use nohash_hasher::IntMap;
use re_chunk::{Chunk, RangeQuery, RowId, TimeInt, Timeline};
use re_chunk::{Chunk, RangeQuery, RowId, TimeInt, Timeline, UnitChunkShared};
use re_chunk_store::{
ColumnDescriptor, ColumnSelector, ComponentColumnDescriptor, ComponentColumnSelector,
ControlColumnDescriptor, ControlColumnSelector, IndexValue, JoinEncoding, QueryExpression2,
TimeColumnDescriptor, TimeColumnSelector,
SparseFillStrategy, TimeColumnDescriptor, TimeColumnSelector,
};
use re_log_types::ResolvedTimeRange;

Expand All @@ -28,7 +28,7 @@ use crate::{QueryEngine, RecordBatch};
// * [x] support for overlaps (slow)
// * [x] pagination (any solution, even a slow one)
// * [x] pov support
// * [ ] latestat sparse-filling
// * [x] latestat sparse-filling
// * [ ] sampling support
// * [ ] overlaps (less dumb)
// * [ ] selector-based `filtered_index`
Expand Down Expand Up @@ -460,7 +460,7 @@ impl QueryHandle<'_> {

/// Temporary state used to resolve the streaming join for the current iteration.
#[derive(Debug)]
struct StreamingJoinState<'a> {
struct StreamingJoinStateEntry<'a> {
/// Which `Chunk` is this?
chunk: &'a Chunk,

Expand All @@ -474,6 +474,20 @@ impl QueryHandle<'_> {
row_id: RowId,
}

/// Temporary state used to resolve the streaming join for the current iteration.
///
/// Possibly retrofilled, see [`QueryExpression2::sparse_fill_strategy`].
#[derive(Debug)]
enum StreamingJoinState<'a> {
/// Incoming data for the current iteration.
StreamingJoinState(StreamingJoinStateEntry<'a>),

/// Data retrofilled through an extra query.
///
/// See [`QueryExpression2::sparse_fill_strategy`].
Retrofilled(UnitChunkShared),
}

let state = self.init();

let _cur_row = state.cur_row.fetch_add(1, Ordering::Relaxed);
Expand All @@ -482,7 +496,7 @@ impl QueryHandle<'_> {
// what is their index value for the current row?
//
// NOTE: Non-component columns don't have a streaming state, hence the optional layer.
let mut view_streaming_state: Vec<Option<StreamingJoinState<'_>>> =
let mut view_streaming_state: Vec<Option<StreamingJoinStateEntry<'_>>> =
// NOTE: cannot use vec![], it has limitations with non-cloneable options.
// vec![None; state.view_chunks.len()];
std::iter::repeat(())
Expand All @@ -508,7 +522,7 @@ impl QueryHandle<'_> {
};

if let Some(streaming_state) = streaming_state.as_mut() {
let StreamingJoinState {
let StreamingJoinStateEntry {
chunk,
cursor,
index_value,
Expand All @@ -530,7 +544,7 @@ impl QueryHandle<'_> {
*row_id = cur_row_id;
}
} else {
*streaming_state = Some(StreamingJoinState {
*streaming_state = Some(StreamingJoinStateEntry {
chunk: cur_chunk,
cursor: cur_cursor_value,
index_value: cur_index_value,
Expand Down Expand Up @@ -566,16 +580,69 @@ impl QueryHandle<'_> {
}
}

for streaming_state in &mut view_streaming_state {
if streaming_state.as_ref().map(|s| s.index_value) != Some(cur_index_value) {
*streaming_state = None;
let mut view_streaming_state = view_streaming_state
.into_iter()
.map(|streaming_state| match streaming_state {
Some(s) if s.index_value == cur_index_value => {
Some(StreamingJoinState::StreamingJoinState(s))
}
_ => None,
})
.collect_vec();

match self.query.sparse_fill_strategy {
SparseFillStrategy::None => {}
SparseFillStrategy::LatestAtGlobal => {
// Everything that yielded `null` for the current iteration.
let null_streaming_states = view_streaming_state
.iter_mut()
.enumerate()
.filter(|(_view_idx, streaming_state)| streaming_state.is_none());

for (view_idx, streaming_state) in null_streaming_states {
let Some(ColumnDescriptor::Component(descr)) =
state.view_contents.get(view_idx)
else {
continue;
};

// NOTE: While it would be very tempting to resolve the latest-at state
// of the entire view contents at `filtered_index_range.start - 1` once
// during `QueryHandle` initialization, and then bootstrap off of that, that
// would effectively close the door to efficient pagination forever, since
// we'd have to iterate over all the pages to compute the right latest-at
// value at t+n (i.e. no more random access).
// Therefore, it is better to simply do this the "dumb" way.
//
// TODO(cmc): Still, as always, this can be made faster and smarter at
// the cost of some extra complexity (e.g. caching the result across
// consecutive nulls etc). Later.

let query =
re_chunk::LatestAtQuery::new(self.query.filtered_index, cur_index_value);

let results = self.engine.cache.latest_at(
self.engine.store,
&query,
&descr.entity_path,
[descr.component_name],
);

*streaming_state = results
.components
.get(&descr.component_name)
.map(|unit| StreamingJoinState::Retrofilled(unit.clone()));
}
}
}

// The most recent chunk in the current iteration, according to RowId semantics.
let cur_most_recent_row = view_streaming_state
.iter()
.flatten()
.filter_map(|streaming_state| match streaming_state {
Some(StreamingJoinState::StreamingJoinState(s)) => Some(s),
_ => None,
})
.max_by_key(|streaming_state| streaming_state.row_id)?;

// We are stitching a bunch of unrelated cells together in order to create the final row
Expand Down Expand Up @@ -625,27 +692,31 @@ impl QueryHandle<'_> {
.iter()
.flatten()
.flat_map(|streaming_state| {
streaming_state
.chunk
.timelines()
.values()
// NOTE: Already took care of that one above.
.filter(|time_column| *time_column.timeline() != self.query.filtered_index)
// NOTE: Cannot fail, just want to stay away from unwraps.
.filter_map(|time_column| {
let cursor = streaming_state.cursor as usize;
time_column
.times_raw()
.get(cursor)
.copied()
.map(TimeInt::new_temporal)
.map(|time| {
(
*time_column.timeline(),
(time, time_column.times_array().sliced(cursor, 1)),
)
})
})
match streaming_state {
StreamingJoinState::StreamingJoinState(s) => s.chunk.timelines(),
StreamingJoinState::Retrofilled(unit) => unit.timelines(),
}
.values()
// NOTE: Already took care of that one above.
.filter(|time_column| *time_column.timeline() != self.query.filtered_index)
// NOTE: Cannot fail, just want to stay away from unwraps.
.filter_map(move |time_column| {
let cursor = match streaming_state {
StreamingJoinState::StreamingJoinState(s) => s.cursor as usize,
StreamingJoinState::Retrofilled(_) => 0,
};
time_column
.times_raw()
.get(cursor)
.copied()
.map(TimeInt::new_temporal)
.map(|time| {
(
*time_column.timeline(),
(time, time_column.times_array().sliced(cursor, 1)),
)
})
})
})
.for_each(|(timeline, (time, time_sliced))| {
max_value_per_index
Expand All @@ -665,26 +736,38 @@ impl QueryHandle<'_> {
// TODO(cmc): no point in slicing arrays that are not selected.
let view_sliced_arrays: Vec<Option<_>> = view_streaming_state
.iter()
.map(|streaming_state| {
.enumerate()
.map(|(view_idx, streaming_state)| {
// NOTE: Reminder: the only reason the streaming state could be `None` here is
// because this column does not have data for the current index value (i.e. `null`).
streaming_state.as_ref().and_then(|streaming_state| {
let cursor = streaming_state.cursor;
let list_array = match streaming_state {
StreamingJoinState::StreamingJoinState(s) => {
debug_assert!(
s.chunk.components().len() <= 1,
"cannot possibly get more than one component with this query"
);

s.chunk
.components()
.first_key_value()
.map(|(_, list_array)| list_array.sliced(s.cursor as usize, 1))

debug_assert!(
streaming_state.chunk.components().len() <= 1,
"cannot possibly get more than one component with this query"
);
}

StreamingJoinState::Retrofilled(unit) => {
let component_name = state.view_contents.get(view_idx).and_then(|col| match col {
ColumnDescriptor::Component(descr) => Some(descr.component_name),
_ => None,
})?;
unit.components().get(&component_name).map(|list_array| list_array.to_boxed())
}
};

let list_array = streaming_state
.chunk
.components()
.first_key_value()
.map(|(_, list_array)| list_array.sliced(cursor as usize, 1));

debug_assert!(
list_array.is_some(),
"This must exist or the chunk wouldn't have been sliced to start with."
"This must exist or the chunk wouldn't have been sliced/retrofilled to start with."
);

// NOTE: This cannot possibly return None, see assert above.
Expand Down Expand Up @@ -818,8 +901,8 @@ mod tests {
// * [x] view_contents
// * [x] selection
// * [x] filtered_point_of_view
// * [x] sparse_fill_strategy
// * [ ] sampled_index_values
// * [ ] sparse_fill_strategy
//
// In addition to those, some much needed extras:
// * [ ] timelines returned with selection=none
Expand Down Expand Up @@ -874,6 +957,51 @@ mod tests {
Ok(())
}

#[test]
fn sparse_fill_strategy_latestatglobal() -> anyhow::Result<()> {
re_log::setup_logging();

let store = create_nasty_store()?;
eprintln!("{store}");
let query_cache = QueryCache::new(&store);
let query_engine = QueryEngine {
store: &store,
cache: &query_cache,
};

let timeline = Timeline::new_sequence("frame_nr");
let mut query = QueryExpression2::new(timeline);
query.sparse_fill_strategy = SparseFillStrategy::LatestAtGlobal;
eprintln!("{query:#?}:");

let query_handle = query_engine.query(query.clone());
let dataframe = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.into_batch_iter().collect_vec(),
);
eprintln!("{dataframe}");

let got = format!(
"{:#?}",
dataframe.data.iter().skip(1 /* RowId */).collect_vec()
);
let expected = unindent::unindent(
"\
[
Int64[None, 1, 2, 3, 4, 5, 6, 7],
Timestamp(Nanosecond, None)[None, 1970-01-01 00:00:00.000000001, None, None, None, 1970-01-01 00:00:00.000000005, None, 1970-01-01 00:00:00.000000007],
ListArray[None, None, None, [2], [3], [4], [4], [6]],
ListArray[[c], [c], [c], [c], [c], [c], [c], [c]],
ListArray[None, [{x: 0, y: 0}], [{x: 1, y: 1}], [{x: 2, y: 2}], [{x: 3, y: 3}], [{x: 4, y: 4}], [{x: 5, y: 5}], [{x: 8, y: 8}]],
]\
"
);

similar_asserts::assert_eq!(expected, got);

Ok(())
}

#[test]
fn filtered_index_range() -> anyhow::Result<()> {
re_log::setup_logging();
Expand Down

0 comments on commit 54a77f8

Please sign in to comment.