Skip to content

Commit

Permalink
Dataframe v2: support for filtered_point_of_view (rerun-io#7593)
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc authored Oct 7, 2024
1 parent 8702f01 commit 4ba9fcc
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 22 deletions.
4 changes: 0 additions & 4 deletions crates/store/re_chunk_store/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -829,8 +829,6 @@ pub struct QueryExpression2 {
/// Example: `ResolvedTimeRange(10, 20)`.
pub filtered_index_range: Option<IndexRange>,

/// TODO(cmc): NOT IMPLEMENTED.
///
/// The specific index values used to filter out _rows_ from the view contents.
///
/// Only rows where at least 1 column contains non-null data at these specific values will be kept
Expand Down Expand Up @@ -858,8 +856,6 @@ pub struct QueryExpression2 {
// TODO(jleibs): We need an alternative name for sampled.
pub sampled_index_values: Option<Vec<IndexValue>>,

/// TODO(cmc): NOT IMPLEMENTED.
///
/// The component column used to filter out _rows_ from the view contents.
///
/// Only rows where this column contains non-null data be kept in the final dataset.
Expand Down
214 changes: 196 additions & 18 deletions crates/store/re_dataframe2/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ use crate::{QueryEngine, RecordBatch};
// * [x] custom selection
// * [x] support for overlaps (slow)
// * [x] pagination (any solution, even a slow one)
// * [x] pov support
// * [ ] latestat sparse-filling
// * [ ] sampling support
// * [ ] overlaps (less dumb)
// * [ ] selector-based `filtered_index`
// * [ ] clears
// * [ ] latestat sparse-filling
// * [ ] pagination (fast)
// * [ ] pov support
// * [ ] sampling support
// * [ ] configurable cache bypass
// * [ ] allocate null arrays once
// * [ ] take kernel duplicates all memory
Expand Down Expand Up @@ -99,6 +99,13 @@ struct QueryHandleState {
// NOTE: Reminder: we have to query everything in the _view_, irrelevant of the current selection.
view_chunks: Vec<Vec<(AtomicU64, Chunk)>>,

/// The index in `view_chunks` where the POV chunks are stored.
///
/// * If set to `None`, then the caller didn't request a POV at all.
/// * If set to `Some(usize::MAX)`, then a POV was requested but couldn't be found in the view.
/// * Otherwise, points to the chunks that should be used to drive the iteration.
view_pov_chunks_idx: Option<usize>,

/// Tracks the current row index: the position of the iterator. For [`QueryHandle::next_row`].
///
/// This represents the number of rows that the caller has iterated on: it is completely
Expand Down Expand Up @@ -275,6 +282,11 @@ impl QueryHandle<'_> {
};

// 4. Perform the query and keep track of all the relevant chunks.
let mut view_pov_chunks_idx = self
.query
.filtered_point_of_view
.as_ref()
.map(|_| usize::MAX);
let view_chunks = {
let index_range = self
.query
Expand All @@ -286,11 +298,12 @@ impl QueryHandle<'_> {
.keep_extra_components(false);

view_contents
.iter()
.map(|selected_column| match selected_column {
ColumnDescriptor::Control(_) | ColumnDescriptor::Time(_) => Vec::new(),
.iter()
.enumerate()
.map(|(idx, selected_column)| match selected_column {
ColumnDescriptor::Control(_) | ColumnDescriptor::Time(_) => Vec::new(),

ColumnDescriptor::Component(column) => {
ColumnDescriptor::Component(column) => {
// NOTE: Keep in mind that the range APIs natively make sure that we will
// either get a bunch of relevant _static_ chunks, or a bunch of relevant
// _temporal_ chunks, but never both.
Expand All @@ -309,7 +322,7 @@ impl QueryHandle<'_> {
"cannot possibly get more than one component with this query"
);

results
let chunks = results
.components
.into_iter()
.next()
Expand All @@ -335,17 +348,26 @@ impl QueryHandle<'_> {
})
.collect_vec()
})
.unwrap_or_default()
.unwrap_or_default();

if let Some(pov) = self.query.filtered_point_of_view.as_ref() {
if pov.entity_path == column.entity_path && pov.component == column.component_name {
view_pov_chunks_idx = Some(idx);
}
}

chunks
},
})
.collect()
.collect()
};

QueryHandleState {
view_contents,
selected_contents,
arrow_schema,
view_chunks,
view_pov_chunks_idx,
cur_row: AtomicU64::new(0),
}
}
Expand Down Expand Up @@ -519,13 +541,23 @@ impl QueryHandle<'_> {
}

// What's the index value we're looking for at the current iteration?
let cur_index_value = view_streaming_state
.iter()
.flatten()
// NOTE: We're purposefully ignoring RowId-related semantics here: we just want to know
// the value we're looking for on the "main" index (dedupe semantics).
.min_by_key(|streaming_state| streaming_state.index_value)
.map(|streaming_state| streaming_state.index_value)?;
let cur_index_value = if let Some(view_pov_chunks_idx) = state.view_pov_chunks_idx {
// If we do have a set point-of-view, then the current iteration corresponds to the
// next available index value across all PoV chunks.
view_streaming_state
.get(view_pov_chunks_idx)
.and_then(|streaming_state| streaming_state.as_ref().map(|s| s.index_value))?
} else {
// If we do not have a set point-of-view, then the current iteration corresponds to the
// smallest available index value across all available chunks.
view_streaming_state
.iter()
.flatten()
// NOTE: We're purposefully ignoring RowId-related semantics here: we just want to know
// the value we're looking for on the "main" index (dedupe semantics).
.min_by_key(|streaming_state| streaming_state.index_value)
.map(|streaming_state| streaming_state.index_value)?
};

