Skip to content

Commit

Permalink
refactor: give Pipeline a database (paradigmxyz#2558)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse authored May 4, 2023
1 parent acbbd67 commit 3dd2778
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 135 deletions.
18 changes: 10 additions & 8 deletions bin/reth/src/chain/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use clap::{crate_version, Parser};
use eyre::Context;
use futures::{Stream, StreamExt};
use reth_beacon_consensus::BeaconConsensus;
use reth_db::mdbx::{Env, WriteMap};

use reth_db::database::Database;
use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder,
headers::reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::FileClient,
Expand Down Expand Up @@ -104,7 +105,7 @@ impl ImportCommand {
info!(target: "reth::cli", "Chain file imported");

let (mut pipeline, events) =
self.build_import_pipeline(config, db.clone(), &consensus, file_client).await?;
self.build_import_pipeline(config, db, &consensus, file_client).await?;

// override the tip
pipeline.set_tip(tip);
Expand All @@ -115,22 +116,23 @@ impl ImportCommand {
// Run pipeline
info!(target: "reth::cli", "Starting sync pipeline");
tokio::select! {
res = pipeline.run(db.clone()) => res?,
res = pipeline.run() => res?,
_ = tokio::signal::ctrl_c() => {},
};

info!(target: "reth::cli", "Finishing up");
Ok(())
}

async fn build_import_pipeline<C>(
async fn build_import_pipeline<DB, C>(
&self,
config: Config,
db: Arc<Env<WriteMap>>,
db: DB,
consensus: &Arc<C>,
file_client: Arc<FileClient>,
) -> eyre::Result<(Pipeline<Env<WriteMap>>, impl Stream<Item = NodeEvent>)>
) -> eyre::Result<(Pipeline<DB>, impl Stream<Item = NodeEvent>)>
where
DB: Database + Clone + Unpin + 'static,
C: Consensus + 'static,
{
if !file_client.has_canonical_blocks() {
Expand All @@ -142,7 +144,7 @@ impl ImportCommand {
.into_task();

let body_downloader = BodiesDownloaderBuilder::from(config.stages.bodies)
.build(file_client.clone(), consensus.clone(), db)
.build(file_client.clone(), consensus.clone(), db.clone())
.into_task();

let (tip_tx, tip_rx) = watch::channel(H256::zero());
Expand Down Expand Up @@ -171,7 +173,7 @@ impl ImportCommand {
})
.set(ExecutionStage::new(factory, config.stages.execution.commit_threshold)),
)
.build();
.build(db);

let events = pipeline.events().map(Into::into);

Expand Down
28 changes: 17 additions & 11 deletions bin/reth/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,22 +412,23 @@ impl Command {
}

/// Constructs a [Pipeline] that's wired to the network
async fn build_networked_pipeline<Client>(
async fn build_networked_pipeline<DB, Client>(
&self,
config: &mut Config,
network: NetworkHandle,
client: Client,
consensus: Arc<dyn Consensus>,
db: Arc<Env<WriteMap>>,
db: DB,
task_executor: &TaskExecutor,
) -> eyre::Result<Pipeline<Env<WriteMap>>>
) -> eyre::Result<Pipeline<DB>>
where
DB: Database + Unpin + Clone + 'static,
Client: HeadersClient + BodiesClient + Clone + 'static,
{
let max_block = if let Some(block) = self.debug.max_block {
Some(block)
} else if let Some(tip) = self.debug.tip {
Some(self.lookup_or_fetch_tip(db.clone(), &client, tip).await?)
Some(self.lookup_or_fetch_tip(&db, &client, tip).await?)
} else {
None
};
Expand All @@ -443,6 +444,7 @@ impl Command {

let pipeline = self
.build_pipeline(
db,
config,
header_downloader,
body_downloader,
Expand Down Expand Up @@ -541,13 +543,14 @@ impl Command {
/// If it doesn't exist, download the header and return the block number.
///
/// NOTE: The download is attempted with infinite retries.
async fn lookup_or_fetch_tip<Client>(
async fn lookup_or_fetch_tip<DB, Client>(
&self,
db: Arc<Env<WriteMap>>,
db: &DB,
client: Client,
tip: H256,
) -> Result<u64, reth_interfaces::Error>
where
DB: Database,
Client: HeadersClient,
{
Ok(self.fetch_tip(db, client, BlockHashOrNumber::Hash(tip)).await?.number)
Expand All @@ -556,13 +559,14 @@ impl Command {
/// Attempt to look up the block with the given number and return the header.
///
/// NOTE: The download is attempted with infinite retries.
async fn fetch_tip<Client>(
async fn fetch_tip<DB, Client>(
&self,
db: Arc<Env<WriteMap>>,
db: &DB,
client: Client,
tip: BlockHashOrNumber,
) -> Result<SealedHeader, reth_interfaces::Error>
where
DB: Database,
Client: HeadersClient,
{
let header = db.view(|tx| -> Result<Option<Header>, reth_db::Error> {
Expand Down Expand Up @@ -619,17 +623,19 @@ impl Command {
}

#[allow(clippy::too_many_arguments)]
async fn build_pipeline<H, B, U>(
async fn build_pipeline<DB, H, B, U>(
&self,
db: DB,
config: &Config,
header_downloader: H,
body_downloader: B,
updater: U,
consensus: Arc<dyn Consensus>,
max_block: Option<u64>,
continuous: bool,
) -> eyre::Result<Pipeline<Env<WriteMap>>>
) -> eyre::Result<Pipeline<DB>>
where
DB: Database + Clone + 'static,
H: HeaderDownloader + 'static,
B: BodyDownloader + 'static,
U: SyncStateUpdater + StatusUpdater + Clone + 'static,
Expand Down Expand Up @@ -687,7 +693,7 @@ impl Command {
.disable_if(MERKLE_UNWIND, || self.auto_mine)
.disable_if(MERKLE_EXECUTION, || self.auto_mine),
)
.build();
.build(db);

Ok(pipeline)
}
Expand Down
24 changes: 11 additions & 13 deletions crates/consensus/beacon/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use reth_tasks::TaskSpawner;
use schnellru::{ByLength, LruMap};
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::sync::{
Expand Down Expand Up @@ -141,7 +140,7 @@ where
BT: BlockchainTreeEngine,
{
/// The database handle.
db: Arc<DB>,
db: DB,
/// Task spawner for spawning the pipeline.
task_spawner: TS,
/// The current state of the pipeline.
Expand Down Expand Up @@ -184,7 +183,7 @@ where
{
/// Create a new instance of the [BeaconConsensusEngine].
pub fn new(
db: Arc<DB>,
db: DB,
task_spawner: TS,
pipeline: Pipeline<DB>,
blockchain_tree: BT,
Expand All @@ -210,7 +209,7 @@ where
/// the [BeaconEngineMessage] communication channel.
#[allow(clippy::too_many_arguments)]
pub fn with_channel(
db: Arc<DB>,
db: DB,
task_spawner: TS,
pipeline: Pipeline<DB>,
blockchain_tree: BT,
Expand Down Expand Up @@ -570,11 +569,10 @@ where
self.metrics.pipeline_runs.increment(1);
trace!(target: "consensus::engine", ?tip, continuous = tip.is_none(), "Starting the pipeline");
let (tx, rx) = oneshot::channel();
let db = self.db.clone();
self.task_spawner.spawn_critical_blocking(
"pipeline",
Box::pin(async move {
let result = pipeline.run_as_fut(db, tip).await;
let result = pipeline.run_as_fut(tip).await;
let _ = tx.send(result);
}),
);
Expand Down Expand Up @@ -768,7 +766,7 @@ enum PipelineTarget {
Safe,
}

/// Keeps track of invalid headerst.
/// Keeps track of invalid headers.
struct InvalidHeaderCache {
headers: LruMap<H256, Header>,
}
Expand Down Expand Up @@ -807,20 +805,20 @@ mod tests {
use reth_provider::{test_utils::TestExecutorFactory, Transaction};
use reth_stages::{test_utils::TestStages, ExecOutput, PipelineError, StageError};
use reth_tasks::TokioTaskExecutor;
use std::{collections::VecDeque, time::Duration};
use std::{collections::VecDeque, sync::Arc, time::Duration};
use tokio::sync::{
oneshot::{self, error::TryRecvError},
watch,
};

type TestBeaconConsensusEngine = BeaconConsensusEngine<
Env<WriteMap>,
Arc<Env<WriteMap>>,
TokioTaskExecutor,
ShareableBlockchainTree<Arc<Env<WriteMap>>, TestConsensus, TestExecutorFactory>,
>;

struct TestEnv<DB> {
db: Arc<DB>,
db: DB,
// Keep the tip receiver around, so it's not dropped.
#[allow(dead_code)]
tip_rx: watch::Receiver<H256>,
Expand All @@ -829,7 +827,7 @@ mod tests {

impl<DB> TestEnv<DB> {
fn new(
db: Arc<DB>,
db: DB,
tip_rx: watch::Receiver<H256>,
engine_handle: BeaconConsensusEngineHandle,
) -> Self {
Expand Down Expand Up @@ -883,7 +881,7 @@ mod tests {
chain_spec: Arc<ChainSpec>,
pipeline_exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
executor_results: Vec<PostState>,
) -> (TestBeaconConsensusEngine, TestEnv<Env<WriteMap>>) {
) -> (TestBeaconConsensusEngine, TestEnv<Arc<Env<WriteMap>>>) {
reth_tracing::init_test_tracing();
let db = create_test_rw_db();
let consensus = TestConsensus::default();
Expand All @@ -897,7 +895,7 @@ mod tests {
let pipeline = Pipeline::builder()
.add_stages(TestStages::new(pipeline_exec_outputs, Default::default()))
.with_tip_sender(tip_tx)
.build();
.build(db.clone());

// Setup blockchain tree
let externals = TreeExternals::new(db.clone(), consensus, executor_factory, chain_spec);
Expand Down
2 changes: 1 addition & 1 deletion crates/interfaces/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub trait SyncStateProvider: Send + Sync {
/// Eventually the node reaches the `Finish` stage and will transition to [`SyncState::Idle`], it
/// which point the node is considered fully synced.
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait SyncStateUpdater: Send + Sync + 'static {
pub trait SyncStateUpdater: std::fmt::Debug + Send + Sync + 'static {
/// Notifies about an [SyncState] update.
fn update_sync_state(&self, state: SyncState);
}
Expand Down
12 changes: 6 additions & 6 deletions crates/net/downloaders/src/bodies/bodies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub struct BodiesDownloader<B: BodiesClient, DB> {
consensus: Arc<dyn Consensus>,
// TODO: make this a [HeaderProvider]
/// The database handle
db: Arc<DB>,
db: DB,
/// The maximum number of non-empty blocks per one request
request_limit: u64,
/// The maximum number of block bodies returned at once from the stream
Expand All @@ -76,7 +76,7 @@ pub struct BodiesDownloader<B: BodiesClient, DB> {
impl<B, DB> BodiesDownloader<B, DB>
where
B: BodiesClient + 'static,
DB: Database,
DB: Database + Unpin + 'static,
{
/// Returns the next contiguous request.
fn next_headers_request(&mut self) -> DownloadResult<Option<Vec<SealedHeader>>> {
Expand Down Expand Up @@ -249,7 +249,7 @@ where
impl<B, DB> BodiesDownloader<B, DB>
where
B: BodiesClient + 'static,
DB: Database,
DB: Database + Unpin + 'static,
Self: BodyDownloader + 'static,
{
/// Spawns the downloader task via [tokio::task::spawn]
Expand All @@ -270,7 +270,7 @@ where
impl<B, DB> BodyDownloader for BodiesDownloader<B, DB>
where
B: BodiesClient + 'static,
DB: Database,
DB: Database + Unpin + 'static,
{
/// Set a new download range (exclusive).
///
Expand Down Expand Up @@ -315,7 +315,7 @@ where
impl<B, DB> Stream for BodiesDownloader<B, DB>
where
B: BodiesClient + 'static,
DB: Database,
DB: Database + Unpin + 'static,
{
type Item = BodyDownloaderResult;

Expand Down Expand Up @@ -497,7 +497,7 @@ impl BodiesDownloaderBuilder {
self,
client: B,
consensus: Arc<dyn Consensus>,
db: Arc<DB>,
db: DB,
) -> BodiesDownloader<B, DB>
where
B: BodiesClient + 'static,
Expand Down
8 changes: 4 additions & 4 deletions crates/stages/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
//! ```
//! # use std::sync::Arc;
//! # use reth_db::mdbx::test_utils::create_test_rw_db;
//! # use reth_db::mdbx::{Env, WriteMap};
//! # use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder;
//! # use reth_downloaders::headers::reverse_headers::ReverseHeadersDownloaderBuilder;
//! # use reth_interfaces::consensus::Consensus;
Expand All @@ -37,22 +36,23 @@
//! # Arc::new(TestHeadersClient::default()),
//! # consensus.clone()
//! # );
//! # let db = create_test_rw_db();
//! # let bodies_downloader = BodiesDownloaderBuilder::default().build(
//! # Arc::new(TestBodiesClient { responder: |_| Ok((PeerId::zero(), vec![]).into()) }),
//! # consensus.clone(),
//! # create_test_rw_db()
//! # db.clone()
//! # );
//! # let (tip_tx, tip_rx) = watch::channel(H256::default());
//! # let factory = Factory::new(Arc::new(MAINNET.clone()));
//! # let (status_updater, _) = TestStatusUpdater::new();
//! // Create a pipeline that can fully sync
//! # let pipeline: Pipeline<Env<WriteMap>> =
//! # let pipeline =
//! Pipeline::builder()
//! .with_tip_sender(tip_tx)
//! .add_stages(
//! DefaultStages::new(HeaderSyncMode::Tip(tip_rx), consensus, headers_downloader, bodies_downloader, status_updater, factory)
//! )
//! .build();
//! .build(db);
//! ```
mod error;
mod id;
Expand Down
Loading

0 comments on commit 3dd2778

Please sign in to comment.