Skip to content

Commit

Permalink
feat: canonical state for local engine (paradigmxyz#11245)
Browse files Browse the repository at this point in the history
  • Loading branch information
greged93 authored Sep 27, 2024
1 parent 5706e03 commit 136a822
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 49 deletions.
1 change: 1 addition & 0 deletions crates/engine/local/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ exclude.workspace = true
[dependencies]
# reth
reth-beacon-consensus.workspace = true
reth-chain-state.workspace = true
reth-engine-tree.workspace = true
reth-node-types.workspace = true
reth-payload-builder.workspace = true
Expand Down
162 changes: 113 additions & 49 deletions crates/engine/local/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
//! building at a fixed interval.
use crate::miner::MiningMode;
use alloy_primitives::B256;
use eyre::eyre;
use reth_beacon_consensus::EngineNodeTypes;
use reth_chain_state::{CanonicalInMemoryState, ExecutedBlock, NewCanonicalChain};
use reth_engine_tree::persistence::PersistenceHandle;
use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_primitives::{
Expand All @@ -17,12 +18,12 @@ use reth_payload_primitives::{
use reth_provider::ProviderFactory;
use reth_prune::PrunerWithFactory;
use reth_stages_api::MetricEventsSender;
use std::fmt::Formatter;
use tokio::sync::oneshot;
use tracing::debug;

/// Provides a local dev service engine that can be used to drive the
/// chain forward.
#[derive(Debug)]
pub struct LocalEngineService<N, B>
where
N: EngineNodeTypes,
Expand All @@ -32,30 +33,14 @@ where
payload_builder: PayloadBuilderHandle<N::Engine>,
/// The payload attribute builder for the engine
payload_attributes_builder: B,
/// Keep track of the Canonical chain state that isn't persisted on disk yet
canonical_in_memory_state: CanonicalInMemoryState,
/// A handle to the persistence layer
persistence_handle: PersistenceHandle,
/// The hash of the current head
head: B256,
/// The mining mode for the engine
mode: MiningMode,
}

impl<N, B> std::fmt::Debug for LocalEngineService<N, B>
where
N: EngineNodeTypes,
B: PayloadAttributesBuilder<PayloadAttributes = <N::Engine as PayloadTypes>::PayloadAttributes>,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LocalEngineService")
.field("payload_builder", &self.payload_builder)
.field("payload_attributes_builder", &self.payload_attributes_builder)
.field("persistence_handle", &self.persistence_handle)
.field("head", &self.head)
.field("mode", &self.mode)
.finish()
}
}

impl<N, B> LocalEngineService<N, B>
where
N: EngineNodeTypes,
Expand All @@ -67,14 +52,20 @@ where
payload_attributes_builder: B,
provider: ProviderFactory<N>,
pruner: PrunerWithFactory<ProviderFactory<N>>,
canonical_in_memory_state: CanonicalInMemoryState,
sync_metrics_tx: MetricEventsSender,
head: B256,
mode: MiningMode,
) -> Self {
let persistence_handle =
PersistenceHandle::spawn_service(provider, pruner, sync_metrics_tx);

Self { payload_builder, payload_attributes_builder, persistence_handle, head, mode }
Self {
payload_builder,
payload_attributes_builder,
canonical_in_memory_state,
persistence_handle,
mode,
}
}

/// Spawn the [`LocalEngineService`] on a tokio green thread. The service will poll the payload
Expand All @@ -86,17 +77,17 @@ where
payload_attributes_builder: B,
provider: ProviderFactory<N>,
pruner: PrunerWithFactory<ProviderFactory<N>>,
canonical_in_memory_state: CanonicalInMemoryState,
sync_metrics_tx: MetricEventsSender,
head: B256,
mode: MiningMode,
) {
let engine = Self::new(
payload_builder,
payload_attributes_builder,
provider,
pruner,
canonical_in_memory_state,
sync_metrics_tx,
head,
mode,
);

Expand All @@ -112,26 +103,29 @@ where
(&mut self.mode).await;

// Start a new payload building job
let new_head = self.build_and_save_payload().await;
let executed_block = self.build_and_save_payload().await;

if new_head.is_err() {
debug!(target: "local_engine", err = ?new_head.unwrap_err(), "failed payload building");
if executed_block.is_err() {
debug!(target: "local_engine", err = ?executed_block.unwrap_err(), "failed payload building");
continue
}
let block = executed_block.expect("not error");

// Update the head
self.head = new_head.expect("not error");
let res = self.update_canonical_in_memory_state(block);
if res.is_err() {
debug!(target: "local_engine", err = ?res.unwrap_err(), "failed canonical state update");
}
}
}

