Skip to content

Commit

Permalink
Add ChunkStore::drop_time_range (rerun-io#7602)
Browse files Browse the repository at this point in the history
### What
Adds `ChunkStore::drop_time_range` to drop all events within some time
range. Will be used to implement undo.

* Part of rerun-io#3135
* Extracted from rerun-io#7546

---------

Co-authored-by: Clement Rey <[email protected]>
  • Loading branch information
emilk and teh-cmc authored Oct 7, 2024
1 parent 899b17b commit 8702f01
Show file tree
Hide file tree
Showing 9 changed files with 254 additions and 0 deletions.
12 changes: 12 additions & 0 deletions crates/store/re_chunk/src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,18 @@ impl PartialEq for Chunk {
}

impl Chunk {
/// Returns a version of us with a new [`ChunkId`].
///
/// Reminder:
/// * The returned [`Chunk`] will re-use the exact same [`RowId`]s as `self`.
/// * Duplicated [`RowId`]s in the `ChunkStore` is undefined behavior.
#[must_use]
#[inline]
pub fn with_id(mut self, id: ChunkId) -> Self {
self.id = id;
self
}

/// Returns `true` is two [`Chunk`]s are similar, although not byte-for-byte equal.
///
/// In particular, this ignores chunks and row IDs, as well as temporal timestamps.
Expand Down
3 changes: 3 additions & 0 deletions crates/store/re_chunk/src/shuffle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ impl Chunk {
/// The underlying arrow data will be copied and shuffled in memory in order to make it contiguous.
///
/// This is a no-op if the underlying timeline is already sorted appropriately (happy path).
///
/// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`].
#[must_use]
pub fn sorted_by_timeline_if_unsorted(&self, timeline: &Timeline) -> Self {
let mut chunk = self.clone();

Expand Down
21 changes: 21 additions & 0 deletions crates/store/re_chunk/src/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ impl Chunk {
/// This cannot fail nor panic: `index` and `len` will be capped so that they cannot
/// run out of bounds.
/// This can result in an empty [`Chunk`] being returned if the slice is completely OOB.
///
/// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`].
#[must_use]
#[inline]
pub fn row_sliced(&self, index: usize, len: usize) -> Self {
re_tracing::profile_function!();
Expand Down Expand Up @@ -144,6 +147,9 @@ impl Chunk {
///
/// If `timeline` is not found within the [`Chunk`], the end result will be the same as the
/// current chunk but without any timeline column.
///
/// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`].
#[must_use]
#[inline]
pub fn timeline_sliced(&self, timeline: Timeline) -> Self {
let Self {
Expand Down Expand Up @@ -184,6 +190,9 @@ impl Chunk {
///
/// If `component_name` is not found within the [`Chunk`], the end result will be the same as the
/// current chunk but without any component column.
///
/// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`].
#[must_use]
#[inline]
pub fn component_sliced(&self, component_name: ComponentName) -> Self {
let Self {
Expand Down Expand Up @@ -224,6 +233,9 @@ impl Chunk {
///
/// If none of the selected timelines exist in the [`Chunk`], the end result will be the same as the
/// current chunk but without any timeline column.
///
/// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`].
#[must_use]
#[inline]
pub fn timelines_sliced(&self, timelines_to_keep: &IntSet<Timeline>) -> Self {
let Self {
Expand Down Expand Up @@ -264,6 +276,9 @@ impl Chunk {
///
/// If none of the `component_names` exist in the [`Chunk`], the end result will be the same as the
/// current chunk but without any component column.
///
/// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`].
#[must_use]
#[inline]
pub fn components_sliced(&self, component_names: &IntSet<ComponentName>) -> Self {
let Self {
Expand Down Expand Up @@ -306,6 +321,9 @@ impl Chunk {
///
/// If `component_name` doesn't exist in this [`Chunk`], or if it is already dense, this method
/// is a no-op.
///
/// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`].
#[must_use]
#[inline]
pub fn densified(&self, component_name_pov: ComponentName) -> Self {
let Self {
Expand Down Expand Up @@ -400,6 +418,9 @@ impl Chunk {
/// Empties the [`Chunk`] vertically.
///
/// The result is a new [`Chunk`] with the same columns but zero rows.
///
/// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`].
#[must_use]
#[inline]
pub fn emptied(&self) -> Self {
let Self {
Expand Down
114 changes: 114 additions & 0 deletions crates/store/re_chunk_store/src/drop_time_range.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use re_chunk::{ChunkId, Timeline};
use re_log_types::ResolvedTimeRange;

use crate::{ChunkStore, ChunkStoreEvent};

impl ChunkStore {
/// Drop all events that are in the given range on the given timeline.
///
/// Note that matching events will be dropped from all timelines they appear on.
///
/// Static chunks are unaffected.
///
/// Used to implement undo (erase the last event from the blueprint db).
pub fn drop_time_range(
&mut self,
timeline: &Timeline,
drop_range: ResolvedTimeRange,
) -> Vec<ChunkStoreEvent> {
re_tracing::profile_function!();

if drop_range.max() < drop_range.min() {
return Default::default();
}

// Prepare the changes:

let mut chunk_ids_to_drop = vec![];
let mut new_chunks = vec![];

for (chunk_id, chunk) in &self.chunks_per_chunk_id {
let Some(time_column) = chunk.timelines().get(timeline) else {
// static chunk, or chunk that doesn't overlap this timeline
continue; // keep it
};

let chunk_range = time_column.time_range();

if drop_range.contains_range(chunk_range) {
// The whole chunk should be dropped!
chunk_ids_to_drop.push(*chunk_id);
} else if drop_range.intersects(chunk_range) {
let chunk = chunk.sorted_by_timeline_if_unsorted(timeline);

let num_rows = chunk.num_rows();

// Get the sorted times:
#[allow(clippy::unwrap_used)] // We already know the chunk has the timeline
let time_column = chunk.timelines().get(timeline).unwrap();
let times = time_column.times_raw();

let drop_range_min = drop_range.min().as_i64();
let drop_range_max = drop_range.max().as_i64();

let min_idx = times.partition_point(|&time| time < drop_range_min);
let max_idx = times.partition_point(|&time| time <= drop_range_max);

{
// Sanity check:
debug_assert!(min_idx <= max_idx);
debug_assert!(drop_range_min <= times[min_idx]);
if 0 < min_idx {
debug_assert!(times[min_idx - 1] < drop_range_min);
}
if max_idx < num_rows {
debug_assert!(drop_range_max < times[max_idx]);
if 0 < max_idx {
debug_assert!(times[max_idx - 1] <= drop_range_max);
}
}
}

if min_idx < max_idx {
chunk_ids_to_drop.push(*chunk_id);
if 0 < min_idx {
new_chunks.push(chunk.row_sliced(0, min_idx).with_id(ChunkId::new()));
}
if max_idx < num_rows {
new_chunks.push(
chunk
.row_sliced(max_idx, num_rows - max_idx)
.with_id(ChunkId::new()),
);
}
}
}
}

// ------------------
// Apply the changes:

let generation = self.generation();
let mut events: Vec<ChunkStoreEvent> = vec![];

for chunk_id in chunk_ids_to_drop {
for diff in self.remove_chunk(chunk_id) {
events.push(ChunkStoreEvent {
store_id: self.id.clone(),
store_generation: generation.clone(),
event_id: self
.event_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
diff,
});
}
}
for mut chunk in new_chunks {
chunk.sort_if_unsorted();
#[allow(clippy::unwrap_used)] // The chunk came from the store, so it should be fine
events.append(&mut self.insert_chunk(&chunk.into()).unwrap());
}

events
}
}
1 change: 1 addition & 0 deletions crates/store/re_chunk_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//!
mod dataframe;
mod drop_time_range;
mod events;
mod gc;
mod query;
Expand Down
82 changes: 82 additions & 0 deletions crates/store/re_chunk_store/tests/drop_time_range.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// https://github.com/rust-lang/rust-clippy/issues/10011
#![cfg(test)]

use std::sync::Arc;

use re_chunk::{Chunk, RowId};
use re_chunk_store::{ChunkStore, ChunkStoreConfig};
use re_log_types::{example_components::MyColor, ResolvedTimeRange};
use re_log_types::{EntityPath, TimePoint, Timeline};
use re_types_core::Loggable as _;

#[test]
fn drop_time_range() -> anyhow::Result<()> {
re_log::setup_logging();

let entity_path = EntityPath::from("this/that");
let timeline = Timeline::new_sequence("timeline");
let data = MyColor::from_rgb(255, 0, 0);
let time_point_at = |time: i64| TimePoint::from([(timeline, time)]);

for config in [
ChunkStoreConfig::DEFAULT,
ChunkStoreConfig::COMPACTION_DISABLED,
] {
let mut store = ChunkStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
config,
);

let num_events = |store: &ChunkStore| {
store.num_temporal_events_for_component_on_timeline(
&timeline,
&entity_path,
MyColor::name(),
)
};

store.insert_chunk(&Arc::new(
Chunk::builder(entity_path.clone())
.with_component_batch(RowId::new(), time_point_at(0), &data)
.with_component_batch(RowId::new(), time_point_at(1), &data)
.with_component_batch(RowId::new(), time_point_at(2), &data)
.with_component_batch(RowId::new(), time_point_at(3), &data)
.build()?,
))?;

store.insert_chunk(&Arc::new(
Chunk::builder(entity_path.clone())
.with_component_batch(RowId::new(), time_point_at(2), &data)
.with_component_batch(RowId::new(), time_point_at(3), &data)
.with_component_batch(RowId::new(), time_point_at(4), &data)
.with_component_batch(RowId::new(), time_point_at(5), &data)
.build()?,
))?;

store.insert_chunk(&Arc::new(
Chunk::builder(entity_path.clone())
.with_component_batch(RowId::new(), time_point_at(4), &data)
.with_component_batch(RowId::new(), time_point_at(5), &data)
.with_component_batch(RowId::new(), time_point_at(6), &data)
.with_component_batch(RowId::new(), time_point_at(7), &data)
.build()?,
))?;

assert_eq!(num_events(&store), 12);

// Drop nothing:
store.drop_time_range(&timeline, ResolvedTimeRange::new(10, 100));
store.drop_time_range(&timeline, ResolvedTimeRange::new(-100, -10));
assert_eq!(num_events(&store), 12);

// Drop stuff from the middle of the first chunk, and the start of the second:
store.drop_time_range(&timeline, ResolvedTimeRange::new(1, 2));
assert_eq!(num_events(&store), 9);

// Drop a bunch in the middle (including all of middle chunk):
store.drop_time_range(&timeline, ResolvedTimeRange::new(2, 5));
assert_eq!(num_events(&store), 3);
}

Ok(())
}
13 changes: 13 additions & 0 deletions crates/store/re_entity_db/src/entity_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,19 @@ impl EntityDb {
store_events
}

/// Drop all events in the given time range from the given timeline.
///
/// Used to implement undo (erase the last event from the blueprint db).
pub fn drop_time_range(
&mut self,
timeline: &Timeline,
drop_range: ResolvedTimeRange,
) -> Vec<ChunkStoreEvent> {
let store_events = self.data_store.drop_time_range(timeline, drop_range);
self.on_store_deletions(&store_events);
store_events
}

/// Unconditionally drops all the data for a given [`EntityPath`] .
///
/// This is _not_ recursive. Children of this entity will not be affected.
Expand Down
6 changes: 6 additions & 0 deletions crates/store/re_log_types/src/resolved_time_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ impl ResolvedTimeRange {
self.min <= time && time <= self.max
}

/// Does this range fully contain the other?
#[inline]
pub fn contains_range(&self, other: Self) -> bool {
self.min <= other.min && other.max <= self.max
}

#[inline]
pub fn intersects(&self, other: Self) -> bool {
self.min <= other.max && self.max >= other.min
Expand Down
2 changes: 2 additions & 0 deletions crates/store/re_log_types/src/time_point/time_int.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ impl TimeInt {

/// Always returns [`Self::STATIC`] for [`Self::STATIC`].
#[inline]
#[must_use]
pub fn inc(&self) -> Self {
match self.0 {
Some(t) => Self::new_temporal(t.get().saturating_add(1)),
Expand All @@ -127,6 +128,7 @@ impl TimeInt {

/// Always returns [`Self::STATIC`] for [`Self::STATIC`].
#[inline]
#[must_use]
pub fn dec(&self) -> Self {
match self.0 {
Some(t) => Self::new_temporal(t.get().saturating_sub(1)),
Expand Down

0 comments on commit 8702f01

Please sign in to comment.