Skip to content

Commit

Permalink
Dataframe queries 2: latest-at (rerun-io#7340)
Browse files Browse the repository at this point in the history
Implements the latest-api dataframe API.

Examples:
```
cargo r --all-features -p re_dataframe --example latest_at -- /tmp/helix.rrd
cargo r --all-features -p re_dataframe --example latest_at -- /tmp/helix.rrd /helix/structure/scaffolding/**
```

```rust
use itertools::Itertools as _;

use re_chunk::{TimeInt, Timeline};
use re_chunk_store::{ChunkStore, ChunkStoreConfig, LatestAtQueryExpression, VersionPolicy};
use re_dataframe::QueryEngine;
use re_log_types::StoreKind;

fn main() -> anyhow::Result<()> {
    let args = std::env::args().collect_vec();

    let get_arg = |i| {
        let Some(value) = args.get(i) else {
            eprintln!(
                "Usage: {} <path_to_rrd> <entity_path_expr>",
                args.first().map_or("$BIN", |s| s.as_str())
            );
            std::process::exit(1);
        };
        value
    };

    let path_to_rrd = get_arg(1);
    let entity_path_expr = args.get(2).map_or("/**", |s| s.as_str());

    let stores = ChunkStore::from_rrd_filepath(
        &ChunkStoreConfig::DEFAULT,
        path_to_rrd,
        VersionPolicy::Warn,
    )?;

    for (store_id, store) in &stores {
        if store_id.kind != StoreKind::Recording {
            continue;
        }

        let cache = re_dataframe::external::re_query::Caches::new(store);
        let engine = QueryEngine {
            store,
            cache: &cache,
        };

        let query = LatestAtQueryExpression {
            entity_path_expr: entity_path_expr.into(),
            timeline: Timeline::log_time(),
            at: TimeInt::MAX,
        };

        let query_handle = engine.latest_at(&query, None /* columns */);
        let batch = query_handle.get();

        eprintln!("{query}:\n{batch}");
    }

    Ok(())
}
```

* Part of rerun-io#7284 

---

Dataframe APIs PR series:
- rerun-io#7338
- rerun-io#7339
- rerun-io#7340
- rerun-io#7341
- rerun-io#7345
  • Loading branch information
teh-cmc authored Sep 4, 2024
1 parent d6f2375 commit caa0654
Show file tree
Hide file tree
Showing 4 changed files with 291 additions and 10 deletions.
55 changes: 55 additions & 0 deletions crates/store/re_dataframe/examples/latest_at.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use itertools::Itertools as _;

use re_chunk::{TimeInt, Timeline};
use re_chunk_store::{ChunkStore, ChunkStoreConfig, LatestAtQueryExpression, VersionPolicy};
use re_dataframe::QueryEngine;
use re_log_types::StoreKind;

fn main() -> anyhow::Result<()> {
let args = std::env::args().collect_vec();

let get_arg = |i| {
let Some(value) = args.get(i) else {
eprintln!(
"Usage: {} <path_to_rrd> <entity_path_expr>",
args.first().map_or("$BIN", |s| s.as_str())
);
std::process::exit(1);
};
value
};

let path_to_rrd = get_arg(1);
let entity_path_expr = args.get(2).map_or("/**", |s| s.as_str());

let stores = ChunkStore::from_rrd_filepath(
&ChunkStoreConfig::DEFAULT,
path_to_rrd,
VersionPolicy::Warn,
)?;

for (store_id, store) in &stores {
if store_id.kind != StoreKind::Recording {
continue;
}

let cache = re_dataframe::external::re_query::Caches::new(store);
let engine = QueryEngine {
store,
cache: &cache,
};

let query = LatestAtQueryExpression {
entity_path_expr: entity_path_expr.into(),
timeline: Timeline::log_time(),
at: TimeInt::MAX,
};

let query_handle = engine.latest_at(&query, None /* columns */);
let batch = query_handle.get();

eprintln!("{query}:\n{batch}");
}

Ok(())
}
17 changes: 7 additions & 10 deletions crates/store/re_dataframe/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use re_chunk_store::{
};
use re_query::Caches;

use crate::LatestAtQueryHandle;

// ---

// TODO(#3741): `arrow2` has no concept of a `RecordBatch`, so for now we just use our trustworthy
Expand All @@ -13,7 +15,6 @@ use re_query::Caches;
// TODO(cmc): add an `arrow` feature to transportchunk in a follow-up pr and call it a day.
pub type RecordBatch = TransportChunk;

pub struct LatestAtQueryHandle<'a>(&'a ());
pub struct RangeQueryHandle<'a>(&'a ());

