Skip to content

Commit

Permalink
Dataframe API v2 rerun-io#2: MVP implementation (rerun-io#7560)
Browse files Browse the repository at this point in the history
A first implementation of the new dataframe APIs.
The name is now very misleading though: there isn't anything dataframe-y
left in here, it is a row-based iterator with Rerun semantics baked in,
driven by a sorted streaming join.

It is rather slow (related:
rerun-io#7558 (comment)),
lacks many features and is full of edge cases, but it works.
It does support dedupe-latest semantics (slowly), view contents and
selections, chunk overlaps, and pagination (horribly, by virtue of
implementing `Iterator`).
It does _not_ support `Clear`s, nor `latest-at` sparse-filling, nor
PoVs, nor index sampling. Yet.

Upcoming PRs will be all about fixing these shortcomings one by one.

It should look somewhat familiar:
```rust
let query_cache = QueryCache::new(store);
let query_engine = QueryEngine {
    store,
    cache: &query_cache,
};

let mut query = QueryExpression2::new(timeline);
query.view_contents = Some(
    query_engine
        .iter_entity_paths(&entity_path_filter)
        .map(|entity_path| (entity_path, None))
        .collect(),
);
query.filtered_index_range = Some(ResolvedTimeRange::new(time_from, time_to));
eprintln!("{query:#?}:");

let query_handle = query_engine.query(query.clone());
// eprintln!("{:#?}", query_handle.selected_contents());
for batch in query_handle.into_batch_iter().skip(offset).take(len) {
    eprintln!("{batch}");
}
```

No tests until we have the guarantee that these are the semantics we
will commit to.

* Part of rerun-io#7495 
* Requires rerun-io#7559
  • Loading branch information
teh-cmc authored Oct 2, 2024
1 parent c692760 commit aab3ed9
Show file tree
Hide file tree
Showing 11 changed files with 1,002 additions and 3 deletions.
1 change: 1 addition & 0 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ Update instructions:
| re_entity_db | In-memory storage of Rerun entities |
| re_query | Querying data in the re_chunk_store |
| re_dataframe | The Rerun public data APIs. |
| re_dataframe2 | The Rerun public data APIs. |
| re_types | The built-in Rerun data types, component types, and archetypes. |
| re_types_blueprint | The core traits and types that power Rerun's Blueprint sub-system. |
| re_log_encoding | Helpers for encoding and transporting Rerun log messages |
Expand Down
27 changes: 27 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5037,6 +5037,33 @@ dependencies = [
"thiserror",
]

[[package]]
name = "re_dataframe2"
version = "0.19.0-alpha.1+dev"
dependencies = [
"ahash",
"anyhow",
"backtrace",
"indent",
"itertools 0.13.0",
"nohash-hasher",
"parking_lot",
"paste",
"re_arrow2",
"re_chunk",
"re_chunk_store",
"re_error",
"re_format",
"re_log",
"re_log_types",
"re_query",
"re_tracing",
"re_types",
"re_types_core",
"seq-macro",
"thiserror",
]

[[package]]
name = "re_dev_tools"
version = "0.19.0-alpha.1+dev"
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ re_chunk_store = { path = "crates/store/re_chunk_store", version = "=0.19.0-alph
re_data_loader = { path = "crates/store/re_data_loader", version = "=0.19.0-alpha.1", default-features = false }
re_data_source = { path = "crates/store/re_data_source", version = "=0.19.0-alpha.1", default-features = false }
re_dataframe = { path = "crates/store/re_dataframe", version = "=0.19.0-alpha.1", default-features = false }
re_dataframe2 = { path = "crates/store/re_dataframe2", version = "=0.19.0-alpha.1", default-features = false }
re_entity_db = { path = "crates/store/re_entity_db", version = "=0.19.0-alpha.1", default-features = false }
re_format_arrow = { path = "crates/store/re_format_arrow", version = "=0.19.0-alpha.1", default-features = false }
re_log_encoding = { path = "crates/store/re_log_encoding", version = "=0.19.0-alpha.1", default-features = false }
Expand Down
31 changes: 29 additions & 2 deletions crates/store/re_chunk_store/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ impl std::fmt::Display for SparseFillStrategy {
///
// TODO(cmc): we need to be able to build that easily in a command-line context, otherwise it's just
// very annoying. E.g. `--with /world/points:[rr.Position3D, rr.Radius] --with /cam:[rr.Pinhole]`.
pub type ViewContents = BTreeMap<EntityPath, Option<BTreeSet<ComponentName>>>;
pub type ViewContentsSelector = BTreeMap<EntityPath, Option<BTreeSet<ComponentName>>>;

// TODO(cmc): Ultimately, this shouldn't be hardcoded to `Timeline`, but to a generic `I: Index`.
// `Index` in this case should also be implemented on tuples (`(I1, I2, ...)`).
Expand Down Expand Up @@ -801,7 +801,7 @@ pub struct QueryExpression2 {
/// "metrics": [rr.Scalar]
/// }
/// ```
pub view_contents: Option<ViewContents>,
pub view_contents: Option<ViewContentsSelector>,

/// The index used to filter out _rows_ from the view contents.
///
Expand Down Expand Up @@ -1153,4 +1153,31 @@ impl ChunkStore {
.filter(|descr| !filtered_out.contains(descr))
.collect()
}

/// Returns the filtered schema for the given [`ViewContentsSelector`].
///
/// The order of the columns is guaranteed to be in a specific order:
/// * first, the control columns in lexical order (`RowId`);
/// * second, the time columns in lexical order (`frame_nr`, `log_time`, ...);
/// * third, the component columns in lexical order (`Color`, `Radius, ...`).
pub fn schema_for_view_contents(
&self,
view_contents: &ViewContentsSelector,
) -> Vec<ColumnDescriptor> {
re_tracing::profile_function!();

self.schema()
.into_iter()
.filter(|column| match column {
ColumnDescriptor::Control(_) | ColumnDescriptor::Time(_) => true,
ColumnDescriptor::Component(column) => view_contents
.get(&column.entity_path)
.map_or(false, |components| {
components.as_ref().map_or(true, |components| {
components.contains(&column.component_name)
})
}),
})
.collect()
}
}
2 changes: 1 addition & 1 deletion crates/store/re_chunk_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub use self::dataframe::{
ColumnDescriptor, ColumnSelector, ComponentColumnDescriptor, ComponentColumnSelector,
ControlColumnDescriptor, ControlColumnSelector, Index, IndexRange, IndexValue, JoinEncoding,
LatestAtQueryExpression, QueryExpression, QueryExpression2, RangeQueryExpression,
SparseFillStrategy, TimeColumnDescriptor, TimeColumnSelector,
SparseFillStrategy, TimeColumnDescriptor, TimeColumnSelector, ViewContentsSelector,
};
pub use self::events::{ChunkStoreDiff, ChunkStoreDiffKind, ChunkStoreEvent};
pub use self::gc::{GarbageCollectionOptions, GarbageCollectionTarget};
Expand Down
52 changes: 52 additions & 0 deletions crates/store/re_dataframe2/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
[package]
name = "re_dataframe2"
authors.workspace = true
description = "High-level query APIs"
edition.workspace = true
homepage.workspace = true
include.workspace = true
license.workspace = true
publish = true
readme = "README.md"
repository.workspace = true
rust-version.workspace = true
version.workspace = true

[lints]
workspace = true

[package.metadata.docs.rs]
all-features = true


[features]
default = []


[dependencies]
# Rerun dependencies:
re_chunk.workspace = true
re_chunk_store.workspace = true
re_error.workspace = true
re_format.workspace = true
re_log.workspace = true
re_log_types.workspace = true
re_query.workspace = true
re_tracing.workspace = true
re_types_core.workspace = true

# External dependencies:
ahash.workspace = true
anyhow.workspace = true
arrow2.workspace = true
backtrace.workspace = true
indent.workspace = true
itertools.workspace = true
nohash-hasher.workspace = true
parking_lot.workspace = true
paste.workspace = true
seq-macro.workspace = true
thiserror.workspace = true

[dev-dependencies]
re_types.workspace = true
10 changes: 10 additions & 0 deletions crates/store/re_dataframe2/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# re_dataframe2

Part of the [`rerun`](https://github.com/rerun-io/rerun) family of crates.

[![Latest version](https://img.shields.io/crates/v/re_dataframe2.svg)](https://crates.io/crates/re_dataframe2)
[![Documentation](https://docs.rs/re_dataframe2/badge.svg)](https://docs.rs/re_dataframe2)
![MIT](https://img.shields.io/badge/license-MIT-blue.svg)
![Apache](https://img.shields.io/badge/license-Apache-blue.svg)

The Rerun public data APIs. Get dataframes back from your Rerun datastore.
78 changes: 78 additions & 0 deletions crates/store/re_dataframe2/examples/query.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#![allow(clippy::unwrap_used, clippy::match_same_arms)]

use itertools::Itertools;

use re_chunk::TimeInt;
use re_chunk_store::{ChunkStore, ChunkStoreConfig, QueryExpression2, Timeline, VersionPolicy};
use re_dataframe2::{QueryCache, QueryEngine};
use re_log_types::{EntityPathFilter, ResolvedTimeRange, StoreKind};

fn main() -> anyhow::Result<()> {
let args = std::env::args().collect_vec();

let get_arg = |i| {
let Some(value) = args.get(i) else {
eprintln!(
"Usage: {} <path_to_rrd> [timeline] [from] [to] [entity_path_filter]",
args.first().map_or("$BIN", |s| s.as_str())
);
std::process::exit(1);
};
value
};

let path_to_rrd = get_arg(1);
let timeline_name = args.get(2).map_or("log_time", |s| s.as_str());
let time_from = args.get(3).map_or(TimeInt::MIN, |s| {
TimeInt::new_temporal(s.parse::<i64>().unwrap())
});
let time_to = args.get(4).map_or(TimeInt::MAX, |s| {
TimeInt::new_temporal(s.parse::<i64>().unwrap())
});
let entity_path_filter = EntityPathFilter::try_from(args.get(5).map_or("/**", |s| s.as_str()))?;

// TODO(cmc): We need to take a selector, not a Timeline.
let timeline = match timeline_name {
"log_time" => Timeline::new_temporal(timeline_name),
"log_tick" => Timeline::new_sequence(timeline_name),
"frame" => Timeline::new_sequence(timeline_name),
"frame_nr" => Timeline::new_sequence(timeline_name),
_ => Timeline::new_temporal(timeline_name),
};

let stores = ChunkStore::from_rrd_filepath(
&ChunkStoreConfig::DEFAULT,
path_to_rrd,
VersionPolicy::Warn,
)?;

for (store_id, store) in &stores {
if store_id.kind != StoreKind::Recording {
continue;
}

let query_cache = QueryCache::new(store);
let query_engine = QueryEngine {
store,
cache: &query_cache,
};

let mut query = QueryExpression2::new(timeline);
query.view_contents = Some(
query_engine
.iter_entity_paths(&entity_path_filter)
.map(|entity_path| (entity_path, None))
.collect(),
);
query.filtered_index_range = Some(ResolvedTimeRange::new(time_from, time_to));
eprintln!("{query:#?}:");

let query_handle = query_engine.query(query.clone());
// eprintln!("{:#?}", query_handle.selected_contents());
for batch in query_handle.into_batch_iter() {
eprintln!("{batch}");
}
}

Ok(())
}
96 changes: 96 additions & 0 deletions crates/store/re_dataframe2/src/engine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use re_chunk::{EntityPath, TransportChunk};
use re_chunk_store::{
ChunkStore, ColumnDescriptor, QueryExpression, QueryExpression2, ViewContentsSelector,
};
use re_log_types::EntityPathFilter;
use re_query::Caches;

use crate::QueryHandle;

// Used all over in docstrings.
#[allow(unused_imports)]
use re_chunk_store::ComponentColumnDescriptor;

// ---

// TODO(#3741): `arrow2` has no concept of a `RecordBatch`, so for now we just use our trustworthy
// `TransportChunk` type until we migrate to `arrow-rs`.
// `TransportChunk` maps 1:1 to `RecordBatch` so the switch (and the compatibility layer in the meantime)
// will be trivial.
// TODO(cmc): add an `arrow` feature to transportchunk in a follow-up pr and call it a day.
pub type RecordBatch = TransportChunk;

// --- Queries ---

/// A handle to our user-facing query engine.
///
/// See the following methods:
/// * [`QueryEngine::schema`]: get the complete schema of the recording.
/// * [`QueryEngine::query`]: execute a [`QueryExpression2`] on the recording.
//
// TODO(cmc): This needs to be a refcounted type that can be easily be passed around: the ref has
// got to go. But for that we need to generally introduce `ChunkStoreHandle` and `QueryCacheHandle`
// first, and this is not as straightforward as it seems.
pub struct QueryEngine<'a> {
pub store: &'a ChunkStore,
pub cache: &'a Caches,
}

impl QueryEngine<'_> {
/// Returns the full schema of the store.
///
/// This will include a column descriptor for every timeline and every component on every
/// entity that has been written to the store so far.
///
/// The order of the columns to guaranteed to be in a specific order:
/// * first, the control columns in lexical order (`RowId`);
/// * second, the time columns in lexical order (`frame_nr`, `log_time`, ...);
/// * third, the component columns in lexical order (`Color`, `Radius, ...`).
#[inline]
pub fn schema(&self) -> Vec<ColumnDescriptor> {
self.store.schema()
}

/// Returns the filtered schema for the given query expression.
///
/// This will only include columns which may contain non-empty values from the perspective of
/// the query semantics.
///
/// The order of the columns is guaranteed to be in a specific order:
/// * first, the control columns in lexical order (`RowId`);
/// * second, the time columns in lexical order (`frame_nr`, `log_time`, ...);
/// * third, the component columns in lexical order (`Color`, `Radius, ...`).
///
/// This does not run a full-blown query, but rather just inspects `Chunk`-level metadata,
/// which can lead to false positives, but makes this very cheap to compute.
#[inline]
pub fn schema_for_query(&self, query: &QueryExpression) -> Vec<ColumnDescriptor> {
self.store.schema_for_query(query)
}

#[inline]
pub fn schema_for_view_contents(
&self,
view_contents: &ViewContentsSelector,
) -> Vec<ColumnDescriptor> {
self.store.schema_for_view_contents(view_contents)
}

/// Starts a new query by instantiating a [`QueryHandle`].
#[inline]
pub fn query(&self, query: QueryExpression2) -> QueryHandle<'_> {
QueryHandle::new(self, query)
}

/// Returns an iterator over all the [`EntityPath`]s present in the database.
#[inline]
pub fn iter_entity_paths<'a>(
&self,
filter: &'a EntityPathFilter,
) -> impl Iterator<Item = EntityPath> + 'a {
self.store
.all_entities()
.into_iter()
.filter(|entity_path| filter.matches(entity_path))
}
}
20 changes: 20 additions & 0 deletions crates/store/re_dataframe2/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
//! The Rerun public data APIs. Get dataframes back from your Rerun datastore.
mod engine;
mod query;

pub use self::engine::{QueryEngine, RecordBatch};
pub use self::query::QueryHandle;

#[doc(no_inline)]
pub use self::external::arrow2::chunk::Chunk as ArrowChunk;
#[doc(no_inline)]
pub use self::external::re_query::Caches as QueryCache;

pub mod external {
pub use re_chunk;
pub use re_chunk_store;
pub use re_query;

pub use arrow2;
}
Loading

0 comments on commit aab3ed9

Please sign in to comment.