Skip to content

Commit

Permalink
Implement support for fully asynchronous QueryHandles (rerun-io#7964)
Browse files Browse the repository at this point in the history
This adds non-blocking methods to all our new shiny storage handles, and
uses these new non-blocking primitives to implement asynchronous helpers
(read: `Future`s) in our `QueryHandle`.
These helpers are then used on the other side to implement a proper
`Stream`.

In particular, all read locks are now recursive, always.
Recursive locks are mandatory for two reasons:
* As we start parallelizing the viewer further, and especially when
using work-stealing schedulers, nested read locks will become a common
occurrence.
* In asynchronous contexts, everything is designed around work stealing.
  • Loading branch information
teh-cmc authored Oct 31, 2024
1 parent 6624898 commit 5cf8451
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 69 deletions.
28 changes: 26 additions & 2 deletions crates/store/re_chunk_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,17 +341,34 @@ impl ChunkStoreHandle {
impl ChunkStoreHandle {
#[inline]
pub fn read(&self) -> parking_lot::RwLockReadGuard<'_, ChunkStore> {
self.0.read()
self.0.read_recursive()
}

#[inline]
pub fn try_read(&self) -> Option<parking_lot::RwLockReadGuard<'_, ChunkStore>> {
self.0.try_read_recursive()
}

#[inline]
pub fn write(&self) -> parking_lot::RwLockWriteGuard<'_, ChunkStore> {
self.0.write()
}

#[inline]
pub fn try_write(&self) -> Option<parking_lot::RwLockWriteGuard<'_, ChunkStore>> {
self.0.try_write()
}

#[inline]
pub fn read_arc(&self) -> parking_lot::ArcRwLockReadGuard<parking_lot::RawRwLock, ChunkStore> {
parking_lot::RwLock::read_arc(&self.0)
parking_lot::RwLock::read_arc_recursive(&self.0)
}

#[inline]
pub fn try_read_arc(
&self,
) -> Option<parking_lot::ArcRwLockReadGuard<parking_lot::RawRwLock, ChunkStore>> {
parking_lot::RwLock::try_read_recursive_arc(&self.0)
}

#[inline]
Expand All @@ -360,6 +377,13 @@ impl ChunkStoreHandle {
) -> parking_lot::ArcRwLockWriteGuard<parking_lot::RawRwLock, ChunkStore> {
parking_lot::RwLock::write_arc(&self.0)
}

#[inline]
pub fn try_write_arc(
&self,
) -> Option<parking_lot::ArcRwLockWriteGuard<parking_lot::RawRwLock, ChunkStore>> {
parking_lot::RwLock::try_write_arc(&self.0)
}
}