/// A generic handle to a query that is ready to be executed.
Expand Down Expand Up @@ -98,10 +99,10 @@ impl QueryEngine<'_> {
query: &QueryExpression,
columns: Option<Vec<ColumnDescriptor>>,
) -> QueryHandle<'_> {
_ = self;
_ = query;
_ = columns;
unimplemented!("TODO(cmc)")
match query {
QueryExpression::LatestAt(query) => self.latest_at(query, columns).into(),
QueryExpression::Range(query) => self.range(query, columns).into(),
}
}

/// Creates a new [`LatestAtQueryHandle`], which can be used to perform a latest-at query.
Expand All @@ -120,16 +121,12 @@ impl QueryEngine<'_> {
/// for a range of data on the `frame` timeline, but still include the `log_time` timeline in
/// the result.
#[inline]
#[allow(clippy::unimplemented, clippy::needless_pass_by_value)]
pub fn latest_at(
&self,
query: &LatestAtQueryExpression,
columns: Option<Vec<ColumnDescriptor>>,
) -> LatestAtQueryHandle<'_> {
_ = self;
_ = query;
_ = columns;
unimplemented!("TODO(cmc)")
LatestAtQueryHandle::new(self, query.clone(), columns)
}

/// Creates a new [`RangeQueryHandle`], which can be used to perform a range query.
Expand Down
227 changes: 227 additions & 0 deletions crates/store/re_dataframe/src/latest_at.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
use std::sync::OnceLock;

use ahash::HashMap;
use arrow2::{
array::Array as ArrowArray, chunk::Chunk as ArrowChunk, datatypes::Schema as ArrowSchema,
};
use itertools::Itertools;

use re_chunk::{LatestAtQuery, TimeInt, Timeline, UnitChunkShared};
use re_chunk_store::{ColumnDescriptor, ComponentColumnDescriptor, LatestAtQueryExpression};

use crate::{QueryEngine, RecordBatch};

// ---

/// A handle to a latest-at query, ready to be executed.
///
/// Cheaply created via [`QueryEngine::latest_at`].
///
/// See [`LatestAtQueryHandle::get`].
pub struct LatestAtQueryHandle<'a> {
/// Handle to the [`QueryEngine`].
engine: &'a QueryEngine<'a>,

/// The original query expression used to instantiate this handle.
query: LatestAtQueryExpression,

/// The user-specified schema that describes any data returned through this handle, if any.
user_columns: Option<Vec<ColumnDescriptor>>,

/// Internal private state. Lazily computed.
///
/// It is important that handles stay cheap to create.
state: OnceLock<LatestAtQueryHandleState>,
}

/// Internal private state. Lazily computed.
struct LatestAtQueryHandleState {
/// The final schema.
columns: Vec<ColumnDescriptor>,
}

impl<'a> LatestAtQueryHandle<'a> {
pub(crate) fn new(
engine: &'a QueryEngine<'a>,
query: LatestAtQueryExpression,
user_columns: Option<Vec<ColumnDescriptor>>,
) -> Self {
Self {
engine,
query,
user_columns,
state: Default::default(),
}
}
}

impl LatestAtQueryHandle<'_> {
/// All results returned by this handle will strictly follow this schema.
///
/// Columns that do not yield any data will still be present in the results, filled with null values.
pub fn schema(&self) -> &[ColumnDescriptor] {
let state = self.state.get_or_init(|| {
let columns = {
re_tracing::profile_scope!("compute schema");

self.user_columns
.clone()
.unwrap_or_else(|| {
self.engine
.store
.schema_for_query(&self.query.clone().into())
})
.into_iter()
// NOTE: We drop `RowId`, as it doesn't make any sense in a compound row like the
// one we are returning.
.filter(|descr| !matches!(descr, ColumnDescriptor::Control(_)))
.collect()
};

LatestAtQueryHandleState { columns }
});

&state.columns
}