if let Some(filtered_index_values) = self.query.filtered_index_values.as_ref() {
if !filtered_index_values.contains(&cur_index_value) {
Expand Down Expand Up @@ -785,9 +817,13 @@ mod tests {
// * [x] filtered_index_values
// * [x] view_contents
// * [x] selection
// * [x] filtered_point_of_view
// * [ ] sampled_index_values
// * [ ] filtered_point_of_view
// * [ ] sparse_fill_strategy
//
// In addition to those, some much needed extras:
// * [ ] timelines returned with selection=none
// * [ ] clears

// TODO(cmc): At some point I'd like to stress multi-entity queries too, but that feels less
// urgent considering how things are implemented (each entity lives in its own index, so it's
Expand Down Expand Up @@ -934,6 +970,148 @@ mod tests {
Ok(())
}

#[test]
fn filtered_point_of_view() -> anyhow::Result<()> {
re_log::setup_logging();

let store = create_nasty_store()?;
eprintln!("{store}");
let query_cache = QueryCache::new(&store);
let query_engine = QueryEngine {
store: &store,
cache: &query_cache,
};

let timeline = Timeline::new_sequence("frame_nr");
let entity_path: EntityPath = "this/that".into();

// non-existing entity
{
let mut query = QueryExpression2::new(timeline);
query.filtered_point_of_view = Some(ComponentColumnSelector {
entity_path: "no/such/entity".into(),
component: MyPoint::name(),
join_encoding: Default::default(),
});
eprintln!("{query:#?}:");

let query_handle = query_engine.query(query.clone());
let dataframe = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.into_batch_iter().collect_vec(),
);
eprintln!("{dataframe}");

let got = format!(
"{:#?}",
dataframe.data.iter().skip(1 /* RowId */).collect_vec()
);
let expected = "[]";

similar_asserts::assert_eq!(expected, got);
}

// non-existing component
{
let mut query = QueryExpression2::new(timeline);
query.filtered_point_of_view = Some(ComponentColumnSelector {
entity_path: entity_path.clone(),
component: "AComponentColumnThatDoesntExist".into(),
join_encoding: Default::default(),
});
eprintln!("{query:#?}:");

let query_handle = query_engine.query(query.clone());
let dataframe = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.into_batch_iter().collect_vec(),
);
eprintln!("{dataframe}");

let got = format!(
"{:#?}",
dataframe.data.iter().skip(1 /* RowId */).collect_vec()
);
let expected = "[]";

similar_asserts::assert_eq!(expected, got);
}

// MyPoint
{
let mut query = QueryExpression2::new(timeline);
query.filtered_point_of_view = Some(ComponentColumnSelector {
entity_path: entity_path.clone(),
component: MyPoint::name(),
join_encoding: Default::default(),
});
eprintln!("{query:#?}:");

let query_handle = query_engine.query(query.clone());
let dataframe = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.into_batch_iter().collect_vec(),
);
eprintln!("{dataframe}");

let got = format!(
"{:#?}",
dataframe.data.iter().skip(1 /* RowId */).collect_vec()
);
let expected = unindent::unindent(
"\
[
Int64[1, 2, 3, 4, 5, 6, 7],
Timestamp(Nanosecond, None)[1970-01-01 00:00:00.000000001, None, None, None, 1970-01-01 00:00:00.000000005, None, 1970-01-01 00:00:00.000000007],
ListArray[None, None, [2], [3], [4], None, [6]],
ListArray[None, None, None, None, None, None, None],
ListArray[[{x: 0, y: 0}], [{x: 1, y: 1}], [{x: 2, y: 2}], [{x: 3, y: 3}], [{x: 4, y: 4}], [{x: 5, y: 5}], [{x: 8, y: 8}]],
]\
"
);

similar_asserts::assert_eq!(expected, got);
}

// MyColor
{
let mut query = QueryExpression2::new(timeline);
query.filtered_point_of_view = Some(ComponentColumnSelector {
entity_path: entity_path.clone(),
component: MyColor::name(),
join_encoding: Default::default(),
});
eprintln!("{query:#?}:");

let query_handle = query_engine.query(query.clone());
let dataframe = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.into_batch_iter().collect_vec(),
);
eprintln!("{dataframe}");

let got = format!(
"{:#?}",
dataframe.data.iter().skip(1 /* RowId */).collect_vec()
);
let expected = unindent::unindent(
"\
[
Int64[3, 4, 5, 7],
Timestamp(Nanosecond, None)[None, None, None, None],
ListArray[[2], [3], [4], [6]],
ListArray[None, None, None, None],
ListArray[None, None, None, None],
]\
",
);

similar_asserts::assert_eq!(expected, got);
}

Ok(())
}

#[test]
fn view_contents() -> anyhow::Result<()> {
re_log::setup_logging();
Expand Down

0 comments on commit 4ba9fcc

Please sign in to comment.