Skip to content

Commit

Permalink
feat: remove const generics from cursors and add segment masks (parad…
Browse files Browse the repository at this point in the history
…igmxyz#5181)

Co-authored-by: Alexey Shekhirin <[email protected]>
  • Loading branch information
joshieDo and shekhirin authored Oct 30, 2023
1 parent 74a2bf3 commit e73ece9
Show file tree
Hide file tree
Showing 11 changed files with 313 additions and 238 deletions.
34 changes: 13 additions & 21 deletions bin/reth/src/db/snapshots/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use super::{
Command,
};
use rand::{seq::SliceRandom, Rng};
use reth_db::{database::Database, open_db_read_only, table::Decompress};
use reth_db::{database::Database, open_db_read_only, snapshot::HeaderMask};
use reth_interfaces::db::LogLevel;
use reth_primitives::{
snapshot::{Compression, Filters, InclusionFilter, PerfectHashingFunction},
ChainSpec, Header, SnapshotSegment,
BlockHash, ChainSpec, Header, SnapshotSegment,
};
use reth_provider::{
providers::SnapshotProvider, DatabaseProviderRO, HeaderProvider, ProviderError, ProviderFactory,
Expand Down Expand Up @@ -71,14 +71,9 @@ impl Command {
compression,
|| {
for num in row_indexes.iter() {
Header::decompress(
cursor
.row_by_number_with_cols::<0b01, 2>((num - self.from) as usize)?
.ok_or(ProviderError::HeaderNotFound((*num).into()))?[0],
)?;
// TODO: replace with below when eventually SnapshotProvider re-uses cursor
// provider.header_by_number(num as
// u64)?.ok_or(ProviderError::HeaderNotFound((*num as u64).into()))?;
cursor
.get_one::<HeaderMask<Header>>((*num).into())?
.ok_or(ProviderError::HeaderNotFound((*num).into()))?;
}
Ok(())
},
Expand Down Expand Up @@ -106,11 +101,9 @@ impl Command {
filters,
compression,
|| {
Ok(Header::decompress(
cursor
.row_by_number_with_cols::<0b01, 2>((num - self.from) as usize)?
.ok_or(ProviderError::HeaderNotFound((num as u64).into()))?[0],
)?)
Ok(cursor
.get_one::<HeaderMask<Header>>(num.into())?
.ok_or(ProviderError::HeaderNotFound(num.into()))?)
},
|provider| {
Ok(provider
Expand All @@ -136,14 +129,13 @@ impl Command {
filters,
compression,
|| {
let header = Header::decompress(
cursor
.row_by_key_with_cols::<0b01, 2>(header_hash.as_slice())?
.ok_or(ProviderError::HeaderNotFound(header_hash.into()))?[0],
)?;
let (header, hash) = cursor
.get_two::<HeaderMask<Header, BlockHash>>((&header_hash).into())?
.ok_or(ProviderError::HeaderNotFound(header_hash.into()))?;

// Might be a false positive, so in the real world we have to validate it
assert_eq!(header.hash_slow(), header_hash);
assert_eq!(hash, header_hash);

Ok(header)
},
|provider| {
Expand Down
33 changes: 10 additions & 23 deletions bin/reth/src/db/snapshots/receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::{
Command, Compression, PerfectHashingFunction,
};
use rand::{seq::SliceRandom, Rng};
use reth_db::{database::Database, open_db_read_only, table::Decompress};
use reth_db::{database::Database, open_db_read_only, snapshot::ReceiptMask};
use reth_interfaces::db::LogLevel;
use reth_primitives::{
snapshot::{Filters, InclusionFilter},
Expand Down Expand Up @@ -81,16 +81,9 @@ impl Command {
compression,
|| {
for num in row_indexes.iter() {
Receipt::decompress(
cursor
.row_by_number_with_cols::<0b1, 1>(
(num - tx_range.start()) as usize,
)?
.ok_or(ProviderError::ReceiptNotFound((*num).into()))?[0],
)?;
// TODO: replace with below when eventually SnapshotProvider re-uses cursor
// provider.receipt(num as
// u64)?.ok_or(ProviderError::ReceiptNotFound((*num).into()))?;
cursor
.get_one::<ReceiptMask<Receipt>>((*num).into())?
.ok_or(ProviderError::ReceiptNotFound((*num).into()))?;
}
Ok(())
},
Expand Down Expand Up @@ -118,11 +111,9 @@ impl Command {
filters,
compression,
|| {
Ok(Receipt::decompress(
cursor
.row_by_number_with_cols::<0b1, 1>((num - tx_range.start()) as usize)?
.ok_or(ProviderError::ReceiptNotFound((num as u64).into()))?[0],
)?)
Ok(cursor
.get_one::<ReceiptMask<Receipt>>(num.into())?
.ok_or(ProviderError::ReceiptNotFound(num.into()))?)
},
|provider| {
Ok(provider
Expand All @@ -148,13 +139,9 @@ impl Command {
filters,
compression,
|| {
let receipt = Receipt::decompress(
cursor
.row_by_key_with_cols::<0b1, 1>(tx_hash.as_slice())?
.ok_or(ProviderError::ReceiptNotFound(tx_hash.into()))?[0],
)?;

Ok(receipt)
Ok(cursor
.get_one::<ReceiptMask<Receipt>>((&tx_hash).into())?
.ok_or(ProviderError::ReceiptNotFound(tx_hash.into()))?)
},
|provider| {
Ok(provider
Expand Down
41 changes: 15 additions & 26 deletions bin/reth/src/db/snapshots/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::{
Command, Compression, PerfectHashingFunction,
};
use rand::{seq::SliceRandom, Rng};
use reth_db::{database::Database, open_db_read_only, table::Decompress};
use reth_db::{database::Database, open_db_read_only, snapshot::TransactionMask};
use reth_interfaces::db::LogLevel;
use reth_primitives::{
snapshot::{Filters, InclusionFilter},
Expand Down Expand Up @@ -81,17 +81,10 @@ impl Command {
compression,
|| {
for num in row_indexes.iter() {
TransactionSignedNoHash::decompress(
cursor
.row_by_number_with_cols::<0b1, 1>(
(num - tx_range.start()) as usize,
)?
.ok_or(ProviderError::TransactionNotFound((*num).into()))?[0],
)?
.with_hash();
// TODO: replace with below when eventually SnapshotProvider re-uses cursor
// provider.transaction_by_id(num as
// u64)?.ok_or(ProviderError::TransactionNotFound((*num).into()))?;
cursor
.get_one::<TransactionMask<TransactionSignedNoHash>>((*num).into())?
.ok_or(ProviderError::TransactionNotFound((*num).into()))?
.with_hash();
}
Ok(())
},
Expand Down Expand Up @@ -119,12 +112,10 @@ impl Command {
filters,
compression,
|| {
Ok(TransactionSignedNoHash::decompress(
cursor
.row_by_number_with_cols::<0b1, 1>((num - tx_range.start()) as usize)?
.ok_or(ProviderError::TransactionNotFound((num as u64).into()))?[0],
)?
.with_hash())
Ok(cursor
.get_one::<TransactionMask<TransactionSignedNoHash>>(num.into())?
.ok_or(ProviderError::TransactionNotFound(num.into()))?
.with_hash())
},
|provider| {
Ok(provider
Expand All @@ -150,14 +141,12 @@ impl Command {
filters,
compression,
|| {
let transaction = TransactionSignedNoHash::decompress(
cursor
.row_by_key_with_cols::<0b1, 1>(transaction_hash.as_slice())?
.ok_or(ProviderError::TransactionNotFound(transaction_hash.into()))?[0],
)?;

// Might be a false positive, so in the real world we have to validate it
Ok(transaction.with_hash())
Ok(cursor
.get_one::<TransactionMask<TransactionSignedNoHash>>(
(&transaction_hash).into(),
)?
.ok_or(ProviderError::TransactionNotFound(transaction_hash.into()))?
.with_hash())
},
|provider| {
Ok(provider
Expand Down
105 changes: 105 additions & 0 deletions crates/storage/db/src/snapshot/cursor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use super::mask::{ColumnSelectorOne, ColumnSelectorThree, ColumnSelectorTwo};
use crate::table::Decompress;
use derive_more::{Deref, DerefMut};
use reth_interfaces::{RethError, RethResult};
use reth_nippy_jar::{MmapHandle, NippyJar, NippyJarCursor};
use reth_primitives::{snapshot::SegmentHeader, B256};

/// Cursor of a snapshot segment.
#[derive(Debug, Deref, DerefMut)]
pub struct SnapshotCursor<'a>(NippyJarCursor<'a, SegmentHeader>);

impl<'a> SnapshotCursor<'a> {
/// Returns a new [`SnapshotCursor`].
pub fn new(
jar: &'a NippyJar<SegmentHeader>,
mmap_handle: MmapHandle,
) -> Result<Self, RethError> {
Ok(Self(NippyJarCursor::with_handle(jar, mmap_handle)?))
}

/// Gets a row of values.
pub fn get(
&mut self,
key_or_num: KeyOrNumber<'_>,
mask: usize,
) -> RethResult<Option<Vec<&'_ [u8]>>> {
let row = match key_or_num {
KeyOrNumber::Key(k) => self.row_by_key_with_cols(k, mask),
KeyOrNumber::Number(n) => {
let offset = self.jar().user_header().start();
if offset > n {
return Ok(None)
}
self.row_by_number_with_cols((n - offset) as usize, mask)
}
}?;

Ok(row)
}

/// Gets one column value from a row.
pub fn get_one<M: ColumnSelectorOne>(
&mut self,
key_or_num: KeyOrNumber<'_>,
) -> RethResult<Option<M::FIRST>> {
let row = self.get(key_or_num, M::MASK)?;

match row {
Some(row) => Ok(Some(M::FIRST::decompress(row[0])?)),
None => Ok(None),
}
}

/// Gets two column values from a row.
pub fn get_two<M: ColumnSelectorTwo>(
&mut self,
key_or_num: KeyOrNumber<'_>,
) -> RethResult<Option<(M::FIRST, M::SECOND)>> {
let row = self.get(key_or_num, M::MASK)?;

match row {
Some(row) => Ok(Some((M::FIRST::decompress(row[0])?, M::SECOND::decompress(row[1])?))),
None => Ok(None),
}
}

/// Gets three column values from a row.
#[allow(clippy::type_complexity)]
pub fn get_three<M: ColumnSelectorThree>(
&mut self,
key_or_num: KeyOrNumber<'_>,
) -> RethResult<Option<(M::FIRST, M::SECOND, M::THIRD)>> {
let row = self.get(key_or_num, M::MASK)?;

match row {
Some(row) => Ok(Some((
M::FIRST::decompress(row[0])?,
M::SECOND::decompress(row[1])?,
M::THIRD::decompress(row[2])?,
))),
None => Ok(None),
}
}
}

/// Either a key _or_ a block/tx number
#[derive(Debug)]
pub enum KeyOrNumber<'a> {
/// A slice used as a key. Usually a block/tx hash
Key(&'a [u8]),
/// A block/tx number
Number(u64),
}

impl<'a> From<&'a B256> for KeyOrNumber<'a> {
fn from(value: &'a B256) -> Self {
KeyOrNumber::Key(value.as_slice())
}
}

impl<'a> From<u64> for KeyOrNumber<'a> {
fn from(value: u64) -> Self {
KeyOrNumber::Number(value)
}
}
Loading

0 comments on commit e73ece9

Please sign in to comment.