/// A complete chunk store: covers all timelines, all entities, everything.
Expand Down
6 changes: 3 additions & 3 deletions crates/store/re_dataframe/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl<E: StorageEngineLike + Clone> QueryEngine<E> {
/// * second, the component columns in lexical order (`Color`, `Radius, ...`).
#[inline]
pub fn schema(&self) -> Vec<ColumnDescriptor> {
self.engine.with_store(|store| store.schema())
self.engine.with(|store, _cache| store.schema())
}

/// Returns the filtered schema for the given [`QueryExpression`].
Expand All @@ -92,7 +92,7 @@ impl<E: StorageEngineLike + Clone> QueryEngine<E> {
#[inline]
pub fn schema_for_query(&self, query: &QueryExpression) -> Vec<ColumnDescriptor> {
self.engine
.with_store(|store| store.schema_for_query(query))
.with(|store, _cache| store.schema_for_query(query))
}

/// Starts a new query by instantiating a [`QueryHandle`].
Expand All @@ -107,7 +107,7 @@ impl<E: StorageEngineLike + Clone> QueryEngine<E> {
&self,
filter: &'a EntityPathFilter,
) -> impl Iterator<Item = EntityPath> + 'a {
self.engine.with_store(|store| {
self.engine.with(|store, _cache| {
store
.all_entities()
.into_iter()
Expand Down
89 changes: 74 additions & 15 deletions crates/store/re_dataframe/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,8 @@ impl<E: StorageEngineLike> QueryHandle<E> {
///
/// It is important that query handles stay cheap to create.
fn init(&self) -> &QueryHandleState {
self.state.get_or_init(|| {
self.engine
.with_store(|store| self.engine.with_cache(|cache| self.init_(store, cache)))
})
self.engine
.with(|store, cache| self.state.get_or_init(|| self.init_(store, cache)))
}

// NOTE: This is split in its own method otherwise it completely breaks `rustfmt`.
Expand Down Expand Up @@ -216,15 +214,16 @@ impl<E: StorageEngineLike> QueryHandle<E> {
.keep_extra_timelines(true) // we want all the timelines we can get!
.keep_extra_components(false)
};
let (view_pov_chunks_idx, mut view_chunks) = self.fetch_view_chunks(&query, &view_contents);
let (view_pov_chunks_idx, mut view_chunks) =
self.fetch_view_chunks(store, cache, &query, &view_contents);

// 5. Collect all relevant clear chunks and update the view accordingly.
//
// We'll turn the clears into actual empty arrays of the expected component type.
{
re_tracing::profile_scope!("clear_chunks");

let clear_chunks = self.fetch_clear_chunks(&query, &view_contents);
let clear_chunks = self.fetch_clear_chunks(store, cache, &query, &view_contents);
for (view_idx, chunks) in view_chunks.iter_mut().enumerate() {
let Some(ColumnDescriptor::Component(descr)) = view_contents.get(view_idx) else {
continue;
Expand Down Expand Up @@ -443,6 +442,8 @@ impl<E: StorageEngineLike> QueryHandle<E> {

fn fetch_view_chunks(
&self,
store: &ChunkStore,
cache: &QueryCache,
query: &RangeQuery,
view_contents: &[ColumnDescriptor],
) -> (Option<usize>, Vec<Vec<(AtomicU64, Chunk)>>) {
Expand All @@ -456,7 +457,13 @@ impl<E: StorageEngineLike> QueryHandle<E> {

ColumnDescriptor::Component(column) => {
let chunks = self
.fetch_chunks(query, &column.entity_path, [column.component_name])
.fetch_chunks(
store,
cache,
query,
&column.entity_path,
[column.component_name],
)
.unwrap_or_default();

if let Some(pov) = self.query.filtered_is_not_null.as_ref() {
Expand All @@ -481,6 +488,8 @@ impl<E: StorageEngineLike> QueryHandle<E> {
/// The component data is stripped out, only the indices are left.
fn fetch_clear_chunks(
&self,
store: &ChunkStore,
cache: &QueryCache,
query: &RangeQuery,
view_contents: &[ColumnDescriptor],
) -> IntMap<EntityPath, Vec<Chunk>> {
Expand Down Expand Up @@ -544,7 +553,7 @@ impl<E: StorageEngineLike> QueryHandle<E> {
// For the entity itself, any chunk that contains clear data is relevant, recursive or not.
// Just fetch everything we find.
let flat_chunks = self
.fetch_chunks(query, entity_path, component_names)
.fetch_chunks(store, cache, query, entity_path, component_names)
.map(|chunks| {
chunks
.into_iter()
Expand All @@ -555,7 +564,7 @@ impl<E: StorageEngineLike> QueryHandle<E> {

let recursive_chunks =
entity_path_ancestors(entity_path).flat_map(|ancestor_path| {
self.fetch_chunks(query, &ancestor_path, component_names)
self.fetch_chunks(store, cache, query, &ancestor_path, component_names)
.into_iter() // option
.flat_map(|chunks| chunks.into_iter().map(|(_cursor, chunk)| chunk))
// NOTE: Ancestors' chunks are only relevant for the rows where `ClearIsRecursive=true`.
Expand All @@ -577,6 +586,8 @@ impl<E: StorageEngineLike> QueryHandle<E> {

fn fetch_chunks<const N: usize>(
&self,
_store: &ChunkStore,
cache: &QueryCache,
query: &RangeQuery,
entity_path: &EntityPath,
component_names: [ComponentName; N],
Expand All @@ -587,9 +598,7 @@ impl<E: StorageEngineLike> QueryHandle<E> {
//
// TODO(cmc): Going through the cache is very useful in a Viewer context, but
// not so much in an SDK context. Make it configurable.
let results = self
.engine
.with_cache(|cache| cache.range(query, entity_path, component_names));
let results = cache.range(query, entity_path, component_names);

debug_assert!(
results.components.len() <= 1,
Expand Down Expand Up @@ -783,10 +792,43 @@ impl<E: StorageEngineLike> QueryHandle<E> {
/// ```
#[inline]
pub fn next_row(&self) -> Option<Vec<Box<dyn ArrowArray>>> {
self.engine.with_cache(|cache| self._next_row(cache))
self.engine
.with(|store, cache| self._next_row(store, cache))
}

/// Asynchronously returns the next row's worth of data.
///
/// The returned vector of Arrow arrays strictly follows the schema specified by [`Self::schema`].
/// Columns that do not yield any data will still be present in the results, filled with null values.
///
/// Each cell in the result corresponds to the latest _locally_ known value at that particular point in
/// the index, for each respective `ColumnDescriptor`.
/// See [`QueryExpression::sparse_fill_strategy`] to go beyond local resolution.
///
/// Example:
/// ```ignore
/// while let Some(row) = query_handle.next_row_async().await {
/// // …
/// }
/// ```
pub fn next_row_async(
&self,
) -> impl std::future::Future<Output = Option<Vec<Box<dyn ArrowArray>>>> {
let res: Option<Option<_>> = self
.engine
.try_with(|store, cache| self._next_row(store, cache));

std::future::poll_fn(move |_cx| match &res {
Some(row) => std::task::Poll::Ready(row.clone()),
None => std::task::Poll::Pending,
})
}

pub fn _next_row(&self, cache: &QueryCache) -> Option<Vec<Box<dyn ArrowArray>>> {
pub fn _next_row(
&self,
store: &ChunkStore,
cache: &QueryCache,
) -> Option<Vec<Box<dyn ArrowArray>>> {
re_tracing::profile_function!();

/// Temporary state used to resolve the streaming join for the current iteration.
Expand Down Expand Up @@ -816,7 +858,10 @@ impl<E: StorageEngineLike> QueryHandle<E> {
Retrofilled(UnitChunkShared),
}

let state = self.init();
// Although that's a synchronous lock, we probably don't need to worry about it until
// there is proof to the contrary: we are in a specific `QueryHandle` after all, there's
// really no good reason to be contending here in the first place.
let state = self.state.get_or_init(move || self.init_(store, cache));

let row_idx = state.cur_row.fetch_add(1, Ordering::Relaxed);
let cur_index_value = state.unique_index_values.get(row_idx as usize)?;
Expand Down Expand Up @@ -1160,6 +1205,20 @@ impl<E: StorageEngineLike> QueryHandle<E> {
data: ArrowChunk::new(self.next_row()?),
})
}

#[inline]
pub async fn next_row_batch_async(&self) -> Option<RecordBatch> {
let row = self.next_row_async().await?;

// If we managed to get a row, then the state must be initialized already.
#[allow(clippy::unwrap_used)]
let schema = self.state.get().unwrap().arrow_schema.clone();

Some(RecordBatch {
schema,
data: ArrowChunk::new(row),
})
}
}

impl<E: StorageEngineLike> QueryHandle<E> {
Expand Down
28 changes: 26 additions & 2 deletions crates/store/re_query/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,34 @@ impl QueryCacheHandle {
impl QueryCacheHandle {
#[inline]
pub fn read(&self) -> parking_lot::RwLockReadGuard<'_, QueryCache> {
self.0.read()
self.0.read_recursive()
}

#[inline]
pub fn try_read(&self) -> Option<parking_lot::RwLockReadGuard<'_, QueryCache>> {
self.0.try_read_recursive()
}

#[inline]
pub fn write(&self) -> parking_lot::RwLockWriteGuard<'_, QueryCache> {
self.0.write()
}

#[inline]
pub fn try_write(&self) -> Option<parking_lot::RwLockWriteGuard<'_, QueryCache>> {
self.0.try_write()
}

#[inline]
pub fn read_arc(&self) -> parking_lot::ArcRwLockReadGuard<parking_lot::RawRwLock, QueryCache> {
parking_lot::RwLock::read_arc(&self.0)
parking_lot::RwLock::read_arc_recursive(&self.0)
}

#[inline]
pub fn try_read_arc(
&self,
) -> Option<parking_lot::ArcRwLockReadGuard<parking_lot::RawRwLock, QueryCache>> {
parking_lot::RwLock::try_read_recursive_arc(&self.0)
}

#[inline]
Expand All @@ -112,6 +129,13 @@ impl QueryCacheHandle {
) -> parking_lot::ArcRwLockWriteGuard<parking_lot::RawRwLock, QueryCache> {
parking_lot::RwLock::write_arc(&self.0)
}

#[inline]
pub fn try_write_arc(
&self,
) -> Option<parking_lot::ArcRwLockWriteGuard<parking_lot::RawRwLock, QueryCache>> {
parking_lot::RwLock::try_write_arc(&self.0)
}
}

pub struct QueryCache {
Expand Down
Loading

0 comments on commit 5cf8451

Please sign in to comment.