From f5aa0a09456d6471061c33d2cce55d13c9560cab Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Fri, 27 Sep 2024 12:06:16 +0200 Subject: [PATCH] Invalidate hub-wide caches on deletions and overwrites (#7525) Hub-wide caches now subscribe to store events and invalidate accordingly in the face of deletions and overwrites. This is a crutch to compensate for the lack of secondary caching, but a much needed crutch: the Rerun Viewer can now effectively be used as a soft realtime telemetry system. https://github.com/user-attachments/assets/f1136f9d-e1fd-4e6b-87c6-422d5f3345e8 * Fixes #7404 --- ## Checklist ### `EncodedImage` ```python for _ in range(0, 100): rr.log("image", rr.EncodedImage(path=image_file_path), static=True) time.sleep(0.01) # give time for the viewer to query and cache it ``` Before: :red_square: After: :green_circle: ### `Mesh3D` ```python for _ in range(0, 100): rr.log( "triangle", rr.Mesh3D( vertex_positions=np.tile(np.array([[0.0, 0.0, 0.0], [1.0, 0.0, 0.0], [0.0, 1.0, 0.0]]), (33333, 1)), vertex_normals=[0.0, 0.0, 1.0], vertex_colors=[255, 0, 0], ), static=True, ) time.sleep(0.01) # give time for the viewer to query and cache it ``` Before: :red_square: After: :green_circle: ### `Asset3D` ```python for _ in range(0, 100): rr.log("world/asset", rr.Asset3D(path=sys.argv[1]), static=True) time.sleep(0.01) # give time for the viewer to query and cache it ``` Before: :red_square: After: :green_circle: ### `TensorData` ```python for _ in range(0, 1000): rr.log("tensor", rr.Tensor(tensor, dim_names=("width", "height", "channel", "batch")), static=True) time.sleep(0.01) # give time for the viewer to query and cache it ``` Before: :red_square: After: :green_circle: ### `AssetVideo` ```python frame_timestamps_ns = video_asset.read_frame_timestamps_ns() rr.send_columns( "video", times=[rr.TimeNanosColumn("video_time", frame_timestamps_ns)], components=[rr.VideoFrameReference.indicator(), rr.components.VideoTimestamp.nanoseconds(frame_timestamps_ns)], ) for _ in range(0, 100): rr.log("video", video_asset, static=True) time.sleep(0.01) # give time for the viewer to query and cache it ``` Before: :red_square: After: :green_circle: --------- Co-authored-by: Andreas Reich --- crates/store/re_entity_db/src/entity_db.rs | 42 +++--- .../re_space_view_spatial/src/mesh_cache.rs | 44 +++++- crates/viewer/re_viewer/src/app.rs | 29 +++- .../re_viewer_context/src/cache/caches.rs | 19 +++ .../src/cache/image_decode_cache.rs | 84 ++++++++--- .../src/cache/image_stats_cache.rs | 42 +++++- .../src/cache/tensor_stats_cache.rs | 41 +++++- .../src/cache/video_cache.rs | 81 ++++++++--- .../re_viewer_context/src/image_info.rs | 2 +- .../viewer/re_viewer_context/src/store_hub.rs | 12 +- .../python/face_tracking/face_tracking.py | 130 ++++++++++++++---- 11 files changed, 419 insertions(+), 107 deletions(-) diff --git a/crates/store/re_entity_db/src/entity_db.rs b/crates/store/re_entity_db/src/entity_db.rs index 16444a228eec..76e3636b7c99 100644 --- a/crates/store/re_entity_db/src/entity_db.rs +++ b/crates/store/re_entity_db/src/entity_db.rs @@ -321,31 +321,35 @@ impl EntityDb { self.entity_path_from_hash.contains_key(&entity_path.hash()) } - pub fn add(&mut self, msg: &LogMsg) -> Result<(), Error> { + pub fn add(&mut self, msg: &LogMsg) -> Result, Error> { re_tracing::profile_function!(); debug_assert_eq!(msg.store_id(), self.store_id()); - match &msg { - LogMsg::SetStoreInfo(msg) => self.set_store_info(msg.clone()), + let store_events = match &msg { + LogMsg::SetStoreInfo(msg) => { + self.set_store_info(msg.clone()); + vec![] + } LogMsg::ArrowMsg(_, arrow_msg) => { self.last_modified_at = web_time::Instant::now(); let mut chunk = re_chunk::Chunk::from_arrow_msg(arrow_msg)?; chunk.sort_if_unsorted(); - self.add_chunk(&Arc::new(chunk))?; + self.add_chunk(&Arc::new(chunk))? } LogMsg::BlueprintActivationCommand(_) => { // Not for us to handle + vec![] } - } + }; - Ok(()) + Ok(store_events) } - pub fn add_chunk(&mut self, chunk: &Arc) -> Result<(), Error> { + pub fn add_chunk(&mut self, chunk: &Arc) -> Result, Error> { let store_events = self.data_store.insert_chunk(chunk)?; self.register_entity_path(chunk.entity_path()); @@ -370,7 +374,7 @@ impl EntityDb { self.stats.on_events(&store_events); } - Ok(()) + Ok(store_events) } fn register_entity_path(&mut self, entity_path: &EntityPath) { @@ -383,36 +387,42 @@ impl EntityDb { self.set_store_info = Some(store_info); } - pub fn gc_everything_but_the_latest_row_on_non_default_timelines(&mut self) { + pub fn gc_everything_but_the_latest_row_on_non_default_timelines( + &mut self, + ) -> Vec { re_tracing::profile_function!(); self.gc(&GarbageCollectionOptions { target: GarbageCollectionTarget::Everything, protect_latest: 1, // TODO(jleibs): Bump this after we have an undo buffer time_budget: DEFAULT_GC_TIME_BUDGET, - }); + }) } /// Free up some RAM by forgetting the older parts of all timelines. - pub fn purge_fraction_of_ram(&mut self, fraction_to_purge: f32) { + pub fn purge_fraction_of_ram(&mut self, fraction_to_purge: f32) -> Vec { re_tracing::profile_function!(); assert!((0.0..=1.0).contains(&fraction_to_purge)); - if !self.gc(&GarbageCollectionOptions { + + let store_events = self.gc(&GarbageCollectionOptions { target: GarbageCollectionTarget::DropAtLeastFraction(fraction_to_purge as _), protect_latest: 1, time_budget: DEFAULT_GC_TIME_BUDGET, - }) { + }); + + if store_events.is_empty() { // If we weren't able to collect any data, then we need to GC the cache itself in order // to regain some space. // See for the // complete rationale. self.query_caches.purge_fraction_of_ram(fraction_to_purge); } + + store_events } - /// Returns `true` if anything at all was actually GC'd. - pub fn gc(&mut self, gc_options: &GarbageCollectionOptions) -> bool { + fn gc(&mut self, gc_options: &GarbageCollectionOptions) -> Vec { re_tracing::profile_function!(); let (store_events, stats_diff) = self.data_store.gc(gc_options); @@ -425,7 +435,7 @@ impl EntityDb { self.on_store_deletions(&store_events); - !store_events.is_empty() + store_events } /// Unconditionally drops all the data for a given [`EntityPath`] . diff --git a/crates/viewer/re_space_view_spatial/src/mesh_cache.rs b/crates/viewer/re_space_view_spatial/src/mesh_cache.rs index 50532fe48f60..3aeb3d10d957 100644 --- a/crates/viewer/re_space_view_spatial/src/mesh_cache.rs +++ b/crates/viewer/re_space_view_spatial/src/mesh_cache.rs @@ -1,9 +1,13 @@ use std::sync::Arc; +use ahash::{HashMap, HashSet}; + +use itertools::Either; +use re_chunk_store::{ChunkStoreEvent, RowId}; use re_entity_db::VersionedInstancePathHash; use re_log_types::hash::Hash64; use re_renderer::RenderContext; -use re_types::components::MediaType; +use re_types::{components::MediaType, Loggable as _}; use re_viewer_context::Cache; use crate::mesh_loader::LoadedMesh; @@ -15,6 +19,7 @@ use crate::mesh_loader::LoadedMesh; /// Note that this is more complex than most other caches, /// since the cache key is not only used for mesh file blobs, /// but also for manually logged meshes. +// // TODO(andreas): Maybe these should be different concerns? // Blobs need costly unpacking/reading/parsing, regular meshes don't. #[derive(Debug, PartialEq, Eq, Hash, Clone)] @@ -26,7 +31,7 @@ pub struct MeshCacheKey { /// Caches meshes based on their [`MeshCacheKey`]. #[derive(Default)] -pub struct MeshCache(ahash::HashMap>>); +pub struct MeshCache(HashMap>>>); /// Either a [`re_types::archetypes::Asset3D`] or [`re_types::archetypes::Mesh3D`] to be cached. #[derive(Debug, Clone, Copy)] @@ -52,6 +57,8 @@ impl MeshCache { re_tracing::profile_function!(); self.0 + .entry(key.versioned_instance_path_hash.row_id) + .or_default() .entry(key) .or_insert_with(|| { re_log::debug!("Loading CPU mesh {name:?}…"); @@ -75,6 +82,39 @@ impl Cache for MeshCache { self.0.clear(); } + fn on_store_events(&mut self, events: &[ChunkStoreEvent]) { + re_tracing::profile_function!(); + + let row_ids_removed: HashSet = events + .iter() + .flat_map(|event| { + let is_deletion = || event.kind == re_chunk_store::ChunkStoreDiffKind::Deletion; + let contains_mesh_data = || { + let contains_asset_blob = event + .chunk + .components() + .contains_key(&re_types::components::Blob::name()); + + let contains_vertex_positions = event + .chunk + .components() + .contains_key(&re_types::components::Position3D::name()); + + contains_asset_blob || contains_vertex_positions + }; + + if is_deletion() && contains_mesh_data() { + Either::Left(event.chunk.row_ids()) + } else { + Either::Right(std::iter::empty()) + } + }) + .collect(); + + self.0 + .retain(|row_id, _per_key| !row_ids_removed.contains(row_id)); + } + fn as_any_mut(&mut self) -> &mut dyn std::any::Any { self } diff --git a/crates/viewer/re_viewer/src/app.rs b/crates/viewer/re_viewer/src/app.rs index ac4c5e6e5044..3d5468fdb944 100644 --- a/crates/viewer/re_viewer/src/app.rs +++ b/crates/viewer/re_viewer/src/app.rs @@ -509,7 +509,7 @@ impl App { let blueprint_db = store_hub.entity_db_mut(&blueprint_id); for chunk in updates { match blueprint_db.add_chunk(&Arc::new(chunk)) { - Ok(()) => {} + Ok(_store_events) => {} Err(err) => { re_log::warn_once!("Failed to store blueprint delta: {err}"); } @@ -1086,15 +1086,30 @@ impl App { re_log::warn_once!("Loading a blueprint {store_id} that is active. See https://github.com/rerun-io/rerun/issues/5514 for details."); } - let entity_db = store_hub.entity_db_mut(store_id); + // TODO(cmc): we have to keep grabbing and releasing entity_db because everything references + // everything and some of it is mutable and some not… it's really not pretty, but it + // does the job for now. - if entity_db.data_source.is_none() { - entity_db.data_source = Some((*channel_source).clone()); + { + let entity_db = store_hub.entity_db_mut(store_id); + if entity_db.data_source.is_none() { + entity_db.data_source = Some((*channel_source).clone()); + } } - if let Err(err) = entity_db.add(&msg) { - re_log::error_once!("Failed to add incoming msg: {err}"); - }; + match store_hub.entity_db_mut(store_id).add(&msg) { + Ok(store_events) => { + if let Some(caches) = store_hub.active_caches() { + caches.on_store_events(&store_events); + } + } + + Err(err) => { + re_log::error_once!("Failed to add incoming msg: {err}"); + } + } + + let entity_db = store_hub.entity_db_mut(store_id); match &msg { LogMsg::SetStoreInfo(_) => { diff --git a/crates/viewer/re_viewer_context/src/cache/caches.rs b/crates/viewer/re_viewer_context/src/cache/caches.rs index 2858df38547c..fcd45c9e0530 100644 --- a/crates/viewer/re_viewer_context/src/cache/caches.rs +++ b/crates/viewer/re_viewer_context/src/cache/caches.rs @@ -2,6 +2,7 @@ use std::any::{Any, TypeId}; use ahash::HashMap; use parking_lot::Mutex; +use re_chunk_store::ChunkStoreEvent; /// Does memoization of different objects for the immediate mode UI. #[derive(Default)] @@ -26,6 +27,17 @@ impl Caches { } } + /// React to the chunk store's changelog, if needed. + /// + /// Useful to e.g. invalidate unreachable data. + pub fn on_store_events(&self, events: &[ChunkStoreEvent]) { + re_tracing::profile_function!(); + + for cache in self.0.lock().values_mut() { + cache.on_store_events(events); + } + } + /// Accesses a cache for reading and writing. /// /// Adds the cache lazily if it wasn't already there. @@ -52,6 +64,13 @@ pub trait Cache: std::any::Any + Send + Sync { /// Attempt to free up memory. fn purge_memory(&mut self); + /// React to the chunk store's changelog, if needed. + /// + /// Useful to e.g. invalidate unreachable data. + fn on_store_events(&mut self, events: &[ChunkStoreEvent]) { + _ = events; + } + // TODO(andreas): Track bytes used for each cache and show in the memory panel! //fn bytes_used(&self) -> usize; diff --git a/crates/viewer/re_viewer_context/src/cache/image_decode_cache.rs b/crates/viewer/re_viewer_context/src/cache/image_decode_cache.rs index 0698923ec0f8..efbcc865548f 100644 --- a/crates/viewer/re_viewer_context/src/cache/image_decode_cache.rs +++ b/crates/viewer/re_viewer_context/src/cache/image_decode_cache.rs @@ -1,8 +1,13 @@ +use ahash::{HashMap, HashSet}; + +use itertools::Either; use re_chunk::RowId; +use re_chunk_store::ChunkStoreEvent; use re_log_types::hash::Hash64; use re_types::{ archetypes::Image, image::{ImageKind, ImageLoadError}, + Loggable as _, }; use crate::{Cache, ImageInfo}; @@ -21,7 +26,7 @@ struct DecodedImageResult { /// Caches the results of decoding [`re_types::archetypes::EncodedImage`]. #[derive(Default)] pub struct ImageDecodeCache { - cache: ahash::HashMap, + cache: HashMap>, memory_used: u64, generation: u64, } @@ -35,31 +40,36 @@ impl ImageDecodeCache { /// so we don't need the instance id here. pub fn entry( &mut self, - row_id: RowId, + blob_row_id: RowId, image_bytes: &[u8], media_type: Option<&str>, ) -> Result { re_tracing::profile_function!(); - let key = Hash64::hash((row_id, media_type)); - - let lookup = self.cache.entry(key).or_insert_with(|| { - let result = decode_image(row_id, image_bytes, media_type); - let memory_used = result.as_ref().map_or(0, |image| image.buffer.len() as u64); - self.memory_used += memory_used; - DecodedImageResult { - result, - memory_used, - last_use_generation: 0, - } - }); + let inner_key = Hash64::hash(media_type); + + let lookup = self + .cache + .entry(blob_row_id) + .or_default() + .entry(inner_key) + .or_insert_with(|| { + let result = decode_image(blob_row_id, image_bytes, media_type); + let memory_used = result.as_ref().map_or(0, |image| image.buffer.len() as u64); + self.memory_used += memory_used; + DecodedImageResult { + result, + memory_used, + last_use_generation: 0, + } + }); lookup.last_use_generation = self.generation; lookup.result.clone() } } fn decode_image( - row_id: RowId, + blob_row_id: RowId, image_bytes: &[u8], media_type: Option<&str>, ) -> Result { @@ -89,7 +99,7 @@ fn decode_image( let Image { buffer, format, .. } = image_arch; Ok(ImageInfo { - buffer_row_id: row_id, + buffer_row_id: blob_row_id, buffer: buffer.0, format: format.0, kind: ImageKind::Color, @@ -122,12 +132,16 @@ impl Cache for ImageDecodeCache { let before = self.memory_used; - self.cache.retain(|_, ci| { - let retain = ci.last_use_generation == self.generation; - if !retain { - self.memory_used -= ci.memory_used; - } - retain + self.cache.retain(|_row_id, per_key| { + per_key.retain(|_, ci| { + let retain = ci.last_use_generation == self.generation; + if !retain { + self.memory_used -= ci.memory_used; + } + retain + }); + + !per_key.is_empty() }); re_log::trace!( @@ -137,6 +151,32 @@ impl Cache for ImageDecodeCache { ); } + fn on_store_events(&mut self, events: &[ChunkStoreEvent]) { + re_tracing::profile_function!(); + + let row_ids_removed: HashSet = events + .iter() + .flat_map(|event| { + let is_deletion = || event.kind == re_chunk_store::ChunkStoreDiffKind::Deletion; + let contains_image_blob = || { + event + .chunk + .components() + .contains_key(&re_types::components::Blob::name()) + }; + + if is_deletion() && contains_image_blob() { + Either::Left(event.chunk.row_ids()) + } else { + Either::Right(std::iter::empty()) + } + }) + .collect(); + + self.cache + .retain(|row_id, _per_key| !row_ids_removed.contains(row_id)); + } + fn as_any_mut(&mut self) -> &mut dyn std::any::Any { self } diff --git a/crates/viewer/re_viewer_context/src/cache/image_stats_cache.rs b/crates/viewer/re_viewer_context/src/cache/image_stats_cache.rs index cbd882cd9fad..0ff2ce3ce927 100644 --- a/crates/viewer/re_viewer_context/src/cache/image_stats_cache.rs +++ b/crates/viewer/re_viewer_context/src/cache/image_stats_cache.rs @@ -1,17 +1,25 @@ -use egui::util::hash; +use ahash::{HashMap, HashSet}; +use itertools::Either; + +use re_chunk::RowId; +use re_chunk_store::ChunkStoreEvent; +use re_log_types::hash::Hash64; +use re_types::Loggable as _; use crate::{Cache, ImageInfo, ImageStats}; // Caches image stats using a [`RowId`] #[derive(Default)] -pub struct ImageStatsCache(ahash::HashMap); +pub struct ImageStatsCache(HashMap>); impl ImageStatsCache { pub fn entry(&mut self, image: &ImageInfo) -> ImageStats { - let key = hash((image.buffer_row_id, image.format)); + let inner_key = Hash64::hash(image.format); *self .0 - .entry(key) + .entry(image.buffer_row_id) + .or_default() + .entry(inner_key) .or_insert_with(|| ImageStats::from_image(image)) } } @@ -21,6 +29,32 @@ impl Cache for ImageStatsCache { // Purging the image stats is not worth it - these are very small objects! } + fn on_store_events(&mut self, events: &[ChunkStoreEvent]) { + re_tracing::profile_function!(); + + let row_ids_removed: HashSet = events + .iter() + .flat_map(|event| { + let is_deletion = || event.kind == re_chunk_store::ChunkStoreDiffKind::Deletion; + let contains_image_blob = || { + event + .chunk + .components() + .contains_key(&re_types::components::Blob::name()) + }; + + if is_deletion() && contains_image_blob() { + Either::Left(event.chunk.row_ids()) + } else { + Either::Right(std::iter::empty()) + } + }) + .collect(); + + self.0 + .retain(|row_id, _per_key| !row_ids_removed.contains(row_id)); + } + fn as_any_mut(&mut self) -> &mut dyn std::any::Any { self } diff --git a/crates/viewer/re_viewer_context/src/cache/tensor_stats_cache.rs b/crates/viewer/re_viewer_context/src/cache/tensor_stats_cache.rs index fdba62d03365..bbc7ed5e30ae 100644 --- a/crates/viewer/re_viewer_context/src/cache/tensor_stats_cache.rs +++ b/crates/viewer/re_viewer_context/src/cache/tensor_stats_cache.rs @@ -1,22 +1,25 @@ +use ahash::{HashMap, HashSet}; +use itertools::Either; + use re_chunk::RowId; -use re_types::datatypes::TensorData; +use re_chunk_store::ChunkStoreEvent; +use re_types::{datatypes::TensorData, Loggable as _}; -use crate::Cache; -use crate::TensorStats; +use crate::{Cache, TensorStats}; /// Caches tensor stats using a [`RowId`], i.e. a specific instance of /// a `TensorData` component #[derive(Default)] -pub struct TensorStatsCache(ahash::HashMap); +pub struct TensorStatsCache(HashMap); impl TensorStatsCache { /// The key should be the `RowId` of the `TensorData`. /// NOTE: `TensorData` is never batched (they are mono-components), /// so we don't need the instance id here. - pub fn entry(&mut self, key: RowId, tensor: &TensorData) -> TensorStats { + pub fn entry(&mut self, tensor_data_row_id: RowId, tensor: &TensorData) -> TensorStats { *self .0 - .entry(key) + .entry(tensor_data_row_id) .or_insert_with(|| TensorStats::from_tensor(tensor)) } } @@ -26,6 +29,32 @@ impl Cache for TensorStatsCache { // Purging the tensor stats is not worth it - these are very small objects! } + fn on_store_events(&mut self, events: &[ChunkStoreEvent]) { + re_tracing::profile_function!(); + + let row_ids_removed: HashSet = events + .iter() + .flat_map(|event| { + let is_deletion = || event.kind == re_chunk_store::ChunkStoreDiffKind::Deletion; + let contains_tensor_data = || { + event + .chunk + .components() + .contains_key(&re_types::components::TensorData::name()) + }; + + if is_deletion() && contains_tensor_data() { + Either::Left(event.chunk.row_ids()) + } else { + Either::Right(std::iter::empty()) + } + }) + .collect(); + + self.0 + .retain(|row_id, _per_key| !row_ids_removed.contains(row_id)); + } + fn as_any_mut(&mut self) -> &mut dyn std::any::Any { self } diff --git a/crates/viewer/re_viewer_context/src/cache/video_cache.rs b/crates/viewer/re_viewer_context/src/cache/video_cache.rs index e15e88254a7e..8faa31a76a0f 100644 --- a/crates/viewer/re_viewer_context/src/cache/video_cache.rs +++ b/crates/viewer/re_viewer_context/src/cache/video_cache.rs @@ -1,13 +1,19 @@ -use crate::Cache; -use re_chunk::RowId; -use re_log_types::hash::Hash64; -use re_renderer::{external::re_video::VideoLoadError, video::Video}; - use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, }; +use ahash::{HashMap, HashSet}; + +use itertools::Either; +use re_chunk::RowId; +use re_chunk_store::ChunkStoreEvent; +use re_log_types::hash::Hash64; +use re_renderer::{external::re_video::VideoLoadError, video::Video}; +use re_types::Loggable as _; + +use crate::Cache; + // ---------------------------------------------------------------------------- struct Entry { @@ -19,7 +25,7 @@ struct Entry { /// Caches meshes based on media type & row id. #[derive(Default)] -pub struct VideoCache(ahash::HashMap); +pub struct VideoCache(HashMap>); impl VideoCache { /// Read in some video data and cache the result. @@ -29,21 +35,26 @@ impl VideoCache { /// so we don't need the instance id here. pub fn entry( &mut self, - row_id: RowId, + blob_row_id: RowId, video_data: &re_types::datatypes::Blob, media_type: Option<&str>, ) -> Arc> { re_tracing::profile_function!(); - let key = Hash64::hash((row_id, media_type)); + let inner_key = Hash64::hash(media_type); - let entry = self.0.entry(key).or_insert_with(|| { - let video = Video::load(video_data, media_type); - Entry { - used_this_frame: AtomicBool::new(true), - video: Arc::new(video), - } - }); + let entry = self + .0 + .entry(blob_row_id) + .or_default() + .entry(inner_key) + .or_insert_with(|| { + let video = Video::load(video_data, media_type); + Entry { + used_this_frame: AtomicBool::new(true), + video: Arc::new(video), + } + }); // Using acquire/release here to be on the safe side and for semantical soundness: // Whatever thread is acquiring the fact that this was used, should also see/acquire @@ -55,17 +66,47 @@ impl VideoCache { impl Cache for VideoCache { fn begin_frame(&mut self, renderer_active_frame_idx: u64) { - for v in self.0.values() { - v.used_this_frame.store(false, Ordering::Release); - if let Ok(video) = v.video.as_ref() { - video.purge_unused_decoders(renderer_active_frame_idx); + for per_key in self.0.values() { + for v in per_key.values() { + v.used_this_frame.store(false, Ordering::Release); + if let Ok(video) = v.video.as_ref() { + video.purge_unused_decoders(renderer_active_frame_idx); + } } } } fn purge_memory(&mut self) { + self.0.retain(|_row_id, per_key| { + per_key.retain(|_, v| v.used_this_frame.load(Ordering::Acquire)); + !per_key.is_empty() + }); + } + + fn on_store_events(&mut self, events: &[ChunkStoreEvent]) { + re_tracing::profile_function!(); + + let row_ids_removed: HashSet = events + .iter() + .flat_map(|event| { + let is_deletion = || event.kind == re_chunk_store::ChunkStoreDiffKind::Deletion; + let contains_video_blob = || { + event + .chunk + .components() + .contains_key(&re_types::components::Blob::name()) + }; + + if is_deletion() && contains_video_blob() { + Either::Left(event.chunk.row_ids()) + } else { + Either::Right(std::iter::empty()) + } + }) + .collect(); + self.0 - .retain(|_, v| v.used_this_frame.load(Ordering::Acquire)); + .retain(|row_id, _per_key| !row_ids_removed.contains(row_id)); } fn as_any_mut(&mut self) -> &mut dyn std::any::Any { diff --git a/crates/viewer/re_viewer_context/src/image_info.rs b/crates/viewer/re_viewer_context/src/image_info.rs index 8359934a9100..b144cf50473e 100644 --- a/crates/viewer/re_viewer_context/src/image_info.rs +++ b/crates/viewer/re_viewer_context/src/image_info.rs @@ -13,7 +13,7 @@ use re_types::{ /// It has enough information to render the image on the screen. #[derive(Clone)] pub struct ImageInfo { - /// The row id that contaoned the blob. + /// The row id that contained the blob. /// /// Can be used instead of hashing [`Self::buffer`]. pub buffer_row_id: RowId, diff --git a/crates/viewer/re_viewer_context/src/store_hub.rs b/crates/viewer/re_viewer_context/src/store_hub.rs index 25ed6d38ac20..00079d04f5df 100644 --- a/crates/viewer/re_viewer_context/src/store_hub.rs +++ b/crates/viewer/re_viewer_context/src/store_hub.rs @@ -617,9 +617,13 @@ impl StoreHub { }; let store_size_before = entity_db.store().stats().total().total_size_bytes; - entity_db.purge_fraction_of_ram(fraction_to_purge); + let store_events = entity_db.purge_fraction_of_ram(fraction_to_purge); let store_size_after = entity_db.store().stats().total().total_size_bytes; + if let Some(caches) = self.caches_per_recording.get_mut(&store_id) { + caches.on_store_events(&store_events); + } + // No point keeping an empty recording around. if entity_db.is_empty() { self.remove(&store_id); @@ -684,7 +688,11 @@ impl StoreHub { // TODO(jleibs): Decide a better tuning for this. Would like to save a // reasonable amount of history, or incremental snapshots. - blueprint.gc_everything_but_the_latest_row_on_non_default_timelines(); + let store_events = + blueprint.gc_everything_but_the_latest_row_on_non_default_timelines(); + if let Some(caches) = self.caches_per_recording.get_mut(blueprint_id) { + caches.on_store_events(&store_events); + } self.blueprint_last_gc .insert(blueprint_id.clone(), blueprint.generation()); diff --git a/examples/python/face_tracking/face_tracking.py b/examples/python/face_tracking/face_tracking.py index 3981c23e1d9b..c5729fde2d36 100755 --- a/examples/python/face_tracking/face_tracking.py +++ b/examples/python/face_tracking/face_tracking.py @@ -20,6 +20,17 @@ import tqdm from mediapipe.tasks.python import vision +# If set, log everything as static. +# +# Generally, the Viewer accumulates data until its set memory budget at which point it will +# remove the oldest data from the recording (see https://rerun.io/docs/howto/limit-ram) +# By instead logging data as static, no data will be accumulated over time since previous +# data is overwritten. +# Naturally, the drawback of this is that there's no history of previous data sent to the viewer, +# as well as no timestamps, making the Viewer's timeline effectively inactive. +global ALL_STATIC +ALL_STATIC: bool = False + EXAMPLE_DIR: Final = Path(os.path.dirname(__file__)) DATASET_DIR: Final = EXAMPLE_DIR / "dataset" MODEL_DIR: Final = EXAMPLE_DIR / "model" @@ -110,7 +121,9 @@ def __init__(self, video_mode: bool = False): ) self._options = vision.FaceDetectorOptions( base_options=self._base_options, - running_mode=mp.tasks.vision.RunningMode.VIDEO if self._video_mode else mp.tasks.vision.RunningMode.IMAGE, + running_mode=mp.tasks.vision.RunningMode.VIDEO + if self._video_mode + else mp.tasks.vision.RunningMode.IMAGE, ) self._detector = vision.FaceDetector.create_from_options(self._options) @@ -118,7 +131,8 @@ def __init__(self, video_mode: bool = False): rr.log( "video/detector", rr.ClassDescription( - info=rr.AnnotationInfo(id=0), keypoint_connections=[(0, 1), (1, 2), (2, 0), (2, 3), (0, 4), (1, 5)] + info=rr.AnnotationInfo(id=0), + keypoint_connections=[(0, 1), (1, 2), (2, 0), (2, 3), (0, 4), (1, 5)], ), static=True, ) @@ -132,7 +146,7 @@ def detect_and_log(self, image: cv2.typing.MatLike, frame_time_nano: int) -> Non if self._video_mode else self._detector.detect(image) ) - rr.log("video/detector/faces", rr.Clear(recursive=True)) + rr.log("video/detector/faces", rr.Clear(recursive=True), static=ALL_STATIC) for i, detection in enumerate(detection_result.detections): # log bounding box bbox = detection.bounding_box @@ -142,16 +156,23 @@ def detect_and_log(self, image: cv2.typing.MatLike, frame_time_nano: int) -> Non rr.log( f"video/detector/faces/{i}/bbox", rr.Boxes2D( - array=[bbox.origin_x, bbox.origin_y, bbox.width, bbox.height], array_format=rr.Box2DFormat.XYWH + array=[bbox.origin_x, bbox.origin_y, bbox.width, bbox.height], + array_format=rr.Box2DFormat.XYWH, ), rr.AnyValues(index=index, score=score), + static=ALL_STATIC, ) # MediaPipe's keypoints are normalized to [0, 1], so we need to scale them to get pixel coordinates. pts = [ - (math.floor(keypoint.x * width), math.floor(keypoint.y * height)) for keypoint in detection.keypoints + (math.floor(keypoint.x * width), math.floor(keypoint.y * height)) + for keypoint in detection.keypoints ] - rr.log(f"video/detector/faces/{i}/keypoints", rr.Points2D(pts, radii=3, keypoint_ids=list(range(6)))) + rr.log( + f"video/detector/faces/{i}/keypoints", + rr.Points2D(pts, radii=3, keypoint_ids=list(range(6))), + static=ALL_STATIC, + ) class FaceLandmarkerLogger: @@ -181,7 +202,9 @@ def __init__(self, video_mode: bool = False, num_faces: int = 1): base_options=self._base_options, output_face_blendshapes=True, num_faces=num_faces, - running_mode=mp.tasks.vision.RunningMode.VIDEO if self._video_mode else mp.tasks.vision.RunningMode.IMAGE, + running_mode=mp.tasks.vision.RunningMode.VIDEO + if self._video_mode + else mp.tasks.vision.RunningMode.IMAGE, ) self._detector = vision.FaceLandmarker.create_from_options(self._options) @@ -203,7 +226,9 @@ def __init__(self, video_mode: bool = False, num_faces: int = 1): mp.solutions.face_mesh.FACEMESH_NOSE, ] - self._class_ids = [0] * mp.solutions.face_mesh.FACEMESH_NUM_LANDMARKS_WITH_IRISES + self._class_ids = [ + 0 + ] * mp.solutions.face_mesh.FACEMESH_NUM_LANDMARKS_WITH_IRISES class_descriptions = [] for i, klass in enumerate(classes): # MediaPipe only provides connections for class, not actual class per keypoint. So we have to extract the @@ -223,7 +248,9 @@ def __init__(self, video_mode: bool = False, num_faces: int = 1): ) ) - rr.log("video/landmarker", rr.AnnotationContext(class_descriptions), static=True) + rr.log( + "video/landmarker", rr.AnnotationContext(class_descriptions), static=True + ) rr.log("reconstruction", rr.AnnotationContext(class_descriptions), static=True) # properly align the 3D face in the viewer @@ -246,26 +273,43 @@ def is_empty(i): # type: ignore[no-untyped-def] except StopIteration: return True - if is_empty(zip(detection_result.face_landmarks, detection_result.face_blendshapes)): - rr.log("video/landmarker/faces", rr.Clear(recursive=True)) - rr.log("reconstruction/faces", rr.Clear(recursive=True)) - rr.log("blendshapes", rr.Clear(recursive=True)) + if is_empty( + zip(detection_result.face_landmarks, detection_result.face_blendshapes) + ): + rr.log( + "video/landmarker/faces", rr.Clear(recursive=True), static=ALL_STATIC + ) + rr.log("reconstruction/faces", rr.Clear(recursive=True), static=ALL_STATIC) + rr.log("blendshapes", rr.Clear(recursive=True), static=ALL_STATIC) for i, (landmark, blendshapes) in enumerate( zip(detection_result.face_landmarks, detection_result.face_blendshapes) ): if len(landmark) == 0 or len(blendshapes) == 0: - rr.log(f"video/landmarker/faces/{i}/landmarks", rr.Clear(recursive=True)) - rr.log(f"reconstruction/faces/{i}", rr.Clear(recursive=True)) - rr.log(f"blendshapes/{i}", rr.Clear(recursive=True)) + rr.log( + f"video/landmarker/faces/{i}/landmarks", + rr.Clear(recursive=True), + static=ALL_STATIC, + ) + rr.log( + f"reconstruction/faces/{i}", + rr.Clear(recursive=True), + static=ALL_STATIC, + ) + rr.log(f"blendshapes/{i}", rr.Clear(recursive=True), static=ALL_STATIC) continue # MediaPipe's keypoints are normalized to [0, 1], so we need to scale them to get pixel coordinates. - pts = [(math.floor(lm.x * width), math.floor(lm.y * height)) for lm in landmark] + pts = [ + (math.floor(lm.x * width), math.floor(lm.y * height)) for lm in landmark + ] keypoint_ids = list(range(len(landmark))) rr.log( f"video/landmarker/faces/{i}/landmarks", - rr.Points2D(pts, radii=3, keypoint_ids=keypoint_ids, class_ids=self._class_ids), + rr.Points2D( + pts, radii=3, keypoint_ids=keypoint_ids, class_ids=self._class_ids + ), + static=ALL_STATIC, ) rr.log( @@ -275,11 +319,16 @@ def is_empty(i): # type: ignore[no-untyped-def] keypoint_ids=keypoint_ids, class_ids=self._class_ids, ), + static=ALL_STATIC, ) for blendshape in blendshapes: if blendshape.category_name in BLENDSHAPES_CATEGORIES: - rr.log(f"blendshapes/{i}/{blendshape.category_name}", rr.Scalar(blendshape.score)) + # NOTE(cmc): That one we still log as temporal, otherwise it's really meh. + rr.log( + f"blendshapes/{i}/{blendshape.category_name}", + rr.Scalar(blendshape.score), + ) # ======================================================================================== @@ -312,7 +361,9 @@ def resize_image(image: cv2.typing.MatLike, max_dim: int | None) -> cv2.typing.M return image -def run_from_video_capture(vid: int | str, max_dim: int | None, max_frame_count: int | None, num_faces: int) -> None: +def run_from_video_capture( + vid: int | str, max_dim: int | None, max_frame_count: int | None, num_faces: int +) -> None: """ Run the face detector on a video stream. @@ -337,7 +388,9 @@ def run_from_video_capture(vid: int | str, max_dim: int | None, max_frame_count: print("Capturing video stream. Press ctrl-c to stop.") try: - it: Iterable[int] = itertools.count() if max_frame_count is None else range(max_frame_count) + it: Iterable[int] = ( + itertools.count() if max_frame_count is None else range(max_frame_count) + ) for frame_idx in tqdm.tqdm(it, desc="Processing frames"): # Capture frame-by-frame @@ -362,7 +415,11 @@ def run_from_video_capture(vid: int | str, max_dim: int | None, max_frame_count: rr.set_time_nanos("frame_time", frame_time_nano) detector.detect_and_log(frame, frame_time_nano) landmarker.detect_and_log(frame, frame_time_nano) - rr.log("video/image", rr.Image(frame, color_model="BGR")) + rr.log( + "video/image", + rr.Image(frame, color_model="BGR"), + static=ALL_STATIC, + ) except KeyboardInterrupt: pass @@ -380,14 +437,20 @@ def run_from_sample_image(path: Path, max_dim: int | None, num_faces: int) -> No landmarker = FaceLandmarkerLogger(video_mode=False, num_faces=num_faces) logger.detect_and_log(image, 0) landmarker.detect_and_log(image, 0) - rr.log("video/image", rr.Image(image, color_model="BGR")) + rr.log( + "video/image", + rr.Image(image, color_model="BGR"), + static=ALL_STATIC, + ) def main() -> None: logging.getLogger().addHandler(logging.StreamHandler()) logging.getLogger().setLevel("INFO") - parser = argparse.ArgumentParser(description="Uses the MediaPipe Face Detection to track a human pose in video.") + parser = argparse.ArgumentParser( + description="Uses the MediaPipe Face Detection to track a human pose in video." + ) parser.add_argument( "--demo-image", action="store_true", @@ -400,7 +463,10 @@ def main() -> None: ) parser.add_argument("--video", type=Path, help="Run on the provided video file.") parser.add_argument( - "--camera", type=int, default=0, help="Run from the camera stream (parameter is the camera ID, usually 0)" + "--camera", + type=int, + default=0, + help="Run from the camera stream (parameter is the camera ID, usually 0)", ) parser.add_argument( "--max-frame", @@ -421,6 +487,9 @@ def main() -> None: "(temporal smoothing is applied only for a value of 1)." ), ) + parser.add_argument( + "--static", action="store_true", help="If set, logs everything as static" + ) rr.script_add_args(parser) @@ -450,6 +519,9 @@ def main() -> None: ), ) + global ALL_STATIC + ALL_STATIC = args.static + if args.demo_image: if not SAMPLE_IMAGE_PATH.exists(): download_file(SAMPLE_IMAGE_URL, SAMPLE_IMAGE_PATH) @@ -458,9 +530,13 @@ def main() -> None: elif args.image is not None: run_from_sample_image(args.image, args.max_dim, args.num_faces) elif args.video is not None: - run_from_video_capture(str(args.video), args.max_dim, args.max_frame, args.num_faces) + run_from_video_capture( + str(args.video), args.max_dim, args.max_frame, args.num_faces + ) else: - run_from_video_capture(args.camera, args.max_dim, args.max_frame, args.num_faces) + run_from_video_capture( + args.camera, args.max_dim, args.max_frame, args.num_faces + ) rr.script_teardown(args)