Skip to content

Commit

Permalink
Compression Profiles now supported
Browse files Browse the repository at this point in the history
  • Loading branch information
jguhlin committed Apr 28, 2024
1 parent 9507e9c commit 97659c2
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 165 deletions.
16 changes: 13 additions & 3 deletions compression/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,20 @@ impl CompressionConfig
self
}

pub const fn with_compression_level(mut self, compression_level: i8)
-> Self
pub fn with_compression_level(mut self, compression_level: i8) -> Self
{
self.compression_level = compression_level;
match self.check_compression_level() {
Ok(_) => {}
Err(x) => {
log::warn!(
"Compression level {} is out of range for {:?}. Setting to {}",
compression_level,
self.compression_type,
x
);
self.compression_level = x;
}
}
self
}

Expand Down
116 changes: 41 additions & 75 deletions libsfasta/src/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,13 @@ use libfractaltree::FractalTreeDisk;
/// ```
pub struct Converter
{
masking: bool,
index: bool,
threads: usize,
pub block_size: u64,
quality_scores: bool,
compression_type: CompressionType,
compression_level: Option<i8>,
dict: bool,
dict_samples: u64,
dict_size: u64,
compression_profile: CompressionProfile,
}

/// Default settings for Converter
Expand All @@ -60,13 +57,10 @@ impl Default for Converter
threads: 8,
block_size: 512 * 1024, // 512kb
index: true,
masking: false,
quality_scores: false,
compression_type: CompressionType::ZSTD,
dict: false,
dict_samples: 100,
dict_size: 110 * 1024,
compression_level: None,
compression_profile: CompressionProfile::default(),
}
}
}
Expand All @@ -75,7 +69,8 @@ impl Converter
{
// Builder configuration functions...
/// Specify a dictionary to use for compression. Untested.
pub fn with_dict(mut self, dict_samples: u64, dict_size: u64) -> Self
pub fn with_dict(&mut self, dict_samples: u64, dict_size: u64)
-> &mut Self
{
self.dict = true;
self.dict_samples = dict_samples;
Expand All @@ -84,49 +79,28 @@ impl Converter
}

/// Disable dictionary
pub fn without_dict(mut self) -> Self
pub fn without_dict(&mut self) -> &mut Self
{
self.dict = false;
self
}

/// Enable masking
pub fn with_masking(mut self) -> Self
{
self.masking = true;
self
}

/// Enable seq index
pub fn with_index(mut self) -> Self
pub fn with_index(&mut self) -> &mut Self
{
self.index = true;
self
}

/// Disable index
pub fn without_index(mut self) -> Self
pub fn without_index(&mut self) -> &mut Self
{
self.index = false;
self
}

/// Enable quality scores
pub fn with_scores(mut self) -> Self
{
self.quality_scores = true;
self
}

/// Disable quality scores
pub fn without_scores(mut self) -> Self
{
self.quality_scores = false;
self
}

/// Set the number of threads to use
pub fn with_threads(mut self, threads: usize) -> Self
pub fn with_threads(&mut self, threads: usize) -> &mut Self
{
assert!(
threads < u16::MAX as usize,
Expand All @@ -137,7 +111,7 @@ impl Converter
}

/// Set the block size for the sequence blocks
pub fn with_block_size(mut self, block_size: usize) -> Self
pub fn with_block_size(&mut self, block_size: usize) -> &mut Self
{
assert!(
block_size < u32::MAX as usize,
Expand All @@ -149,23 +123,15 @@ impl Converter
}

/// Set the compression type
pub fn with_compression_type(mut self, ct: CompressionType) -> Self
pub fn with_compression(
&mut self,
ct: CompressionType,
level: i8,
) -> &mut Self
{
self.compression_type = ct;
self
}

/// Set the compression level
pub fn with_compression_level(mut self, level: i8) -> Self
{
self.compression_level = Some(level);
self
}

/// Reset compression level to default
pub fn with_default_compression_level(mut self) -> Self
{
self.compression_level = None;
log::info!("Setting compression to {:?} at level {}", ct, level);
log::info!("Custom compression profiles often perform better...");
self.compression_profile = CompressionProfile::set_global(ct, level);
self
}

Expand Down Expand Up @@ -244,12 +210,7 @@ impl Converter
// Compression seems to take care of the size. bitvec! and vec! seem
// to have similar performance and on-disk storage
// requirements
if self.masking {
sfasta = sfasta.with_masking();
}

sfasta.parameters.as_mut().unwrap().compression_type =
self.compression_type;
sfasta.parameters.as_mut().unwrap().block_size = self.block_size as u32;

// Set dummy values for the directory
Expand Down Expand Up @@ -337,11 +298,9 @@ impl Converter
let start = out_buffer_thread.stream_position().unwrap();

let mut index: FractalTreeDisk<u32, u32> = index.into();
index.set_compression(CompressionConfig {
compression_type: CompressionType::ZSTD,
compression_level: 1,
compression_dict: None,
});
// todo can the index be made into a dict?
index
.set_compression(self.compression_profile.id_index.clone());

fractaltree_pos = index
.write_to_buffer(&mut *out_buffer_thread)
Expand Down Expand Up @@ -467,46 +426,53 @@ impl Converter

let compression_workers_thread = Arc::clone(&compression_workers);

let compression_config = CompressionConfig {
compression_type: self.compression_type,
compression_level: self.compression_level.unwrap_or(9),
compression_dict: None,
};

let mut seqlocs = SeqLocsStoreBuilder::default();
// let seqlocs = SeqLocsThreadBuilder::new(seqlocs);
seqlocs =
seqlocs.with_compression(self.compression_profile.seqlocs.clone());

// todo lots of clones below...

let mut headers = StringBlockStoreBuilder::default()
.with_block_size(block_size as usize)
.with_compression(compression_config.clone())
.with_compression(self.compression_profile.data.headers.clone())
.with_tree_compression(
self.compression_profile.index.headers.clone(),
)
.with_compression_worker(Arc::clone(&compression_workers_thread));

// let headers = ThreadBuilder::new(headers);

let mut ids = StringBlockStoreBuilder::default()
.with_block_size(block_size as usize)
.with_compression(compression_config.clone())
.with_compression(self.compression_profile.data.ids.clone())
.with_tree_compression(self.compression_profile.index.ids.clone())
.with_compression_worker(Arc::clone(&compression_workers_thread));

// let ids = ThreadBuilder::new(ids);

let mut masking = MaskingStoreBuilder::default()
.with_compression_worker(Arc::clone(&compression_workers_thread))
.with_compression(compression_config.clone())
.with_compression(self.compression_profile.data.masking.clone())
.with_tree_compression(
self.compression_profile.index.masking.clone(),
)
.with_block_size(block_size as usize);

// let masking = ThreadBuilder::new(masking);

let mut sequences = BytesBlockStoreBuilder::default()
.with_block_size(block_size as usize)
.with_compression(compression_config.clone())
.with_compression(self.compression_profile.data.sequence.clone())
.with_compression_worker(Arc::clone(&compression_workers_thread));

// let sequences = ThreadBuilder::new(sequences);

let mut scores = BytesBlockStoreBuilder::default()
.with_block_size(block_size as usize)
.with_compression(compression_config.clone())
.with_compression(self.compression_profile.data.quality.clone())
.with_tree_compression(
self.compression_profile.index.quality.clone(),
)
.with_compression_worker(Arc::clone(&compression_workers_thread));

// let scores = ThreadBuilder::new(scores);
Expand Down Expand Up @@ -789,8 +755,8 @@ mod tests
.expect("Unable to open testing file"),
);

let converter =
Converter::default().with_threads(6).with_block_size(8192);
let mut converter = Converter::default();
converter.with_threads(6).with_block_size(8192);

let mut out_buf = converter.convert(&mut in_buf, out_buf);

Expand Down
55 changes: 50 additions & 5 deletions libsfasta/src/datatypes/compression_profile.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};

use libcompression::CompressionConfig;
use libcompression::{CompressionConfig, CompressionType};

impl CompressionProfile
{
Expand Down Expand Up @@ -43,6 +43,19 @@ impl CompressionProfile
))
.unwrap()
}

