Skip to content

Commit

Permalink
Dataframe queries 1: schema resolution (rerun-io#7339)
Browse files Browse the repository at this point in the history
The schema resolution logic.

* Part of rerun-io#7284 

---

Dataframe APIs PR series:
- rerun-io#7338
- rerun-io#7339
- rerun-io#7340
- rerun-io#7341
- rerun-io#7345
  • Loading branch information
teh-cmc authored Sep 4, 2024
1 parent e685541 commit d6f2375
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 9 deletions.
154 changes: 152 additions & 2 deletions crates/store/re_chunk_store/src/dataframe.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
//! All the APIs used specifically for `re_dataframe`.
#![allow(unused_imports)]

use std::collections::BTreeSet;

use ahash::HashSet;
Expand Down Expand Up @@ -502,3 +500,155 @@ impl<S: AsRef<str>> From<S> for EntityPathExpression {
}
}
}

// ---

impl ChunkStore {
/// Returns the full schema of the store.
///
/// This will include a column descriptor for every timeline and every component on every
/// entity that has been written to the store so far.
///
/// The order of the columns is guaranteed to be in a specific order:
/// * first, the control columns in lexical order (`RowId`);
/// * second, the time columns in lexical order (`frame_nr`, `log_time`, ...);
/// * third, the component columns in lexical order (`Color`, `Radius, ...`).
pub fn schema(&self) -> Vec<ColumnDescriptor> {
re_tracing::profile_function!();

let controls = std::iter::once(ColumnDescriptor::Control(ControlColumnDescriptor {
component_name: RowId::name(),
datatype: RowId::arrow_datatype(),
}));

let timelines = self.all_timelines().into_iter().map(|timeline| {
ColumnDescriptor::Time(TimeColumnDescriptor {
timeline,
datatype: timeline.datatype(),
})
});

let static_components =
self.static_chunk_ids_per_entity
.iter()
.flat_map(|(entity_path, per_component)| {
// TODO(#6889): Fill `archetype_name`/`archetype_field_name` (or whatever their
// final name ends up being) once we generate tags.
per_component.keys().filter_map(|component_name| {
self.lookup_datatype(component_name).map(|datatype| {
ColumnDescriptor::Component(ComponentColumnDescriptor {
entity_path: entity_path.clone(),
archetype_name: None,
archetype_field_name: None,
component_name: *component_name,
datatype: datatype.clone(),
is_static: true,
})
})
})
});

// TODO(cmc): Opportunities for parallelization, if it proves to be a net positive in practice.
let temporal_components = self
.temporal_chunk_ids_per_entity_per_component
.iter()
.flat_map(|(entity_path, per_timeline)| {
per_timeline
.iter()
.map(move |(timeline, per_component)| (entity_path, timeline, per_component))
})
.flat_map(|(entity_path, _timeline, per_component)| {
// TODO(#6889): Fill `archetype_name`/`archetype_field_name` (or whatever their
// final name ends up being) once we generate tags.
per_component.keys().filter_map(|component_name| {
self.lookup_datatype(component_name).map(|datatype| {
ColumnDescriptor::Component(ComponentColumnDescriptor {
entity_path: entity_path.clone(),
archetype_name: None,
archetype_field_name: None,
component_name: *component_name,
datatype: datatype.clone(),
// NOTE: This will make it so shadowed temporal data automatically gets
// discarded from the schema.
is_static: self
.static_chunk_ids_per_entity
.get(entity_path)
.map_or(false, |per_component| {
per_component.contains_key(component_name)
}),
})
})
})
});

let components = static_components
.chain(temporal_components)
.collect::<BTreeSet<_>>();

controls.chain(timelines).chain(components).collect()
}

/// Returns the filtered schema for the given query expression.
///
/// This will only include columns which may contain non-empty values from the perspective of
/// the query semantics.
///
/// The order of the columns is guaranteed to be in a specific order:
/// * first, the control columns in lexical order (`RowId`);
/// * second, the time columns in lexical order (`frame_nr`, `log_time`, ...);
/// * third, the component columns in lexical order (`Color`, `Radius, ...`).
///
/// This does not run a full-blown query, but rather just inspects [`Chunk`]-level metadata,
/// which can lead to false positives, but makes this very cheap to compute.
pub fn schema_for_query(&self, query: &QueryExpression) -> Vec<ColumnDescriptor> {
re_tracing::profile_function!(format!("{query:?}"));

// First, grab the full schema and filters out every entity path that isn't covered by the query.
let schema = self
.schema()
.into_iter()
.filter(|descr| {
descr.entity_path().map_or(true, |entity_path| {
query.entity_path_expr().matches(entity_path)
})
})
.collect_vec();

// Then, discard any column descriptor which cannot possibly have data for the given query.
//
// TODO(cmc): Opportunities for parallelization, if it proves to be a net positive in practice.
let mut filtered_out = HashSet::default();
for column_descr in &schema {
let ColumnDescriptor::Component(descr) = column_descr else {
continue;
};

match query {
QueryExpression::LatestAt(query) => {
let q = LatestAtQuery::new(query.timeline, query.at);
if self
.latest_at_relevant_chunks(&q, &descr.entity_path, descr.component_name)
.is_empty()
{
filtered_out.insert(column_descr.clone());
}
}

QueryExpression::Range(query) => {
let q = LatestAtQuery::new(query.timeline, query.time_range.max());
if self
.latest_at_relevant_chunks(&q, &descr.entity_path, descr.component_name)
.is_empty()
{
filtered_out.insert(column_descr.clone());
}
}
}
}

schema
.into_iter()
.filter(|descr| !filtered_out.contains(descr))
.collect()
}
}
9 changes: 2 additions & 7 deletions crates/store/re_dataframe/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,8 @@ impl QueryEngine<'_> {
/// * second, the time columns in lexical order (`frame_nr`, `log_time`, ...);
/// * third, the component columns in lexical order (`Color`, `Radius, ...`).
#[inline]
#[allow(clippy::unimplemented, clippy::needless_pass_by_value)]
pub fn schema(&self) -> Vec<ColumnDescriptor> {
_ = self;
unimplemented!("TODO(cmc)")
self.store.schema()
}

/// Returns the filtered schema for the given query expression.
Expand All @@ -84,11 +82,8 @@ impl QueryEngine<'_> {
/// This does not run a full-blown query, but rather just inspects [`Chunk`]-level metadata,
/// which can lead to false positives, but makes this very cheap to compute.
#[inline]
#[allow(clippy::unimplemented, clippy::needless_pass_by_value)]
pub fn schema_for_query(&self, query: &QueryExpression) -> Vec<ColumnDescriptor> {
_ = self;
_ = query;
unimplemented!("TODO(cmc)")
self.store.schema_for_query(query)
}

/// Creates a new appropriate [`QueryHandle`].
Expand Down

0 comments on commit d6f2375

Please sign in to comment.