Skip to content

Commit

Permalink
Dataframe queries: simplified schema wrapping management (rerun-io#7366)
Browse files Browse the repository at this point in the history
Hotfix for the pending schema mismatch woes in the dataframe APIs.

It greatly simplifies all the schema wrapping management logic, and test
it.
It's still a bandaid, there is a fundamental design issue here (rerun-io#7365),
but it is a much saner foundation to start iterating on.


* Always advertise every datatype as _at least_ a `List`.
* Always add an extra dict wrapper in the range case.
* Always guarantee `handle.schema() == recordbatch.schema`.
* Complain a lot about rerun-io#7365 which is a massive PITA.
* Fix CI.

### Checklist
* [x] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)
* [x] I've included a screenshot or gif (if applicable)
* [x] I have tested the web demo (if applicable):
* Using examples from latest `main` build:
[rerun.io/viewer](https://rerun.io/viewer/pr/7366?manifest_url=https://app.rerun.io/version/main/examples_manifest.json)
* Using full set of examples from `nightly` build:
[rerun.io/viewer](https://rerun.io/viewer/pr/7366?manifest_url=https://app.rerun.io/version/nightly/examples_manifest.json)
* [x] The PR title and labels are set such as to maximize their
usefulness for the next release's CHANGELOG
* [x] If applicable, add a new check to the [release
checklist](https://github.com/rerun-io/rerun/blob/main/tests/python/release_checklist)!
* [x] If have noted any breaking changes to the log API in
`CHANGELOG.md` and the migration guide

- [PR Build Summary](https://build.rerun.io/pr/7366)
- [Recent benchmark results](https://build.rerun.io/graphs/crates.html)
- [Wasm size tracking](https://build.rerun.io/graphs/sizes.html)

To run all checks from `main`, comment on the PR with `@rerun-bot
full-check`.
  • Loading branch information
teh-cmc authored Sep 9, 2024
1 parent 07291e5 commit 45da3a1
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 47 deletions.
31 changes: 17 additions & 14 deletions crates/store/re_chunk_store/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
use std::collections::BTreeSet;

use ahash::HashSet;
use arrow2::datatypes::{DataType as ArrowDatatype, Field as ArrowField};
use arrow2::{
array::ListArray as ArrowListArray,
datatypes::{DataType as ArrowDatatype, Field as ArrowField},
};
use itertools::Itertools as _;

use re_chunk::LatestAtQuery;
Expand Down Expand Up @@ -55,11 +58,11 @@ impl ColumnDescriptor {
}

#[inline]
pub fn to_arrow_field(&self, datatype: Option<ArrowDatatype>) -> ArrowField {
pub fn to_arrow_field(&self) -> ArrowField {
match self {
Self::Control(descr) => descr.to_arrow_field(),
Self::Time(descr) => descr.to_arrow_field(),
Self::Component(descr) => descr.to_arrow_field(datatype),
Self::Component(descr) => descr.to_arrow_field(),
}
}
}
Expand Down Expand Up @@ -260,15 +263,17 @@ impl ComponentColumnDescriptor {
archetype_name: None,
archetype_field_name: None,
component_name: C::name(),
datatype: C::arrow_datatype(),
// TODO(cmc): one of the many reasons why using `ComponentColumnDescriptor` for this
// gets a bit weird… Good enough for now though.
// NOTE: The data is always a at least a list, whether it's latest-at or range.
// It might be wrapped further in e.g. a dict, but at the very least
// it's a list.
// TODO(#7365): user-specified datatypes have got to go.
datatype: ArrowListArray::<i32>::default_datatype(C::arrow_datatype()),
is_static: false,
}
}

#[inline]
pub fn to_arrow_field(&self, wrapped_datatype: Option<ArrowDatatype>) -> ArrowField {
pub fn to_arrow_field(&self) -> ArrowField {
let Self {
entity_path,
archetype_name,
Expand All @@ -278,14 +283,9 @@ impl ComponentColumnDescriptor {
is_static,
} = self;

// NOTE: Only the system doing the actual packing knows the final datatype with all of
// its wrappers (is it a component array? is it a list? is it a dict?).
let datatype = wrapped_datatype.unwrap_or_else(|| datatype.clone());

// TODO(cmc): figure out who's in charge of adding the outer list layer.
ArrowField::new(
component_name.short_name().to_owned(),
datatype,
datatype.clone(),
false, /* nullable */
)
// TODO(#6889): This needs some proper sorbetization -- I just threw these names randomly.
Expand Down Expand Up @@ -567,7 +567,10 @@ impl ChunkStore {
archetype_name: None,
archetype_field_name: None,
component_name: *component_name,
datatype: datatype.clone(),
// NOTE: The data is always a at least a list, whether it's latest-at or range.
// It might be wrapped further in e.g. a dict, but at the very least
// it's a list.
datatype: ArrowListArray::<i32>::default_datatype(datatype.clone()),
// NOTE: This will make it so shadowed temporal data automatically gets
// discarded from the schema.
is_static: self
Expand Down
22 changes: 22 additions & 0 deletions crates/store/re_dataframe/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ use re_query::Caches;

use crate::{LatestAtQueryHandle, RangeQueryHandle};

// Used all over in docstrings.
#[allow(unused_imports)]
use re_chunk_store::ComponentColumnDescriptor;

// ---

// TODO(#3741): `arrow2` has no concept of a `RecordBatch`, so for now we just use our trustworthy
Expand Down Expand Up @@ -107,6 +111,8 @@ impl QueryEngine<'_> {
/// Creating a handle is very cheap as it doesn't perform any kind of querying.
///
/// If `columns` is specified, the schema of the result will strictly follow this specification.
/// [`ComponentColumnDescriptor::datatype`] and [`ComponentColumnDescriptor::is_static`] are ignored.
///
/// Any provided [`ColumnDescriptor`]s that don't match a column in the result will still be included, but the
/// data will be null for the entire column.
/// If `columns` is left unspecified, the schema of the returned result will correspond to what's returned by
Expand All @@ -117,6 +123,13 @@ impl QueryEngine<'_> {
/// are still valid data-columns to include in the result. So a user could, for example, query
/// for a range of data on the `frame` timeline, but still include the `log_time` timeline in
/// the result.
//
// TODO(#7365): We need to stop using `ComponentColumnDescriptor` as input to the dataframe APIs.
// It's fundamentally flawed: there is no way to guarantee that passed-in schema will match the
// returned schema.
// E.g. what are we supposed to do if the user pass in a non-nullable datatype, and the
// column does not exist? Or its state only starts halfway through the time range? Then we
// need to insert null values but the datatype is non-nullable…
#[inline]
pub fn latest_at(
&self,
Expand All @@ -131,6 +144,8 @@ impl QueryEngine<'_> {
/// Creating a handle is very cheap as it doesn't perform any kind of querying.
///
/// If `columns` is specified, the schema of the result will strictly follow this specification.
/// [`ComponentColumnDescriptor::datatype`] and [`ComponentColumnDescriptor::is_static`] are ignored.
///
/// Any provided [`ColumnDescriptor`]s that don't match a column in the result will still be included, but the
/// data will be null for the entire column.
/// If `columns` is left unspecified, the schema of the returned result will correspond to what's returned by
Expand All @@ -141,6 +156,13 @@ impl QueryEngine<'_> {
/// are still valid data-columns to include in the result. So a user could, for example, query
/// for a range of data on the `frame` timeline, but still include the `log_time` timeline in
/// the result.
//
// TODO(#7365): We need to stop using `ComponentColumnDescriptor` as input to the dataframe APIs.
// It's fundamentally flawed: there is no way to guarantee that passed-in schema will match the
// returned schema.
// E.g. what are we supposed to do if the user pass in a non-nullable datatype, and the
// column does not exist? Or its state only starts halfway through the time range? Then we
// need to insert null values but the datatype is non-nullable…
#[inline]
pub fn range(
&self,
Expand Down
19 changes: 11 additions & 8 deletions crates/store/re_dataframe/src/latest_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,8 @@ impl LatestAtQueryHandle<'_> {
schema: ArrowSchema {
fields: columns
.iter()
.zip(packed_arrays.iter())
.map(|(descr, arr)| descr.to_arrow_field(Some(arr.data_type().clone())))
.collect(),
.map(|descr| descr.to_arrow_field())
.collect_vec(),
metadata: Default::default(),
},
data: ArrowChunk::new(packed_arrays),
Expand Down Expand Up @@ -299,9 +298,11 @@ mod tests {

// The output should be an empty recordbatch with the right schema and empty arrays.
assert_eq!(0, batch.num_rows());
assert!(itertools::izip!(columns.iter(), batch.schema.fields.iter())
.all(|(descr, field)| descr.to_arrow_field(None) == *field));
assert!(itertools::izip!(columns.iter(), batch.data.iter())
assert!(
itertools::izip!(handle.schema(), batch.schema.fields.iter())
.all(|(descr, field)| descr.to_arrow_field() == *field)
);
assert!(itertools::izip!(handle.schema(), batch.data.iter())
.all(|(descr, array)| descr.datatype() == array.data_type()));
}

Expand Down Expand Up @@ -370,7 +371,9 @@ mod tests {
)
.unwrap()
);
assert!(itertools::izip!(columns.iter(), batch.schema.fields.iter())
.all(|(descr, field)| descr.to_arrow_field(None) == *field));
assert!(
itertools::izip!(handle.schema(), batch.schema.fields.iter())
.all(|(descr, field)| descr.to_arrow_field() == *field)
);
}
}
91 changes: 66 additions & 25 deletions crates/store/re_dataframe/src/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use ahash::HashMap;
use arrow2::{
array::{Array as ArrowArray, DictionaryArray as ArrowDictionaryArray},
chunk::Chunk as ArrowChunk,
datatypes::Schema as ArrowSchema,
datatypes::{DataType as ArrowDatatype, Schema as ArrowSchema},
Either,
};
use itertools::Itertools;
Expand Down Expand Up @@ -86,6 +86,20 @@ impl RangeQueryHandle<'_> {
self.engine
.store
.schema_for_query(&self.query.clone().into())
.into_iter()
// NOTE: At least for now, range queries always return dictionaries.
.map(|col| match col {
ColumnDescriptor::Component(mut descr) => {
descr.datatype = ArrowDatatype::Dictionary(
arrow2::datatypes::IntegerType::UInt32,
descr.datatype.into(),
true,
);
ColumnDescriptor::Component(descr)
}
_ => col,
})
.collect()
})
};

Expand Down Expand Up @@ -162,7 +176,7 @@ impl RangeQueryHandle<'_> {
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return Some(RecordBatch {
schema: ArrowSchema {
fields: columns.iter().map(|col| col.to_arrow_field(None)).collect(),
fields: columns.iter().map(|col| col.to_arrow_field()).collect(),
metadata: Default::default(),
},
data: ArrowChunk::new(
Expand Down Expand Up @@ -211,7 +225,7 @@ impl RangeQueryHandle<'_> {
let columns = self.schema();
return vec![RecordBatch {
schema: ArrowSchema {
fields: columns.iter().map(|col| col.to_arrow_field(None)).collect(),
fields: columns.iter().map(|col| col.to_arrow_field()).collect(),
metadata: Default::default(),
},
data: ArrowChunk::new(
Expand Down Expand Up @@ -352,7 +366,17 @@ impl RangeQueryHandle<'_> {

let dict_array = {
re_tracing::profile_scope!("concat");
re_chunk::util::arrays_to_dictionary(descr.datatype.clone(), &arrays)

// Sanitize the input datatype for `arrays_to_dictionary`.
let datatype = match &descr.datatype {
ArrowDatatype::Dictionary(_, inner, _) => match &**inner {
ArrowDatatype::List(field) => field.data_type().clone(),
datatype => datatype.clone(),
},
ArrowDatatype::List(field) => field.data_type().clone(),
datatype => datatype.clone(),
};
re_chunk::util::arrays_to_dictionary(datatype, &arrays)
};

if cfg!(debug_assertions) {
Expand Down Expand Up @@ -405,9 +429,8 @@ impl RangeQueryHandle<'_> {
schema: ArrowSchema {
fields: columns
.iter()
.zip(packed_arrays.iter())
.map(|(descr, arr)| descr.to_arrow_field(Some(arr.data_type().clone())))
.collect(),
.map(|descr| descr.to_arrow_field())
.collect_vec(),
metadata: Default::default(),
},
data: ArrowChunk::new(packed_arrays),
Expand All @@ -428,6 +451,8 @@ impl<'a> RangeQueryHandle<'a> {
mod tests {
use std::sync::Arc;

use arrow2::array::DictionaryArray as ArrowDictionaryArray;

use re_chunk::{ArrowArray, Chunk, EntityPath, RowId, TimePoint, Timeline};
use re_chunk_store::{
ChunkStore, ChunkStoreConfig, ColumnDescriptor, ComponentColumnDescriptor,
Expand Down Expand Up @@ -488,9 +513,11 @@ mod tests {
let batch = handle.next_page().unwrap();
// The output should be an empty recordbatch with the right schema and empty arrays.
assert_eq!(0, batch.num_rows());
assert!(itertools::izip!(columns.iter(), batch.schema.fields.iter())
.all(|(descr, field)| descr.to_arrow_field(None) == *field));
assert!(itertools::izip!(columns.iter(), batch.data.iter())
assert!(
itertools::izip!(handle.schema(), batch.schema.fields.iter())
.all(|(descr, field)| descr.to_arrow_field() == *field)
);
assert!(itertools::izip!(handle.schema(), batch.data.iter())
.all(|(descr, array)| descr.datatype() == array.data_type()));

let batch = handle.next_page();
Expand All @@ -502,9 +529,11 @@ mod tests {
let batch = handle.get(0, 0).pop().unwrap();
// The output should be an empty recordbatch with the right schema and empty arrays.
assert_eq!(0, batch.num_rows());
assert!(itertools::izip!(columns.iter(), batch.schema.fields.iter())
.all(|(descr, field)| descr.to_arrow_field(None) == *field));
assert!(itertools::izip!(columns.iter(), batch.data.iter())
assert!(
itertools::izip!(handle.schema(), batch.schema.fields.iter())
.all(|(descr, field)| descr.to_arrow_field() == *field)
);
assert!(itertools::izip!(handle.schema(), batch.data.iter())
.all(|(descr, array)| descr.datatype() == array.data_type()));

let _batch = handle.get(0, 1).pop().unwrap();
Expand Down Expand Up @@ -576,14 +605,20 @@ mod tests {
assert_eq!(
chunk.components().get(&MyPoint::name()).unwrap().to_boxed(),
itertools::izip!(batch.schema.fields.iter(), batch.data.iter())
.find_map(
|(field, array)| (field.name == MyPoint::name().short_name())
.then_some(array.clone())
)
.find_map(|(field, array)| {
(field.name == MyPoint::name().short_name()).then_some(array.clone())
})
.unwrap()
.as_any()
.downcast_ref::<ArrowDictionaryArray<u32>>()
.unwrap()
.values()
.clone()
);
assert!(
itertools::izip!(handle.schema(), batch.schema.fields.iter())
.all(|(descr, field)| descr.to_arrow_field() == *field)
);
assert!(itertools::izip!(columns.iter(), batch.schema.fields.iter())
.all(|(descr, field)| descr.to_arrow_field(None) == *field));

let batch = handle.next_page();
assert!(batch.is_none());
Expand All @@ -597,14 +632,20 @@ mod tests {
assert_eq!(
chunk.components().get(&MyPoint::name()).unwrap().to_boxed(),
itertools::izip!(batch.schema.fields.iter(), batch.data.iter())
.find_map(
|(field, array)| (field.name == MyPoint::name().short_name())
.then_some(array.clone())
)
.find_map(|(field, array)| {
(field.name == MyPoint::name().short_name()).then_some(array.clone())
})
.unwrap()
.as_any()
.downcast_ref::<ArrowDictionaryArray<u32>>()
.unwrap()
.values()
.clone()
);
assert!(
itertools::izip!(handle.schema(), batch.schema.fields.iter())
.all(|(descr, field)| descr.to_arrow_field() == *field)
);
assert!(itertools::izip!(columns.iter(), batch.schema.fields.iter())
.all(|(descr, field)| descr.to_arrow_field(None) == *field));

let _batch = handle.get(1, 1).pop().unwrap();

Expand Down

0 comments on commit 45da3a1

Please sign in to comment.