Skip to content

Commit

Permalink
feat: share mmap handle inbetween snapshot providers & cursors (parad…
Browse files Browse the repository at this point in the history
…igmxyz#5162)

Co-authored-by: Alexey Shekhirin <[email protected]>
  • Loading branch information
joshieDo and shekhirin authored Oct 27, 2023
1 parent 006259b commit a2323c9
Show file tree
Hide file tree
Showing 6 changed files with 613 additions and 521 deletions.
30 changes: 17 additions & 13 deletions crates/storage/nippy-jar/src/cursor.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use crate::{
compression::{Compression, Compressors, Zstd},
InclusionFilter, NippyJar, NippyJarError, PerfectHashingFunction, RefRow,
InclusionFilter, MmapHandle, NippyJar, NippyJarError, PerfectHashingFunction, RefRow,
};
use memmap2::Mmap;
use serde::{de::Deserialize, ser::Serialize};
use std::{fs::File, ops::Range, sync::Arc};
use std::ops::Range;
use sucds::int_vectors::Access;
use zstd::bulk::Decompressor;

Expand All @@ -14,10 +13,7 @@ pub struct NippyJarCursor<'a, H = ()> {
/// [`NippyJar`] which holds most of the required configuration to read from the file.
jar: &'a NippyJar<H>,
/// Data file.
#[allow(unused)]
file_handle: Arc<File>,
/// Data file.
mmap_handle: Arc<Mmap>,
mmap_handle: MmapHandle,
/// Internal buffer to unload data to without reallocating memory on each retrieval.
internal_buffer: Vec<u8>,
/// Cursor row position.
Expand All @@ -38,16 +34,24 @@ where
H: Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static,
{
pub fn new(jar: &'a NippyJar<H>) -> Result<Self, NippyJarError> {
let file = File::open(jar.data_path())?;

// SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle.
let mmap = unsafe { Mmap::map(&file)? };
let max_row_size = jar.max_row_size;
Ok(NippyJarCursor {
jar,
mmap_handle: jar.open_data()?,
// Makes sure that we have enough buffer capacity to decompress any row of data.
internal_buffer: Vec::with_capacity(max_row_size),
row: 0,
})
}

pub fn with_handle(
jar: &'a NippyJar<H>,
mmap_handle: MmapHandle,
) -> Result<Self, NippyJarError> {
let max_row_size = jar.max_row_size;
Ok(NippyJarCursor {
jar,
file_handle: Arc::new(file),
mmap_handle: Arc::new(mmap),
mmap_handle,
// Makes sure that we have enough buffer capacity to decompress any row of data.
internal_buffer: Vec::with_capacity(max_row_size),
row: 0,
Expand Down
36 changes: 36 additions & 0 deletions crates/storage/nippy-jar/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@
#![deny(unused_must_use, rust_2018_idioms)]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]

use memmap2::Mmap;
use serde::{Deserialize, Serialize};
use std::{
clone::Clone,
error::Error as StdError,
fs::File,
io::{Seek, Write},
marker::Sync,
ops::Deref,
path::{Path, PathBuf},
sync::Arc,
};
use sucds::{
int_vectors::PrefixSummedEliasFano,
Expand Down Expand Up @@ -247,6 +250,11 @@ where
.join(format!("{}.idx", data_path.file_name().expect("exists").to_string_lossy()))
}

/// Returns a [`MmapHandle`] of the data file
pub fn open_data(&self) -> Result<MmapHandle, NippyJarError> {
MmapHandle::new(self.data_path())
}

/// If required, prepares any compression algorithm to an early pass of the data.
pub fn prepare_compression(
&mut self,
Expand Down Expand Up @@ -487,6 +495,34 @@ where
}
}

/// Holds an `Arc` over a file and its associated mmap handle.
#[derive(Debug, Clone)]
pub struct MmapHandle {
/// File descriptor. Needs to be kept alive as long as the mmap handle.
#[allow(unused)]
file: Arc<File>,
/// Mmap handle.
mmap: Arc<Mmap>,
}

impl MmapHandle {
pub fn new(path: impl AsRef<Path>) -> Result<Self, NippyJarError> {
let file = File::open(path)?;

// SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle.
let mmap = unsafe { Mmap::map(&file)? };

Ok(Self { file: Arc::new(file), mmap: Arc::new(mmap) })
}
}

impl Deref for MmapHandle {
type Target = Mmap;
fn deref(&self) -> &Self::Target {
&self.mmap
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading

0 comments on commit a2323c9

Please sign in to comment.