Skip to content

Commit

Permalink
Dataframe queries 4: paginated dense range (rerun-io#7345)
Browse files Browse the repository at this point in the history
Implements the paginated dense range dataframe APIs.

If there's no off-by-one anywhere in there, I will eat my hat.
Getting this in the hands of people is the highest prio though, I'll add
tests later.


![image](https://github.com/user-attachments/assets/e865ba62-21db-41c1-9899-35a0e7aea134)

![image](https://github.com/user-attachments/assets/32934ba8-2673-401a-aafc-409dfbe9b2c5)


* Fixes rerun-io#7284 

---

Dataframe APIs PR series:
- rerun-io#7338
- rerun-io#7339
- rerun-io#7340
- rerun-io#7341
- rerun-io#7345
  • Loading branch information
teh-cmc authored Sep 4, 2024
1 parent 9a994a0 commit 5155636
Show file tree
Hide file tree
Showing 2 changed files with 209 additions and 5 deletions.
105 changes: 105 additions & 0 deletions crates/store/re_dataframe/examples/range_paginated.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#![allow(clippy::unwrap_used)]

use itertools::Itertools as _;

use re_chunk_store::{
ChunkStore, ChunkStoreConfig, ComponentColumnDescriptor, RangeQueryExpression, Timeline,
VersionPolicy,
};
use re_dataframe::{QueryEngine, RecordBatch};
use re_log_types::{ResolvedTimeRange, StoreKind};

fn main() -> anyhow::Result<()> {
let args = std::env::args().collect_vec();

let get_arg = |i| {
let Some(value) = args.get(i) else {
eprintln!(
"Usage: {} <path_to_rrd_with_position3ds> <entity_path_pov> [entity_path_expr]",
args.first().map_or("$BIN", |s| s.as_str())
);
std::process::exit(1);
};
value
};

let path_to_rrd = get_arg(1);
let entity_path_pov = get_arg(2).as_str();
let entity_path_expr = args.get(3).map_or("/**", |s| s.as_str());

let stores = ChunkStore::from_rrd_filepath(
&ChunkStoreConfig::ALL_DISABLED,
path_to_rrd,
VersionPolicy::Warn,
)?;

for (store_id, store) in &stores {
if store_id.kind != StoreKind::Recording {
continue;
}

let cache = re_dataframe::external::re_query::Caches::new(store);
let engine = QueryEngine {
store,
cache: &cache,
};

let query = RangeQueryExpression {
entity_path_expr: entity_path_expr.into(),
timeline: Timeline::log_tick(),
time_range: ResolvedTimeRange::new(0, 30),
pov: ComponentColumnDescriptor::new::<re_types::components::Position3D>(
entity_path_pov.into(),
),
};

let query_handle = engine.range(&query, None /* columns */);
println!("{query}:");
println!(
"num_chunks:{} num_rows:{}",
query_handle.num_chunks(),
query_handle.num_rows()
);

let (offset, len) = (0, 4);
println!("offset:{offset} len:{len}");
concat_and_print(query_handle.get(offset, len));

let (offset, len) = (2, 4);
println!("offset:{offset} len:{len}");
concat_and_print(query_handle.get(offset, len));

let (offset, len) = (10, 5);
println!("offset:{offset} len:{len}");
concat_and_print(query_handle.get(offset, len));

let (offset, len) = (0, 15);
println!("offset:{offset} len:{len}");
concat_and_print(query_handle.get(offset, len));
}

Ok(())
}

fn concat_and_print(chunks: Vec<RecordBatch>) {
use re_chunk::external::arrow2::{
chunk::Chunk as ArrowChunk, compute::concatenate::concatenate,
};

let chunk = chunks.into_iter().reduce(|acc, chunk| RecordBatch {
schema: chunk.schema.clone(),
data: ArrowChunk::new(
acc.data
.iter()
.zip(chunk.data.iter())
.map(|(l, r)| concatenate(&[&**l, &**r]).unwrap())
.collect(),
),
});

if let Some(chunk) = chunk {
println!("{chunk}");
} else {
println!("<empty>");
}
}
109 changes: 104 additions & 5 deletions crates/store/re_dataframe/src/range.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::VecDeque, sync::OnceLock};
use std::sync::{atomic::AtomicU64, OnceLock};

use ahash::HashMap;
use arrow2::{
Expand Down Expand Up @@ -47,7 +47,12 @@ struct RangeQuerytHandleState {
/// All the [`Chunk`]s for the active point-of-view.
///
/// These are already sorted and vertically sliced according to the query.
pov_chunks: Option<VecDeque<Chunk>>,
pov_chunks: Option<Vec<Chunk>>,

/// Tracks the current page index. Used for [`RangeQueryHandle::next_page`].
//
// NOTE: The state is behind a `OnceLock`, the atomic just make some things simpler down the road.
cur_page: AtomicU64,
}

impl<'a> RangeQueryHandle<'a> {
Expand Down Expand Up @@ -103,12 +108,12 @@ impl RangeQueryHandle<'_> {
.find_map(|(component_name, chunks)| {
(component_name == self.query.pov.component_name).then_some(chunks)
})
.map(Into::into)
};

RangeQuerytHandleState {
columns,
pov_chunks,
cur_page: AtomicU64::new(0),
}
})
}
Expand Down Expand Up @@ -144,8 +149,102 @@ impl RangeQueryHandle<'_> {
pub fn next_page(&mut self) -> Option<RecordBatch> {
re_tracing::profile_function!(format!("{:?}", self.query));

_ = self.init();
let pov_chunk = self.state.get_mut()?.pov_chunks.as_mut()?.pop_front()?;
let state = self.init();
let cur_page = state.cur_page.load(std::sync::atomic::Ordering::Relaxed);
let pov_chunk = state.pov_chunks.as_ref()?.get(cur_page as usize)?;
_ = state
.cur_page
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);

