Skip to content

Commit

Permalink
Dataframe queries: address first wave of bug reports (rerun-io#7362)
Browse files Browse the repository at this point in the history
Fix and test all dataframe API bugs reported so far (I think?).

Namely:
* Fix behavior when there is no data available vs. empty data etc.
* Fix behavior with regards to static data.
* Fix puffin probe names being completely broken.
* Fix `QueryHandle::num_rows` being wrong in some edge cases.

Also introduces the first test suites.

* Fixes rerun-io#7356

### Checklist
* [x] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)
* [x] I've included a screenshot or gif (if applicable)
* [x] I have tested the web demo (if applicable):
* Using examples from latest `main` build:
[rerun.io/viewer](https://rerun.io/viewer/pr/{{pr.number}}?manifest_url=https://app.rerun.io/version/main/examples_manifest.json)
* Using full set of examples from `nightly` build:
[rerun.io/viewer](https://rerun.io/viewer/pr/{{pr.number}}?manifest_url=https://app.rerun.io/version/nightly/examples_manifest.json)
* [x] The PR title and labels are set such as to maximize their
usefulness for the next release's CHANGELOG
* [x] If applicable, add a new check to the [release
checklist](https://github.com/rerun-io/rerun/blob/main/tests/python/release_checklist)!
* [x] If have noted any breaking changes to the log API in
`CHANGELOG.md` and the migration guide

- [PR Build Summary](https://build.rerun.io/pr/{{pr.number}})
- [Recent benchmark results](https://build.rerun.io/graphs/crates.html)
- [Wasm size tracking](https://build.rerun.io/graphs/sizes.html)

To run all checks from `main`, comment on the PR with `@rerun-bot
full-check`.
  • Loading branch information
teh-cmc authored Sep 5, 2024
1 parent 3645c10 commit ce83447
Show file tree
Hide file tree
Showing 2 changed files with 406 additions and 13 deletions.
158 changes: 155 additions & 3 deletions crates/store/re_dataframe/src/latest_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl LatestAtQueryHandle<'_> {
/// [`Self::schema`].
/// Columns that do not yield any data will still be present in the results, filled with null values.
pub fn get(&self) -> RecordBatch {
re_tracing::profile_function!(format!("{:?}", self.query));
re_tracing::profile_function!(format!("{}", self.query));

let columns = self.schema();

Expand Down Expand Up @@ -154,6 +154,10 @@ impl LatestAtQueryHandle<'_> {
}
}

// If the query didn't return anything at all, we just want a properly empty Recordbatch with
// the right schema.
let null_array_length = max_time_per_timeline.get(&self.query.timeline).is_some() as usize;

// NOTE: Keep in mind this must match the ordering specified by `Self::schema`.
let packed_arrays = {
re_tracing::profile_scope!("packing");
Expand All @@ -176,7 +180,12 @@ impl LatestAtQueryHandle<'_> {
.and_then(|(_, chunk)| chunk.timelines().get(&descr.timeline).cloned());

Some(time_column.map_or_else(
|| arrow2::array::new_null_array(descr.datatype.clone(), 1),
|| {
arrow2::array::new_null_array(
descr.datatype.clone(),
null_array_length,
)
},
|time_column| time_column.times_array().to_boxed(),
))
}
Expand All @@ -186,7 +195,12 @@ impl LatestAtQueryHandle<'_> {
.get(descr)
.and_then(|chunk| chunk.components().get(&descr.component_name))
.map_or_else(
|| arrow2::array::new_null_array(descr.datatype.clone(), 1),
|| {
arrow2::array::new_null_array(
descr.datatype.clone(),
null_array_length,
)
},
|list_array| list_array.to_boxed(),
),
),
Expand Down Expand Up @@ -222,3 +236,141 @@ impl<'a> LatestAtQueryHandle<'a> {
})
}
}

// ---

#[cfg(test)]
mod tests {
use std::sync::Arc;

use re_chunk::{ArrowArray, Chunk, EntityPath, RowId, TimeInt, TimePoint, Timeline};
use re_chunk_store::{
ChunkStore, ChunkStoreConfig, ColumnDescriptor, ComponentColumnDescriptor,
LatestAtQueryExpression, TimeColumnDescriptor,
};
use re_log_types::{example_components::MyPoint, StoreId, StoreKind};
use re_query::Caches;
use re_types::{
components::{Color, Position3D, Radius},
Loggable,
};

use crate::QueryEngine;

#[test]
fn empty_yields_empty() {
let store = ChunkStore::new(
StoreId::random(StoreKind::Recording),
ChunkStoreConfig::default(),
);
let cache = Caches::new(&store);
let engine = QueryEngine {
store: &store,
cache: &cache,
};

let query = LatestAtQueryExpression {
entity_path_expr: "/**".into(),
timeline: Timeline::log_time(),
at: TimeInt::MAX,
};

let entity_path: EntityPath = "/points".into();
let columns = vec![
ColumnDescriptor::Time(TimeColumnDescriptor {
timeline: Timeline::log_time(),
datatype: Timeline::log_time().datatype(),
}),
ColumnDescriptor::Time(TimeColumnDescriptor {
timeline: Timeline::log_tick(),
datatype: Timeline::log_tick().datatype(),
}),
ColumnDescriptor::Component(ComponentColumnDescriptor::new::<Position3D>(
entity_path.clone(),
)),
ColumnDescriptor::Component(ComponentColumnDescriptor::new::<Radius>(
entity_path.clone(),
)),
ColumnDescriptor::Component(ComponentColumnDescriptor::new::<Color>(entity_path)),
];

let handle = engine.latest_at(&query, Some(columns.clone()));
let batch = handle.get();

// The output should be an empty recordbatch with the right schema and empty arrays.
assert_eq!(0, batch.num_rows());
assert!(itertools::izip!(columns.iter(), batch.schema.fields.iter())
.all(|(descr, field)| descr.to_arrow_field(None) == *field));
assert!(itertools::izip!(columns.iter(), batch.data.iter())
.all(|(descr, array)| descr.datatype() == array.data_type()));
}

#[test]
fn static_does_yield() {
let mut store = ChunkStore::new(
StoreId::random(StoreKind::Recording),
ChunkStoreConfig::default(),
);

let entity_path: EntityPath = "/points".into();
let chunk = Arc::new(
Chunk::builder(entity_path.clone())
.with_component_batches(
RowId::new(),
TimePoint::default(),
[&[MyPoint::new(1.0, 1.0), MyPoint::new(2.0, 2.0)] as _],
)
.build()
.unwrap(),
);
_ = store.insert_chunk(&chunk);

eprintln!("{store}");

let cache = Caches::new(&store);
let engine = QueryEngine {
store: &store,
cache: &cache,
};

let query = LatestAtQueryExpression {
entity_path_expr: "/**".into(),
timeline: Timeline::log_time(),
at: TimeInt::MAX,
};

let columns = vec![
ColumnDescriptor::Time(TimeColumnDescriptor {
timeline: Timeline::log_time(),
datatype: Timeline::log_time().datatype(),
}),
ColumnDescriptor::Time(TimeColumnDescriptor {
timeline: Timeline::log_tick(),
datatype: Timeline::log_tick().datatype(),
}),
ColumnDescriptor::Component(ComponentColumnDescriptor::new::<MyPoint>(
entity_path.clone(),
)),
ColumnDescriptor::Component(ComponentColumnDescriptor::new::<Radius>(
entity_path.clone(),
)),
ColumnDescriptor::Component(ComponentColumnDescriptor::new::<Color>(entity_path)),
];

let handle = engine.latest_at(&query, Some(columns.clone()));
let batch = handle.get();

assert_eq!(1, batch.num_rows());
assert_eq!(
chunk.components().get(&MyPoint::name()).unwrap().to_boxed(),
itertools::izip!(batch.schema.fields.iter(), batch.data.iter())
.find_map(
|(field, array)| (field.name == MyPoint::name().short_name())
.then_some(array.clone())
)
.unwrap()
);
assert!(itertools::izip!(columns.iter(), batch.schema.fields.iter())
.all(|(descr, field)| descr.to_arrow_field(None) == *field));
}
}
Loading

0 comments on commit ce83447

Please sign in to comment.