/// Builds a payload by initiating a new payload job via the [`PayloadBuilderHandle`],
/// saving the execution outcome to persistence and returning the current head of the
/// chain.
async fn build_and_save_payload(&self) -> eyre::Result<B256> {
/// saving the execution outcome to persistence and returning the executed block.
async fn build_and_save_payload(&self) -> eyre::Result<ExecutedBlock> {
let payload_attributes = self.payload_attributes_builder.build()?;
let parent = self.canonical_in_memory_state.get_canonical_head().hash();
let payload_builder_attributes =
<N::Engine as PayloadTypes>::PayloadBuilderAttributes::try_new(
self.head,
parent,
payload_attributes,
)
.map_err(|_| eyre::eyre!("failed to fetch payload attributes"))?;
Expand All @@ -142,22 +136,38 @@ where
.await?
.await?;

let block = payload.executed_block().map(|block| vec![block]).unwrap_or_default();
let executed_block =
payload.executed_block().ok_or_else(|| eyre!("missing executed block"))?;
let (tx, rx) = oneshot::channel();

let _ = self.persistence_handle.save_blocks(block, tx);
let _ = self.persistence_handle.save_blocks(vec![executed_block.clone()], tx);

// Wait for the persistence_handle to complete
let new_head = rx.await?.ok_or_else(|| eyre::eyre!("missing new head"))?;
let _ = rx.await?.ok_or_else(|| eyre!("missing new head"))?;

Ok(executed_block)
}

/// Update the canonical in memory state and send notification for a new canon state to
/// all the listeners.
fn update_canonical_in_memory_state(&self, executed_block: ExecutedBlock) -> eyre::Result<()> {
let chain = NewCanonicalChain::Commit { new: vec![executed_block] };
let tip = chain.tip().header.clone();
let notification = chain.to_chain_notification();

Ok(new_head.hash)
// Update the tracked in-memory state with the new chain
self.canonical_in_memory_state.update_chain(chain);
self.canonical_in_memory_state.set_canonical_head(tip);

// Sends an event to all active listeners about the new canonical chain
self.canonical_in_memory_state.notify_canon_state(notification);
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use alloy_primitives::B256;
use reth_chainspec::MAINNET;
use reth_config::PruneConfig;
use reth_db::test_utils::{create_test_rw_db, create_test_static_files_dir};
Expand Down Expand Up @@ -201,34 +211,38 @@ mod tests {
let provider = ProviderFactory::<NodeTypesWithDBAdapter<TestNode, _>>::new(
create_test_rw_db(),
MAINNET.clone(),
StaticFileProvider::read_write(static_dir_path).unwrap(),
StaticFileProvider::read_write(static_dir_path)?,
);
let pruner = PrunerBuilder::new(PruneConfig::default())
.build_with_provider_factory(provider.clone());

// Create an empty canonical in memory state
let canonical_in_memory_state = CanonicalInMemoryState::empty();

// Start the payload builder service
let payload_handle = spawn_test_payload_service::<EthEngineTypes>();

// Sync metric channel
let (sync_metrics_tx, _) = unbounded_channel();

// Get the attributes for start of block building
let genesis_hash = B256::random();

// Launch the LocalEngineService in interval mode
let period = Duration::from_secs(1);
LocalEngineService::spawn_new(
payload_handle,
TestPayloadAttributesBuilder,
provider.clone(),
pruner,
canonical_in_memory_state,
sync_metrics_tx,
genesis_hash,
MiningMode::interval(period),
);

// Check that we have no block for now
let block = provider.block_by_number(0)?;
assert!(block.is_none());

// Wait 4 intervals
tokio::time::sleep(4 * period).await;
tokio::time::sleep(2 * period).await;

// Assert a block has been build
let block = provider.block_by_number(0)?;
Expand All @@ -246,11 +260,14 @@ mod tests {
let provider = ProviderFactory::<NodeTypesWithDBAdapter<TestNode, _>>::new(
create_test_rw_db(),
MAINNET.clone(),
StaticFileProvider::read_write(static_dir_path).unwrap(),
StaticFileProvider::read_write(static_dir_path)?,
);
let pruner = PrunerBuilder::new(PruneConfig::default())
.build_with_provider_factory(provider.clone());

// Create an empty canonical in memory state
let canonical_in_memory_state = CanonicalInMemoryState::empty();

// Start the payload builder service
let payload_handle = spawn_test_payload_service::<EthEngineTypes>();

Expand All @@ -260,17 +277,14 @@ mod tests {
// Sync metric channel
let (sync_metrics_tx, _) = unbounded_channel();

// Get the attributes for start of block building
let genesis_hash = B256::random();

// Launch the LocalEngineService in instant mode
LocalEngineService::spawn_new(
payload_handle,
TestPayloadAttributesBuilder,
provider.clone(),
pruner,
canonical_in_memory_state,
sync_metrics_tx,
genesis_hash,
MiningMode::instant(pool.clone()),
);

Expand All @@ -295,4 +309,54 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_canonical_chain_subscription() -> eyre::Result<()> {
reth_tracing::init_test_tracing();

// Start the provider and the pruner
let (_, static_dir_path) = create_test_static_files_dir();
let provider = ProviderFactory::<NodeTypesWithDBAdapter<TestNode, _>>::new(
create_test_rw_db(),
MAINNET.clone(),
StaticFileProvider::read_write(static_dir_path)?,
);
let pruner = PrunerBuilder::new(PruneConfig::default())
.build_with_provider_factory(provider.clone());

// Create an empty canonical in memory state
let canonical_in_memory_state = CanonicalInMemoryState::empty();
let mut notifications = canonical_in_memory_state.subscribe_canon_state();

// Start the payload builder service
let payload_handle = spawn_test_payload_service::<EthEngineTypes>();

// Start a transaction pool
let pool = testing_pool();

// Sync metric channel
let (sync_metrics_tx, _) = unbounded_channel();

// Launch the LocalEngineService in instant mode
LocalEngineService::spawn_new(
payload_handle,
TestPayloadAttributesBuilder,
provider.clone(),
pruner,
canonical_in_memory_state,
sync_metrics_tx,
MiningMode::instant(pool.clone()),
);

// Add a transaction to the pool
let transaction = MockTransaction::legacy().with_gas_price(10);
pool.add_transaction(Default::default(), transaction).await?;

// Check a notification is received for block 0
let res = notifications.recv().await?;

assert_eq!(res.tip().number, 0);

Ok(())
}
}

0 comments on commit 136a822

Please sign in to comment.