Skip to content

Commit

Permalink
Dataframe queries: support sparsely encoded dictionary arrays (rerun-…
Browse files Browse the repository at this point in the history
…io#7361)

And also clean up the `datatype` situation: I didn't know what should be
in charge of specifying all the wrappers in the final datatype. Now I
do.
  • Loading branch information
teh-cmc authored Sep 5, 2024
1 parent fed60d4 commit 3645c10
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 36 deletions.
97 changes: 93 additions & 4 deletions crates/store/re_chunk/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use arrow2::{
array::{Array as ArrowArray, BooleanArray as ArrowBooleanArray, ListArray as ArrowListArray},
array::{
Array as ArrowArray, BooleanArray as ArrowBooleanArray,
DictionaryArray as ArrowDictionaryArray, ListArray as ArrowListArray,
PrimitiveArray as ArrowPrimitiveArray,
},
bitmap::Bitmap as ArrowBitmap,
datatypes::DataType as ArrowDataType,
datatypes::DataType as ArrowDatatype,
offset::Offsets as ArrowOffsets,
};
use itertools::Itertools as _;
use itertools::Itertools;

// ---

Expand All @@ -29,7 +33,7 @@ pub fn arrays_to_list_array_opt(arrays: &[Option<&dyn ArrowArray>]) -> Option<Ar
///
/// Returns an empty list if `arrays` is empty.
pub fn arrays_to_list_array(
array_datatype: ArrowDataType,
array_datatype: ArrowDatatype,
arrays: &[Option<&dyn ArrowArray>],
) -> Option<ArrowListArray<i32>> {
let arrays_dense = arrays.iter().flatten().copied().collect_vec();
Expand Down Expand Up @@ -66,6 +70,91 @@ pub fn arrays_to_list_array(
))
}

/// Create a sparse dictionary-array out of an array of (potentially) duplicated arrays.
///
/// The `Idx` is used as primary key to drive the deduplication process.
/// Returns `None` if any of the specified `arrays` doesn't match the given `array_datatype`.
///
/// Returns an empty dictionary if `arrays` is empty.
//
// TODO(cmc): Ideally I would prefer to just use the array's underlying pointer as primary key, but
// this has proved extremely brittle in practice. Maybe once we move to arrow-rs.
// TODO(cmc): A possible improvement would be to pick the smallest key datatype possible based
// on the cardinality of the input arrays.
pub fn arrays_to_dictionary<Idx: Copy + Eq>(
array_datatype: ArrowDatatype,
arrays: &[Option<(Idx, &dyn ArrowArray)>],
) -> Option<ArrowDictionaryArray<u32>> {
// Dedupe the input arrays based on the given primary key.
let arrays_dense_deduped = arrays
.iter()
.flatten()
.copied()
.dedup_by(|(lhs_index, _), (rhs_index, _)| lhs_index == rhs_index)
.map(|(_index, array)| array)
.collect_vec();

// Compute the keys for the final dictionary, using that same primary key.
let keys = {
let mut cur_key = 0u32;
arrays
.iter()
.dedup_by_with_count(|lhs, rhs| {
lhs.map(|(index, _)| index) == rhs.map(|(index, _)| index)
})
.flat_map(|(count, value)| {
if value.is_some() {
let keys = std::iter::repeat(Some(cur_key)).take(count);
cur_key += 1;
keys
} else {
std::iter::repeat(None).take(count)
}
})
.collect_vec()
};

// Concatenate the underlying data as usual, except only the _unique_ values!
let data = if arrays_dense_deduped.is_empty() {
arrow2::array::new_empty_array(array_datatype.clone())
} else {
arrow2::compute::concatenate::concatenate(&arrays_dense_deduped)
.map_err(|err| {
re_log::warn_once!("failed to concatenate arrays: {err}");
err
})
.ok()?
};

// We still need the underlying data to be a list-array, so the dictionary's keys can index
// into this list-array.
let data = {
let datatype = ArrowListArray::<i32>::default_datatype(array_datatype);

#[allow(clippy::unwrap_used)] // yes, these are indeed lengths
let offsets =
ArrowOffsets::try_from_lengths(arrays_dense_deduped.iter().map(|array| array.len()))
.unwrap();

ArrowListArray::<i32>::new(datatype, offsets.into(), data, None)
};

let datatype = ArrowDatatype::Dictionary(
arrow2::datatypes::IntegerType::UInt32,
std::sync::Arc::new(data.data_type().clone()),
true, // is_sorted
);

// And finally we build our dictionary, which indexes into our concatenated list-array of
// unique values.
ArrowDictionaryArray::try_new(
datatype,
ArrowPrimitiveArray::<u32>::from(keys),
data.to_boxed(),
)
.ok()
}

/// Given a sparse `ArrowListArray` (i.e. an array with a validity bitmap that contains at least
/// one falsy value), returns a dense `ArrowListArray` that only contains the non-null values from
/// the original list.
Expand Down
16 changes: 8 additions & 8 deletions crates/store/re_chunk_store/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ impl ColumnDescriptor {
}

#[inline]
pub fn to_arrow_field(&self) -> ArrowField {
pub fn to_arrow_field(&self, datatype: Option<ArrowDatatype>) -> ArrowField {
match self {
Self::Control(descr) => descr.to_arrow_field(),
Self::Time(descr) => descr.to_arrow_field(),
Self::Component(descr) => descr.to_arrow_field(),
Self::Component(descr) => descr.to_arrow_field(datatype),
}
}
}
Expand Down Expand Up @@ -268,7 +268,7 @@ impl ComponentColumnDescriptor {
}

