Skip to content

Commit

Permalink
feat(exex): commit only notifications with unfinalized blocks to WAL (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin authored Oct 10, 2024
1 parent 8a11830 commit 90cb362
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 37 deletions.
124 changes: 95 additions & 29 deletions crates/exex/exex/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ use tokio_util::sync::{PollSendError, PollSender, ReusableBoxFuture};
/// or 17 minutes of 1-second blocks.
pub const DEFAULT_EXEX_MANAGER_CAPACITY: usize = 1024;

/// The source of the notification.
///
/// This distinguishment is needed to not commit any pipeline notificatations to [WAL](`Wal`),
/// because they are already finalized.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExExNotificationSource {
/// The notification was sent from the pipeline.
Pipeline,
/// The notification was sent from the blockchain tree.
BlockchainTree,
}

/// Metrics for an `ExEx`.
#[derive(Metrics)]
#[metrics(scope = "exex")]
Expand Down Expand Up @@ -197,7 +209,7 @@ pub struct ExExManager<P> {
exex_handles: Vec<ExExHandle>,

/// [`ExExNotification`] channel from the [`ExExManagerHandle`]s.
handle_rx: UnboundedReceiver<ExExNotification>,
handle_rx: UnboundedReceiver<(ExExNotificationSource, ExExNotification)>,

/// The minimum notification ID currently present in the buffer.
min_id: usize,
Expand Down Expand Up @@ -429,14 +441,23 @@ where

// Drain handle notifications
while this.buffer.len() < this.max_capacity {
if let Poll::Ready(Some(notification)) = this.handle_rx.poll_recv(cx) {
debug!(
target: "exex::manager",
committed_tip = ?notification.committed_chain().map(|chain| chain.tip().number),
reverted_tip = ?notification.reverted_chain().map(|chain| chain.tip().number),
"Received new notification"
);
this.wal.commit(&notification)?;
if let Poll::Ready(Some((source, notification))) = this.handle_rx.poll_recv(cx) {
let committed_tip = notification.committed_chain().map(|chain| chain.tip().number);
let reverted_tip = notification.reverted_chain().map(|chain| chain.tip().number);
debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Received new notification");

// Commit to WAL only notifications from blockchain tree. Pipeline notifications
// always contain only finalized blocks.
match source {
ExExNotificationSource::BlockchainTree => {
debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Committing notification to WAL");
this.wal.commit(&notification)?;
}
ExExNotificationSource::Pipeline => {
debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Notification was sent from pipeline, skipping WAL commit");
}
}

this.push_notification(notification);
continue
}
Expand Down Expand Up @@ -491,7 +512,7 @@ where
#[derive(Debug)]
pub struct ExExManagerHandle {
/// Channel to send notifications to the `ExEx` manager.
exex_tx: UnboundedSender<ExExNotification>,
exex_tx: UnboundedSender<(ExExNotificationSource, ExExNotification)>,
/// The number of `ExEx`'s running on the node.
num_exexs: usize,
/// A watch channel denoting whether the manager is ready for new notifications or not.
Expand Down Expand Up @@ -533,8 +554,12 @@ impl ExExManagerHandle {
/// Synchronously send a notification over the channel to all execution extensions.
///
/// Senders should call [`Self::has_capacity`] first.
pub fn send(&self, notification: ExExNotification) -> Result<(), SendError<ExExNotification>> {
self.exex_tx.send(notification)
pub fn send(
&self,
source: ExExNotificationSource,
notification: ExExNotification,
) -> Result<(), SendError<(ExExNotificationSource, ExExNotification)>> {
self.exex_tx.send((source, notification))
}

/// Asynchronously send a notification over the channel to all execution extensions.
Expand All @@ -543,10 +568,11 @@ impl ExExManagerHandle {
/// capacity in the channel, the future will wait.
pub async fn send_async(
&mut self,
source: ExExNotificationSource,
notification: ExExNotification,
) -> Result<(), SendError<ExExNotification>> {
) -> Result<(), SendError<(ExExNotificationSource, ExExNotification)>> {
self.ready().await;
self.exex_tx.send(notification)
self.exex_tx.send((source, notification))
}

/// Get the current capacity of the `ExEx` manager's internal notification buffer.
Expand Down Expand Up @@ -610,16 +636,16 @@ impl Clone for ExExManagerHandle {
mod tests {
use super::*;
use alloy_primitives::B256;
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use rand::Rng;
use reth_db_common::init::init_genesis;
use reth_evm_ethereum::execute::EthExecutorProvider;
use reth_primitives::SealedBlockWithSenders;
use reth_provider::{
providers::BlockchainProvider2, test_utils::create_test_provider_factory, BlockReader,
Chain, TransactionVariant,
BlockWriter, Chain, DatabaseProviderFactory, TransactionVariant,
};
use reth_testing_utils::generators;
use reth_testing_utils::generators::{self, random_block, BlockParams};

fn empty_finalized_header_stream() -> ForkChoiceStream<SealedHeader> {
let (tx, rx) = watch::channel(None);
Expand Down Expand Up @@ -959,9 +985,21 @@ mod tests {
};

// Send notifications to go over the max capacity
exex_manager.handle.exex_tx.send(notification.clone()).unwrap();
exex_manager.handle.exex_tx.send(notification.clone()).unwrap();
exex_manager.handle.exex_tx.send(notification).unwrap();
exex_manager
.handle
.exex_tx
.send((ExExNotificationSource::BlockchainTree, notification.clone()))
.unwrap();
exex_manager
.handle
.exex_tx
.send((ExExNotificationSource::BlockchainTree, notification.clone()))
.unwrap();
exex_manager
.handle
.exex_tx
.send((ExExNotificationSource::BlockchainTree, notification))
.unwrap();

// Pin the ExExManager to call the poll method
let mut pinned_manager = std::pin::pin!(exex_manager);
Expand Down Expand Up @@ -1177,6 +1215,18 @@ mod tests {
.sealed_block_with_senders(genesis_hash.into(), TransactionVariant::NoHash)
.unwrap()
.ok_or_else(|| eyre::eyre!("genesis block not found"))?;

let block = random_block(
&mut rng,
genesis_block.number + 1,
BlockParams { parent: Some(genesis_hash), ..Default::default() },
)
.seal_with_senders()
.unwrap();
let provider_rw = provider_factory.database_provider_rw().unwrap();
provider_rw.insert_block(block.clone()).unwrap();
provider_rw.commit().unwrap();

let provider = BlockchainProvider2::new(provider_factory).unwrap();

let temp_dir = tempfile::tempdir().unwrap();
Expand All @@ -1190,33 +1240,49 @@ mod tests {
wal.handle(),
);

let notification = ExExNotification::ChainCommitted {
let genesis_notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![genesis_block.clone()], Default::default(), None)),
};
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![block.clone()], Default::default(), None)),
};

let (finalized_headers_tx, rx) = watch::channel(None);
finalized_headers_tx.send(Some(genesis_block.header.clone()))?;
let finalized_header_stream = ForkChoiceStream::new(rx);

let mut exex_manager = std::pin::pin!(ExExManager::new(
provider,
vec![exex_handle],
1,
2,
wal,
finalized_header_stream
));

let mut cx = Context::from_waker(futures::task::noop_waker_ref());

exex_manager.handle().send(notification.clone())?;
exex_manager
.handle()
.send(ExExNotificationSource::Pipeline, genesis_notification.clone())?;
exex_manager.handle().send(ExExNotificationSource::BlockchainTree, notification.clone())?;

assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
assert_eq!(notifications.next().await.unwrap().unwrap(), notification.clone());
assert_eq!(
notifications.try_poll_next_unpin(&mut cx)?,
Poll::Ready(Some(genesis_notification))
);
assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
assert_eq!(
notifications.try_poll_next_unpin(&mut cx)?,
Poll::Ready(Some(notification.clone()))
);
// WAL shouldn't contain the genesis notification, because it's finalized
assert_eq!(
exex_manager.wal.iter_notifications()?.collect::<eyre::Result<Vec<_>>>()?,
[notification.clone()]
);

finalized_headers_tx.send(Some(genesis_block.header.clone()))?;
finalized_headers_tx.send(Some(block.header.clone()))?;
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
// WAL isn't finalized because the ExEx didn't emit the `FinishedHeight` event
assert_eq!(
Expand All @@ -1229,7 +1295,7 @@ mod tests {
.send(ExExEvent::FinishedHeight((rng.gen::<u64>(), rng.gen::<B256>()).into()))
.unwrap();

finalized_headers_tx.send(Some(genesis_block.header.clone()))?;
finalized_headers_tx.send(Some(block.header.clone()))?;
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
// WAL isn't finalized because the ExEx emitted a `FinishedHeight` event with a
// non-canonical block
Expand All @@ -1239,12 +1305,12 @@ mod tests {
);

// Send a `FinishedHeight` event with a canonical block
events_tx.send(ExExEvent::FinishedHeight(genesis_block.num_hash())).unwrap();
events_tx.send(ExExEvent::FinishedHeight(block.num_hash())).unwrap();

finalized_headers_tx.send(Some(genesis_block.header.clone()))?;
finalized_headers_tx.send(Some(block.header.clone()))?;
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
// WAL is finalized
assert!(exex_manager.wal.iter_notifications()?.next().is_none());
assert_eq!(exex_manager.wal.iter_notifications()?.next().transpose()?, None);

Ok(())
}
Expand Down
6 changes: 4 additions & 2 deletions crates/node/builder/src/launch/exex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use futures::future;
use reth_chain_state::ForkChoiceSubscriptions;
use reth_chainspec::EthChainSpec;
use reth_exex::{
ExExContext, ExExHandle, ExExManager, ExExManagerHandle, Wal, DEFAULT_EXEX_MANAGER_CAPACITY,
ExExContext, ExExHandle, ExExManager, ExExManagerHandle, ExExNotificationSource, Wal,
DEFAULT_EXEX_MANAGER_CAPACITY,
};
use reth_node_api::{FullNodeComponents, NodeTypes};
use reth_primitives::Head;
Expand Down Expand Up @@ -47,6 +48,7 @@ impl<Node: FullNodeComponents + Clone> ExExLauncher<Node> {
return Ok(None)
}

info!(target: "reth::cli", "Loading ExEx Write-Ahead Log...");
let exex_wal = Wal::new(
config_container
.config
Expand Down Expand Up @@ -127,7 +129,7 @@ impl<Node: FullNodeComponents + Clone> ExExLauncher<Node> {
async move {
while let Ok(notification) = canon_state_notifications.recv().await {
handle
.send_async(notification.into())
.send_async(ExExNotificationSource::BlockchainTree, notification.into())
.await
.expect("blockchain tree notification could not be sent to exex manager");
}
Expand Down
15 changes: 9 additions & 6 deletions crates/stages/stages/src/stages/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use reth_evm::{
metrics::ExecutorMetrics,
};
use reth_execution_types::{Chain, ExecutionOutcome};
use reth_exex::{ExExManagerHandle, ExExNotification};
use reth_exex::{ExExManagerHandle, ExExNotification, ExExNotificationSource};
use reth_primitives::{Header, SealedHeader, StaticFileSegment};
use reth_primitives_traits::format_gas_throughput;
use reth_provider::{
Expand Down Expand Up @@ -389,9 +389,10 @@ where

// NOTE: We can ignore the error here, since an error means that the channel is closed,
// which means the manager has died, which then in turn means the node is shutting down.
let _ = self
.exex_manager_handle
.send(ExExNotification::ChainCommitted { new: Arc::new(chain) });
let _ = self.exex_manager_handle.send(
ExExNotificationSource::Pipeline,
ExExNotification::ChainCommitted { new: Arc::new(chain) },
);

Ok(())
}
Expand Down Expand Up @@ -477,8 +478,10 @@ where

// NOTE: We can ignore the error here, since an error means that the channel is closed,
// which means the manager has died, which then in turn means the node is shutting down.
let _ =
self.exex_manager_handle.send(ExExNotification::ChainReverted { old: Arc::new(chain) });
let _ = self.exex_manager_handle.send(
ExExNotificationSource::Pipeline,
ExExNotification::ChainReverted { old: Arc::new(chain) },
);

Ok(())
}
Expand Down

0 comments on commit 90cb362

Please sign in to comment.