Skip to content

Commit

Permalink
perf(tree): integrate parallel state root (paradigmxyz#7161)
Browse files Browse the repository at this point in the history
  • Loading branch information
rkrasiuk authored Mar 21, 2024
1 parent aac0b00 commit 56b63ad
Show file tree
Hide file tree
Showing 17 changed files with 184 additions and 144 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/blockchain-tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ reth-db.workspace = true
reth-provider.workspace = true
reth-stages.workspace = true
reth-trie = { workspace = true, features = ["metrics"] }
reth-trie-parallel = { workspace = true, features = ["parallel"] }

# common
parking_lot.workspace = true
Expand Down
6 changes: 5 additions & 1 deletion crates/blockchain-tree/src/blockchain_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ pub struct BlockchainTree<DB: Database, EVM: ExecutorFactory> {
prune_modes: Option<PruneModes>,
}

impl<DB: Database, EVM: ExecutorFactory> BlockchainTree<DB, EVM> {
impl<DB, EVM> BlockchainTree<DB, EVM>
where
DB: Database + Clone,
EVM: ExecutorFactory,
{
/// Create a new blockchain tree.
pub fn new(
externals: TreeExternals<DB, EVM>,
Expand Down
36 changes: 26 additions & 10 deletions crates/blockchain-tree/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ use reth_primitives::{
BlockHash, BlockNumber, ForkBlock, GotExpected, SealedBlockWithSenders, SealedHeader, U256,
};
use reth_provider::{
providers::BundleStateProvider, BundleStateDataProvider, BundleStateWithReceipts, Chain,
ExecutorFactory, StateRootProvider,
providers::{BundleStateProvider, ConsistentDbView},
BundleStateDataProvider, BundleStateWithReceipts, Chain, ExecutorFactory, ProviderError,
StateRootProvider,
};
use reth_trie::updates::TrieUpdates;
use reth_trie_parallel::parallel_root::ParallelStateRoot;
use std::{
collections::BTreeMap,
ops::{Deref, DerefMut},
Expand Down Expand Up @@ -74,7 +76,7 @@ impl AppendableChain {
block_validation_kind: BlockValidationKind,
) -> Result<Self, InsertBlockErrorKind>
where
DB: Database,
DB: Database + Clone,
EF: ExecutorFactory,
{
let state = BundleStateWithReceipts::default();
Expand Down Expand Up @@ -112,7 +114,7 @@ impl AppendableChain {
block_validation_kind: BlockValidationKind,
) -> Result<Self, InsertBlockErrorKind>
where
DB: Database,
DB: Database + Clone,
EF: ExecutorFactory,
{
let parent_number = block.number - 1;
Expand Down Expand Up @@ -174,16 +176,26 @@ impl AppendableChain {
) -> RethResult<(BundleStateWithReceipts, Option<TrieUpdates>)>
where
BSDP: BundleStateDataProvider,
DB: Database,
DB: Database + Clone,
EVM: ExecutorFactory,
{
// some checks are done before blocks comes here.
externals.consensus.validate_header_against_parent(&block, parent_block)?;

// get the state provider.
let canonical_fork = bundle_state_data_provider.canonical_fork();

// SAFETY: For block execution and parallel state root computation below we open multiple
// independent database transactions. Upon opening the database transaction the consistent
// view will check a current tip in the database and throw an error if it doesn't match
// the one recorded during initialization.
// It is safe to use consistent view without any special error handling as long as
// we guarantee that plain state cannot change during processing of new payload.
// The usage has to be re-evaluated if that was ever to change.
let consistent_view =
ConsistentDbView::new_with_latest_tip(externals.provider_factory.clone())?;
let state_provider =
externals.provider_factory.history_by_block_number(canonical_fork.number)?;
consistent_view.provider_ro()?.state_provider_by_block_number(canonical_fork.number)?;

let provider = BundleStateProvider::new(state_provider, bundle_state_data_provider);

Expand All @@ -199,9 +211,13 @@ impl AppendableChain {
// calculate and check state root
let start = Instant::now();
let (state_root, trie_updates) = if block_attachment.is_canonical() {
provider
.state_root_with_updates(bundle_state.state())
.map(|(root, updates)| (root, Some(updates)))?
let mut state = provider.bundle_state_data_provider.state().clone();
state.extend(bundle_state.clone());
let hashed_state = state.hash_state_slow();
ParallelStateRoot::new(consistent_view, hashed_state)
.incremental_root_with_updates()
.map(|(root, updates)| (root, Some(updates)))
.map_err(ProviderError::from)?
} else {
(provider.state_root(bundle_state.state())?, None)
};
Expand Down Expand Up @@ -250,7 +266,7 @@ impl AppendableChain {
block_validation_kind: BlockValidationKind,
) -> Result<(), InsertBlockErrorKind>
where
DB: Database,
DB: Database + Clone,
EF: ExecutorFactory,
{
let parent_block = self.chain.tip();
Expand Down
32 changes: 24 additions & 8 deletions crates/blockchain-tree/src/shareable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,27 @@ use tracing::trace;

/// Shareable blockchain tree that is behind tokio::RwLock
#[derive(Clone, Debug)]
pub struct ShareableBlockchainTree<DB: Database, EF: ExecutorFactory> {
pub struct ShareableBlockchainTree<DB: Database + Clone, EF: ExecutorFactory> {
/// BlockchainTree
pub tree: Arc<RwLock<BlockchainTree<DB, EF>>>,
}

impl<DB: Database, EF: ExecutorFactory> ShareableBlockchainTree<DB, EF> {
impl<DB, EF> ShareableBlockchainTree<DB, EF>
where
DB: Database + Clone,
EF: ExecutorFactory,
{
/// Create a new shareable database.
pub fn new(tree: BlockchainTree<DB, EF>) -> Self {
Self { tree: Arc::new(RwLock::new(tree)) }
}
}

impl<DB: Database, EF: ExecutorFactory> BlockchainTreeEngine for ShareableBlockchainTree<DB, EF> {
impl<DB, EF> BlockchainTreeEngine for ShareableBlockchainTree<DB, EF>
where
DB: Database + Clone,
EF: ExecutorFactory,
{
fn buffer_block(&self, block: SealedBlockWithSenders) -> Result<(), InsertBlockError> {
let mut tree = self.tree.write();
// Blockchain tree metrics shouldn't be updated here, see
Expand Down Expand Up @@ -103,7 +111,11 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTreeEngine for ShareableBlockc
}
}

impl<DB: Database, EF: ExecutorFactory> BlockchainTreeViewer for ShareableBlockchainTree<DB, EF> {
impl<DB, EF> BlockchainTreeViewer for ShareableBlockchainTree<DB, EF>
where
DB: Database + Clone,
EF: ExecutorFactory,
{
fn blocks(&self) -> BTreeMap<BlockNumber, HashSet<BlockHash>> {
trace!(target: "blockchain_tree", "Returning all blocks in blockchain tree");
self.tree.read().block_indices().block_number_to_block_hashes().clone()
Expand Down Expand Up @@ -196,8 +208,10 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTreeViewer for ShareableBlockc
}
}

impl<DB: Database, EF: ExecutorFactory> BlockchainTreePendingStateProvider
for ShareableBlockchainTree<DB, EF>
impl<DB, EF> BlockchainTreePendingStateProvider for ShareableBlockchainTree<DB, EF>
where
DB: Database + Clone,
EF: ExecutorFactory,
{
fn find_pending_state_provider(
&self,
Expand All @@ -209,8 +223,10 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTreePendingStateProvider
}
}

impl<DB: Database, EF: ExecutorFactory> CanonStateSubscriptions
for ShareableBlockchainTree<DB, EF>
impl<DB, EF> CanonStateSubscriptions for ShareableBlockchainTree<DB, EF>
where
DB: Database + Clone,
EF: ExecutorFactory,
{
fn subscribe_to_canonical_state(&self) -> reth_provider::CanonStateNotifications {
trace!(target: "blockchain_tree", "Registered subscriber for canonical state");
Expand Down
14 changes: 10 additions & 4 deletions crates/interfaces/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ pub enum ProviderError {
/// Error encountered when the block number conversion from U256 to u64 causes an overflow.
#[error("failed to convert block number U256 to u64: {0}")]
BlockNumberOverflow(U256),
/// Consistent view error.
#[error("failed to initialize consistent view: {0}")]
ConsistentView(Box<ConsistentViewError>),
}

impl From<reth_primitives::fs::FsPathError> for ProviderError {
Expand All @@ -152,7 +155,7 @@ pub struct RootMismatch {
}

/// Consistent database view error.
#[derive(Error, Debug)]
#[derive(Clone, Debug, Error, PartialEq, Eq)]
pub enum ConsistentViewError {
/// Error thrown on attempt to initialize provider while node is still syncing.
#[error("node is syncing. best block: {0}")]
Expand All @@ -163,7 +166,10 @@ pub enum ConsistentViewError {
/// The tip diff.
tip: GotExpected<Option<B256>>,
},
/// Underlying provider error.
#[error(transparent)]
Provider(#[from] ProviderError),
}

impl From<ConsistentViewError> for ProviderError {
fn from(value: ConsistentViewError) -> Self {
Self::ConsistentView(Box::new(value))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ use revm::db::BundleState;
#[derive(Debug)]
pub struct BundleStateProvider<SP: StateProvider, BSDP: BundleStateDataProvider> {
/// The inner state provider.
pub(crate) state_provider: SP,
/// Bundle state data,
pub(crate) bundle_state_data_provider: BSDP,
pub state_provider: SP,
/// Bundle state data.
pub bundle_state_data_provider: BSDP,
}

impl<SP: StateProvider, BSDP: BundleStateDataProvider> BundleStateProvider<SP, BSDP> {
Expand Down
31 changes: 14 additions & 17 deletions crates/storage/provider/src/providers/consistent_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,22 @@ where
Provider: DatabaseProviderFactory<DB>,
{
/// Creates new consistent database view.
pub fn new(provider: Provider) -> Self {
Self { database: PhantomData, provider, tip: None }
pub fn new(provider: Provider, tip: Option<B256>) -> Self {
Self { database: PhantomData, provider, tip }
}

/// Initializes the view with provided tip.
pub fn with_tip(mut self, tip: B256) -> Self {
self.tip = Some(tip);
self
}

/// Initializes the view with latest tip.
pub fn with_latest_tip(mut self) -> ProviderResult<Self> {
let provider = self.provider.database_provider_ro()?;
let tip = provider.tx_ref().cursor_read::<tables::CanonicalHeaders>()?.last()?;
self.tip = tip.map(|(_, hash)| hash);
Ok(self)
/// Creates new consistent database view with latest tip.
pub fn new_with_latest_tip(provider: Provider) -> ProviderResult<Self> {
let tip = provider
.database_provider_ro()?
.tx_ref()
.cursor_read::<tables::CanonicalHeaders>()?
.last()?;
Ok(Self::new(provider, tip.map(|(_, hash)| hash)))
}

/// Creates new read-only provider and performs consistency checks on the current tip.
pub fn provider_ro(&self) -> Result<DatabaseProviderRO<DB>, ConsistentViewError> {
pub fn provider_ro(&self) -> ProviderResult<DatabaseProviderRO<DB>> {
let provider_ro = self.provider.database_provider_ro()?;
let last_entry = provider_ro
.tx_ref()
Expand All @@ -65,12 +61,13 @@ where
if self.tip != tip {
return Err(ConsistentViewError::Inconsistent {
tip: GotExpected { got: tip, expected: self.tip },
})
}
.into())
}

let best_block_number = provider_ro.best_block_number()?;
if last_entry.map(|(number, _)| number).unwrap_or_default() != best_block_number {
return Err(ConsistentViewError::Syncing(best_block_number))
return Err(ConsistentViewError::Syncing(best_block_number).into())
}

Ok(provider_ro)
Expand Down
63 changes: 5 additions & 58 deletions crates/storage/provider/src/providers/database/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use crate::{
providers::{
state::{historical::HistoricalStateProvider, latest::LatestStateProvider},
StaticFileProvider,
},
providers::{state::latest::LatestStateProvider, StaticFileProvider},
to_range,
traits::{BlockSource, ReceiptProvider},
BlockHashReader, BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory,
Expand Down Expand Up @@ -127,69 +124,19 @@ impl<DB: Database> ProviderFactory<DB> {
)))
}

/// Storage provider for latest block
/// State provider for latest block
#[track_caller]
pub fn latest(&self) -> ProviderResult<StateProviderBox> {
trace!(target: "providers::db", "Returning latest state provider");
Ok(Box::new(LatestStateProvider::new(self.db.tx()?, self.static_file_provider())))
}

/// Storage provider for state at that given block
fn state_provider_by_block_number(
&self,
provider: DatabaseProviderRO<DB>,
mut block_number: BlockNumber,
) -> ProviderResult<StateProviderBox> {
if block_number == provider.best_block_number().unwrap_or_default() &&
block_number == provider.last_block_number().unwrap_or_default()
{
return Ok(Box::new(LatestStateProvider::new(
provider.into_tx(),
self.static_file_provider(),
)))
}

// +1 as the changeset that we want is the one that was applied after this block.
block_number += 1;

let account_history_prune_checkpoint =
provider.get_prune_checkpoint(PruneSegment::AccountHistory)?;
let storage_history_prune_checkpoint =
provider.get_prune_checkpoint(PruneSegment::StorageHistory)?;

let mut state_provider = HistoricalStateProvider::new(
provider.into_tx(),
block_number,
self.static_file_provider(),
);

// If we pruned account or storage history, we can't return state on every historical block.
// Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
if let Some(prune_checkpoint_block_number) =
account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
{
state_provider = state_provider.with_lowest_available_account_history_block_number(
prune_checkpoint_block_number + 1,
);
}
if let Some(prune_checkpoint_block_number) =
storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
{
state_provider = state_provider.with_lowest_available_storage_history_block_number(
prune_checkpoint_block_number + 1,
);
}

Ok(Box::new(state_provider))
}

/// Storage provider for state at that given block
pub fn history_by_block_number(
&self,
block_number: BlockNumber,
) -> ProviderResult<StateProviderBox> {
let provider = self.provider()?;
let state_provider = self.state_provider_by_block_number(provider, block_number)?;
let state_provider = self.provider()?.state_provider_by_block_number(block_number)?;
trace!(target: "providers::db", ?block_number, "Returning historical state provider for block number");
Ok(state_provider)
}
Expand All @@ -202,8 +149,8 @@ impl<DB: Database> ProviderFactory<DB> {
.block_number(block_hash)?
.ok_or(ProviderError::BlockHashNotFound(block_hash))?;

let state_provider = self.state_provider_by_block_number(provider, block_number)?;
trace!(target: "providers::db", ?block_number, "Returning historical state provider for block hash");
let state_provider = self.provider()?.state_provider_by_block_number(block_number)?;
trace!(target: "providers::db", ?block_number, %block_hash, "Returning historical state provider for block hash");
Ok(state_provider)
}
}
Expand Down
Loading

0 comments on commit 56b63ad

Please sign in to comment.