#[inline]
pub fn to_arrow_field(&self) -> ArrowField {
pub fn to_arrow_field(&self, wrapped_datatype: Option<ArrowDatatype>) -> ArrowField {
let Self {
entity_path,
archetype_name,
Expand All @@ -278,14 +278,14 @@ impl ComponentColumnDescriptor {
is_static,
} = self;

// NOTE: Only the system doing the actual packing knows the final datatype with all of
// its wrappers (is it a component array? is it a list? is it a dict?).
let datatype = wrapped_datatype.unwrap_or_else(|| datatype.clone());

// TODO(cmc): figure out who's in charge of adding the outer list layer.
ArrowField::new(
component_name.short_name().to_owned(),
ArrowDatatype::List(std::sync::Arc::new(ArrowField::new(
"item",
datatype.clone(),
true, /* nullable */
))),
datatype,
false, /* nullable */
)
// TODO(#6889): This needs some proper sorbetization -- I just threw these names randomly.
Expand Down
21 changes: 9 additions & 12 deletions crates/store/re_dataframe/src/latest_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,6 @@ impl LatestAtQueryHandle<'_> {

let columns = self.schema();

let schema = ArrowSchema {
fields: columns
.iter()
.map(ColumnDescriptor::to_arrow_field)
.collect(),

// TODO(#6889): properly some sorbet stuff we want to get in there at some point.
metadata: Default::default(),
};

let all_units: HashMap<&ComponentColumnDescriptor, UnitChunkShared> = {
re_tracing::profile_scope!("queries");

Expand Down Expand Up @@ -201,11 +191,18 @@ impl LatestAtQueryHandle<'_> {
),
),
})
.collect()
.collect_vec()
};

RecordBatch {
schema,
schema: ArrowSchema {
fields: columns
.iter()
.zip(packed_arrays.iter())
.map(|(descr, arr)| descr.to_arrow_field(Some(arr.data_type().clone())))
.collect(),
metadata: Default::default(),
},
data: ArrowChunk::new(packed_arrays),
}
}
Expand Down
40 changes: 28 additions & 12 deletions crates/store/re_dataframe/src/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ use std::sync::{atomic::AtomicU64, OnceLock};

use ahash::HashMap;
use arrow2::{
array::{Array as ArrowArray, ListArray as ArrowListArray},
array::{Array as ArrowArray, DictionaryArray as ArrowDictionaryArray},
chunk::Chunk as ArrowChunk,
datatypes::Schema as ArrowSchema,
};
use itertools::Itertools;

use re_chunk::{Chunk, LatestAtQuery, RangeQuery};
use re_chunk::{Chunk, LatestAtQuery, RangeQuery, RowId, TimeInt};
use re_chunk_store::{ColumnDescriptor, ComponentColumnDescriptor, RangeQueryExpression};

use crate::{QueryEngine, RecordBatch};
Expand Down Expand Up @@ -253,7 +253,7 @@ impl RangeQueryHandle<'_> {
// see if this ever becomes an issue before going down this road.
//
// TODO(cmc): Opportunities for parallelization, if it proves to be a net positive in practice.
let list_arrays: HashMap<&ComponentColumnDescriptor, ArrowListArray<i32>> = {
let dict_arrays: HashMap<&ComponentColumnDescriptor, ArrowDictionaryArray<u32>> = {
re_tracing::profile_scope!("queries");

columns
Expand All @@ -279,24 +279,39 @@ impl RangeQueryHandle<'_> {
.components
.get(&descr.component_name)
.and_then(|unit| {
unit.component_batch_raw(&descr.component_name).clone()
unit.component_batch_raw(&descr.component_name).clone().map(
|array| {
(
unit.index(&query.timeline())
// NOTE: technically cannot happen, but better than unwrapping.
.unwrap_or((TimeInt::STATIC, RowId::ZERO)),
array,
)
},
)
})
})
.collect_vec();
let arrays = arrays
.iter()
.map(|array| array.as_ref().map(|array| &**array as &dyn ArrowArray))
.map(|array| {
array
.as_ref()
.map(|(index, array)| (index, &**array as &dyn ArrowArray))
})
.collect_vec();

let list_array =
re_chunk::util::arrays_to_list_array(descr.datatype.clone(), &arrays);
let dict_array = {
re_tracing::profile_scope!("concat");
re_chunk::util::arrays_to_dictionary(descr.datatype.clone(), &arrays)
};

if cfg!(debug_assertions) {
#[allow(clippy::unwrap_used)] // want to crash in dev
Some((descr, list_array.unwrap()))
Some((descr, dict_array.unwrap()))
} else {
// NOTE: Technically cannot ever happen, but I'd rather that than an uwnrap.
list_array.map(|list_array| (descr, list_array))
dict_array.map(|dict_array| (descr, dict_array))
}
})
.collect()
Expand Down Expand Up @@ -324,14 +339,14 @@ impl RangeQueryHandle<'_> {
)
}

ColumnDescriptor::Component(descr) => list_arrays.get(descr).map_or_else(
ColumnDescriptor::Component(descr) => dict_arrays.get(descr).map_or_else(
|| {
arrow2::array::new_null_array(
descr.datatype.clone(),
pov_time_column.num_rows(),
)
},
|list_array| list_array.to_boxed(),
|dict_array| dict_array.to_boxed(),
),
})
.collect_vec()
Expand All @@ -341,7 +356,8 @@ impl RangeQueryHandle<'_> {
schema: ArrowSchema {
fields: columns
.iter()
.map(ColumnDescriptor::to_arrow_field)
.zip(packed_arrays.iter())
.map(|(descr, arr)| descr.to_arrow_field(Some(arr.data_type().clone())))
.collect(),
metadata: Default::default(),
},
Expand Down

0 comments on commit 3645c10

Please sign in to comment.