Skip to content

Commit

Permalink
feat: integrate blobstore in pool (paradigmxyz#4266)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse authored Aug 18, 2023
1 parent 24632ac commit 8516fef
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 26 deletions.
5 changes: 4 additions & 1 deletion bin/reth/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ use reth_stages::{
MetricEventsSender, MetricsListener,
};
use reth_tasks::TaskExecutor;
use reth_transaction_pool::{TransactionPool, TransactionValidationTaskExecutor};
use reth_transaction_pool::{
blobstore::InMemoryBlobStore, TransactionPool, TransactionValidationTaskExecutor,
};
use secp256k1::SecretKey;
use std::{
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
Expand Down Expand Up @@ -269,6 +271,7 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
ctx.task_executor.clone(),
1,
),
InMemoryBlobStore::default(),
self.txpool.pool_config(),
);
info!(target: "reth::cli", "Transaction pool initialized");
Expand Down
43 changes: 32 additions & 11 deletions crates/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
//! - providing existing transactions
//! - ordering and providing the best transactions for block production
//! - monitoring memory footprint and enforce pool size limits
//! - storing blob data for transactions in a separate blobstore on insertion
//!
//! ## Assumptions
//!
Expand Down Expand Up @@ -86,6 +87,13 @@
//! that provides the `TransactionPool` interface.
//!
//!
//! ## Blob Transactions
//!
//! Blob transaction can be quite large hence they are stored in a separate blobstore. The pool is
//! responsible for inserting blob data for new transactions into the blobstore.
//! See also [ValidTransaction](validate::ValidTransaction)
//!
//!
//! ## Examples
//!
//! Listen for new transactions and print them:
Expand All @@ -95,9 +103,11 @@
//! use reth_provider::{ChainSpecProvider, StateProviderFactory};
//! use reth_tasks::TokioTaskExecutor;
//! use reth_transaction_pool::{TransactionValidationTaskExecutor, Pool, TransactionPool};
//! use reth_transaction_pool::blobstore::InMemoryBlobStore;
//! async fn t<C>(client: C) where C: StateProviderFactory + ChainSpecProvider + Clone + 'static{
//! let pool = Pool::eth_pool(
//! TransactionValidationTaskExecutor::eth(client, MAINNET.clone(), TokioTaskExecutor::default()),
//! InMemoryBlobStore::default(),
//! Default::default(),
//! );
//! let mut transactions = pool.pending_transactions_listener();
Expand All @@ -120,13 +130,15 @@
//! use reth_provider::{BlockReaderIdExt, CanonStateNotification, ChainSpecProvider, StateProviderFactory};
//! use reth_tasks::TokioTaskExecutor;
//! use reth_transaction_pool::{TransactionValidationTaskExecutor, Pool};
//! use reth_transaction_pool::blobstore::InMemoryBlobStore;
//! use reth_transaction_pool::maintain::maintain_transaction_pool_future;
//! async fn t<C, St>(client: C, stream: St)
//! where C: StateProviderFactory + BlockReaderIdExt + ChainSpecProvider + Clone + 'static,
//! St: Stream<Item = CanonStateNotification> + Send + Unpin + 'static,
//! {
//! let pool = Pool::eth_pool(
//! TransactionValidationTaskExecutor::eth(client.clone(), MAINNET.clone(), TokioTaskExecutor::default()),
//! InMemoryBlobStore::default(),
//! Default::default(),
//! );
//!
Expand All @@ -151,6 +163,7 @@ use std::{
use tokio::sync::mpsc::Receiver;
use tracing::{instrument, trace};

use crate::blobstore::BlobStore;
pub use crate::{
config::{
PoolConfig, PriceBumpConfig, SubPoolLimit, DEFAULT_PRICE_BUMP, REPLACE_BLOB_PRICE_BUMP,
Expand Down Expand Up @@ -194,25 +207,26 @@ pub mod test_utils;

/// A shareable, generic, customizable `TransactionPool` implementation.
#[derive(Debug)]
pub struct Pool<V: TransactionValidator, T: TransactionOrdering> {
pub struct Pool<V, T: TransactionOrdering, S> {
/// Arc'ed instance of the pool internals
pool: Arc<PoolInner<V, T>>,
pool: Arc<PoolInner<V, T, S>>,
}

// === impl Pool ===

impl<V, T> Pool<V, T>
impl<V, T, S> Pool<V, T, S>
where
V: TransactionValidator,
T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
S: BlobStore,
{
/// Create a new transaction pool instance.
pub fn new(validator: V, ordering: T, config: PoolConfig) -> Self {
Self { pool: Arc::new(PoolInner::new(validator, ordering, config)) }
pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
Self { pool: Arc::new(PoolInner::new(validator, ordering, blob_store, config)) }
}

/// Returns the wrapped pool.
pub(crate) fn inner(&self) -> &PoolInner<V, T> {
pub(crate) fn inner(&self) -> &PoolInner<V, T, S> {
&self.pool
}

Expand Down Expand Up @@ -261,13 +275,15 @@ where
}
}

impl<Client>
impl<Client, S>
Pool<
TransactionValidationTaskExecutor<EthTransactionValidator<Client, EthPooledTransaction>>,
CoinbaseTipOrdering<EthPooledTransaction>,
S,
>
where
Client: StateProviderFactory + Clone + 'static,
S: BlobStore,
{
/// Returns a new [Pool] that uses the default [TransactionValidationTaskExecutor] when
/// validating [EthPooledTransaction]s and ords via [CoinbaseTipOrdering]
Expand All @@ -279,9 +295,11 @@ where
/// use reth_primitives::MAINNET;
/// use reth_tasks::TokioTaskExecutor;
/// use reth_transaction_pool::{TransactionValidationTaskExecutor, Pool};
/// use reth_transaction_pool::blobstore::InMemoryBlobStore;
/// # fn t<C>(client: C) where C: StateProviderFactory + Clone + 'static{
/// let pool = Pool::eth_pool(
/// TransactionValidationTaskExecutor::eth(client, MAINNET.clone(), TokioTaskExecutor::default()),
/// InMemoryBlobStore::default(),
/// Default::default(),
/// );
/// # }
Expand All @@ -290,18 +308,20 @@ where
validator: TransactionValidationTaskExecutor<
EthTransactionValidator<Client, EthPooledTransaction>,
>,
blob_store: S,
config: PoolConfig,
) -> Self {
Self::new(validator, CoinbaseTipOrdering::default(), config)
Self::new(validator, CoinbaseTipOrdering::default(), blob_store, config)
}
}

/// implements the `TransactionPool` interface for various transaction pool API consumers.
#[async_trait::async_trait]
impl<V, T> TransactionPool for Pool<V, T>
impl<V, T, S> TransactionPool for Pool<V, T, S>
where
V: TransactionValidator,
T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
S: BlobStore,
{
type Transaction = T::Transaction;

Expand Down Expand Up @@ -440,10 +460,11 @@ where
}
}

impl<V: TransactionValidator, T: TransactionOrdering> TransactionPoolExt for Pool<V, T>
impl<V: TransactionValidator, T: TransactionOrdering, S> TransactionPoolExt for Pool<V, T, S>
where
V: TransactionValidator,
T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
S: BlobStore,
{
#[instrument(skip(self), target = "txpool")]
fn set_block_info(&self, info: BlockInfo) {
Expand All @@ -460,7 +481,7 @@ where
}
}

impl<V: TransactionValidator, T: TransactionOrdering> Clone for Pool<V, T> {
impl<V, T: TransactionOrdering, S> Clone for Pool<V, T, S> {
fn clone(&self) -> Self {
Self { pool: Arc::clone(&self.pool) }
}
Expand Down
10 changes: 10 additions & 0 deletions crates/transaction-pool/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ pub struct TxPoolMetrics {
pub(crate) performed_state_updates: Counter,
}

/// Transaction pool blobstore metrics
#[derive(Metrics)]
#[metrics(scope = "transaction_pool")]
pub struct BlobStoreMetrics {
/// Number of failed inserts into the blobstore
pub(crate) blobstore_failed_inserts: Counter,
/// Number of failed deletes into the blobstore
pub(crate) blobstore_failed_deletes: Counter,
}

/// Transaction pool maintenance metrics
#[derive(Metrics)]
#[metrics(scope = "transaction_pool")]
Expand Down
67 changes: 59 additions & 8 deletions crates/transaction-pool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,22 +82,23 @@ use crate::{
};
use best::BestTransactions;
use parking_lot::{Mutex, RwLock};
use reth_primitives::{Address, TxHash, H256};
use reth_primitives::{Address, BlobTransactionSidecar, TxHash, H256};
use std::{
collections::{HashMap, HashSet},
fmt,
sync::Arc,
time::Instant,
};
use tokio::sync::mpsc;
use tracing::{debug, trace};
use tracing::{debug, trace, warn};

mod events;
pub use events::{FullTransactionEvent, TransactionEvent};

mod listener;
use crate::{
pool::txpool::UpdateOutcome, traits::PendingTransactionListenerKind, validate::ValidTransaction,
blobstore::BlobStore, metrics::BlobStoreMetrics, pool::txpool::UpdateOutcome,
traits::PendingTransactionListenerKind, validate::ValidTransaction,
};
pub use listener::{AllTransactionsEvents, TransactionEvents};

Expand All @@ -110,11 +111,16 @@ pub mod txpool;
mod update;

/// Transaction pool internals.
pub struct PoolInner<V: TransactionValidator, T: TransactionOrdering> {
pub struct PoolInner<V, T, S>
where
T: TransactionOrdering,
{
/// Internal mapping of addresses to plain ints.
identifiers: RwLock<SenderIdentifiers>,
/// Transaction validation.
validator: V,
/// Storage for blob transactions
blob_store: S,
/// The internal pool that manages all transactions.
pool: RwLock<TxPool<T>>,
/// Pool settings.
Expand All @@ -125,17 +131,20 @@ pub struct PoolInner<V: TransactionValidator, T: TransactionOrdering> {
pending_transaction_listener: Mutex<Vec<PendingTransactionListener>>,
/// Listeners for new transactions added to the pool.
transaction_listener: Mutex<Vec<mpsc::Sender<NewTransactionEvent<T::Transaction>>>>,
/// Metrics for the blob store
blob_store_metrics: BlobStoreMetrics,
}

// === impl PoolInner ===

impl<V, T> PoolInner<V, T>
impl<V, T, S> PoolInner<V, T, S>
where
V: TransactionValidator,
T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
S: BlobStore,
{
/// Create a new transaction pool instance.
pub(crate) fn new(validator: V, ordering: T, config: PoolConfig) -> Self {
pub(crate) fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
Self {
identifiers: Default::default(),
validator,
Expand All @@ -144,6 +153,8 @@ where
pending_transaction_listener: Default::default(),
transaction_listener: Default::default(),
config,
blob_store,
blob_store_metrics: Default::default(),
}
}

Expand Down Expand Up @@ -316,7 +327,8 @@ where
let transaction_id = TransactionId::new(sender_id, transaction.nonce());
let encoded_length = transaction.encoded_length();

let (transaction, _maybe_sidecar) = match transaction {
// split the valid transaction and the blob sidecar if it has any
let (transaction, maybe_sidecar) = match transaction {
ValidTransaction::Valid(tx) => (tx, None),
ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
debug_assert!(
Expand All @@ -339,6 +351,16 @@ where
let added = self.pool.write().add_transaction(tx, balance, state_nonce)?;
let hash = *added.hash();

// transaction was successfully inserted into the pool
if let Some(sidecar) = maybe_sidecar {
// store the sidecar in the blob store
self.insert_blob(hash, sidecar);
}
if let Some(replaced) = added.replaced_blob_transaction() {
// delete the replaced transaction from the blob store
self.delete_blob(replaced);
}

// Notify about new pending transactions
if let Some(pending) = added.as_pending() {
self.on_new_pending_transaction(pending);
Expand Down Expand Up @@ -625,9 +647,25 @@ where
pub(crate) fn discard_worst(&self) -> HashSet<TxHash> {
self.pool.write().discard_worst().into_iter().map(|tx| *tx.hash()).collect()
}

/// Inserts a blob transaction into the blob store
fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecar) {
if let Err(err) = self.blob_store.insert(hash, blob) {
warn!(target: "txpool", ?err, "[{:?}] failed to insert blob", hash);
self.blob_store_metrics.blobstore_failed_inserts.increment(1);
}
}

/// Delete a blob from the blob store
fn delete_blob(&self, blob: TxHash) {
if let Err(err) = self.blob_store.delete(blob) {
warn!(target: "txpool", ?err, "[{:?}] failed to delete blobs", blob);
self.blob_store_metrics.blobstore_failed_deletes.increment(1);
}
}
}

impl<V: TransactionValidator, T: TransactionOrdering> fmt::Debug for PoolInner<V, T> {
impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PoolInner").field("config", &self.config).finish_non_exhaustive()
}
Expand Down Expand Up @@ -723,6 +761,19 @@ impl<T: PoolTransaction> AddedTransaction<T> {
}
}

/// Returns the the replaced transaction if there was one
pub(crate) fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
match self {
AddedTransaction::Pending(tx) => tx.replaced.as_ref(),
AddedTransaction::Parked { replaced, .. } => replaced.as_ref(),
}
}

/// Returns the hash of the replaced transaction if it is a blob transaction.
pub(crate) fn replaced_blob_transaction(&self) -> Option<H256> {
self.replaced().filter(|tx| tx.transaction.is_eip4844()).map(|tx| *tx.transaction.hash())
}

/// Returns the hash of the transaction
pub(crate) fn hash(&self) -> &TxHash {
match self {
Expand Down
9 changes: 5 additions & 4 deletions crates/transaction-pool/src/test_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ mod mock;
mod pool;

use crate::{
noop::MockTransactionValidator, Pool, PoolTransaction, TransactionOrigin,
TransactionValidationOutcome, TransactionValidator,
blobstore::InMemoryBlobStore, noop::MockTransactionValidator, Pool, PoolTransaction,
TransactionOrigin, TransactionValidationOutcome, TransactionValidator,
};
use async_trait::async_trait;
pub use mock::*;
use std::{marker::PhantomData, sync::Arc};

/// A [Pool] used for testing
pub type TestPool = Pool<MockTransactionValidator<MockTransaction>, MockOrdering>;
pub type TestPool =
Pool<MockTransactionValidator<MockTransaction>, MockOrdering, InMemoryBlobStore>;

/// Returns a new [Pool] used for testing purposes
pub fn testing_pool() -> TestPool {
Expand All @@ -23,5 +24,5 @@ pub fn testing_pool() -> TestPool {
pub fn testing_pool_with_validator(
validator: MockTransactionValidator<MockTransaction>,
) -> TestPool {
Pool::new(validator, MockOrdering::default(), Default::default())
Pool::new(validator, MockOrdering::default(), InMemoryBlobStore::default(), Default::default())
}
6 changes: 4 additions & 2 deletions examples/network-txpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
use reth_network::{config::rng_secret_key, NetworkConfig, NetworkManager};
use reth_provider::test_utils::NoopProvider;
use reth_transaction_pool::{
validate::ValidTransaction, CoinbaseTipOrdering, EthPooledTransaction, PoolTransaction,
TransactionOrigin, TransactionPool, TransactionValidationOutcome, TransactionValidator,
blobstore::InMemoryBlobStore, validate::ValidTransaction, CoinbaseTipOrdering,
EthPooledTransaction, PoolTransaction, TransactionOrigin, TransactionPool,
TransactionValidationOutcome, TransactionValidator,
};

#[tokio::main]
Expand All @@ -25,6 +26,7 @@ async fn main() -> eyre::Result<()> {
let pool = reth_transaction_pool::Pool::new(
OkValidator::default(),
CoinbaseTipOrdering::default(),
InMemoryBlobStore::default(),
Default::default(),
);

Expand Down

0 comments on commit 8516fef

Please sign in to comment.