Skip to content

Commit

Permalink
fix: only propagate txs that are allowed to be propagated (paradigmxy…
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse authored Aug 3, 2023
1 parent 6ceaad6 commit 3f63a08
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 55 deletions.
3 changes: 2 additions & 1 deletion crates/net/network/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ impl<Pool: TransactionPool> TransactionsManager<Pool> {
let network_events = network.event_listener();
let (command_tx, command_rx) = mpsc::unbounded_channel();

// install a listener for new transactions
// install a listener for new pending transactions that are allowed to be propagated over
// the network
let pending = pool.pending_transactions_listener();

Self {
Expand Down
12 changes: 8 additions & 4 deletions crates/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,9 @@ pub use crate::{
},
traits::{
AllPoolTransactions, BestTransactions, BlockInfo, CanonicalStateUpdate, ChangedAccount,
NewTransactionEvent, PoolSize, PoolTransaction, PooledTransaction, PropagateKind,
PropagatedTransactions, TransactionOrigin, TransactionPool, TransactionPoolExt,
NewTransactionEvent, PendingTransactionListenerKind, PoolSize, PoolTransaction,
PooledTransaction, PropagateKind, PropagatedTransactions, TransactionOrigin,
TransactionPool, TransactionPoolExt,
},
validate::{
EthTransactionValidator, TransactionValidationOutcome, TransactionValidator,
Expand Down Expand Up @@ -343,8 +344,11 @@ where
self.pool.add_all_transactions_event_listener()
}

fn pending_transactions_listener(&self) -> Receiver<TxHash> {
self.pool.add_pending_listener()
fn pending_transactions_listener_for(
&self,
kind: PendingTransactionListenerKind,
) -> Receiver<TxHash> {
self.pool.add_pending_listener(kind)
}

fn new_transactions_listener(&self) -> Receiver<NewTransactionEvent<Self::Transaction>> {
Expand Down
37 changes: 26 additions & 11 deletions crates/transaction-pool/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
//! to be generic over it.
use crate::{
error::PoolError, AllPoolTransactions, AllTransactionsEvents, BestTransactions, BlockInfo,
NewTransactionEvent, PoolResult, PoolSize, PoolTransaction, PooledTransaction,
PropagatedTransactions, TransactionEvents, TransactionOrigin, TransactionPool,
TransactionValidationOutcome, TransactionValidator, ValidPoolTransaction,
error::PoolError, traits::PendingTransactionListenerKind, AllPoolTransactions,
AllTransactionsEvents, BestTransactions, BlockInfo, NewTransactionEvent, PoolResult, PoolSize,
PoolTransaction, PooledTransaction, PropagatedTransactions, TransactionEvents,
TransactionOrigin, TransactionPool, TransactionValidationOutcome, TransactionValidator,
ValidPoolTransaction,
};
use reth_primitives::{Address, TxHash};
use std::{collections::HashSet, marker::PhantomData, sync::Arc};
Expand Down Expand Up @@ -77,7 +78,10 @@ impl TransactionPool for NoopTransactionPool {
AllTransactionsEvents { events: mpsc::channel(1).1 }
}

fn pending_transactions_listener(&self) -> Receiver<TxHash> {
fn pending_transactions_listener_for(
&self,
_kind: PendingTransactionListenerKind,
) -> Receiver<TxHash> {
mpsc::channel(1).1
}

Expand Down Expand Up @@ -166,29 +170,40 @@ impl TransactionPool for NoopTransactionPool {
/// A [`TransactionValidator`] that does nothing.
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct NoopTransactionValidator<T>(PhantomData<T>);
pub struct MockTransactionValidator<T> {
propagate_local: bool,
_marker: PhantomData<T>,
}

#[async_trait::async_trait]
impl<T: PoolTransaction> TransactionValidator for NoopTransactionValidator<T> {
impl<T: PoolTransaction> TransactionValidator for MockTransactionValidator<T> {
type Transaction = T;

async fn validate_transaction(
&self,
_origin: TransactionOrigin,
origin: TransactionOrigin,
transaction: Self::Transaction,
) -> TransactionValidationOutcome<Self::Transaction> {
TransactionValidationOutcome::Valid {
balance: Default::default(),
state_nonce: 0,
transaction,
propagate: true,
propagate: if origin.is_local() { self.propagate_local } else { true },
}
}
}

impl<T> Default for NoopTransactionValidator<T> {
impl<T> MockTransactionValidator<T> {
/// Creates a new [`MockTransactionValidator`] that does not allow local transactions to be
/// propagated.
pub fn no_propagate_local() -> Self {
Self { propagate_local: false, _marker: Default::default() }
}
}

impl<T> Default for MockTransactionValidator<T> {
fn default() -> Self {
NoopTransactionValidator(PhantomData)
MockTransactionValidator { propagate_local: true, _marker: Default::default() }
}
}

Expand Down
86 changes: 56 additions & 30 deletions crates/transaction-pool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ mod events;
pub use events::{FullTransactionEvent, TransactionEvent};

mod listener;
use crate::pool::txpool::UpdateOutcome;
use crate::{pool::txpool::UpdateOutcome, traits::PendingTransactionListenerKind};
pub use listener::{AllTransactionsEvents, TransactionEvents};

mod best;
Expand All @@ -119,8 +119,8 @@ pub struct PoolInner<V: TransactionValidator, T: TransactionOrdering> {
config: PoolConfig,
/// Manages listeners for transaction state change events.
event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
/// Listeners for new ready transactions.
pending_transaction_listener: Mutex<Vec<mpsc::Sender<TxHash>>>,
/// Listeners for new pending transactions.
pending_transaction_listener: Mutex<Vec<PendingTransactionListener>>,
/// Listeners for new transactions added to the pool.
transaction_listener: Mutex<Vec<mpsc::Sender<NewTransactionEvent<T::Transaction>>>>,
}
Expand Down Expand Up @@ -196,15 +196,19 @@ where
}

/// Adds a new transaction listener to the pool that gets notified about every new _pending_
/// transaction.
pub fn add_pending_listener(&self) -> mpsc::Receiver<TxHash> {
/// transaction inserted into the pool
pub fn add_pending_listener(
&self,
kind: PendingTransactionListenerKind,
) -> mpsc::Receiver<TxHash> {
const TX_LISTENER_BUFFER_SIZE: usize = 2048;
let (tx, rx) = mpsc::channel(TX_LISTENER_BUFFER_SIZE);
self.pending_transaction_listener.lock().push(tx);
let (sender, rx) = mpsc::channel(TX_LISTENER_BUFFER_SIZE);
let listener = PendingTransactionListener { sender, kind };
self.pending_transaction_listener.lock().push(listener);
rx
}

/// Adds a new transaction listener to the pool that gets notified about every new transaction
/// Adds a new transaction listener to the pool that gets notified about every new transaction.
pub fn add_new_transaction_listener(
&self,
) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
Expand Down Expand Up @@ -318,8 +322,8 @@ where
let hash = *added.hash();

// Notify about new pending transactions
if let Some(pending_hash) = added.as_pending() {
self.on_new_pending_transaction(pending_hash);
if added.is_pending() {
self.on_new_pending_transaction(&added);
}

// Notify tx event listeners
Expand Down Expand Up @@ -387,20 +391,31 @@ where
}

/// Notify all listeners about a new pending transaction.
fn on_new_pending_transaction(&self, ready: &TxHash) {
fn on_new_pending_transaction(&self, pending: &AddedTransaction<T::Transaction>) {
let tx_hash = *pending.hash();
let propagate_allowed = pending.is_propagate_allowed();

let mut transaction_listeners = self.pending_transaction_listener.lock();
transaction_listeners.retain_mut(|listener| match listener.try_send(*ready) {
Ok(()) => true,
Err(err) => {
if matches!(err, mpsc::error::TrySendError::Full(_)) {
debug!(
target: "txpool",
"[{:?}] failed to send pending tx; channel full",
ready,
);
true
} else {
false
transaction_listeners.retain_mut(|listener| {
if listener.kind.is_propagate_only() && !propagate_allowed {
// only emit this hash to listeners that are only allowed to receive propagate only
// transactions, such as network
return !listener.sender.is_closed()
}

match listener.sender.try_send(tx_hash) {
Ok(()) => true,
Err(err) => {
if matches!(err, mpsc::error::TrySendError::Full(_)) {
debug!(
target: "txpool",
"[{:?}] failed to send pending tx; channel full",
tx_hash,
);
true
} else {
false
}
}
}
});
Expand Down Expand Up @@ -568,6 +583,14 @@ impl<V: TransactionValidator, T: TransactionOrdering> fmt::Debug for PoolInner<V
}
}

/// An active listener for new pending transactions.
#[derive(Debug)]
struct PendingTransactionListener {
sender: mpsc::Sender<TxHash>,
/// Whether to include transactions that should not be propagated over the network.
kind: PendingTransactionListenerKind,
}

/// Tracks an added transaction and all graph changes caused by adding it.
#[derive(Debug, Clone)]
pub struct AddedPendingTransaction<T: PoolTransaction> {
Expand Down Expand Up @@ -599,13 +622,9 @@ pub enum AddedTransaction<T: PoolTransaction> {
}

impl<T: PoolTransaction> AddedTransaction<T> {
/// Returns the hash of the transaction if it's pending
pub(crate) fn as_pending(&self) -> Option<&TxHash> {
if let AddedTransaction::Pending(tx) = self {
Some(tx.transaction.hash())
} else {
None
}
/// Returns whether the transaction is pending
pub(crate) fn is_pending(&self) -> bool {
matches!(self, AddedTransaction::Pending(_))
}

/// Returns the hash of the transaction
Expand All @@ -615,6 +634,13 @@ impl<T: PoolTransaction> AddedTransaction<T> {
AddedTransaction::Parked { transaction, .. } => transaction.hash(),
}
}
/// Returns if the transaction should be propagated.
pub(crate) fn is_propagate_allowed(&self) -> bool {
match self {
AddedTransaction::Pending(transaction) => transaction.transaction.propagate,
AddedTransaction::Parked { transaction, .. } => transaction.propagate,
}
}

/// Converts this type into the event type for listeners
pub(crate) fn into_new_transaction_event(self) -> NewTransactionEvent<T> {
Expand Down
12 changes: 9 additions & 3 deletions crates/transaction-pool/src/test_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,23 @@ mod mock;
mod pool;

use crate::{
noop::NoopTransactionValidator, Pool, PoolTransaction, TransactionOrigin,
noop::MockTransactionValidator, Pool, PoolTransaction, TransactionOrigin,
TransactionValidationOutcome, TransactionValidator,
};
use async_trait::async_trait;
pub use mock::*;
use std::{marker::PhantomData, sync::Arc};

/// A [Pool] used for testing
pub type TestPool = Pool<NoopTransactionValidator<MockTransaction>, MockOrdering>;
pub type TestPool = Pool<MockTransactionValidator<MockTransaction>, MockOrdering>;

/// Returns a new [Pool] used for testing purposes
pub fn testing_pool() -> TestPool {
Pool::new(NoopTransactionValidator::default(), MockOrdering::default(), Default::default())
testing_pool_with_validator(MockTransactionValidator::default())
}
/// Returns a new [Pool] used for testing purposes
pub fn testing_pool_with_validator(
validator: MockTransactionValidator<MockTransaction>,
) -> TestPool {
Pool::new(validator, MockOrdering::default(), Default::default())
}
41 changes: 38 additions & 3 deletions crates/transaction-pool/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,23 @@ pub trait TransactionPool: Send + Sync + Clone {
/// Returns a new transaction change event stream for _all_ transactions in the pool.
fn all_transactions_event_listener(&self) -> AllTransactionsEvents<Self::Transaction>;

/// Returns a new Stream that yields transactions hashes for new ready transactions.
/// Returns a new Stream that yields transactions hashes for new __pending__ transactions
/// inserted into the pool that are allowed to be propagated.
///
/// Consumer: RPC
fn pending_transactions_listener(&self) -> Receiver<TxHash>;
/// Note: This is intended for networking and will __only__ yield transactions that are allowed
/// to be propagated over the network.
///
/// Consumer: RPC/P2P
fn pending_transactions_listener(&self) -> Receiver<TxHash> {
self.pending_transactions_listener_for(PendingTransactionListenerKind::PropagateOnly)
}

/// Returns a new Stream that yields transactions hashes for new __pending__ transactions
/// inserted into the pool depending on the given [PendingTransactionListenerKind] argument.
fn pending_transactions_listener_for(
&self,
kind: PendingTransactionListenerKind,
) -> Receiver<TxHash>;

/// Returns a new stream that yields new valid transactions added to the pool.
fn new_transactions_listener(&self) -> Receiver<NewTransactionEvent<Self::Transaction>>;
Expand Down Expand Up @@ -273,6 +286,28 @@ pub trait TransactionPoolExt: TransactionPool {
fn update_accounts(&self, accounts: Vec<ChangedAccount>);
}

/// Determines what kind of new pending transactions should be emitted by a stream of pending
/// transactions.
///
/// This gives control whether to include transactions that are allowed to be propagated.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum PendingTransactionListenerKind {
/// Any new pending transactions
All,
/// Only transactions that are allowed to be propagated.
///
/// See also [ValidPoolTransaction]
PropagateOnly,
}

impl PendingTransactionListenerKind {
/// Returns true if we're only interested in transactions that are allowed to be propagated.
#[inline]
pub fn is_propagate_only(&self) -> bool {
matches!(self, Self::PropagateOnly)
}
}

/// A Helper type that bundles all transactions in the pool.
#[derive(Debug, Clone)]
pub struct AllPoolTransactions<T: PoolTransaction> {
Expand Down
2 changes: 1 addition & 1 deletion crates/transaction-pool/src/validate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ pub struct ValidPoolTransaction<T: PoolTransaction> {
pub transaction: T,
/// The identifier for this transaction.
pub transaction_id: TransactionId,
/// Whether to propagate the transaction.
/// Whether it is allowed to propagate the transaction.
pub propagate: bool,
/// Timestamp when this was added to the pool.
pub timestamp: Instant,
Expand Down
31 changes: 29 additions & 2 deletions crates/transaction-pool/tests/it/listeners.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use assert_matches::assert_matches;
use reth_transaction_pool::{
test_utils::{testing_pool, MockTransactionFactory},
FullTransactionEvent, TransactionEvent, TransactionOrigin, TransactionPool,
noop::MockTransactionValidator,
test_utils::{testing_pool, testing_pool_with_validator, MockTransactionFactory},
FullTransactionEvent, PendingTransactionListenerKind, TransactionEvent, TransactionOrigin,
TransactionPool,
};
use std::{future::poll_fn, task::Poll};
use tokio_stream::StreamExt;

#[tokio::test(flavor = "multi_thread")]
Expand Down Expand Up @@ -37,3 +40,27 @@ async fn txpool_listener_all() {
Some(FullTransactionEvent::Pending(hash)) if hash == transaction.transaction.get_hash()
);
}

#[tokio::test(flavor = "multi_thread")]
async fn txpool_listener_propagate_only() {
let txpool = testing_pool_with_validator(MockTransactionValidator::no_propagate_local());
let mut mock_tx_factory = MockTransactionFactory::default();
let transaction = mock_tx_factory.create_eip1559();
let expected = *transaction.hash();
let mut listener_network = txpool.pending_transactions_listener();
let mut listener_all =
txpool.pending_transactions_listener_for(PendingTransactionListenerKind::All);
let result =
txpool.add_transaction(TransactionOrigin::Local, transaction.transaction.clone()).await;
assert!(result.is_ok());

let inserted = listener_all.recv().await.unwrap();
assert_eq!(inserted, expected);

poll_fn(|cx| {
// no propagation
assert!(listener_network.poll_recv(cx).is_pending());
Poll::Ready(())
})
.await;
}

0 comments on commit 3f63a08

Please sign in to comment.