Skip to content

Commit

Permalink
chore(engine): enable engine debug streams in new implementation (par…
Browse files Browse the repository at this point in the history
  • Loading branch information
rkrasiuk authored Aug 13, 2024
1 parent dfcd00f commit 8a802da
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 12 deletions.
2 changes: 1 addition & 1 deletion crates/engine/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ reth-tasks.workspace = true
# async
futures.workspace = true
pin-project.workspace = true
tokio-stream.workspace = true

# misc
thiserror.workspace = true
Expand All @@ -46,3 +45,4 @@ reth-primitives.workspace = true
reth-prune-types.workspace = true

tokio = { workspace = true, features = ["sync"] }
tokio-stream.workspace = true
11 changes: 7 additions & 4 deletions crates/engine/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ use std::{
sync::Arc,
task::{Context, Poll},
};
use tokio_stream::wrappers::UnboundedReceiverStream;

/// Alias for consensus engine stream.
type EngineMessageStream<T> = Pin<Box<dyn Stream<Item = BeaconEngineMessage<T>> + Send + Sync>>;

/// Alias for chain orchestrator.
type EngineServiceType<DB, Client, T> = ChainOrchestrator<
EngineHandler<
EngineApiRequestHandler<EngineApiRequest<T>>,
UnboundedReceiverStream<BeaconEngineMessage<T>>,
EngineMessageStream<T>,
BasicBlockDownloader<Client>,
>,
PipelineSync<DB>,
Expand Down Expand Up @@ -70,7 +72,7 @@ where
executor_factory: E,
chain_spec: Arc<ChainSpec>,
client: Client,
incoming_requests: UnboundedReceiverStream<BeaconEngineMessage<T>>,
incoming_requests: EngineMessageStream<T>,
pipeline: Pipeline<DB>,
pipeline_task_spawner: Box<dyn TaskSpawner>,
provider: ProviderFactory<DB>,
Expand Down Expand Up @@ -149,6 +151,7 @@ mod tests {
use reth_tasks::TokioTaskExecutor;
use std::sync::Arc;
use tokio::sync::{mpsc::unbounded_channel, watch};
use tokio_stream::wrappers::UnboundedReceiverStream;

#[test]
fn eth_chain_orchestrator_build() {
Expand Down Expand Up @@ -185,7 +188,7 @@ mod tests {
executor_factory,
chain_spec,
client,
incoming_requests,
Box::pin(incoming_requests),
pipeline,
pipeline_task_spawner,
provider_factory,
Expand Down
13 changes: 7 additions & 6 deletions crates/engine/util/src/reorg.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Stream wrapper that simulates reorgs.
use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt, TryFutureExt};
use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt};
use itertools::Either;
use reth_beacon_consensus::{BeaconEngineMessage, BeaconOnNewPayloadError, OnForkChoiceUpdated};
use reth_engine_primitives::EngineTypes;
Expand All @@ -26,6 +26,7 @@ use reth_rpc_types_compat::engine::payload::block_to_payload;
use revm_primitives::{BlockEnv, CfgEnvWithHandlerCfg, EVMError, EnvWithHandlerCfg};
use std::{
collections::VecDeque,
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};
Expand All @@ -43,6 +44,8 @@ type EngineReorgResponse = Result<
oneshot::error::RecvError,
>;

type ReorgResponseFut = Pin<Box<dyn Future<Output = EngineReorgResponse> + Send + Sync>>;

/// Engine API stream wrapper that simulates reorgs with specified frequency.
#[derive(Debug)]
#[pin_project::pin_project]
Expand All @@ -66,7 +69,7 @@ pub struct EngineReorg<S, Engine: EngineTypes, Provider, Evm> {
/// Last forkchoice state.
last_forkchoice_state: Option<ForkchoiceState>,
/// Pending engine responses to reorg messages.
reorg_responses: FuturesUnordered<BoxFuture<'static, EngineReorgResponse>>,
reorg_responses: FuturesUnordered<ReorgResponseFut>,
}

impl<S, Engine: EngineTypes, Provider, Evm> EngineReorg<S, Engine, Provider, Evm> {
Expand Down Expand Up @@ -181,10 +184,8 @@ where
let (reorg_payload_tx, reorg_payload_rx) = oneshot::channel();
let (reorg_fcu_tx, reorg_fcu_rx) = oneshot::channel();
this.reorg_responses.extend([
Box::pin(reorg_payload_rx.map_ok(Either::Left))
as BoxFuture<'static, EngineReorgResponse>,
Box::pin(reorg_fcu_rx.map_ok(Either::Right))
as BoxFuture<'static, EngineReorgResponse>,
Box::pin(reorg_payload_rx.map_ok(Either::Left)) as ReorgResponseFut,
Box::pin(reorg_fcu_rx.map_ok(Either::Right)) as ReorgResponseFut,
]);

*this.state = EngineReorgState::Reorg {
Expand Down
18 changes: 17 additions & 1 deletion crates/node/builder/src/launch/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use reth_engine_tree::{
engine::{EngineApiRequest, EngineRequestHandler},
tree::TreeConfig,
};
use reth_engine_util::EngineMessageStreamExt;
use reth_exex::ExExManagerHandle;
use reth_network::{NetworkSyncUpdater, SyncState};
use reth_network_api::{BlockDownloaderProvider, NetworkEventListenerProvider};
Expand Down Expand Up @@ -133,6 +134,21 @@ where
let network_client = ctx.components().network().fetch_client().await?;
let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();

let node_config = ctx.node_config();
let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
.maybe_skip_fcu(node_config.debug.skip_fcu)
.maybe_skip_new_payload(node_config.debug.skip_new_payload)
.maybe_reorg(
ctx.blockchain_db().clone(),
ctx.components().evm_config().clone(),
reth_payload_validator::ExecutionPayloadValidator::new(ctx.chain_spec()),
node_config.debug.reorg_frequency,
)
// Store messages _after_ skipping so that `replay-engine` command
// would replay only the messages that were observed by the engine
// during this run.
.maybe_store_messages(node_config.debug.engine_api_store.clone());

let max_block = ctx.max_block(network_client.clone()).await?;
let mut hooks = EngineHooks::new();

Expand Down Expand Up @@ -179,7 +195,7 @@ where
ctx.components().block_executor().clone(),
ctx.chain_spec(),
network_client.clone(),
UnboundedReceiverStream::new(consensus_engine_rx),
Box::pin(consensus_engine_stream),
pipeline,
Box::new(ctx.task_executor().clone()),
ctx.provider_factory().clone(),
Expand Down

0 comments on commit 8a802da

Please sign in to comment.