Skip to content

Commit

Permalink
refactor: split async/sync work in stages (paradigmxyz#4636)
Browse files Browse the repository at this point in the history
Co-authored-by: Roman Krasiuk <[email protected]>
  • Loading branch information
onbjerg and rkrasiuk authored Nov 17, 2023
1 parent 7f9ce6f commit db5d01e
Show file tree
Hide file tree
Showing 39 changed files with 769 additions and 675 deletions.
17 changes: 7 additions & 10 deletions bin/reth/src/chain/import.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use crate::{
args::{
utils::{chain_help, genesis_value_parser, SUPPORTED_CHAINS},
DatabaseArgs,
},
dirs::{DataDirPath, MaybePlatformPath},
init::init_genesis,
node::events::{handle_events, NodeEvent},
Expand All @@ -8,12 +12,6 @@ use clap::Parser;
use eyre::Context;
use futures::{Stream, StreamExt};
use reth_beacon_consensus::BeaconConsensus;
use reth_provider::{ProviderFactory, StageCheckpointReader};

use crate::args::{
utils::{chain_help, genesis_value_parser, SUPPORTED_CHAINS},
DatabaseArgs,
};
use reth_config::Config;
use reth_db::{database::Database, init_db};
use reth_downloaders::{
Expand All @@ -22,12 +20,10 @@ use reth_downloaders::{
};
use reth_interfaces::consensus::Consensus;
use reth_primitives::{stage::StageId, ChainSpec, B256};
use reth_provider::{HeaderSyncMode, ProviderFactory, StageCheckpointReader};
use reth_stages::{
prelude::*,
stages::{
ExecutionStage, ExecutionStageThresholds, HeaderSyncMode, SenderRecoveryStage,
TotalDifficultyStage,
},
stages::{ExecutionStage, ExecutionStageThresholds, SenderRecoveryStage, TotalDifficultyStage},
};
use std::{path::PathBuf, sync::Arc};
use tokio::sync::watch;
Expand Down Expand Up @@ -164,6 +160,7 @@ impl ImportCommand {
.with_max_block(max_block)
.add_stages(
DefaultStages::new(
ProviderFactory::new(db.clone(), self.chain.clone()),
HeaderSyncMode::Tip(tip_rx),
consensus.clone(),
header_downloader,
Expand Down
8 changes: 3 additions & 5 deletions bin/reth/src/debug_cmd/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,10 @@ use reth_interfaces::{
use reth_network::{NetworkEvents, NetworkHandle};
use reth_network_api::NetworkInfo;
use reth_primitives::{fs, stage::StageId, BlockHashOrNumber, BlockNumber, ChainSpec, B256};
use reth_provider::{BlockExecutionWriter, ProviderFactory, StageCheckpointReader};
use reth_provider::{BlockExecutionWriter, HeaderSyncMode, ProviderFactory, StageCheckpointReader};
use reth_stages::{
sets::DefaultStages,
stages::{
ExecutionStage, ExecutionStageThresholds, HeaderSyncMode, SenderRecoveryStage,
TotalDifficultyStage,
},
stages::{ExecutionStage, ExecutionStageThresholds, SenderRecoveryStage, TotalDifficultyStage},
Pipeline, StageSet,
};
use reth_tasks::TaskExecutor;
Expand Down Expand Up @@ -118,6 +115,7 @@ impl Command {
.with_tip_sender(tip_tx)
.add_stages(
DefaultStages::new(
ProviderFactory::new(db.clone(), self.chain.clone()),
header_mode,
Arc::clone(&consensus),
header_downloader,
Expand Down
57 changes: 23 additions & 34 deletions bin/reth/src/debug_cmd/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,53 +222,42 @@ impl Command {
None
};

execution_stage
.execute(
execution_stage.execute(
&provider_rw,
ExecInput {
target: Some(block),
checkpoint: block.checked_sub(1).map(StageCheckpoint::new),
},
)?;

let mut account_hashing_done = false;
while !account_hashing_done {
let output = account_hashing_stage.execute(
&provider_rw,
ExecInput {
target: Some(block),
checkpoint: block.checked_sub(1).map(StageCheckpoint::new),
checkpoint: progress.map(StageCheckpoint::new),
},
)
.await?;

let mut account_hashing_done = false;
while !account_hashing_done {
let output = account_hashing_stage
.execute(
&provider_rw,
ExecInput {
target: Some(block),
checkpoint: progress.map(StageCheckpoint::new),
},
)
.await?;
)?;
account_hashing_done = output.done;
}

let mut storage_hashing_done = false;
while !storage_hashing_done {
let output = storage_hashing_stage
.execute(
&provider_rw,
ExecInput {
target: Some(block),
checkpoint: progress.map(StageCheckpoint::new),
},
)
.await?;
storage_hashing_done = output.done;
}

let incremental_result = merkle_stage
.execute(
let output = storage_hashing_stage.execute(
&provider_rw,
ExecInput {
target: Some(block),
checkpoint: progress.map(StageCheckpoint::new),
},
)
.await;
)?;
storage_hashing_done = output.done;
}

let incremental_result = merkle_stage.execute(
&provider_rw,
ExecInput { target: Some(block), checkpoint: progress.map(StageCheckpoint::new) },
);

if incremental_result.is_err() {
tracing::warn!(target: "reth::cli", block, "Incremental calculation failed, retrying from scratch");
Expand All @@ -285,7 +274,7 @@ impl Command {

let clean_input = ExecInput { target: Some(block), checkpoint: None };
loop {
let clean_result = merkle_stage.execute(&provider_rw, clean_input).await;
let clean_result = merkle_stage.execute(&provider_rw, clean_input);
assert!(clean_result.is_ok(), "Clean state root calculation failed");
if clean_result.unwrap().done {
break
Expand Down
9 changes: 5 additions & 4 deletions bin/reth/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use reth_primitives::{
};
use reth_provider::{
providers::BlockchainProvider, BlockHashReader, BlockReader, CanonStateSubscriptions,
HeaderProvider, ProviderFactory, StageCheckpointReader,
HeaderProvider, HeaderSyncMode, ProviderFactory, StageCheckpointReader,
};
use reth_prune::{segments::SegmentSet, Pruner};
use reth_revm::Factory;
Expand All @@ -71,9 +71,9 @@ use reth_snapshot::HighestSnapshotsTracker;
use reth_stages::{
prelude::*,
stages::{
AccountHashingStage, ExecutionStage, ExecutionStageThresholds, HeaderSyncMode,
IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage,
StorageHashingStage, TotalDifficultyStage, TransactionLookupStage,
AccountHashingStage, ExecutionStage, ExecutionStageThresholds, IndexAccountHistoryStage,
IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage,
TotalDifficultyStage, TransactionLookupStage,
},
};
use reth_tasks::TaskExecutor;
Expand Down Expand Up @@ -896,6 +896,7 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
.with_metrics_tx(metrics_tx.clone())
.add_stages(
DefaultStages::new(
ProviderFactory::new(db.clone(), self.chain.clone()),
header_mode,
Arc::clone(&consensus),
header_downloader,
Expand Down
33 changes: 12 additions & 21 deletions bin/reth/src/stage/dump/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,14 @@ async fn unwind_and_copy<DB: Database>(

let mut exec_stage = ExecutionStage::new_with_factory(Factory::new(db_tool.chain.clone()));

exec_stage
.unwind(
&provider,
UnwindInput {
unwind_to: from,
checkpoint: StageCheckpoint::new(tip_block_number),
bad_block: None,
},
)
.await?;
exec_stage.unwind(
&provider,
UnwindInput {
unwind_to: from,
checkpoint: StageCheckpoint::new(tip_block_number),
bad_block: None,
},
)?;

let unwind_inner_tx = provider.into_tx();

Expand All @@ -131,20 +129,13 @@ async fn dry_run<DB: Database>(
info!(target: "reth::cli", "Executing stage. [dry-run]");

let factory = ProviderFactory::new(&output_db, chain.clone());
let provider = factory.provider_rw()?;
let mut exec_stage = ExecutionStage::new_with_factory(Factory::new(chain.clone()));

exec_stage
.execute(
&provider,
reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
},
)
.await?;
let input =
reth_stages::ExecInput { target: Some(to), checkpoint: Some(StageCheckpoint::new(from)) };
exec_stage.execute(&factory.provider_rw()?, input)?;

info!(target: "reth::cli", "Success.");
info!(target: "reth::cli", "Success");

Ok(())
}
44 changes: 19 additions & 25 deletions bin/reth/src/stage/dump/hashing_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub(crate) async fn dump_hashing_account_stage<DB: Database>(
tx.import_table_with_range::<tables::AccountChangeSet, _>(&db_tool.db.tx()?, Some(from), to)
})??;

unwind_and_copy(db_tool, from, tip_block_number, &output_db).await?;
unwind_and_copy(db_tool, from, tip_block_number, &output_db)?;

if should_run {
dry_run(db_tool.chain.clone(), output_db, to, from).await?;
Expand All @@ -32,7 +32,7 @@ pub(crate) async fn dump_hashing_account_stage<DB: Database>(
}

/// Dry-run an unwind to FROM block and copy the necessary table data to the new database.
async fn unwind_and_copy<DB: Database>(
fn unwind_and_copy<DB: Database>(
db_tool: &DbTool<'_, DB>,
from: u64,
tip_block_number: u64,
Expand All @@ -42,16 +42,14 @@ async fn unwind_and_copy<DB: Database>(
let provider = factory.provider_rw()?;
let mut exec_stage = AccountHashingStage::default();

exec_stage
.unwind(
&provider,
UnwindInput {
unwind_to: from,
checkpoint: StageCheckpoint::new(tip_block_number),
bad_block: None,
},
)
.await?;
exec_stage.unwind(
&provider,
UnwindInput {
unwind_to: from,
checkpoint: StageCheckpoint::new(tip_block_number),
bad_block: None,
},
)?;
let unwind_inner_tx = provider.into_tx();

output_db.update(|tx| tx.import_table::<tables::PlainAccountState, _>(&unwind_inner_tx))??;
Expand All @@ -70,23 +68,19 @@ async fn dry_run<DB: Database>(

let factory = ProviderFactory::new(&output_db, chain);
let provider = factory.provider_rw()?;
let mut exec_stage = AccountHashingStage {
let mut stage = AccountHashingStage {
clean_threshold: 1, // Forces hashing from scratch
..Default::default()
};

let mut exec_output = false;
while !exec_output {
exec_output = exec_stage
.execute(
&provider,
reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
},
)
.await?
.done;
loop {
let input = reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
};
if stage.execute(&provider, input)?.done {
break
}
}

info!(target: "reth::cli", "Success.");
Expand Down
44 changes: 19 additions & 25 deletions bin/reth/src/stage/dump/hashing_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub(crate) async fn dump_hashing_storage_stage<DB: Database>(
) -> Result<()> {
let (output_db, tip_block_number) = setup(from, to, output_db, db_tool)?;

unwind_and_copy(db_tool, from, tip_block_number, &output_db).await?;
unwind_and_copy(db_tool, from, tip_block_number, &output_db)?;

if should_run {
dry_run(db_tool.chain.clone(), output_db, to, from).await?;
Expand All @@ -27,7 +27,7 @@ pub(crate) async fn dump_hashing_storage_stage<DB: Database>(
}

/// Dry-run an unwind to FROM block and copy the necessary table data to the new database.
async fn unwind_and_copy<DB: Database>(
fn unwind_and_copy<DB: Database>(
db_tool: &DbTool<'_, DB>,
from: u64,
tip_block_number: u64,
Expand All @@ -38,16 +38,14 @@ async fn unwind_and_copy<DB: Database>(

let mut exec_stage = StorageHashingStage::default();

exec_stage
.unwind(
&provider,
UnwindInput {
unwind_to: from,
checkpoint: StageCheckpoint::new(tip_block_number),
bad_block: None,
},
)
.await?;
exec_stage.unwind(
&provider,
UnwindInput {
unwind_to: from,
checkpoint: StageCheckpoint::new(tip_block_number),
bad_block: None,
},
)?;
let unwind_inner_tx = provider.into_tx();

// TODO optimize we can actually just get the entries we need for both these tables
Expand All @@ -69,23 +67,19 @@ async fn dry_run<DB: Database>(

let factory = ProviderFactory::new(&output_db, chain);
let provider = factory.provider_rw()?;
let mut exec_stage = StorageHashingStage {
let mut stage = StorageHashingStage {
clean_threshold: 1, // Forces hashing from scratch
..Default::default()
};

let mut exec_output = false;
while !exec_output {
exec_output = exec_stage
.execute(
&provider,
reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
},
)
.await?
.done;
loop {
let input = reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
};
if stage.execute(&provider, input)?.done {
break
}
}

info!(target: "reth::cli", "Success.");
Expand Down
Loading

0 comments on commit db5d01e

Please sign in to comment.