Skip to content

Commit

Permalink
feat: share decompressor buffer (paradigmxyz#5777)
Browse files Browse the repository at this point in the history
  • Loading branch information
joshieDo authored Dec 18, 2023
1 parent 43f29fe commit 1d25829
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 35 deletions.
45 changes: 37 additions & 8 deletions crates/primitives/src/compression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ thread_local! {
);

/// Thread Transaction decompressor.
pub static TRANSACTION_DECOMPRESSOR: RefCell<Decompressor<'static>> = RefCell::new(
Decompressor::with_dictionary(TRANSACTION_DICTIONARY)
.expect("Failed to initialize decompressor."),
);
pub static TRANSACTION_DECOMPRESSOR: RefCell<ReusableDecompressor> = RefCell::new(
ReusableDecompressor::new(Decompressor::with_dictionary(TRANSACTION_DICTIONARY).expect("Failed to initialize decompressor."))
);

/// Thread receipt compressor.
pub static RECEIPT_COMPRESSOR: RefCell<Compressor<'static>> = RefCell::new(
Expand All @@ -28,8 +27,38 @@ thread_local! {
);

/// Thread receipt decompressor.
pub static RECEIPT_DECOMPRESSOR: RefCell<Decompressor<'static>> = RefCell::new(
Decompressor::with_dictionary(RECEIPT_DICTIONARY)
.expect("Failed to initialize decompressor."),
);
pub static RECEIPT_DECOMPRESSOR: RefCell<ReusableDecompressor> = RefCell::new(
ReusableDecompressor::new(Decompressor::with_dictionary(RECEIPT_DICTIONARY).expect("Failed to initialize decompressor."))
);
}

/// Reusable decompressor that uses its own internal buffer.
#[allow(missing_debug_implementations)]
pub struct ReusableDecompressor {
/// zstd decompressor
decompressor: Decompressor<'static>,
/// buffer to decompress to.
buf: Vec<u8>,
}

impl ReusableDecompressor {
fn new(decompressor: Decompressor<'static>) -> Self {
Self { decompressor, buf: Vec::with_capacity(4096) }
}

/// Decompresses `src` reusing the decompressor and its internal buffer.
pub fn decompress(&mut self, src: &[u8]) -> &[u8] {
// `decompress_to_buffer` will return an error if the output buffer doesn't have
// enough capacity. However we don't actually have information on the required
// length. So we hope for the best, and keep trying again with a fairly bigger size
// if it fails.
while let Err(err) = self.decompressor.decompress_to_buffer(src, &mut self.buf) {
let err = err.to_string();
if !err.contains("Destination buffer is too small") {
panic!("Failed to decompress: {}", err);
}
self.buf.reserve(self.buf.capacity() + 24_000);
}
&self.buf
}
}
18 changes: 3 additions & 15 deletions crates/primitives/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -897,25 +897,13 @@ impl Compact for TransactionSignedNoHash {
let zstd_bit = bitflags >> 3;
let (transaction, buf) = if zstd_bit != 0 {
TRANSACTION_DECOMPRESSOR.with(|decompressor| {
let mut decompressor = decompressor.borrow_mut();
let mut tmp: Vec<u8> = Vec::with_capacity(200);

// `decompress_to_buffer` will return an error if the output buffer doesn't have
// enough capacity. However we don't actually have information on the required
// length. So we hope for the best, and keep trying again with a fairly bigger size
// if it fails.
while let Err(err) = decompressor.decompress_to_buffer(buf, &mut tmp) {
let err = err.to_string();
if !err.contains("Destination buffer is too small") {
panic!("Failed to decompress: {}", err);
}
tmp.reserve(tmp.capacity() + 24_000);
}
let decompressor = &mut decompressor.borrow_mut();

// TODO: enforce that zstd is only present at a "top" level type

let transaction_type = (bitflags & 0b110) >> 1;
let (transaction, _) = Transaction::from_compact(tmp.as_slice(), transaction_type);
let (transaction, _) =
Transaction::from_compact(decompressor.decompress(buf), transaction_type);

(transaction, buf)
})
Expand Down
15 changes: 3 additions & 12 deletions crates/storage/codecs/derive/src/compact/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,20 +115,11 @@ fn generate_from_compact(fields: &FieldList, ident: &Ident, is_zstd: bool) -> To
quote! {
if flags.__zstd() != 0 {
#decompressor.with(|decompressor| {
let mut decompressor = decompressor.borrow_mut();

let mut tmp: Vec<u8> = Vec::with_capacity(300);

while let Err(err) = decompressor.decompress_to_buffer(&buf[..], &mut tmp) {
let err = err.to_string();
if !err.contains("Destination buffer is too small") {
panic!("Failed to decompress: {}", err);
}
tmp.reserve(tmp.capacity() + 10_000);
}
let decompressor = &mut decompressor.borrow_mut();
let decompressed = decompressor.decompress(buf);
let mut original_buf = buf;

let mut buf: &[u8] = tmp.as_slice();
let mut buf: &[u8] = decompressed;
#(#lines)*
(obj, original_buf)
})
Expand Down

0 comments on commit 1d25829

Please sign in to comment.