Skip to content

Commit

Permalink
feat(transaction-pool): make EthTransactionValidator generic over V…
Browse files Browse the repository at this point in the history
…alidator (paradigmxyz#4258)

Co-authored-by: Matthias Seitz <[email protected]>
  • Loading branch information
tcoratger and mattsse authored Aug 18, 2023
1 parent 5039b3b commit efab153
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 132 deletions.
4 changes: 2 additions & 2 deletions bin/reth/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ use reth_stages::{
MetricEventsSender, MetricsListener,
};
use reth_tasks::TaskExecutor;
use reth_transaction_pool::{EthTransactionValidator, TransactionPool};
use reth_transaction_pool::{TransactionPool, TransactionValidationTaskExecutor};
use secp256k1::SecretKey;
use std::{
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
Expand Down Expand Up @@ -263,7 +263,7 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
let blockchain_db = BlockchainProvider::new(factory, blockchain_tree.clone())?;

let transaction_pool = reth_transaction_pool::Pool::eth_pool(
EthTransactionValidator::with_additional_tasks(
TransactionValidationTaskExecutor::eth_with_additional_tasks(
blockchain_db.clone(),
Arc::clone(&self.chain),
ctx.task_executor.clone(),
Expand Down
26 changes: 14 additions & 12 deletions crates/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@
//! use reth_primitives::MAINNET;
//! use reth_provider::{ChainSpecProvider, StateProviderFactory};
//! use reth_tasks::TokioTaskExecutor;
//! use reth_transaction_pool::{EthTransactionValidator, Pool, TransactionPool};
//! use reth_transaction_pool::{TransactionValidationTaskExecutor, Pool, TransactionPool};
//! async fn t<C>(client: C) where C: StateProviderFactory + ChainSpecProvider + Clone + 'static{
//! let pool = Pool::eth_pool(
//! EthTransactionValidator::new(client, MAINNET.clone(), TokioTaskExecutor::default()),
//! TransactionValidationTaskExecutor::eth(client, MAINNET.clone(), TokioTaskExecutor::default()),
//! Default::default(),
//! );
//! let mut transactions = pool.pending_transactions_listener();
Expand All @@ -119,14 +119,14 @@
//! use reth_primitives::MAINNET;
//! use reth_provider::{BlockReaderIdExt, CanonStateNotification, ChainSpecProvider, StateProviderFactory};
//! use reth_tasks::TokioTaskExecutor;
//! use reth_transaction_pool::{EthTransactionValidator, Pool};
//! use reth_transaction_pool::{TransactionValidationTaskExecutor, Pool};
//! use reth_transaction_pool::maintain::maintain_transaction_pool_future;
//! async fn t<C, St>(client: C, stream: St)
//! where C: StateProviderFactory + BlockReaderIdExt + ChainSpecProvider + Clone + 'static,
//! St: Stream<Item = CanonStateNotification> + Send + Unpin + 'static,
//! {
//! let pool = Pool::eth_pool(
//! EthTransactionValidator::new(client.clone(), MAINNET.clone(), TokioTaskExecutor::default()),
//! TransactionValidationTaskExecutor::eth(client.clone(), MAINNET.clone(), TokioTaskExecutor::default()),
//! Default::default(),
//! );
//!
Expand Down Expand Up @@ -170,8 +170,8 @@ pub use crate::{
TransactionPoolExt,
},
validate::{
EthTransactionValidator, TransactionValidationOutcome, TransactionValidator,
ValidPoolTransaction,
EthTransactionValidator, TransactionValidationOutcome, TransactionValidationTaskExecutor,
TransactionValidator, ValidPoolTransaction,
},
};

Expand Down Expand Up @@ -263,31 +263,33 @@ where

impl<Client>
Pool<
EthTransactionValidator<Client, EthPooledTransaction>,
TransactionValidationTaskExecutor<EthTransactionValidator<Client, EthPooledTransaction>>,
CoinbaseTipOrdering<EthPooledTransaction>,
>
where
Client: StateProviderFactory + Clone + 'static,
{
/// Returns a new [Pool] that uses the default [EthTransactionValidator] when validating
/// [EthPooledTransaction]s and ords via [CoinbaseTipOrdering]
/// Returns a new [Pool] that uses the default [TransactionValidationTaskExecutor] when
/// validating [EthPooledTransaction]s and ords via [CoinbaseTipOrdering]
///
/// # Example
///
/// ```
/// use reth_provider::StateProviderFactory;
/// use reth_primitives::MAINNET;
/// use reth_tasks::TokioTaskExecutor;
/// use reth_transaction_pool::{EthTransactionValidator, Pool};
/// use reth_transaction_pool::{TransactionValidationTaskExecutor, Pool};
/// # fn t<C>(client: C) where C: StateProviderFactory + Clone + 'static{
/// let pool = Pool::eth_pool(
/// EthTransactionValidator::new(client, MAINNET.clone(), TokioTaskExecutor::default()),
/// TransactionValidationTaskExecutor::eth(client, MAINNET.clone(), TokioTaskExecutor::default()),
/// Default::default(),
/// );
/// # }
/// ```
pub fn eth_pool(
validator: EthTransactionValidator<Client, EthPooledTransaction>,
validator: TransactionValidationTaskExecutor<
EthTransactionValidator<Client, EthPooledTransaction>,
>,
config: PoolConfig,
) -> Self {
Self::new(validator, CoinbaseTipOrdering::default(), config)
Expand Down
127 changes: 15 additions & 112 deletions crates/transaction-pool/src/validate/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@
use crate::{
error::InvalidPoolTransactionError,
traits::{PoolTransaction, TransactionOrigin},
validate::{
task::ValidationJobSender, TransactionValidatorError, ValidTransaction, ValidationTask,
MAX_INIT_CODE_SIZE, TX_MAX_SIZE,
},
TransactionValidationOutcome, TransactionValidator,
validate::{ValidTransaction, ValidationTask, MAX_INIT_CODE_SIZE, TX_MAX_SIZE},
TransactionValidationOutcome, TransactionValidationTaskExecutor, TransactionValidator,
};
use reth_primitives::{
constants::ETHEREUM_BLOCK_GAS_LIMIT, ChainSpec, InvalidTransactionError, EIP1559_TX_TYPE_ID,
Expand All @@ -16,113 +13,16 @@ use reth_primitives::{
use reth_provider::{AccountReader, StateProviderFactory};
use reth_tasks::TaskSpawner;
use std::{marker::PhantomData, sync::Arc};
use tokio::sync::{oneshot, Mutex};
use tokio::sync::Mutex;

/// A [TransactionValidator] implementation that validates ethereum transaction.
///
/// This validator is non-blocking, all validation work is done in a separate task.
#[derive(Debug, Clone)]
/// Validator for Ethereum transactions.
#[derive(Debug)]
pub struct EthTransactionValidator<Client, T> {
/// The type that performs the actual validation.
inner: Arc<EthTransactionValidatorInner<Client, T>>,
/// The sender half to validation tasks that perform the actual validation.
to_validation_task: Arc<Mutex<ValidationJobSender>>,
}

// === impl EthTransactionValidator ===

impl EthTransactionValidator<(), ()> {
/// Convenience method to create a [EthTransactionValidatorBuilder]
pub fn builder(chain_spec: Arc<ChainSpec>) -> EthTransactionValidatorBuilder {
EthTransactionValidatorBuilder::new(chain_spec)
}
pub inner: Arc<EthTransactionValidatorInner<Client, T>>,
}

impl<Client, Tx> EthTransactionValidator<Client, Tx> {
/// Creates a new instance for the given [ChainSpec]
///
/// This will spawn a single validation tasks that performs the actual validation.
/// See [EthTransactionValidator::with_additional_tasks]
pub fn new<T>(client: Client, chain_spec: Arc<ChainSpec>, tasks: T) -> Self
where
T: TaskSpawner,
{
Self::with_additional_tasks(client, chain_spec, tasks, 0)
}

/// Creates a new instance for the given [ChainSpec]
///
/// By default this will enable support for:
/// - shanghai
/// - eip1559
/// - eip2930
///
/// This will always spawn a validation task that performs the actual validation. It will spawn
/// `num_additional_tasks` additional tasks.
pub fn with_additional_tasks<T>(
client: Client,
chain_spec: Arc<ChainSpec>,
tasks: T,
num_additional_tasks: usize,
) -> Self
where
T: TaskSpawner,
{
EthTransactionValidatorBuilder::new(chain_spec)
.with_additional_tasks(num_additional_tasks)
.build(client, tasks)
}

/// Returns the configured chain id
pub fn chain_id(&self) -> u64 {
self.inner.chain_id()
}
}

#[async_trait::async_trait]
impl<Client, Tx> TransactionValidator for EthTransactionValidator<Client, Tx>
where
Client: StateProviderFactory + Clone + 'static,
Tx: PoolTransaction + 'static,
{
type Transaction = Tx;

async fn validate_transaction(
&self,
origin: TransactionOrigin,
transaction: Self::Transaction,
) -> TransactionValidationOutcome<Self::Transaction> {
let hash = *transaction.hash();
let (tx, rx) = oneshot::channel();
{
let to_validation_task = self.to_validation_task.clone();
let to_validation_task = to_validation_task.lock().await;
let validator = Arc::clone(&self.inner);
let res = to_validation_task
.send(Box::pin(async move {
let res = validator.validate_transaction(origin, transaction).await;
let _ = tx.send(res);
}))
.await;
if res.is_err() {
return TransactionValidationOutcome::Error(
hash,
Box::new(TransactionValidatorError::ValidationServiceUnreachable),
)
}
}

match rx.await {
Ok(res) => res,
Err(_) => TransactionValidationOutcome::Error(
hash,
Box::new(TransactionValidatorError::ValidationServiceUnreachable),
),
}
}
}

/// A builder for [EthTransactionValidator]
/// A builder for [TransactionValidationTaskExecutor]
#[derive(Debug, Clone)]
pub struct EthTransactionValidatorBuilder {
chain_spec: Arc<ChainSpec>,
Expand Down Expand Up @@ -241,7 +141,7 @@ impl EthTransactionValidatorBuilder {
self
}

/// Builds a [EthTransactionValidator]
/// Builds a [TransactionValidationTaskExecutor]
///
/// The validator will spawn `additional_tasks` additional tasks for validation.
///
Expand All @@ -250,7 +150,7 @@ impl EthTransactionValidatorBuilder {
self,
client: Client,
tasks: T,
) -> EthTransactionValidator<Client, Tx>
) -> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx>>
where
T: TaskSpawner,
{
Expand Down Expand Up @@ -300,13 +200,16 @@ impl EthTransactionValidatorBuilder {

let to_validation_task = Arc::new(Mutex::new(tx));

EthTransactionValidator { inner: Arc::new(inner), to_validation_task }
TransactionValidationTaskExecutor {
validator: EthTransactionValidator { inner: Arc::new(inner) },
to_validation_task,
}
}
}

/// A [TransactionValidator] implementation that validates ethereum transaction.
#[derive(Debug, Clone)]
struct EthTransactionValidatorInner<Client, T> {
pub struct EthTransactionValidatorInner<Client, T> {
/// Spec of the chain
chain_spec: Arc<ChainSpec>,
/// This type fetches account info from the db
Expand Down Expand Up @@ -335,7 +238,7 @@ struct EthTransactionValidatorInner<Client, T> {

impl<Client, Tx> EthTransactionValidatorInner<Client, Tx> {
/// Returns the configured chain id
fn chain_id(&self) -> u64 {
pub fn chain_id(&self) -> u64 {
self.chain_spec.chain().id()
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/transaction-pool/src/validate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mod task;
pub use eth::{EthTransactionValidator, EthTransactionValidatorBuilder};

/// A spawnable task that performs transaction validation.
pub use task::ValidationTask;
pub use task::{TransactionValidationTaskExecutor, ValidationTask};

/// Validation constants.
pub use constants::{MAX_CODE_SIZE, MAX_INIT_CODE_SIZE, TX_MAX_SIZE, TX_SLOT_SIZE};
Expand Down Expand Up @@ -150,7 +150,7 @@ pub trait TransactionValidator: Send + Sync {
/// example nonce or balance changes. Hence, any validation checks must be applied in this
/// function.
///
/// See [EthTransactionValidator] for a reference implementation.
/// See [TransactionValidationTaskExecutor] for a reference implementation.
async fn validate_transaction(
&self,
origin: TransactionOrigin,
Expand Down
Loading

0 comments on commit efab153

Please sign in to comment.