Skip to content

Commit

Permalink
Invalidate hub-wide caches on deletions and overwrites (rerun-io#7525)
Browse files Browse the repository at this point in the history
Hub-wide caches now subscribe to store events and invalidate accordingly
in the face of deletions and overwrites.

This is a crutch to compensate for the lack of secondary caching, but a
much needed crutch: the Rerun Viewer can now effectively be used as a
soft realtime telemetry system.



https://github.com/user-attachments/assets/f1136f9d-e1fd-4e6b-87c6-422d5f3345e8


* Fixes rerun-io#7404 

---

## Checklist

### `EncodedImage`

```python
for _ in range(0, 100):
    rr.log("image", rr.EncodedImage(path=image_file_path), static=True)
    time.sleep(0.01) # give time for the viewer to query and cache it
```

Before: 🟥 
After: 🟢 

### `Mesh3D`

```python
for _ in range(0, 100):
    rr.log(
        "triangle",
        rr.Mesh3D(
            vertex_positions=np.tile(np.array([[0.0, 0.0, 0.0], [1.0, 0.0, 0.0], [0.0, 1.0, 0.0]]), (33333, 1)),
            vertex_normals=[0.0, 0.0, 1.0],
            vertex_colors=[255, 0, 0],
        ),
        static=True,
    )
    time.sleep(0.01) # give time for the viewer to query and cache it
```

Before: 🟥 
After: 🟢  

### `Asset3D`

```python
for _ in range(0, 100):
    rr.log("world/asset", rr.Asset3D(path=sys.argv[1]), static=True)
    time.sleep(0.01) # give time for the viewer to query and cache it
```

Before: 🟥 
After: 🟢 

### `TensorData`

```python
for _ in range(0, 1000):
    rr.log("tensor", rr.Tensor(tensor, dim_names=("width", "height", "channel", "batch")), static=True)
    time.sleep(0.01) # give time for the viewer to query and cache it
```

Before: 🟥 
After: 🟢 


### `AssetVideo`

```python
frame_timestamps_ns = video_asset.read_frame_timestamps_ns()
rr.send_columns(
    "video",
    times=[rr.TimeNanosColumn("video_time", frame_timestamps_ns)],
    components=[rr.VideoFrameReference.indicator(), rr.components.VideoTimestamp.nanoseconds(frame_timestamps_ns)],
)

for _ in range(0, 100):
    rr.log("video", video_asset, static=True)
    time.sleep(0.01) # give time for the viewer to query and cache it
```

Before: 🟥 
After: 🟢

---------

Co-authored-by: Andreas Reich <[email protected]>
  • Loading branch information
teh-cmc and Wumpf authored Sep 27, 2024
1 parent 6b8ad66 commit f5aa0a0
Show file tree
Hide file tree
Showing 11 changed files with 419 additions and 107 deletions.
42 changes: 26 additions & 16 deletions crates/store/re_entity_db/src/entity_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,31 +321,35 @@ impl EntityDb {
self.entity_path_from_hash.contains_key(&entity_path.hash())
}

pub fn add(&mut self, msg: &LogMsg) -> Result<(), Error> {
pub fn add(&mut self, msg: &LogMsg) -> Result<Vec<ChunkStoreEvent>, Error> {
re_tracing::profile_function!();

debug_assert_eq!(msg.store_id(), self.store_id());

match &msg {
LogMsg::SetStoreInfo(msg) => self.set_store_info(msg.clone()),
let store_events = match &msg {
LogMsg::SetStoreInfo(msg) => {
self.set_store_info(msg.clone());
vec![]
}

LogMsg::ArrowMsg(_, arrow_msg) => {
self.last_modified_at = web_time::Instant::now();

let mut chunk = re_chunk::Chunk::from_arrow_msg(arrow_msg)?;
chunk.sort_if_unsorted();
self.add_chunk(&Arc::new(chunk))?;
self.add_chunk(&Arc::new(chunk))?
}

LogMsg::BlueprintActivationCommand(_) => {
// Not for us to handle
vec![]
}
}
};

Ok(())
Ok(store_events)
}

pub fn add_chunk(&mut self, chunk: &Arc<Chunk>) -> Result<(), Error> {
pub fn add_chunk(&mut self, chunk: &Arc<Chunk>) -> Result<Vec<ChunkStoreEvent>, Error> {
let store_events = self.data_store.insert_chunk(chunk)?;

self.register_entity_path(chunk.entity_path());
Expand All @@ -370,7 +374,7 @@ impl EntityDb {
self.stats.on_events(&store_events);
}

Ok(())
Ok(store_events)
}

fn register_entity_path(&mut self, entity_path: &EntityPath) {
Expand All @@ -383,36 +387,42 @@ impl EntityDb {
self.set_store_info = Some(store_info);
}

pub fn gc_everything_but_the_latest_row_on_non_default_timelines(&mut self) {
pub fn gc_everything_but_the_latest_row_on_non_default_timelines(
&mut self,
) -> Vec<ChunkStoreEvent> {
re_tracing::profile_function!();

self.gc(&GarbageCollectionOptions {
target: GarbageCollectionTarget::Everything,
protect_latest: 1, // TODO(jleibs): Bump this after we have an undo buffer
time_budget: DEFAULT_GC_TIME_BUDGET,
});
})
}

/// Free up some RAM by forgetting the older parts of all timelines.
pub fn purge_fraction_of_ram(&mut self, fraction_to_purge: f32) {
pub fn purge_fraction_of_ram(&mut self, fraction_to_purge: f32) -> Vec<ChunkStoreEvent> {
re_tracing::profile_function!();

assert!((0.0..=1.0).contains(&fraction_to_purge));
if !self.gc(&GarbageCollectionOptions {

let store_events = self.gc(&GarbageCollectionOptions {
target: GarbageCollectionTarget::DropAtLeastFraction(fraction_to_purge as _),
protect_latest: 1,
time_budget: DEFAULT_GC_TIME_BUDGET,
}) {
});

if store_events.is_empty() {
// If we weren't able to collect any data, then we need to GC the cache itself in order
// to regain some space.
// See <https://github.com/rerun-io/rerun/issues/7369#issuecomment-2335164098> for the
// complete rationale.
self.query_caches.purge_fraction_of_ram(fraction_to_purge);
}

store_events
}

/// Returns `true` if anything at all was actually GC'd.
pub fn gc(&mut self, gc_options: &GarbageCollectionOptions) -> bool {
fn gc(&mut self, gc_options: &GarbageCollectionOptions) -> Vec<ChunkStoreEvent> {
re_tracing::profile_function!();

let (store_events, stats_diff) = self.data_store.gc(gc_options);
Expand All @@ -425,7 +435,7 @@ impl EntityDb {

self.on_store_deletions(&store_events);

!store_events.is_empty()
store_events
}

/// Unconditionally drops all the data for a given [`EntityPath`] .
Expand Down
44 changes: 42 additions & 2 deletions crates/viewer/re_space_view_spatial/src/mesh_cache.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use std::sync::Arc;

use ahash::{HashMap, HashSet};

use itertools::Either;
use re_chunk_store::{ChunkStoreEvent, RowId};
use re_entity_db::VersionedInstancePathHash;
use re_log_types::hash::Hash64;
use re_renderer::RenderContext;
use re_types::components::MediaType;
use re_types::{components::MediaType, Loggable as _};
use re_viewer_context::Cache;

use crate::mesh_loader::LoadedMesh;
Expand All @@ -15,6 +19,7 @@ use crate::mesh_loader::LoadedMesh;
/// Note that this is more complex than most other caches,
/// since the cache key is not only used for mesh file blobs,
/// but also for manually logged meshes.
//
// TODO(andreas): Maybe these should be different concerns?
// Blobs need costly unpacking/reading/parsing, regular meshes don't.
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
Expand All @@ -26,7 +31,7 @@ pub struct MeshCacheKey {

/// Caches meshes based on their [`MeshCacheKey`].
#[derive(Default)]
pub struct MeshCache(ahash::HashMap<MeshCacheKey, Option<Arc<LoadedMesh>>>);
pub struct MeshCache(HashMap<RowId, HashMap<MeshCacheKey, Option<Arc<LoadedMesh>>>>);

/// Either a [`re_types::archetypes::Asset3D`] or [`re_types::archetypes::Mesh3D`] to be cached.
#[derive(Debug, Clone, Copy)]
Expand All @@ -52,6 +57,8 @@ impl MeshCache {
re_tracing::profile_function!();

self.0
.entry(key.versioned_instance_path_hash.row_id)
.or_default()
.entry(key)
.or_insert_with(|| {
re_log::debug!("Loading CPU mesh {name:?}…");
Expand All @@ -75,6 +82,39 @@ impl Cache for MeshCache {
self.0.clear();
}

fn on_store_events(&mut self, events: &[ChunkStoreEvent]) {
re_tracing::profile_function!();

let row_ids_removed: HashSet<RowId> = events
.iter()
.flat_map(|event| {
let is_deletion = || event.kind == re_chunk_store::ChunkStoreDiffKind::Deletion;
let contains_mesh_data = || {
let contains_asset_blob = event
.chunk
.components()
.contains_key(&re_types::components::Blob::name());

let contains_vertex_positions = event
.chunk
.components()
.contains_key(&re_types::components::Position3D::name());

contains_asset_blob || contains_vertex_positions
};

if is_deletion() && contains_mesh_data() {
Either::Left(event.chunk.row_ids())
} else {
Either::Right(std::iter::empty())
}
})
.collect();

self.0
.retain(|row_id, _per_key| !row_ids_removed.contains(row_id));
}

fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
self
}
Expand Down
29 changes: 22 additions & 7 deletions crates/viewer/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ impl App {
let blueprint_db = store_hub.entity_db_mut(&blueprint_id);
for chunk in updates {
match blueprint_db.add_chunk(&Arc::new(chunk)) {
Ok(()) => {}
Ok(_store_events) => {}
Err(err) => {
re_log::warn_once!("Failed to store blueprint delta: {err}");
}
Expand Down Expand Up @@ -1086,15 +1086,30 @@ impl App {
re_log::warn_once!("Loading a blueprint {store_id} that is active. See https://github.com/rerun-io/rerun/issues/5514 for details.");
}

let entity_db = store_hub.entity_db_mut(store_id);
// TODO(cmc): we have to keep grabbing and releasing entity_db because everything references
// everything and some of it is mutable and some not… it's really not pretty, but it
// does the job for now.

if entity_db.data_source.is_none() {
entity_db.data_source = Some((*channel_source).clone());
{
let entity_db = store_hub.entity_db_mut(store_id);
if entity_db.data_source.is_none() {
entity_db.data_source = Some((*channel_source).clone());
}
}

if let Err(err) = entity_db.add(&msg) {
re_log::error_once!("Failed to add incoming msg: {err}");
};
match store_hub.entity_db_mut(store_id).add(&msg) {
Ok(store_events) => {
if let Some(caches) = store_hub.active_caches() {
caches.on_store_events(&store_events);
}
}

Err(err) => {
re_log::error_once!("Failed to add incoming msg: {err}");
}
}

let entity_db = store_hub.entity_db_mut(store_id);

match &msg {
LogMsg::SetStoreInfo(_) => {
Expand Down
19 changes: 19 additions & 0 deletions crates/viewer/re_viewer_context/src/cache/caches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::any::{Any, TypeId};

use ahash::HashMap;
use parking_lot::Mutex;
use re_chunk_store::ChunkStoreEvent;

/// Does memoization of different objects for the immediate mode UI.
#[derive(Default)]
Expand All @@ -26,6 +27,17 @@ impl Caches {
}
}

/// React to the chunk store's changelog, if needed.
///
/// Useful to e.g. invalidate unreachable data.
pub fn on_store_events(&self, events: &[ChunkStoreEvent]) {
re_tracing::profile_function!();

for cache in self.0.lock().values_mut() {
cache.on_store_events(events);
}
}

/// Accesses a cache for reading and writing.
///
/// Adds the cache lazily if it wasn't already there.
Expand All @@ -52,6 +64,13 @@ pub trait Cache: std::any::Any + Send + Sync {
/// Attempt to free up memory.
fn purge_memory(&mut self);

/// React to the chunk store's changelog, if needed.
///
/// Useful to e.g. invalidate unreachable data.
fn on_store_events(&mut self, events: &[ChunkStoreEvent]) {
_ = events;
}

// TODO(andreas): Track bytes used for each cache and show in the memory panel!
//fn bytes_used(&self) -> usize;

Expand Down
Loading

0 comments on commit f5aa0a0

Please sign in to comment.