Skip to content

Commit

Permalink
feat: make downloaders and clients generic over block parts (paradigm…
Browse files Browse the repository at this point in the history
…xyz#12469)

Co-authored-by: Matthias Seitz <[email protected]>
  • Loading branch information
klkvr and mattsse authored Nov 12, 2024
1 parent 3a337cd commit aece53a
Show file tree
Hide file tree
Showing 60 changed files with 631 additions and 409 deletions.
6 changes: 5 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/reth/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ reth-ethereum-cli.workspace = true
reth-chainspec.workspace = true
reth-config.workspace = true
reth-primitives.workspace = true
reth-primitives-traits.workspace = true
reth-fs-util.workspace = true
reth-db = { workspace = true, features = ["mdbx"] }
reth-db-api.workspace = true
Expand Down
11 changes: 7 additions & 4 deletions bin/reth/src/commands/debug_cmd/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use reth_downloaders::{
use reth_exex::ExExManagerHandle;
use reth_network::{BlockDownloaderProvider, NetworkEventListenerProvider, NetworkHandle};
use reth_network_api::NetworkInfo;
use reth_network_p2p::{headers::client::HeadersClient, BlockClient};
use reth_network_p2p::{headers::client::HeadersClient, EthBlockClient};
use reth_node_api::{NodeTypesWithDB, NodeTypesWithDBAdapter, NodeTypesWithEngine};
use reth_node_ethereum::EthExecutorProvider;
use reth_provider::{
Expand Down Expand Up @@ -68,7 +68,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
static_file_producer: StaticFileProducer<ProviderFactory<N>>,
) -> eyre::Result<Pipeline<N>>
where
Client: BlockClient + 'static,
Client: EthBlockClient + 'static,
{
// building network downloaders using the fetch client
let header_downloader = ReverseHeadersDownloaderBuilder::new(config.stages.headers)
Expand Down Expand Up @@ -137,11 +137,14 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
Ok(network)
}

async fn fetch_block_hash<Client: HeadersClient>(
async fn fetch_block_hash<Client>(
&self,
client: Client,
block: BlockNumber,
) -> eyre::Result<B256> {
) -> eyre::Result<B256>
where
Client: HeadersClient<Header: reth_primitives_traits::BlockHeader>,
{
info!(target: "reth::cli", ?block, "Fetching block from the network.");
loop {
match get_single_header(&client, BlockHashOrNumber::Number(block)).await {
Expand Down
4 changes: 3 additions & 1 deletion bin/reth/src/commands/debug_cmd/in_memory_merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
use alloy_eips::BlockHashOrNumber;
use backon::{ConstantBuilder, Retryable};
use clap::Parser;
use reth_beacon_consensus::EthBeaconConsensus;
use reth_chainspec::ChainSpec;
use reth_cli::chainspec::ChainSpecParser;
use reth_cli_commands::common::{AccessRights, Environment, EnvironmentArgs};
Expand Down Expand Up @@ -124,7 +125,8 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {

let client = fetch_client.clone();
let chain = provider_factory.chain_spec();
let block = (move || get_single_body(client.clone(), Arc::clone(&chain), header.clone()))
let consensus = Arc::new(EthBeaconConsensus::new(chain.clone()));
let block = (move || get_single_body(client.clone(), header.clone(), consensus.clone()))
.retry(backoff)
.notify(
|err, _| warn!(target: "reth::cli", "Error requesting body: {err}. Retrying..."),
Expand Down
8 changes: 4 additions & 4 deletions crates/consensus/beacon/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use reth_engine_primitives::{EngineApiMessageVersion, EngineTypes, PayloadTypes}
use reth_errors::{BlockValidationError, ProviderResult, RethError, RethResult};
use reth_network_p2p::{
sync::{NetworkSyncUpdater, SyncState},
BlockClient,
EthBlockClient,
};
use reth_node_types::NodeTypesWithEngine;
use reth_payload_builder::PayloadBuilderHandle;
Expand Down Expand Up @@ -174,7 +174,7 @@ type PendingForkchoiceUpdate<PayloadAttributes> =
pub struct BeaconConsensusEngine<N, BT, Client>
where
N: EngineNodeTypes,
Client: BlockClient,
Client: EthBlockClient,
BT: BlockchainTreeEngine
+ BlockReader
+ BlockIdReader
Expand Down Expand Up @@ -237,7 +237,7 @@ where
+ StageCheckpointReader
+ ChainSpecProvider<ChainSpec = N::ChainSpec>
+ 'static,
Client: BlockClient + 'static,
Client: EthBlockClient + 'static,
{
/// Create a new instance of the [`BeaconConsensusEngine`].
#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -1799,7 +1799,7 @@ where
impl<N, BT, Client> Future for BeaconConsensusEngine<N, BT, Client>
where
N: EngineNodeTypes,
Client: BlockClient + 'static,
Client: EthBlockClient + 'static,
BT: BlockchainTreeEngine
+ BlockReader
+ BlockIdReader
Expand Down
8 changes: 4 additions & 4 deletions crates/consensus/beacon/src/engine/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use alloy_primitives::{BlockNumber, B256};
use futures::FutureExt;
use reth_network_p2p::{
full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient},
BlockClient,
EthBlockClient,
};
use reth_primitives::SealedBlock;
use reth_provider::providers::ProviderNodeTypes;
Expand All @@ -34,7 +34,7 @@ use tracing::trace;
pub(crate) struct EngineSyncController<N, Client>
where
N: ProviderNodeTypes,
Client: BlockClient,
Client: EthBlockClient,
{
/// A downloader that can download full blocks from the network.
full_block_client: FullBlockClient<Client>,
Expand Down Expand Up @@ -64,7 +64,7 @@ where
impl<N, Client> EngineSyncController<N, Client>
where
N: ProviderNodeTypes,
Client: BlockClient + 'static,
Client: EthBlockClient + 'static,
{
/// Create a new instance
pub(crate) fn new(
Expand Down Expand Up @@ -522,7 +522,7 @@ mod tests {
) -> EngineSyncController<N, Either<Client, TestFullBlockClient>>
where
N: ProviderNodeTypes,
Client: BlockClient + 'static,
Client: EthBlockClient + 'static,
{
let client = self
.client
Expand Down
12 changes: 7 additions & 5 deletions crates/consensus/beacon/src/engine/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use reth_ethereum_engine_primitives::EthEngineTypes;
use reth_evm::{either::Either, test_utils::MockExecutorProvider};
use reth_evm_ethereum::execute::EthExecutorProvider;
use reth_exex_types::FinishedExExHeight;
use reth_network_p2p::{sync::NoopSyncStateUpdater, test_utils::NoopFullBlockClient, BlockClient};
use reth_network_p2p::{
sync::NoopSyncStateUpdater, test_utils::NoopFullBlockClient, EthBlockClient,
};
use reth_payload_builder::test_utils::spawn_test_payload_service;
use reth_primitives::SealedHeader;
use reth_provider::{
Expand Down Expand Up @@ -237,7 +239,7 @@ impl TestConsensusEngineBuilder {
client: Client,
) -> NetworkedTestConsensusEngineBuilder<Client>
where
Client: BlockClient + 'static,
Client: EthBlockClient + 'static,
{
NetworkedTestConsensusEngineBuilder { base_config: self, client: Some(client) }
}
Expand All @@ -264,7 +266,7 @@ pub struct NetworkedTestConsensusEngineBuilder<Client> {

impl<Client> NetworkedTestConsensusEngineBuilder<Client>
where
Client: BlockClient + 'static,
Client: EthBlockClient + 'static,
{
/// Set the pipeline execution outputs to use for the test consensus engine.
#[allow(dead_code)]
Expand Down Expand Up @@ -319,7 +321,7 @@ where
client: ClientType,
) -> NetworkedTestConsensusEngineBuilder<ClientType>
where
ClientType: BlockClient + 'static,
ClientType: EthBlockClient + 'static,
{
NetworkedTestConsensusEngineBuilder { base_config: self.base_config, client: Some(client) }
}
Expand Down Expand Up @@ -450,7 +452,7 @@ pub fn spawn_consensus_engine<Client>(
engine: TestBeaconConsensusEngine<Client>,
) -> oneshot::Receiver<Result<(), BeaconConsensusEngineError>>
where
Client: BlockClient + 'static,
Client: EthBlockClient + 'static,
{
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
Expand Down
8 changes: 4 additions & 4 deletions crates/engine/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub use reth_engine_tree::{
engine::EngineApiEvent,
};
use reth_evm::execute::BlockExecutorProvider;
use reth_network_p2p::BlockClient;
use reth_network_p2p::EthBlockClient;
use reth_node_types::NodeTypesWithEngine;
use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_validator::ExecutionPayloadValidator;
Expand Down Expand Up @@ -49,7 +49,7 @@ type EngineServiceType<N, Client> = ChainOrchestrator<
pub struct EngineService<N, Client, E>
where
N: EngineNodeTypes,
Client: BlockClient + 'static,
Client: EthBlockClient + 'static,
E: BlockExecutorProvider + 'static,
{
orchestrator: EngineServiceType<N, Client>,
Expand All @@ -59,7 +59,7 @@ where
impl<N, Client, E> EngineService<N, Client, E>
where
N: EngineNodeTypes,
Client: BlockClient + 'static,
Client: EthBlockClient + 'static,
E: BlockExecutorProvider + 'static,
{
/// Constructor for `EngineService`.
Expand Down Expand Up @@ -124,7 +124,7 @@ where
impl<N, Client, E> Stream for EngineService<N, Client, E>
where
N: EngineNodeTypes,
Client: BlockClient + 'static,
Client: EthBlockClient + 'static,
E: BlockExecutorProvider + 'static,
{
type Item = ChainEvent<BeaconConsensusEngineEvent>;
Expand Down
12 changes: 8 additions & 4 deletions crates/engine/tree/src/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ use futures::FutureExt;
use reth_consensus::Consensus;
use reth_network_p2p::{
full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient},
BlockClient,
BlockClient, EthBlockClient,
};
use reth_primitives::{SealedBlock, SealedBlockWithSenders};
use std::{
cmp::{Ordering, Reverse},
collections::{binary_heap::PeekMut, BinaryHeap, HashSet, VecDeque},
fmt::Debug,
sync::Arc,
task::{Context, Poll},
};
Expand Down Expand Up @@ -72,10 +73,13 @@ where

impl<Client> BasicBlockDownloader<Client>
where
Client: BlockClient + 'static,
Client: EthBlockClient + 'static,
{
/// Create a new instance
pub fn new(client: Client, consensus: Arc<dyn Consensus>) -> Self {
pub fn new(
client: Client,
consensus: Arc<dyn Consensus<Client::Header, Client::Body>>,
) -> Self {
Self {
full_block_client: FullBlockClient::new(client, consensus),
inflight_full_block_requests: Vec::new(),
Expand Down Expand Up @@ -182,7 +186,7 @@ where

impl<Client> BlockDownloader for BasicBlockDownloader<Client>
where
Client: BlockClient + 'static,
Client: EthBlockClient,
{
/// Handles incoming download actions.
fn on_action(&mut self, action: DownloadAction) {
Expand Down
1 change: 1 addition & 0 deletions crates/net/downloaders/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ reth-db-api = { workspace = true, optional = true }
reth-testing-utils = { workspace = true, optional = true }

# ethereum
alloy-consensus.workspace = true
alloy-eips.workspace = true
alloy-primitives.workspace = true
alloy-rlp.workspace = true
Expand Down
Loading

0 comments on commit aece53a

Please sign in to comment.