From 96e39d29b9c03fe5f4e8b470ef9c3a9295a9bb43 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Tue, 26 Mar 2024 17:48:35 +0100 Subject: [PATCH] feat: add `ETL` to Hashing Stages (#7030) --- bin/reth/src/commands/stage/dump/merkle.rs | 21 +- bin/reth/src/commands/stage/run.rs | 4 +- crates/node-core/src/node_config.rs | 2 + crates/stages/src/stages/hashing_account.rs | 266 +++++----------- crates/stages/src/stages/hashing_storage.rs | 335 +++++--------------- 5 files changed, 188 insertions(+), 440 deletions(-) diff --git a/bin/reth/src/commands/stage/dump/merkle.rs b/bin/reth/src/commands/stage/dump/merkle.rs index 3e6d2e6352ce..32f828b32410 100644 --- a/bin/reth/src/commands/stage/dump/merkle.rs +++ b/bin/reth/src/commands/stage/dump/merkle.rs @@ -1,6 +1,7 @@ use super::setup; use crate::utils::DbTool; use eyre::Result; +use reth_config::config::EtlConfig; use reth_db::{database::Database, table::TableImporter, tables, DatabaseEnv}; use reth_node_core::dirs::{ChainPath, DataDirPath}; use reth_node_ethereum::EthEvmConfig; @@ -106,12 +107,20 @@ async fn unwind_and_copy( )?; // Bring hashes to TO - AccountHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX } - .execute(&provider, execute_input) - .unwrap(); - StorageHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX } - .execute(&provider, execute_input) - .unwrap(); + AccountHashingStage { + clean_threshold: u64::MAX, + commit_threshold: u64::MAX, + etl_config: EtlConfig::default(), + } + .execute(&provider, execute_input) + .unwrap(); + StorageHashingStage { + clean_threshold: u64::MAX, + commit_threshold: u64::MAX, + etl_config: EtlConfig::default(), + } + .execute(&provider, execute_input) + .unwrap(); let unwind_inner_tx = provider.into_tx(); diff --git a/bin/reth/src/commands/stage/run.rs b/bin/reth/src/commands/stage/run.rs index f015cd599ec0..a055f5a4e0dc 100644 --- a/bin/reth/src/commands/stage/run.rs +++ b/bin/reth/src/commands/stage/run.rs @@ -247,10 +247,10 @@ impl Command { (Box::new(TransactionLookupStage::new(batch_size, etl_config, None)), None) } StageEnum::AccountHashing => { - (Box::new(AccountHashingStage::new(1, batch_size)), None) + (Box::new(AccountHashingStage::new(1, batch_size, etl_config)), None) } StageEnum::StorageHashing => { - (Box::new(StorageHashingStage::new(1, batch_size)), None) + (Box::new(StorageHashingStage::new(1, batch_size, etl_config)), None) } StageEnum::Merkle => ( Box::new(MerkleStage::default_execution()), diff --git a/crates/node-core/src/node_config.rs b/crates/node-core/src/node_config.rs index 97a1be1749a1..319128732d9a 100644 --- a/crates/node-core/src/node_config.rs +++ b/crates/node-core/src/node_config.rs @@ -859,10 +859,12 @@ impl NodeConfig { .set(AccountHashingStage::new( stage_config.account_hashing.clean_threshold, stage_config.account_hashing.commit_threshold, + stage_config.etl.clone(), )) .set(StorageHashingStage::new( stage_config.storage_hashing.clean_threshold, stage_config.storage_hashing.commit_threshold, + stage_config.etl.clone(), )) .set(MerkleStage::new_execution(stage_config.merkle.clean_threshold)) .set(TransactionLookupStage::new( diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index 91de1562a15c..6dea11e9bdf7 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -1,30 +1,34 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use itertools::Itertools; -use rayon::slice::ParallelSliceMut; +use reth_config::config::EtlConfig; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, database::Database, tables, transaction::{DbTx, DbTxMut}, - RawKey, RawTable, + RawKey, RawTable, RawValue, }; +use reth_etl::Collector; use reth_interfaces::provider::ProviderResult; use reth_primitives::{ keccak256, - stage::{ - AccountHashingCheckpoint, CheckpointBlockRange, EntitiesCheckpoint, StageCheckpoint, - StageId, - }, + stage::{AccountHashingCheckpoint, EntitiesCheckpoint, StageCheckpoint, StageId}, + Account, B256, }; use reth_provider::{AccountExtReader, DatabaseProviderRW, HashingWriter, StatsReader}; use std::{ - cmp::max, fmt::Debug, ops::{Range, RangeInclusive}, - sync::mpsc, + sync::mpsc::{self, Receiver}, }; use tracing::*; +/// Maximum number of channels that can exist in memory. +const MAXIMUM_CHANNELS: usize = 10_000; + +/// Maximum number of accounts to hash per rayon worker job. +const WORKER_CHUNK_SIZE: usize = 100; + /// Account hashing stage hashes plain account. /// This is preparation before generating intermediate hashes and calculating Merkle tree root. #[derive(Clone, Debug)] @@ -32,20 +36,26 @@ pub struct AccountHashingStage { /// The threshold (in number of blocks) for switching between incremental /// hashing and full storage hashing. pub clean_threshold: u64, - /// The maximum number of accounts to process before committing. + /// The maximum number of accounts to process before committing during unwind. pub commit_threshold: u64, + /// ETL configuration + pub etl_config: EtlConfig, } impl AccountHashingStage { /// Create new instance of [AccountHashingStage]. - pub fn new(clean_threshold: u64, commit_threshold: u64) -> Self { - Self { clean_threshold, commit_threshold } + pub fn new(clean_threshold: u64, commit_threshold: u64, etl_config: EtlConfig) -> Self { + Self { clean_threshold, commit_threshold, etl_config } } } impl Default for AccountHashingStage { fn default() -> Self { - Self { clean_threshold: 500_000, commit_threshold: 100_000 } + Self { + clean_threshold: 500_000, + commit_threshold: 100_000, + etl_config: EtlConfig::default(), + } } } @@ -87,7 +97,7 @@ impl AccountHashingStage { generators, generators::{random_block_range, random_eoa_accounts}, }; - use reth_primitives::{Account, B256, U256}; + use reth_primitives::U256; use reth_provider::providers::StaticFileWriter; let mut rng = generators::rng(); @@ -155,96 +165,45 @@ impl Stage for AccountHashingStage { // genesis accounts are not in changeset. if to_block - from_block > self.clean_threshold || from_block == 1 { let tx = provider.tx_ref(); - let stage_checkpoint = input - .checkpoint - .and_then(|checkpoint| checkpoint.account_hashing_stage_checkpoint()); - - let start_address = match stage_checkpoint { - Some(AccountHashingCheckpoint { address: address @ Some(_), block_range: CheckpointBlockRange { from, to }, .. }) - // Checkpoint is only valid if the range of transitions didn't change. - // An already hashed account may have been changed with the new range, - // and therefore should be hashed again. - if from == from_block && to == to_block => - { - debug!(target: "sync::stages::account_hashing::exec", checkpoint = ?stage_checkpoint, "Continuing inner account hashing checkpoint"); - - address - } - _ => { - // clear table, load all accounts and hash it - tx.clear::()?; - None + // clear table, load all accounts and hash it + tx.clear::()?; + + let mut accounts_cursor = tx.cursor_read::>()?; + let mut collector = + Collector::new(self.etl_config.file_size, self.etl_config.dir.clone()); + let mut channels = Vec::with_capacity(MAXIMUM_CHANNELS); + + // channels used to return result of account hashing + for chunk in &accounts_cursor.walk(None)?.chunks(WORKER_CHUNK_SIZE) { + // An _unordered_ channel to receive results from a rayon job + let (tx, rx) = mpsc::channel(); + channels.push(rx); + + let chunk = chunk.collect::, _>>()?; + // Spawn the hashing task onto the global rayon pool + rayon::spawn(move || { + for (address, account) in chunk.into_iter() { + let address = address.key().unwrap(); + let _ = tx.send((RawKey::new(keccak256(address)), account)); + } + }); + + // Flush to ETL when channels length reaches MAXIMUM_CHANNELS + if !channels.is_empty() && channels.len() % MAXIMUM_CHANNELS == 0 { + collect(&mut channels, &mut collector)?; } } - .take() - .map(RawKey::new); - - let next_address = { - let mut accounts_cursor = - tx.cursor_read::>()?; - - // channels used to return result of account hashing - let mut channels = Vec::new(); - for chunk in &accounts_cursor - .walk(start_address.clone())? - .take(self.commit_threshold as usize) - .chunks( - max(self.commit_threshold as usize, rayon::current_num_threads()) / - rayon::current_num_threads(), - ) - { - // An _unordered_ channel to receive results from a rayon job - let (tx, rx) = mpsc::channel(); - channels.push(rx); - - let chunk = chunk.collect::, _>>()?; - // Spawn the hashing task onto the global rayon pool - rayon::spawn(move || { - for (address, account) in chunk.into_iter() { - let address = address.key().unwrap(); - let _ = tx.send((RawKey::new(keccak256(address)), account)); - } - }); - } - let mut hashed_batch = Vec::with_capacity(self.commit_threshold as usize); - // Iterate over channels and append the hashed accounts. - for channel in channels { - while let Ok(hashed) = channel.recv() { - hashed_batch.push(hashed); - } - } - // sort it all in parallel - hashed_batch.par_sort_unstable_by(|a, b| a.0.cmp(&b.0)); - - let mut hashed_account_cursor = - tx.cursor_write::>()?; - - // iterate and put presorted hashed accounts - if start_address.is_none() { - hashed_batch - .into_iter() - .try_for_each(|(k, v)| hashed_account_cursor.append(k, v))?; - } else { - hashed_batch - .into_iter() - .try_for_each(|(k, v)| hashed_account_cursor.insert(k, v))?; - } - // next key of iterator - accounts_cursor.next()? - }; - - if let Some((next_address, _)) = &next_address { - let checkpoint = input.checkpoint().with_account_hashing_stage_checkpoint( - AccountHashingCheckpoint { - address: Some(next_address.key().unwrap()), - block_range: CheckpointBlockRange { from: from_block, to: to_block }, - progress: stage_checkpoint_progress(provider)?, - }, - ); - - return Ok(ExecOutput { checkpoint, done: false }) + collect(&mut channels, &mut collector)?; + + let mut hashed_account_cursor = + tx.cursor_write::>()?; + + for item in collector.iter()? { + let (key, value) = item?; + hashed_account_cursor + .append(RawKey::::from_vec(key), RawValue::::from_vec(value))?; } } else { // Aggregate all transition changesets and make a list of accounts that have been @@ -293,6 +252,21 @@ impl Stage for AccountHashingStage { } } +/// Flushes channels hashes to ETL collector. +fn collect( + channels: &mut Vec, RawValue)>>, + collector: &mut Collector, RawValue>, +) -> Result<(), StageError> { + for channel in channels.iter_mut() { + while let Ok((key, v)) = channel.recv() { + collector.insert(key, v)?; + } + } + debug!(target: "sync::stages::hashing_account", "Hashed {} entries", collector.len()); + channels.clear(); + Ok(()) +} + fn stage_checkpoint_progress( provider: &DatabaseProviderRW, ) -> ProviderResult { @@ -356,91 +330,6 @@ mod tests { assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation"); } - #[tokio::test] - async fn execute_clean_account_hashing_with_commit_threshold() { - let (previous_stage, stage_progress) = (20, 10); - // Set up the runner - let mut runner = AccountHashingTestRunner::default(); - runner.set_clean_threshold(1); - runner.set_commit_threshold(5); - - let mut input = ExecInput { - target: Some(previous_stage), - checkpoint: Some(StageCheckpoint::new(stage_progress)), - }; - - runner.seed_execution(input).expect("failed to seed execution"); - - // first run, hash first five accounts. - let rx = runner.execute(input); - let result = rx.await.unwrap(); - - let fifth_address = runner - .db - .query(|tx| { - let (address, _) = tx - .cursor_read::()? - .walk(None)? - .nth(5) - .unwrap() - .unwrap(); - Ok(address) - }) - .unwrap(); - - assert_matches!( - result, - Ok(ExecOutput { - checkpoint: StageCheckpoint { - block_number: 10, - stage_checkpoint: Some(StageUnitCheckpoint::Account( - AccountHashingCheckpoint { - address: Some(address), - block_range: CheckpointBlockRange { - from: 11, - to: 20, - }, - progress: EntitiesCheckpoint { processed: 5, total } - } - )) - }, - done: false - }) if address == fifth_address && - total == runner.db.table::().unwrap().len() as u64 - ); - assert_eq!(runner.db.table::().unwrap().len(), 5); - - // second run, hash next five accounts. - input.checkpoint = Some(result.unwrap().checkpoint); - let rx = runner.execute(input); - let result = rx.await.unwrap(); - - assert_matches!( - result, - Ok(ExecOutput { - checkpoint: StageCheckpoint { - block_number: 20, - stage_checkpoint: Some(StageUnitCheckpoint::Account( - AccountHashingCheckpoint { - address: None, - block_range: CheckpointBlockRange { - from: 0, - to: 0, - }, - progress: EntitiesCheckpoint { processed, total } - } - )) - }, - done: true - }) if processed == total && - total == runner.db.table::().unwrap().len() as u64 - ); - assert_eq!(runner.db.table::().unwrap().len(), 10); - - // Validate the stage execution - assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation"); - } - mod test_utils { use super::*; use crate::test_utils::TestStageDB; @@ -450,6 +339,7 @@ mod tests { pub(crate) db: TestStageDB, commit_threshold: u64, clean_threshold: u64, + etl_config: EtlConfig, } impl AccountHashingTestRunner { @@ -509,7 +399,12 @@ mod tests { impl Default for AccountHashingTestRunner { fn default() -> Self { - Self { db: TestStageDB::default(), commit_threshold: 1000, clean_threshold: 1000 } + Self { + db: TestStageDB::default(), + commit_threshold: 1000, + clean_threshold: 1000, + etl_config: EtlConfig::default(), + } } } @@ -524,6 +419,7 @@ mod tests { Self::S { commit_threshold: self.commit_threshold, clean_threshold: self.clean_threshold, + etl_config: self.etl_config.clone(), } } } diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index 298389f6c44e..16784e127924 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -1,25 +1,35 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; -use num_traits::Zero; +use itertools::Itertools; +use reth_config::config::EtlConfig; use reth_db::{ - cursor::DbDupCursorRO, + codecs::CompactU256, + cursor::{DbCursorRO, DbDupCursorRW}, database::Database, models::BlockNumberAddress, + table::Decompress, tables, transaction::{DbTx, DbTxMut}, }; +use reth_etl::Collector; use reth_interfaces::provider::ProviderResult; use reth_primitives::{ keccak256, - stage::{ - CheckpointBlockRange, EntitiesCheckpoint, StageCheckpoint, StageId, - StorageHashingCheckpoint, - }, - StorageEntry, + stage::{EntitiesCheckpoint, StageCheckpoint, StageId, StorageHashingCheckpoint}, + BufMut, StorageEntry, B256, }; use reth_provider::{DatabaseProviderRW, HashingWriter, StatsReader, StorageReader}; -use std::{collections::BTreeMap, fmt::Debug}; +use std::{ + fmt::Debug, + sync::mpsc::{self, Receiver}, +}; use tracing::*; +/// Maximum number of channels that can exist in memory. +const MAXIMUM_CHANNELS: usize = 10_000; + +/// Maximum number of storage entries to hash per rayon worker job. +const WORKER_CHUNK_SIZE: usize = 100; + /// Storage hashing stage hashes plain storage. /// This is preparation before generating intermediate hashes and calculating Merkle tree root. #[derive(Debug)] @@ -27,20 +37,26 @@ pub struct StorageHashingStage { /// The threshold (in number of blocks) for switching between incremental /// hashing and full storage hashing. pub clean_threshold: u64, - /// The maximum number of slots to process before committing. + /// The maximum number of slots to process before committing during unwind. pub commit_threshold: u64, + /// ETL configuration + pub etl_config: EtlConfig, } impl StorageHashingStage { /// Create new instance of [StorageHashingStage]. - pub fn new(clean_threshold: u64, commit_threshold: u64) -> Self { - Self { clean_threshold, commit_threshold } + pub fn new(clean_threshold: u64, commit_threshold: u64, etl_config: EtlConfig) -> Self { + Self { clean_threshold, commit_threshold, etl_config } } } impl Default for StorageHashingStage { fn default() -> Self { - Self { clean_threshold: 500_000, commit_threshold: 100_000 } + Self { + clean_threshold: 500_000, + commit_threshold: 100_000, + etl_config: EtlConfig::default(), + } } } @@ -68,104 +84,48 @@ impl Stage for StorageHashingStage { // AccountHashing table. Also, if we start from genesis, we need to hash from scratch, as // genesis accounts are not in changeset, along with their storages. if to_block - from_block > self.clean_threshold || from_block == 1 { - let stage_checkpoint = input - .checkpoint - .and_then(|checkpoint| checkpoint.storage_hashing_stage_checkpoint()); - - let (mut current_key, mut current_subkey) = match stage_checkpoint { - Some(StorageHashingCheckpoint { - address: address @ Some(_), - storage, - block_range: CheckpointBlockRange { from, to }, - .. - }) - // Checkpoint is only valid if the range of transitions didn't change. - // An already hashed storage may have been changed with the new range, - // and therefore should be hashed again. - if from == from_block && to == to_block => - { - debug!(target: "sync::stages::storage_hashing::exec", checkpoint = ?stage_checkpoint, "Continuing inner storage hashing checkpoint"); - - (address, storage) + // clear table, load all accounts and hash it + tx.clear::()?; + + let mut storage_cursor = tx.cursor_read::()?; + let mut collector = + Collector::new(self.etl_config.file_size, self.etl_config.dir.clone()); + let mut channels = Vec::with_capacity(MAXIMUM_CHANNELS); + + for chunk in &storage_cursor.walk(None)?.chunks(WORKER_CHUNK_SIZE) { + // An _unordered_ channel to receive results from a rayon job + let (tx, rx) = mpsc::channel(); + channels.push(rx); + + let chunk = chunk.collect::, _>>()?; + // Spawn the hashing task onto the global rayon pool + rayon::spawn(move || { + for (address, slot) in chunk.into_iter() { + let mut addr_key = Vec::with_capacity(64); + addr_key.put_slice(keccak256(address).as_slice()); + addr_key.put_slice(keccak256(slot.key).as_slice()); + let _ = tx.send((addr_key, CompactU256::from(slot.value))); } - _ => { - // clear table, load all accounts and hash it - tx.clear::()?; + }); - (None, None) - } - }; - - let mut keccak_address = None; - - let mut hashed_batch = BTreeMap::new(); - let mut remaining = self.commit_threshold as usize; - { - let mut storage = tx.cursor_dup_read::()?; - while !remaining.is_zero() { - hashed_batch.extend( - storage - .walk_dup(current_key, current_subkey)? - .take(remaining) - .map(|res| { - res.map(|(address, slot)| { - // Address caching for the first iteration when current_key - // is None - let keccak_address = - if let Some(keccak_address) = keccak_address { - keccak_address - } else { - keccak256(address) - }; - - // TODO cache map keccak256(slot.key) ? - ((keccak_address, keccak256(slot.key)), slot.value) - }) - }) - .collect::, _>>()?, - ); - - remaining = self.commit_threshold as usize - hashed_batch.len(); - - if let Some((address, slot)) = storage.next_dup()? { - // There's still some remaining elements on this key, so we need to save - // the cursor position for the next - // iteration - (current_key, current_subkey) = (Some(address), Some(slot.key)); - } else { - // Go to the next key - (current_key, current_subkey) = storage - .next_no_dup()? - .map(|(key, storage_entry)| (key, storage_entry.key)) - .unzip(); - - // Cache keccak256(address) for the next key if it exists - if let Some(address) = current_key { - keccak_address = Some(keccak256(address)); - } else { - // We have reached the end of table - break - } - } + // Flush to ETL when channels length reaches MAXIMUM_CHANNELS + if !channels.is_empty() && channels.len() % MAXIMUM_CHANNELS == 0 { + collect(&mut channels, &mut collector)?; } } - // iterate and put presorted hashed slots - hashed_batch.into_iter().try_for_each(|((addr, key), value)| { - tx.put::(addr, StorageEntry { key, value }) - })?; + collect(&mut channels, &mut collector)?; - if current_key.is_some() { - let checkpoint = input.checkpoint().with_storage_hashing_stage_checkpoint( - StorageHashingCheckpoint { - address: current_key, - storage: current_subkey, - block_range: CheckpointBlockRange { from: from_block, to: to_block }, - progress: stage_checkpoint_progress(provider)?, + let mut cursor = tx.cursor_dup_write::()?; + for item in collector.iter()? { + let (addr_key, value) = item?; + cursor.append_dup( + B256::from_slice(&addr_key[..32]), + StorageEntry { + key: B256::from_slice(&addr_key[32..]), + value: CompactU256::decompress(value)?.into(), }, - ); - - return Ok(ExecOutput { checkpoint, done: false }) + )?; } } else { // Aggregate all changesets and make list of storages that have been @@ -212,6 +172,21 @@ impl Stage for StorageHashingStage { } } +/// Flushes channels hashes to ETL collector. +fn collect( + channels: &mut Vec, CompactU256)>>, + collector: &mut Collector, CompactU256>, +) -> Result<(), StageError> { + for channel in channels.iter_mut() { + while let Ok((key, v)) = channel.recv() { + collector.insert(key, v)?; + } + } + debug!(target: "sync::stages::hashing_storage", "Hashed {} entries", collector.len()); + channels.clear(); + Ok(()) +} + fn stage_checkpoint_progress( provider: &DatabaseProviderRW, ) -> ProviderResult { @@ -231,14 +206,14 @@ mod tests { use assert_matches::assert_matches; use rand::Rng; use reth_db::{ - cursor::{DbCursorRO, DbCursorRW}, + cursor::{DbCursorRW, DbDupCursorRO}, models::StoredBlockBodyIndices, }; use reth_interfaces::test_utils::{ generators, generators::{random_block_range, random_contract_account_range}, }; - use reth_primitives::{stage::StageUnitCheckpoint, Address, SealedBlock, B256, U256}; + use reth_primitives::{Address, SealedBlock, U256}; use reth_provider::providers::StaticFileWriter; stage_test_suite_ext!(StorageHashingTestRunner, storage_hashing); @@ -310,156 +285,21 @@ mod tests { } } - #[tokio::test] - async fn execute_clean_storage_hashing_with_commit_threshold() { - let (previous_stage, stage_progress) = (500, 100); - // Set up the runner - let mut runner = StorageHashingTestRunner::default(); - runner.set_clean_threshold(1); - runner.set_commit_threshold(500); - - let mut input = ExecInput { - target: Some(previous_stage), - checkpoint: Some(StageCheckpoint::new(stage_progress)), - }; - - runner.seed_execution(input).expect("failed to seed execution"); - - // first run, hash first half of storages. - let rx = runner.execute(input); - let result = rx.await.unwrap(); - - let (progress_address, progress_key) = runner - .db - .query(|tx| { - let (address, entry) = tx - .cursor_read::()? - .walk(None)? - .nth(500) - .unwrap() - .unwrap(); - Ok((address, entry.key)) - }) - .unwrap(); - - assert_matches!( - result, - Ok(ExecOutput { - checkpoint: StageCheckpoint { - block_number: 100, - stage_checkpoint: Some(StageUnitCheckpoint::Storage(StorageHashingCheckpoint { - address: Some(address), - storage: Some(storage), - block_range: CheckpointBlockRange { - from: 101, - to: 500, - }, - progress: EntitiesCheckpoint { - processed: 500, - total - } - })) - }, - done: false - }) if address == progress_address && storage == progress_key && - total == runner.db.table::().unwrap().len() as u64 - ); - assert_eq!(runner.db.table::().unwrap().len(), 500); - - // second run with commit threshold of 2 to check if subkey is set. - runner.set_commit_threshold(2); - let result = result.unwrap(); - input.checkpoint = Some(result.checkpoint); - let rx = runner.execute(input); - let result = rx.await.unwrap(); - - let (progress_address, progress_key) = runner - .db - .query(|tx| { - let (address, entry) = tx - .cursor_read::()? - .walk(None)? - .nth(502) - .unwrap() - .unwrap(); - Ok((address, entry.key)) - }) - .unwrap(); - - assert_matches!( - result, - Ok(ExecOutput { - checkpoint: StageCheckpoint { - block_number: 100, - stage_checkpoint: Some(StageUnitCheckpoint::Storage( - StorageHashingCheckpoint { - address: Some(address), - storage: Some(storage), - block_range: CheckpointBlockRange { - from: 101, - to: 500, - }, - progress: EntitiesCheckpoint { - processed: 502, - total - } - } - )) - }, - done: false - }) if address == progress_address && storage == progress_key && - total == runner.db.table::().unwrap().len() as u64 - ); - assert_eq!(runner.db.table::().unwrap().len(), 502); - - // third last run, hash rest of storages. - runner.set_commit_threshold(1000); - input.checkpoint = Some(result.unwrap().checkpoint); - let rx = runner.execute(input); - let result = rx.await.unwrap(); - - assert_matches!( - result, - Ok(ExecOutput { - checkpoint: StageCheckpoint { - block_number: 500, - stage_checkpoint: Some(StageUnitCheckpoint::Storage( - StorageHashingCheckpoint { - address: None, - storage: None, - block_range: CheckpointBlockRange { - from: 0, - to: 0, - }, - progress: EntitiesCheckpoint { - processed, - total - } - } - )) - }, - done: true - }) if processed == total && - total == runner.db.table::().unwrap().len() as u64 - ); - assert_eq!( - runner.db.table::().unwrap().len(), - runner.db.table::().unwrap().len() - ); - - // Validate the stage execution - assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation"); - } - struct StorageHashingTestRunner { db: TestStageDB, commit_threshold: u64, clean_threshold: u64, + etl_config: EtlConfig, } impl Default for StorageHashingTestRunner { fn default() -> Self { - Self { db: TestStageDB::default(), commit_threshold: 1000, clean_threshold: 1000 } + Self { + db: TestStageDB::default(), + commit_threshold: 1000, + clean_threshold: 1000, + etl_config: EtlConfig::default(), + } } } @@ -474,6 +314,7 @@ mod tests { Self::S { commit_threshold: self.commit_threshold, clean_threshold: self.clean_threshold, + etl_config: self.etl_config.clone(), } } }