Skip to content

Commit

Permalink
Dataframe API: remove control columns from public surface (rerun-io#7612
Browse files Browse the repository at this point in the history
)

* Prevents ~hard~annoying-to-solve cross-feature interactions in the new
API implementation.
* Fixes rerun-io#7599
  • Loading branch information
teh-cmc authored Oct 7, 2024
1 parent d3f637f commit 15ee2b9
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 527 deletions.
127 changes: 5 additions & 122 deletions crates/store/re_chunk_store/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use itertools::Itertools as _;
use re_chunk::{LatestAtQuery, TimelineName};
use re_log_types::{EntityPath, TimeInt, Timeline};
use re_log_types::{EntityPathFilter, ResolvedTimeRange};
use re_types_core::{ArchetypeName, ComponentName, Loggable as _};
use re_types_core::{ArchetypeName, ComponentName};

use crate::ChunkStore;

Expand Down Expand Up @@ -70,12 +70,10 @@ pub enum JoinEncoding {
// Describes any kind of column.
//
// See:
// * [`ControlColumnDescriptor`]
// * [`TimeColumnDescriptor`]
// * [`ComponentColumnDescriptor`]
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum ColumnDescriptor {
Control(ControlColumnDescriptor),
Time(TimeColumnDescriptor),
Component(ComponentColumnDescriptor),
}
Expand All @@ -84,15 +82,14 @@ impl ColumnDescriptor {
#[inline]
pub fn entity_path(&self) -> Option<&EntityPath> {
match self {
Self::Control(_) | Self::Time(_) => None,
Self::Time(_) => None,
Self::Component(descr) => Some(&descr.entity_path),
}
}

#[inline]
pub fn datatype(&self) -> ArrowDatatype {
match self {
Self::Control(descr) => descr.datatype.clone(),
Self::Time(descr) => descr.datatype.clone(),
Self::Component(descr) => descr.returned_datatype(),
}
Expand All @@ -101,7 +98,6 @@ impl ColumnDescriptor {
#[inline]
pub fn to_arrow_field(&self) -> ArrowField {
match self {
Self::Control(descr) => descr.to_arrow_field(),
Self::Time(descr) => descr.to_arrow_field(),
Self::Component(descr) => descr.to_arrow_field(),
}
Expand All @@ -110,59 +106,12 @@ impl ColumnDescriptor {
#[inline]
pub fn short_name(&self) -> String {
match self {
Self::Control(descr) => descr.component_name.short_name().to_owned(),
Self::Time(descr) => descr.timeline.name().to_string(),
Self::Component(descr) => descr.component_name.short_name().to_owned(),
}
}
}

/// Describes a column used to control Rerun's behavior, such as `RowId`.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ControlColumnDescriptor {
/// Semantic name associated with this data.
///
/// Example: `RowId::name()`.
pub component_name: ComponentName,

/// The Arrow datatype of the column.
pub datatype: ArrowDatatype,
}

impl PartialOrd for ControlColumnDescriptor {
#[inline]
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl Ord for ControlColumnDescriptor {
#[inline]
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
let Self {
component_name,
datatype: _,
} = self;
component_name.cmp(&other.component_name)
}
}

impl ControlColumnDescriptor {
#[inline]
pub fn to_arrow_field(&self) -> ArrowField {
let Self {
component_name,
datatype,
} = self;

ArrowField::new(
component_name.to_string(),
datatype.clone(),
false, /* nullable */
)
}
}

/// Describes a time column, such as `log_time`.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct TimeColumnDescriptor {
Expand Down Expand Up @@ -411,7 +360,6 @@ impl ComponentColumnDescriptor {
/// Describes a column selection to return as part of a query.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum ColumnSelector {
Control(ControlColumnSelector),
Time(TimeColumnSelector),
Component(ComponentColumnSelector),
//TODO(jleibs): Add support for archetype-based component selection.
Expand All @@ -422,20 +370,12 @@ impl From<ColumnDescriptor> for ColumnSelector {
#[inline]
fn from(desc: ColumnDescriptor) -> Self {
match desc {
ColumnDescriptor::Control(desc) => Self::Control(desc.into()),
ColumnDescriptor::Time(desc) => Self::Time(desc.into()),
ColumnDescriptor::Component(desc) => Self::Component(desc.into()),
}
}
}

impl From<ControlColumnSelector> for ColumnSelector {
#[inline]
fn from(desc: ControlColumnSelector) -> Self {
Self::Control(desc)
}
}

impl From<TimeColumnSelector> for ColumnSelector {
#[inline]
fn from(desc: TimeColumnSelector) -> Self {
Expand All @@ -450,37 +390,6 @@ impl From<ComponentColumnSelector> for ColumnSelector {
}
}

/// Select a control column.
///
/// The only control column currently supported is `rerun.components.RowId`.
//
// TODO(cmc): `RowId` shouldnt be a control column at this point, it should be yet another index.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ControlColumnSelector {
/// Name of the control column.
//
// TODO(cmc): this should be `component_name`.
pub component: ComponentName,
}

impl ControlColumnSelector {
#[inline]
pub fn row_id() -> Self {
Self {
component: RowId::name(),
}
}
}

impl From<ControlColumnDescriptor> for ControlColumnSelector {
#[inline]
fn from(desc: ControlColumnDescriptor) -> Self {
Self {
component: desc.component_name,
}
}
}

/// Select a time column.
//
// TODO(cmc): This shouldn't be specific to time, this should be an `IndexColumnSelector` or smth.
Expand Down Expand Up @@ -917,11 +826,6 @@ impl ChunkStore {
pub fn schema(&self) -> Vec<ColumnDescriptor> {
re_tracing::profile_function!();

let controls = std::iter::once(ColumnDescriptor::Control(ControlColumnDescriptor {
component_name: RowId::name(),
datatype: RowId::arrow_datatype(),
}));

let timelines = self.all_timelines().into_iter().map(|timeline| {
ColumnDescriptor::Time(TimeColumnDescriptor {
timeline,
Expand Down Expand Up @@ -995,26 +899,7 @@ impl ChunkStore {
.chain(temporal_components)
.collect::<BTreeSet<_>>();

controls.chain(timelines).chain(components).collect()
}

/// Given a [`ControlColumnSelector`], returns the corresponding [`ControlColumnDescriptor`].
#[allow(clippy::unused_self)]
pub fn resolve_control_selector(
&self,
selector: &ControlColumnSelector,
) -> ControlColumnDescriptor {
if selector.component == RowId::name() {
ControlColumnDescriptor {
component_name: selector.component,
datatype: RowId::arrow_datatype(),
}
} else {
ControlColumnDescriptor {
component_name: selector.component,
datatype: ArrowDatatype::Null,
}
}
timelines.chain(components).collect()
}

/// Given a [`TimeColumnSelector`], returns the corresponding [`TimeColumnDescriptor`].
Expand Down Expand Up @@ -1076,12 +961,10 @@ impl ChunkStore {
.map(|selector| {
let selector = selector.into();
match selector {
ColumnSelector::Control(selector) => {
ColumnDescriptor::Control(self.resolve_control_selector(&selector))
}
ColumnSelector::Time(selector) => {
ColumnDescriptor::Time(self.resolve_time_selector(&selector))
}

ColumnSelector::Component(selector) => {
ColumnDescriptor::Component(self.resolve_component_selector(&selector))
}
Expand Down Expand Up @@ -1171,7 +1054,7 @@ impl ChunkStore {
self.schema()
.into_iter()
.filter(|column| match column {
ColumnDescriptor::Control(_) | ColumnDescriptor::Time(_) => true,
ColumnDescriptor::Time(_) => true,
ColumnDescriptor::Component(column) => view_contents
.get(&column.entity_path)
.map_or(false, |components| {
Expand Down
8 changes: 4 additions & 4 deletions crates/store/re_chunk_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ mod subscribers;
mod writes;

pub use self::dataframe::{
ColumnDescriptor, ColumnSelector, ComponentColumnDescriptor, ComponentColumnSelector,
ControlColumnDescriptor, ControlColumnSelector, Index, IndexRange, IndexValue, JoinEncoding,
LatestAtQueryExpression, QueryExpression, QueryExpression2, RangeQueryExpression,
SparseFillStrategy, TimeColumnDescriptor, TimeColumnSelector, ViewContentsSelector,
ColumnDescriptor, ColumnSelector, ComponentColumnDescriptor, ComponentColumnSelector, Index,
IndexRange, IndexValue, JoinEncoding, LatestAtQueryExpression, QueryExpression,
QueryExpression2, RangeQueryExpression, SparseFillStrategy, TimeColumnDescriptor,
TimeColumnSelector, ViewContentsSelector,
};
pub use self::events::{ChunkStoreDiff, ChunkStoreDiffKind, ChunkStoreEvent};
pub use self::gc::{GarbageCollectionOptions, GarbageCollectionTarget};
Expand Down
48 changes: 17 additions & 31 deletions crates/store/re_dataframe/src/latest_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,6 @@ impl LatestAtQueryHandle<'_> {
.schema_for_query(&self.query.clone().into())
})
.into_iter()
// NOTE: We drop `RowId`, as it doesn't make any sense in a compound row like the
// one we are returning.
.filter(|descr| !matches!(descr, ColumnDescriptor::Control(_)))
.collect()
};

Expand Down Expand Up @@ -125,7 +122,7 @@ impl LatestAtQueryHandle<'_> {
.map(|chunk| (col, chunk))
}

_ => None,
ColumnDescriptor::Time(_) => None,
})
.collect()
};
Expand All @@ -138,7 +135,7 @@ impl LatestAtQueryHandle<'_> {
.iter()
.filter_map(|descr| match descr {
ColumnDescriptor::Time(descr) => Some(descr.timeline),
_ => None,
ColumnDescriptor::Component(_) => None,
})
.collect_vec();

Expand Down Expand Up @@ -169,46 +166,35 @@ impl LatestAtQueryHandle<'_> {

columns
.iter()
.filter_map(|col| match col {
ColumnDescriptor::Control(_) => {
if cfg!(debug_assertions) {
unreachable!("filtered out during schema computation");
} else {
// NOTE: Technically cannot ever happen, but I'd rather that than an uwnrap.
None
}
}

.map(|col| match col {
ColumnDescriptor::Time(descr) => {
let time_column = max_time_per_timeline
.remove(&descr.timeline)
.and_then(|(_, chunk)| chunk.timelines().get(&descr.timeline).cloned());

Some(time_column.map_or_else(
time_column.map_or_else(
|| {
arrow2::array::new_null_array(
descr.datatype.clone(),
null_array_length,
)
},
|time_column| time_column.times_array().to_boxed(),
))
)
}

ColumnDescriptor::Component(descr) => Some(
all_units
.get(col)
.and_then(|chunk| chunk.components().get(&descr.component_name))
.map_or_else(
|| {
arrow2::array::new_null_array(
descr.returned_datatype(),
null_array_length,
)
},
|list_array| list_array.to_boxed(),
),
),
ColumnDescriptor::Component(descr) => all_units
.get(col)
.and_then(|chunk| chunk.components().get(&descr.component_name))
.map_or_else(
|| {
arrow2::array::new_null_array(
descr.returned_datatype(),
null_array_length,
)
},
|list_array| list_array.to_boxed(),
),
})
.collect_vec()
};
Expand Down
10 changes: 2 additions & 8 deletions crates/store/re_dataframe/src/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ impl RangeQueryHandle<'_> {
JoinEncoding::OverlappingSlice => None,
JoinEncoding::DictionaryEncode => Some(descr),
},
_ => None,
ColumnDescriptor::Time(_) => None,
})
.filter_map(|descr| {
let arrays = pov_time_column
Expand Down Expand Up @@ -426,7 +426,7 @@ impl RangeQueryHandle<'_> {
}
JoinEncoding::DictionaryEncode => None,
},
_ => None,
ColumnDescriptor::Time(_) => None,
})
.map(|descr| {
let arrays = pov_time_column
Expand Down Expand Up @@ -472,8 +472,6 @@ impl RangeQueryHandle<'_> {
columns
.iter()
.map(|descr| match descr {
ColumnDescriptor::Control(_descr) => pov_chunk.row_ids_array().to_boxed(),

ColumnDescriptor::Time(descr) => {
let time_column = pov_chunk.timelines().get(&descr.timeline).cloned();
time_column.map_or_else(
Expand Down Expand Up @@ -530,10 +528,6 @@ impl RangeQueryHandle<'_> {
let packed_arrays = columns
.iter()
.map(|descr| match descr {
ColumnDescriptor::Control(_descr) => {
pov_chunk.row_ids_array().sliced(row, 1).to_boxed()
}

ColumnDescriptor::Time(descr) => {
let time_column = pov_chunk.timelines().get(&descr.timeline);
time_column.map_or_else(
Expand Down
Loading

0 comments on commit 15ee2b9

Please sign in to comment.