Skip to content

Commit

Permalink
chore: move chain state notifications to reth-chain-state crate (para…
Browse files Browse the repository at this point in the history
  • Loading branch information
fgimenez authored Jul 23, 2024
1 parent 23ff371 commit c5ceee3
Show file tree
Hide file tree
Showing 14 changed files with 92 additions and 72 deletions.
8 changes: 5 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions crates/chain-state/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,16 @@ revm = { workspace = true, optional = true}

# async
tokio = { workspace = true, features = ["sync", "macros", "rt-multi-thread"] }
tokio-stream = { workspace = true, features = ["sync"] }

# tracing
tracing.workspace = true

# misc
auto_impl.workspace = true
derive_more.workspace = true
parking_lot.workspace = true
pin-project.workspace = true
rand = { workspace = true, optional = true }

[dev-dependencies]
Expand Down
7 changes: 7 additions & 0 deletions crates/chain-state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ pub use in_memory::*;
mod chain_info;
pub use chain_info::ChainInfoTracker;

mod notifications;
pub use notifications::{
CanonStateNotification, CanonStateNotificationSender, CanonStateNotificationStream,
CanonStateNotifications, CanonStateSubscriptions, ForkChoiceNotifications, ForkChoiceStream,
ForkChoiceSubscriptions,
};

#[cfg(any(test, feature = "test-utils"))]
/// Common test helpers
pub mod test_utils;
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//! Canonical chain state notification trait and types.
use crate::{BlockReceipts, Chain};
use auto_impl::auto_impl;
use derive_more::{Deref, DerefMut};
use reth_execution_types::{BlockReceipts, Chain};
use reth_primitives::{SealedBlockWithSenders, SealedHeader};
use std::{
pin::Pin,
Expand Down Expand Up @@ -61,7 +61,7 @@ impl Stream for CanonStateNotificationStream {
}

/// Chain action that is triggered when a new block is imported or old block is reverted.
/// and will return all [`crate::ExecutionOutcome`] and
/// and will return all `ExecutionOutcome` and
/// [`reth_primitives::SealedBlockWithSenders`] of both reverted and committed blocks.
#[derive(Clone, Debug)]
pub enum CanonStateNotification {
Expand Down
44 changes: 41 additions & 3 deletions crates/chain-state/src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
use crate::in_memory::ExecutedBlock;
use crate::{
in_memory::ExecutedBlock, CanonStateNotification, CanonStateNotifications,
CanonStateSubscriptions,
};
use rand::Rng;
use reth_execution_types::ExecutionOutcome;
use reth_execution_types::{Chain, ExecutionOutcome};
use reth_primitives::{
Address, Block, BlockNumber, Receipts, Requests, SealedBlockWithSenders, TransactionSigned,
};
use reth_trie::{updates::TrieUpdates, HashedPostState};
use revm::db::BundleState;
use std::{ops::Range, sync::Arc};
use std::{
ops::Range,
sync::{Arc, Mutex},
};
use tokio::sync::broadcast::{self, Sender};

fn get_executed_block(block_number: BlockNumber, receipts: Receipts) -> ExecutedBlock {
let mut block = Block::default();
Expand Down Expand Up @@ -50,3 +57,34 @@ pub fn get_executed_block_with_number(block_number: BlockNumber) -> ExecutedBloc
pub fn get_executed_blocks(range: Range<u64>) -> impl Iterator<Item = ExecutedBlock> {
range.map(get_executed_block_with_number)
}

/// A test `ChainEventSubscriptions`
#[derive(Clone, Debug, Default)]
pub struct TestCanonStateSubscriptions {
canon_notif_tx: Arc<Mutex<Vec<Sender<CanonStateNotification>>>>,
}

impl TestCanonStateSubscriptions {
/// Adds new block commit to the queue that can be consumed with
/// [`TestCanonStateSubscriptions::subscribe_to_canonical_state`]
pub fn add_next_commit(&self, new: Arc<Chain>) {
let event = CanonStateNotification::Commit { new };
self.canon_notif_tx.lock().as_mut().unwrap().retain(|tx| tx.send(event.clone()).is_ok())
}

/// Adds reorg to the queue that can be consumed with
/// [`TestCanonStateSubscriptions::subscribe_to_canonical_state`]
pub fn add_next_reorg(&self, old: Arc<Chain>, new: Arc<Chain>) {
let event = CanonStateNotification::Reorg { old, new };
self.canon_notif_tx.lock().as_mut().unwrap().retain(|tx| tx.send(event.clone()).is_ok())
}
}

impl CanonStateSubscriptions for TestCanonStateSubscriptions {
fn subscribe_to_canonical_state(&self) -> CanonStateNotifications {
let (canon_notif_tx, canon_notif_rx) = broadcast::channel(100);
self.canon_notif_tx.lock().as_mut().unwrap().push(canon_notif_tx);

canon_notif_rx
}
}
10 changes: 6 additions & 4 deletions crates/storage/provider/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ revm.workspace = true

# async
tokio = { workspace = true, features = ["sync", "macros", "rt-multi-thread"] }
tokio-stream = { workspace = true, features = ["sync"] }

# tracing
tracing.workspace = true
Expand All @@ -50,9 +49,7 @@ metrics.workspace = true

# misc
auto_impl.workspace = true
derive_more.workspace = true
itertools.workspace = true
pin-project.workspace = true
parking_lot.workspace = true
dashmap = { workspace = true, features = ["inline"] }
strum.workspace = true
Expand All @@ -78,4 +75,9 @@ rand.workspace = true
[features]
optimism = ["reth-primitives/optimism", "reth-execution-types/optimism"]
serde = ["reth-execution-types/serde"]
test-utils = ["alloy-rlp", "reth-db/test-utils", "reth-nippy-jar/test-utils"]
test-utils = [
"alloy-rlp",
"reth-db/test-utils",
"reth-nippy-jar/test-utils",
"reth-chain-state/test-utils"
]
5 changes: 5 additions & 0 deletions crates/storage/provider/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ pub use bundle_state::{OriginalValuesKnown, StateChanges, StateReverts};
/// Writer standalone type.
pub mod writer;

pub use reth_chain_state::{
CanonStateNotification, CanonStateNotificationSender, CanonStateNotificationStream,
CanonStateNotifications, CanonStateSubscriptions,
};

pub(crate) fn to_range<R: std::ops::RangeBounds<u64>>(bounds: R) -> std::ops::Range<u64> {
let start = match bounds.start_bound() {
std::ops::Bound::Included(&v) => v,
Expand Down
14 changes: 7 additions & 7 deletions crates/storage/provider/src/providers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use crate::{
AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt,
BlockSource, BlockchainTreePendingStateProvider, CanonChainTracker, CanonStateNotifications,
CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, DatabaseProviderFactory,
EvmEnvProvider, FullExecutionDataProvider, HeaderProvider, ProviderError,
PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt, RequestsProvider,
StageCheckpointReader, StateProviderBox, StateProviderFactory, StaticFileProviderFactory,
TransactionVariant, TransactionsProvider, TreeViewer, WithdrawalsProvider,
BlockSource, BlockchainTreePendingStateProvider, CanonChainTracker, ChainSpecProvider,
ChangeSetReader, DatabaseProviderFactory, EvmEnvProvider, FullExecutionDataProvider,
HeaderProvider, ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
RequestsProvider, StageCheckpointReader, StateProviderBox, StateProviderFactory,
StaticFileProviderFactory, TransactionVariant, TransactionsProvider, TreeViewer,
WithdrawalsProvider,
};
use reth_blockchain_tree_api::{
error::{CanonicalError, InsertBlockError},
BlockValidationKind, BlockchainTreeEngine, BlockchainTreeViewer, CanonicalOutcome,
InsertPayloadOk,
};
use reth_chain_state::ChainInfoTracker;
use reth_chain_state::{CanonStateNotifications, CanonStateSubscriptions, ChainInfoTracker};
use reth_chainspec::{ChainInfo, ChainSpec};
use reth_db_api::{
database::Database,
Expand Down
35 changes: 0 additions & 35 deletions crates/storage/provider/src/test_utils/events.rs

This file was deleted.

3 changes: 1 addition & 2 deletions crates/storage/provider/src/test_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ use reth_db::{
use std::sync::Arc;

pub mod blocks;
mod events;
mod mock;
mod noop;

pub use events::TestCanonStateSubscriptions;
pub use mock::{ExtendedAccount, MockEthProvider};
pub use noop::NoopProvider;
pub use reth_chain_state::test_utils::TestCanonStateSubscriptions;

/// Creates test provider factory with mainnet chain spec.
pub fn create_test_provider_factory() -> ProviderFactory<Arc<TempDatabase<DatabaseEnv>>> {
Expand Down
10 changes: 5 additions & 5 deletions crates/storage/provider/src/test_utils/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
sync::Arc,
};

use reth_chain_state::{CanonStateNotifications, CanonStateSubscriptions};
use reth_chainspec::{ChainInfo, ChainSpec, MAINNET};
use reth_db_api::models::{AccountBeforeTx, StoredBlockBodyIndices};
use reth_evm::ConfigureEvmEnv;
Expand All @@ -24,11 +25,10 @@ use crate::{
providers::StaticFileProvider,
traits::{BlockSource, ReceiptProvider},
AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt,
CanonStateNotifications, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader,
EvmEnvProvider, HeaderProvider, PruneCheckpointReader, ReceiptProviderIdExt, RequestsProvider,
StageCheckpointReader, StateProvider, StateProviderBox, StateProviderFactory,
StateRootProvider, StaticFileProviderFactory, TransactionVariant, TransactionsProvider,
WithdrawalsProvider,
ChainSpecProvider, ChangeSetReader, EvmEnvProvider, HeaderProvider, PruneCheckpointReader,
ReceiptProviderIdExt, RequestsProvider, StageCheckpointReader, StateProvider, StateProviderBox,
StateProviderFactory, StateRootProvider, StaticFileProviderFactory, TransactionVariant,
TransactionsProvider, WithdrawalsProvider,
};

/// Supports various api interfaces for testing purposes.
Expand Down
7 changes: 4 additions & 3 deletions crates/storage/provider/src/traits/full.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
//! Helper provider traits to encapsulate all provider traits for simplicity.
use crate::{
AccountReader, BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader,
DatabaseProviderFactory, EvmEnvProvider, HeaderProvider, StageCheckpointReader,
StateProviderFactory, StaticFileProviderFactory, TransactionsProvider,
AccountReader, BlockReaderIdExt, ChainSpecProvider, ChangeSetReader, DatabaseProviderFactory,
EvmEnvProvider, HeaderProvider, StageCheckpointReader, StateProviderFactory,
StaticFileProviderFactory, TransactionsProvider,
};
use reth_chain_state::CanonStateSubscriptions;
use reth_db_api::database::Database;

/// Helper trait to unify all provider traits for simplicity.
Expand Down
7 changes: 0 additions & 7 deletions crates/storage/provider/src/traits/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,6 @@ pub use header_sync_gap::{HeaderSyncGap, HeaderSyncGapProvider};
mod state;
pub use state::StateWriter;

mod chain;
pub use chain::{
CanonStateNotification, CanonStateNotificationSender, CanonStateNotificationStream,
CanonStateNotifications, CanonStateSubscriptions, ForkChoiceNotifications, ForkChoiceStream,
ForkChoiceSubscriptions,
};

mod spec;
pub use spec::ChainSpecProvider;

Expand Down
3 changes: 2 additions & 1 deletion crates/storage/provider/src/traits/tree_viewer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{BlockchainTreePendingStateProvider, CanonStateSubscriptions};
use crate::BlockchainTreePendingStateProvider;
use reth_blockchain_tree_api::{BlockchainTreeEngine, BlockchainTreeViewer};
use reth_chain_state::CanonStateSubscriptions;

/// Helper trait to combine all the traits we need for the `BlockchainProvider`
///
Expand Down

0 comments on commit c5ceee3

Please sign in to comment.