Skip to content

Commit

Permalink
Dataframe v2: support for filtered_index_values (rerun-io#7589)
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc authored Oct 7, 2024
1 parent c44c300 commit bed792e
Showing 1 changed file with 83 additions and 20 deletions.
103 changes: 83 additions & 20 deletions crates/store/re_dataframe2/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use nohash_hasher::IntMap;
use re_chunk::{Chunk, RangeQuery, RowId, TimeInt, Timeline};
use re_chunk_store::{
ColumnDescriptor, ColumnSelector, ComponentColumnDescriptor, ComponentColumnSelector,
ControlColumnDescriptor, ControlColumnSelector, JoinEncoding, QueryExpression2,
ControlColumnDescriptor, ControlColumnSelector, IndexValue, JoinEncoding, QueryExpression2,
TimeColumnDescriptor, TimeColumnSelector,
};
use re_log_types::ResolvedTimeRange;
Expand Down Expand Up @@ -527,6 +527,13 @@ impl QueryHandle<'_> {
.min_by_key(|streaming_state| streaming_state.index_value)
.map(|streaming_state| streaming_state.index_value)?;

if let Some(filtered_index_values) = self.query.filtered_index_values.as_ref() {
if !filtered_index_values.contains(&cur_index_value) {
self.increment_cursors_at_index_value(cur_index_value);
return self.next_row();
}
}

for streaming_state in &mut view_streaming_state {
if streaming_state.as_ref().map(|s| s.index_value) != Some(cur_index_value) {
*streaming_state = None;
Expand Down Expand Up @@ -687,24 +694,7 @@ impl QueryHandle<'_> {
})
.collect_vec();

// We now need to increment cursors.
//
// NOTE: This is trickier than it looks: cursors need to be incremented not only for chunks
// that were used to return data during the current iteration, but also chunks that
// _attempted_ to return data and were preempted for one reason or another (overlap,
// intra-timestamp tie-break, etc).
for view_chunks in &state.view_chunks {
for (cur_cursor, cur_chunk) in view_chunks {
if let Some((index_value, _row_id)) = cur_chunk
.iter_indices(&self.query.filtered_index)
.nth(cur_cursor.load(Ordering::Relaxed) as _)
{
if cur_index_value == index_value {
cur_cursor.fetch_add(1, Ordering::Relaxed);
}
};
}
}
self.increment_cursors_at_index_value(cur_index_value);

debug_assert_eq!(state.arrow_schema.fields.len(), selected_arrays.len());

Expand All @@ -724,6 +714,28 @@ impl QueryHandle<'_> {
data: ArrowChunk::new(self.next_row()?),
})
}

/// Increment cursors for iteration corresponding to `cur_index_value`.
//
// NOTE: This is trickier than it looks: cursors need to be incremented not only for chunks
// that were used to return data during the current iteration, but also chunks that
// _attempted_ to return data and were preempted for one reason or another (overlap,
// intra-timestamp tie-break, etc).
fn increment_cursors_at_index_value(&self, cur_index_value: IndexValue) {
let state = self.init();
for view_chunks in &state.view_chunks {
for (cur_cursor, cur_chunk) in view_chunks {
if let Some((index_value, _row_id)) = cur_chunk
.iter_indices(&self.query.filtered_index)
.nth(cur_cursor.load(Ordering::Relaxed) as _)
{
if cur_index_value == index_value {
cur_cursor.fetch_add(1, Ordering::Relaxed);
}
};
}
}
}
}

impl<'a> QueryHandle<'a> {
Expand Down Expand Up @@ -770,9 +782,9 @@ mod tests {
// In no particular order:
// * [x] filtered_index
// * [x] filtered_index_range
// * [x] filtered_index_values
// * [x] view_contents
// * [x] selection
// * [ ] filtered_index_values
// * [ ] sampled_index_values
// * [ ] filtered_point_of_view
// * [ ] sparse_fill_strategy
Expand Down Expand Up @@ -871,6 +883,57 @@ mod tests {
Ok(())
}

#[test]
fn filtered_index_values() -> anyhow::Result<()> {
re_log::setup_logging();

let store = create_nasty_store()?;
eprintln!("{store}");
let query_cache = QueryCache::new(&store);
let query_engine = QueryEngine {
store: &store,
cache: &query_cache,
};

let timeline = Timeline::new_sequence("frame_nr");
let mut query = QueryExpression2::new(timeline);
query.filtered_index_values = Some(
[0, 3, 6, 9]
.into_iter()
.map(TimeInt::new_temporal)
.chain(std::iter::once(TimeInt::STATIC))
.collect(),
);
eprintln!("{query:#?}:");

let query_handle = query_engine.query(query.clone());
let dataframe = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.into_batch_iter().collect_vec(),
);
eprintln!("{dataframe}");

let got = format!(
"{:#?}",
dataframe.data.iter().skip(1 /* RowId */).collect_vec()
);
let expected = unindent::unindent(
"\
[
Int64[None, 3, 6],
Timestamp(Nanosecond, None)[None, None, None],
ListArray[None, [2], None],
ListArray[[c], None, None],
ListArray[None, [{x: 2, y: 2}], [{x: 5, y: 5}]],
]\
",
);

similar_asserts::assert_eq!(expected, got);

Ok(())
}

#[test]
fn view_contents() -> anyhow::Result<()> {
re_log::setup_logging();
Expand Down

0 comments on commit bed792e

Please sign in to comment.