self.dense_batch_at_pov(pov_chunk)
}

/// Partially executes the range query in order to return the specified range of rows.
///
/// Returns a vector of [`RecordBatch`]es: as many as required to fill the specified range.
/// Each [`RecordBatch`] will correspond a "natural page" of data, even the first and last batch,
/// although they might be cut off at the edge.
/// Each cell in the result corresponds to the latest known value at that particular point in time
/// for each respective `ColumnDescriptor`.
///
/// The schema of the returned [`RecordBatch`]es is guaranteed to match the one returned by
/// [`Self::schema`].
/// Columns that do not yield any data will still be present in the results, filled with null values.
///
/// "Natural pages" refers to pages of data that match 1:1 to the underlying storage.
/// The size of each page cannot be known in advance, as it depends on unspecified
/// implementation details.
/// This is the most performant way to iterate over the dataset.
//
// TODO(cmc): This could be turned into an actual lazy iterator at some point.
pub fn get(&self, offset: u64, mut len: u64) -> Vec<RecordBatch> {
let mut results = Vec::new();

let state = self.init();
let Some(pov_chunks) = state.pov_chunks.as_ref() else {
return results;
};
let mut pov_chunks = pov_chunks.iter();

let mut cur_offset = 0;
let Some(mut cur_pov_chunk) = pov_chunks.next().cloned() else {
return results;
};

// Fast-forward until the first relevant PoV chunk.
//
// TODO(cmc): should keep an extra sorted datastructure and use a binsearch instead.
while (cur_offset + cur_pov_chunk.num_rows() as u64) < offset {
cur_offset += cur_pov_chunk.num_rows() as u64;

let Some(next_pov_chunk) = pov_chunks.next().cloned() else {
return results;
};
cur_pov_chunk = next_pov_chunk;
}

// Fast-forward to until the first relevant row in the PoV chunk.
let mut offset = if cur_offset < offset {
offset.saturating_sub(cur_offset)
} else {
0
};

// Repeatedly compute dense ranges until we've returned `len` rows.
while len > 0 {
cur_pov_chunk = cur_pov_chunk.row_sliced(offset as _, len as _);
results.extend(self.dense_batch_at_pov(&cur_pov_chunk));

offset = 0; // always start at the first row after the first chunk
len = len.saturating_sub(cur_pov_chunk.num_rows() as u64);

let Some(next_pov_chunk) = pov_chunks.next().cloned() else {
break;
};
cur_pov_chunk = next_pov_chunk;
}

results
}

/// How many chunks / natural pages of data will be returned?
#[inline]
pub fn num_chunks(&self) -> u64 {
self.init()
.pov_chunks
.as_ref()
.map_or(0, |pov_chunks| pov_chunks.len() as _)
}

/// How many rows of data will be returned?
#[inline]
pub fn num_rows(&self) -> u64 {
self.init().pov_chunks.as_ref().map_or(0, |pov_chunks| {
pov_chunks.iter().map(|chunk| chunk.num_rows() as u64).sum()
})
}

fn dense_batch_at_pov(&self, pov_chunk: &Chunk) -> Option<RecordBatch> {
let pov_time_column = pov_chunk.timelines().get(&self.query.timeline)?;
let columns = self.schema();

Expand Down

0 comments on commit 5155636

Please sign in to comment.