Skip to content

Commit

Permalink
feat: create SparseRepoData from byte slices (conda#624)
Browse files Browse the repository at this point in the history
Fixes conda#619

---------

Co-authored-by: Bas Zalmstra <[email protected]>
  • Loading branch information
aochagavia and baszalmstra authored Apr 29, 2024
1 parent cac9d14 commit 526cd23
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 32 deletions.
3 changes: 2 additions & 1 deletion crates/rattler_repodata_gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ readme.workspace = true
[dependencies]
async-compression = { workspace = true, features = ["gzip", "tokio", "bzip2", "zstd"] }
blake2 = { workspace = true }
bytes = { workspace = true, optional = true }
cache_control = { workspace = true }
chrono = { workspace = true, features = ["std", "serde", "alloc", "clock"] }
humansize = { workspace = true }
Expand Down Expand Up @@ -61,4 +62,4 @@ tracing-test = { workspace = true }
default = ['native-tls']
native-tls = ['reqwest/native-tls']
rustls-tls = ['reqwest/rustls-tls']
sparse = ["rattler_conda_types", "memmap2", "ouroboros", "superslice", "itertools", "serde_json/raw_value"]
sparse = ["rattler_conda_types", "memmap2", "ouroboros", "superslice", "itertools", "serde_json/raw_value", "bytes"]
157 changes: 126 additions & 31 deletions crates/rattler_repodata_gateway/src/sparse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#![allow(clippy::mem_forget)]

use bytes::Bytes;
use futures::{stream, StreamExt, TryFutureExt, TryStreamExt};
use itertools::Itertools;
use rattler_conda_types::{
Expand All @@ -24,8 +25,7 @@ use superslice::Ext;
/// A struct to enable loading records from a `repodata.json` file on demand. Since most of the time you
/// don't need all the records from the `repodata.json` this can help provide some significant speedups.
pub struct SparseRepoData {
/// Data structure that holds a memory mapped repodata.json file and an index into the the records
/// store in that data.
/// Data structure that holds an index into the the records stored in a repo data.
inner: SparseRepoDataInner,

/// The channel from which this data was downloaded.
Expand All @@ -39,11 +39,26 @@ pub struct SparseRepoData {
patch_record_fn: Option<fn(&mut PackageRecord)>,
}

enum SparseRepoDataInner {
/// The repo data is stored as a memory mapped file
Memmapped(MemmappedSparseRepoDataInner),
/// The repo data is stored as `Bytes`
Bytes(BytesSparseRepoDataInner),
}

impl SparseRepoDataInner {
fn borrow_repo_data(&self) -> &LazyRepoData<'_> {
match self {
SparseRepoDataInner::Memmapped(inner) => inner.borrow_repo_data(),
SparseRepoDataInner::Bytes(inner) => inner.borrow_repo_data(),
}
}
}

/// A struct that holds a memory map of a `repodata.json` file and also a self-referential field which
/// indexes the data in the memory map with a sparsely parsed json struct. See [`LazyRepoData`].
#[ouroboros::self_referencing]
struct SparseRepoDataInner {
struct MemmappedSparseRepoDataInner {
/// Memory map of the `repodata.json` file
memory_map: memmap2::Mmap,

Expand All @@ -54,8 +69,23 @@ struct SparseRepoDataInner {
repo_data: LazyRepoData<'this>,
}

/// A struct that holds a reference to the bytes of a `repodata.json` file and also a self-referential
/// field which indexes the data in the `bytes` with a sparsely parsed json struct. See [`LazyRepoData`].
#[ouroboros::self_referencing]
struct BytesSparseRepoDataInner {
/// Bytes of the `repodata.json` file
bytes: Bytes,

/// Sparsely parsed json content of the file's bytes. This data struct holds references into the
/// bytes so we have to use ouroboros to make this legal.
#[borrows(bytes)]
#[covariant]
repo_data: LazyRepoData<'this>,
}

impl SparseRepoData {
/// Construct an instance of self from a file on disk and a [`Channel`].
///
/// The `patch_function` can be used to patch the package record after it has been parsed
/// (e.g. to add `pip` to `python`).
pub fn new(
Expand All @@ -67,17 +97,43 @@ impl SparseRepoData {
let file = std::fs::File::open(path)?;
let memory_map = unsafe { memmap2::Mmap::map(&file) }?;
Ok(SparseRepoData {
inner: SparseRepoDataInnerTryBuilder {
memory_map,
repo_data_builder: |memory_map| serde_json::from_slice(memory_map.as_ref()),
}
.try_build()?,
inner: SparseRepoDataInner::Memmapped(
MemmappedSparseRepoDataInnerTryBuilder {
memory_map,
repo_data_builder: |memory_map| serde_json::from_slice(memory_map.as_ref()),
}
.try_build()?,
),
subdir: subdir.into(),
channel,
patch_record_fn: patch_function,
})
}

/// Construct an instance of self from a bytes and a [`Channel`].
///
/// The `patch_function` can be used to patch the package record after it has been parsed
/// (e.g. to add `pip` to `python`).
pub fn from_bytes(
channel: Channel,
subdir: impl Into<String>,
bytes: Bytes,
patch_function: Option<fn(&mut PackageRecord)>,
) -> Result<Self, serde_json::Error> {
Ok(Self {
inner: SparseRepoDataInner::Bytes(
BytesSparseRepoDataInnerTryBuilder {
bytes,
repo_data_builder: |bytes| serde_json::from_slice(bytes),
}
.try_build()?,
),
channel,
subdir: subdir.into(),
patch_record_fn: patch_function,
})
}

/// Returns an iterator over all package names in this repodata file.
///
/// This works by iterating over all elements in the `packages` and `conda_packages` fields of
Expand Down Expand Up @@ -350,7 +406,7 @@ fn deserialize_filename_and_raw_record<'d, D: Deserializer<'d>>(
//
// Since (in most cases) the repodata is already ordered by filename which does closely resemble
// ordering by package name this sort operation will most likely be very fast.
entries.sort_by(|(a, _), (b, _)| a.package.cmp(b.package));
entries.sort_unstable_by(|(a, _), (b, _)| a.package.cmp(b.package));

Ok(entries)
}
Expand Down Expand Up @@ -386,7 +442,8 @@ impl<'de> TryFrom<&'de str> for PackageFilename<'de> {

#[cfg(test)]
mod test {
use super::{load_repo_data_recursively, PackageFilename};
use super::{load_repo_data_recursively, PackageFilename, SparseRepoData};
use bytes::Bytes;
use rattler_conda_types::{Channel, ChannelConfig, PackageName, RepoData, RepoDataRecord};
use rstest::rstest;
use std::path::{Path, PathBuf};
Expand All @@ -395,23 +452,54 @@ mod test {
Path::new(env!("CARGO_MANIFEST_DIR")).join("../../test-data")
}

fn default_repo_datas() -> Vec<(Channel, &'static str, PathBuf)> {
let channel_config = ChannelConfig::default_with_root_dir(std::env::current_dir().unwrap());
vec![
(
Channel::from_str("conda-forge", &channel_config).unwrap(),
"noarch",
test_dir().join("channels/conda-forge/noarch/repodata.json"),
),
(
Channel::from_str("conda-forge", &channel_config).unwrap(),
"linux-64",
test_dir().join("channels/conda-forge/linux-64/repodata.json"),
),
]
}

fn default_repo_data_bytes() -> Vec<(Channel, &'static str, Bytes)> {
default_repo_datas()
.into_iter()
.map(|(channel, subdir, path)| {
let bytes = std::fs::read(path).unwrap();
(channel, subdir, bytes.into())
})
.collect()
}

fn load_sparse_from_bytes(
repo_datas: &[(Channel, &'static str, Bytes)],
package_names: impl IntoIterator<Item = impl AsRef<str>>,
) -> Vec<Vec<RepoDataRecord>> {
let sparse: Vec<_> = repo_datas
.iter()
.map(|(channel, subdir, bytes)| {
SparseRepoData::from_bytes(channel.clone(), *subdir, bytes.clone(), None).unwrap()
})
.collect();

let package_names = package_names
.into_iter()
.map(|name| PackageName::try_from(name.as_ref()).unwrap());
SparseRepoData::load_records_recursive(&sparse, package_names, None).unwrap()
}

async fn load_sparse(
package_names: impl IntoIterator<Item = impl AsRef<str>>,
) -> Vec<Vec<RepoDataRecord>> {
let channel_config = ChannelConfig::default_with_root_dir(std::env::current_dir().unwrap());
load_repo_data_recursively(
[
(
Channel::from_str("conda-forge", &channel_config).unwrap(),
"noarch",
test_dir().join("channels/conda-forge/noarch/repodata.json"),
),
(
Channel::from_str("conda-forge", &channel_config).unwrap(),
"linux-64",
test_dir().join("channels/conda-forge/linux-64/repodata.json"),
),
],
default_repo_datas(),
package_names
.into_iter()
.map(|name| PackageName::try_from(name.as_ref()).unwrap()),
Expand Down Expand Up @@ -464,7 +552,7 @@ mod test {

#[tokio::test]
async fn test_sparse_numpy_dev() {
let sparse_empty_data = load_sparse([
let package_names = vec![
"python",
"cython",
"compilers",
Expand All @@ -487,13 +575,20 @@ mod test {
"gitpython",
"cffi",
"pytz",
])
.await;
];

let total_records = sparse_empty_data
.iter()
.map(std::vec::Vec::len)
.sum::<usize>();
// Memmapped
let sparse_empty_data = load_sparse(package_names.clone()).await;

let total_records = sparse_empty_data.iter().map(Vec::len).sum::<usize>();

assert_eq!(total_records, 16065);

// Bytes
let repo_datas = default_repo_data_bytes();
let sparse_empty_data = load_sparse_from_bytes(&repo_datas, package_names);

let total_records = sparse_empty_data.iter().map(Vec::len).sum::<usize>();

assert_eq!(total_records, 16065);
}
Expand Down

0 comments on commit 526cd23

Please sign in to comment.