pub fn set_global(ct: CompressionType, level: i8) -> Self
{
let config = CompressionConfig::new()
.with_compression_type(ct)
.with_compression_level(level);
Self {
data: DataCompressionProfile::splat(config.clone()),
index: IndexCompressionProfile::splat(config.clone()),
seqlocs: config.clone(),
id_index: config,
}
}
}

impl Default for CompressionProfile
Expand All @@ -56,10 +69,10 @@ impl Default for CompressionProfile
#[derive(Serialize, Deserialize)]
pub struct CompressionProfile
{
data: DataCompressionProfile,
index: IndexCompressionProfile,
seqlocs: CompressionConfig,
id_index: CompressionConfig,
pub data: DataCompressionProfile,
pub index: IndexCompressionProfile,
pub seqlocs: CompressionConfig,
pub id_index: CompressionConfig,
}

#[derive(Default, Serialize, Deserialize)]
Expand All @@ -74,6 +87,22 @@ pub struct DataCompressionProfile
pub modifications: CompressionConfig,
}

impl DataCompressionProfile
{
fn splat(cc: CompressionConfig) -> Self
{
Self {
ids: cc.clone(),
headers: cc.clone(),
sequence: cc.clone(),
masking: cc.clone(),
quality: cc.clone(),
signals: cc.clone(),
modifications: cc.clone(),
}
}
}

#[derive(Default, Serialize, Deserialize)]
pub struct IndexCompressionProfile
{
Expand All @@ -86,6 +115,22 @@ pub struct IndexCompressionProfile
pub modifications: CompressionConfig,
}

impl IndexCompressionProfile
{
fn splat(cc: CompressionConfig) -> Self
{
Self {
ids: cc.clone(),
headers: cc.clone(),
sequence: cc.clone(),
masking: cc.clone(),
quality: cc.clone(),
signals: cc.clone(),
modifications: cc.clone(),
}
}
}

#[cfg(test)]

mod tests
Expand Down
12 changes: 12 additions & 0 deletions libsfasta/src/datatypes/seq_loc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ pub struct SeqLocsStoreBuilder
{
pub location: u64,
pub tree: FractalTreeBuild<u32, SeqLoc>,
pub compression: CompressionConfig,
count: usize,
}

Expand All @@ -391,6 +392,11 @@ impl Default for SeqLocsStoreBuilder
SeqLocsStoreBuilder {
location: 0,
tree: FractalTreeBuild::new(2048, 8192),
compression: CompressionConfig {
compression_type: CompressionType::ZSTD,
compression_level: 1,
compression_dict: None,
},
count: 0,
}
}
Expand All @@ -404,6 +410,12 @@ impl SeqLocsStoreBuilder
SeqLocsStoreBuilder::default()
}

pub fn with_compression(mut self, compression: CompressionConfig) -> Self
{
self.compression = compression;
self
}

/// Add a SeqLoc to the store
pub fn add_to_index(&mut self, seqloc: SeqLoc) -> u32
{
Expand Down
Loading

0 comments on commit 97659c2

Please sign in to comment.