/// Performs the latest-at query.
///
/// Returns a single [`RecordBatch`] containing a single row, where each cell corresponds to
/// the latest known value at that particular point in time for each respective `ColumnDescriptor`.
///
/// The schema of the returned [`RecordBatch`] is guaranteed to match the one returned by
/// [`Self::schema`].
/// Columns that do not yield any data will still be present in the results, filled with null values.
pub fn get(&self) -> RecordBatch {
re_tracing::profile_function!(format!("{:?}", self.query));

let columns = self.schema();

let schema = ArrowSchema {
fields: columns
.iter()
.map(ColumnDescriptor::to_arrow_field)
.collect(),

// TODO(#6889): properly some sorbet stuff we want to get in there at some point.
metadata: Default::default(),
};

let all_units: HashMap<&ComponentColumnDescriptor, UnitChunkShared> = {
re_tracing::profile_scope!("queries");

// TODO(cmc): Opportunities for parallelization, if it proves to be a net positive in practice.
let query = LatestAtQuery::new(self.query.timeline, self.query.at);
columns
.iter()
.filter_map(|descr| match descr {
ColumnDescriptor::Component(descr) => {
let results = self.engine.cache.latest_at(
self.engine.store,
&query,
&descr.entity_path,
[descr.component_name],
);

results
.components
.get(&descr.component_name)
.cloned()
.map(|chunk| (descr, chunk))
}

_ => None,
})
.collect()
};

let mut max_time_per_timeline = HashMap::<Timeline, (TimeInt, UnitChunkShared)>::default();
{
re_tracing::profile_scope!("compound times");

let timelines = columns
.iter()
.filter_map(|descr| match descr {
ColumnDescriptor::Time(descr) => Some(descr.timeline),
_ => None,
})
.collect_vec();

for unit in all_units.values() {
for &timeline in &timelines {
if let Some((time, _)) = unit.index(&timeline) {
max_time_per_timeline
.entry(timeline)
.and_modify(|(cur_time, cur_unit)| {
if *cur_time < time {
*cur_time = time;
*cur_unit = unit.clone();
}
})
.or_insert_with(|| (time, unit.clone()));
}
}
}
}

// NOTE: Keep in mind this must match the ordering specified by `Self::schema`.
let packed_arrays = {
re_tracing::profile_scope!("packing");

columns
.iter()
.filter_map(|descr| match descr {
ColumnDescriptor::Control(_) => {
if cfg!(debug_assertions) {
unreachable!("filtered out during schema computation");
} else {
// NOTE: Technically cannot ever happen, but I'd rather that than an uwnrap.
None
}
}

ColumnDescriptor::Time(descr) => {
let time_column = max_time_per_timeline
.remove(&descr.timeline)
.and_then(|(_, chunk)| chunk.timelines().get(&descr.timeline).cloned());

Some(time_column.map_or_else(
|| arrow2::array::new_null_array(descr.datatype.clone(), 1),
|time_column| time_column.times_array().to_boxed(),
))
}

ColumnDescriptor::Component(descr) => Some(
all_units
.get(descr)
.and_then(|chunk| chunk.components().get(&descr.component_name))
.map_or_else(
|| arrow2::array::new_null_array(descr.datatype.clone(), 1),
|list_array| list_array.to_boxed(),
),
),
})
.collect()
};

RecordBatch {
schema,
data: ArrowChunk::new(packed_arrays),
}
}
}

impl<'a> LatestAtQueryHandle<'a> {
#[allow(clippy::should_implement_trait)] // we need an anonymous closure, this won't work
pub fn into_iter(self) -> impl Iterator<Item = RecordBatch> + 'a {
let mut yielded = false;
std::iter::from_fn(move || {
if yielded {
None
} else {
yielded = true;
Some(self.get())
}
})
}
}
2 changes: 2 additions & 0 deletions crates/store/re_dataframe/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
//! The Rerun public data APIs. Get dataframes back from your Rerun datastore.
mod engine;
mod latest_at;

pub use self::engine::{QueryEngine, QueryHandle, RecordBatch};
pub use self::latest_at::LatestAtQueryHandle;

pub mod external {
pub use re_chunk;
Expand Down

0 comments on commit caa0654

Please sign in to comment.