Skip to content

Commit

Permalink
Wrap sidecar in arcs (paradigmxyz#11554)
Browse files Browse the repository at this point in the history
Co-authored-by: Matthias Seitz <[email protected]>
  • Loading branch information
kdonthi and mattsse authored Oct 30, 2024
1 parent bb8da98 commit 755fac0
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 46 deletions.
2 changes: 1 addition & 1 deletion crates/ethereum/payload/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ where
EthBuiltPayload::new(attributes.id, sealed_block, total_fees, Some(executed), requests);

// extend the payload with the blob sidecars from the executed txs
payload.extend_sidecars(blob_sidecars);
payload.extend_sidecars(blob_sidecars.into_iter().map(Arc::unwrap_or_clone));

Ok(BuildOutcome::Better { payload, cached_reads })
}
44 changes: 29 additions & 15 deletions crates/transaction-pool/src/blobstore/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl BlobStore for DiskFileBlobStore {
stat
}

fn get(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
self.inner.get_one(tx)
}

Expand All @@ -114,14 +114,17 @@ impl BlobStore for DiskFileBlobStore {
fn get_all(
&self,
txs: Vec<B256>,
) -> Result<Vec<(B256, BlobTransactionSidecar)>, BlobStoreError> {
) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
if txs.is_empty() {
return Ok(Vec::new())
}
self.inner.get_all(txs)
}

fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
fn get_exact(
&self,
txs: Vec<B256>,
) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
if txs.is_empty() {
return Ok(Vec::new())
}
Expand Down Expand Up @@ -164,7 +167,7 @@ impl BlobStore for DiskFileBlobStore {

struct DiskFileBlobStoreInner {
blob_dir: PathBuf,
blob_cache: Mutex<LruMap<TxHash, BlobTransactionSidecar, ByLength>>,
blob_cache: Mutex<LruMap<TxHash, Arc<BlobTransactionSidecar>, ByLength>>,
size_tracker: BlobStoreSize,
file_lock: RwLock<()>,
txs_to_delete: RwLock<HashSet<B256>>,
Expand Down Expand Up @@ -205,7 +208,7 @@ impl DiskFileBlobStoreInner {
fn insert_one(&self, tx: B256, data: BlobTransactionSidecar) -> Result<(), BlobStoreError> {
let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
data.rlp_encode_fields(&mut buf);
self.blob_cache.lock().insert(tx, data);
self.blob_cache.lock().insert(tx, Arc::new(data));
let size = self.write_one_encoded(tx, &buf)?;

self.size_tracker.add_size(size);
Expand All @@ -227,7 +230,7 @@ impl DiskFileBlobStoreInner {
{
let mut cache = self.blob_cache.lock();
for (tx, data) in txs {
cache.insert(tx, data);
cache.insert(tx, Arc::new(data));
}
}
let mut add = 0;
Expand Down Expand Up @@ -278,15 +281,19 @@ impl DiskFileBlobStoreInner {
}

/// Retrieves the blob for the given transaction hash from the blob cache or disk.
fn get_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
fn get_one(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
if let Some(blob) = self.blob_cache.lock().get(&tx) {
return Ok(Some(blob.clone()))
}
let blob = self.read_one(tx)?;

if let Some(blob) = &blob {
self.blob_cache.lock().insert(tx, blob.clone());
let blob_arc = Arc::new(blob.clone());
self.blob_cache.lock().insert(tx, blob_arc.clone());
return Ok(Some(blob_arc))
}
Ok(blob)

Ok(None)
}

/// Returns the path to the blob file for the given transaction hash.
Expand Down Expand Up @@ -374,7 +381,7 @@ impl DiskFileBlobStoreInner {
fn get_all(
&self,
txs: Vec<B256>,
) -> Result<Vec<(B256, BlobTransactionSidecar)>, BlobStoreError> {
) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
let mut res = Vec::with_capacity(txs.len());
let mut cache_miss = Vec::new();
{
Expand All @@ -396,8 +403,9 @@ impl DiskFileBlobStoreInner {
}
let mut cache = self.blob_cache.lock();
for (tx, data) in from_disk {
cache.insert(tx, data.clone());
res.push((tx, data));
let arc = Arc::new(data.clone());
cache.insert(tx, arc.clone());
res.push((tx, arc.clone()));
}

Ok(res)
Expand All @@ -407,7 +415,10 @@ impl DiskFileBlobStoreInner {
///
/// Returns an error if there are any missing blobs.
#[inline]
fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
fn get_exact(
&self,
txs: Vec<B256>,
) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
txs.into_iter()
.map(|tx| self.get_one(tx)?.ok_or(BlobStoreError::MissingSidecar(tx)))
.collect()
Expand Down Expand Up @@ -514,14 +525,17 @@ mod tests {
let blobs = rng_blobs(10);
let all_hashes = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
store.insert_all(blobs.clone()).unwrap();

// all cached
for (tx, blob) in &blobs {
assert!(store.is_cached(tx));
assert_eq!(store.get(*tx).unwrap().unwrap(), *blob);
let b = store.get(*tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
assert_eq!(b, *blob);
}

let all = store.get_all(all_hashes.clone()).unwrap();
for (tx, blob) in all {
assert!(blobs.contains(&(tx, blob)), "missing blob {tx:?}");
assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))), "missing blob {tx:?}");
}

assert!(store.contains(all_hashes[0]).unwrap());
Expand Down
21 changes: 11 additions & 10 deletions crates/transaction-pool/src/blobstore/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub struct InMemoryBlobStore {
#[derive(Debug, Default)]
struct InMemoryBlobStoreInner {
/// Storage for all blob data.
store: RwLock<HashMap<B256, BlobTransactionSidecar>>,
store: RwLock<HashMap<B256, Arc<BlobTransactionSidecar>>>,
size_tracker: BlobStoreSize,
}

Expand Down Expand Up @@ -75,7 +75,7 @@ impl BlobStore for InMemoryBlobStore {
}

// Retrieves the decoded blob data for the given transaction hash.
fn get(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
Ok(self.inner.store.read().get(&tx).cloned())
}

Expand All @@ -86,16 +86,17 @@ impl BlobStore for InMemoryBlobStore {
fn get_all(
&self,
txs: Vec<B256>,
) -> Result<Vec<(B256, BlobTransactionSidecar)>, BlobStoreError> {
) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
let store = self.inner.store.read();
Ok(txs.into_iter().filter_map(|tx| store.get(&tx).map(|item| (tx, item.clone()))).collect())
}

fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
fn get_exact(
&self,
txs: Vec<B256>,
) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
let store = self.inner.store.read();
txs.into_iter()
.map(|tx| store.get(&tx).cloned().ok_or_else(|| BlobStoreError::MissingSidecar(tx)))
.collect()
Ok(txs.into_iter().filter_map(|tx| store.get(&tx).cloned()).collect())
}

fn get_by_versioned_hashes(
Expand Down Expand Up @@ -134,7 +135,7 @@ impl BlobStore for InMemoryBlobStore {

/// Removes the given blob from the store and returns the size of the blob that was removed.
#[inline]
fn remove_size(store: &mut HashMap<B256, BlobTransactionSidecar>, tx: &B256) -> usize {
fn remove_size(store: &mut HashMap<B256, Arc<BlobTransactionSidecar>>, tx: &B256) -> usize {
store.remove(tx).map(|rem| rem.size()).unwrap_or_default()
}

Expand All @@ -143,11 +144,11 @@ fn remove_size(store: &mut HashMap<B256, BlobTransactionSidecar>, tx: &B256) ->
/// We don't need to handle the size updates for replacements because transactions are unique.
#[inline]
fn insert_size(
store: &mut HashMap<B256, BlobTransactionSidecar>,
store: &mut HashMap<B256, Arc<BlobTransactionSidecar>>,
tx: B256,
blob: BlobTransactionSidecar,
) -> usize {
let add = blob.size();
store.insert(tx, blob);
store.insert(tx, Arc::new(blob));
add
}
12 changes: 8 additions & 4 deletions crates/transaction-pool/src/blobstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ pub use noop::NoopBlobStore;
use reth_primitives::BlobTransactionSidecar;
use std::{
fmt,
sync::atomic::{AtomicUsize, Ordering},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
pub use tracker::{BlobStoreCanonTracker, BlobStoreUpdates};

Expand Down Expand Up @@ -44,7 +47,7 @@ pub trait BlobStore: fmt::Debug + Send + Sync + 'static {
fn cleanup(&self) -> BlobStoreCleanupStat;

/// Retrieves the decoded blob data for the given transaction hash.
fn get(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError>;
fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError>;

/// Checks if the given transaction hash is in the blob store.
fn contains(&self, tx: B256) -> Result<bool, BlobStoreError>;
Expand All @@ -58,13 +61,14 @@ pub trait BlobStore: fmt::Debug + Send + Sync + 'static {
fn get_all(
&self,
txs: Vec<B256>,
) -> Result<Vec<(B256, BlobTransactionSidecar)>, BlobStoreError>;
) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError>;

/// Returns the exact [`BlobTransactionSidecar`] for the given transaction hashes in the exact
/// order they were requested.
///
/// Returns an error if any of the blobs are not found in the blob store.
fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError>;
fn get_exact(&self, txs: Vec<B256>)
-> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError>;

/// Return the [`BlobTransactionSidecar`]s for a list of blob versioned hashes.
fn get_by_versioned_hashes(
Expand Down
10 changes: 7 additions & 3 deletions crates/transaction-pool/src/blobstore/noop.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobTransactionSidecar};
use alloy_eips::eip4844::BlobAndProofV1;
use alloy_primitives::B256;
use std::sync::Arc;

/// A blobstore implementation that does nothing
#[derive(Clone, Copy, Debug, PartialOrd, PartialEq, Eq, Default)]
Expand Down Expand Up @@ -28,7 +29,7 @@ impl BlobStore for NoopBlobStore {
BlobStoreCleanupStat::default()
}

fn get(&self, _tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
fn get(&self, _tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
Ok(None)
}

Expand All @@ -39,11 +40,14 @@ impl BlobStore for NoopBlobStore {
fn get_all(
&self,
_txs: Vec<B256>,
) -> Result<Vec<(B256, BlobTransactionSidecar)>, BlobStoreError> {
) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
Ok(vec![])
}

fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
fn get_exact(
&self,
txs: Vec<B256>,
) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
if txs.is_empty() {
return Ok(vec![])
}
Expand Down
9 changes: 6 additions & 3 deletions crates/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,21 +561,24 @@ where
self.pool.unique_senders()
}

fn get_blob(&self, tx_hash: TxHash) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
fn get_blob(
&self,
tx_hash: TxHash,
) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
self.pool.blob_store().get(tx_hash)
}

fn get_all_blobs(
&self,
tx_hashes: Vec<TxHash>,
) -> Result<Vec<(TxHash, BlobTransactionSidecar)>, BlobStoreError> {
) -> Result<Vec<(TxHash, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
self.pool.blob_store().get_all(tx_hashes)
}

fn get_all_blobs_exact(
&self,
tx_hashes: Vec<TxHash>,
) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
self.pool.blob_store().get_exact(tx_hashes)
}

Expand Down
2 changes: 2 additions & 0 deletions crates/transaction-pool/src/maintain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::{
collections::HashSet,
hash::{Hash, Hasher},
path::{Path, PathBuf},
sync::Arc,
};
use tokio::sync::oneshot;
use tracing::{debug, error, info, trace, warn};
Expand Down Expand Up @@ -328,6 +329,7 @@ pub async fn maintain_transaction_pool<Client, P, St, Tasks>(
pool.get_blob(tx.hash)
.ok()
.flatten()
.map(Arc::unwrap_or_clone)
.and_then(|sidecar| {
PooledTransactionsElementEcRecovered::try_from_blob_transaction(
tx, sidecar,
Expand Down
9 changes: 6 additions & 3 deletions crates/transaction-pool/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,21 +275,24 @@ impl TransactionPool for NoopTransactionPool {
Default::default()
}

fn get_blob(&self, _tx_hash: TxHash) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
fn get_blob(
&self,
_tx_hash: TxHash,
) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
Ok(None)
}

fn get_all_blobs(
&self,
_tx_hashes: Vec<TxHash>,
) -> Result<Vec<(TxHash, BlobTransactionSidecar)>, BlobStoreError> {
) -> Result<Vec<(TxHash, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
Ok(vec![])
}

fn get_all_blobs_exact(
&self,
tx_hashes: Vec<TxHash>,
) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
if tx_hashes.is_empty() {
return Ok(vec![])
}
Expand Down
4 changes: 3 additions & 1 deletion crates/transaction-pool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,9 @@ where
/// Caution: this assumes the given transaction is eip-4844
fn get_blob_transaction(&self, transaction: TransactionSigned) -> Option<BlobTransaction> {
if let Ok(Some(sidecar)) = self.blob_store.get(transaction.hash()) {
if let Ok(blob) = BlobTransaction::try_from_signed(transaction, sidecar) {
if let Ok(blob) =
BlobTransaction::try_from_signed(transaction, Arc::unwrap_or_clone(sidecar))
{
return Some(blob)
}
}
Expand Down
9 changes: 6 additions & 3 deletions crates/transaction-pool/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,10 @@ pub trait TransactionPool: Send + Sync + Clone {

/// Returns the [BlobTransactionSidecar] for the given transaction hash if it exists in the blob
/// store.
fn get_blob(&self, tx_hash: TxHash) -> Result<Option<BlobTransactionSidecar>, BlobStoreError>;
fn get_blob(
&self,
tx_hash: TxHash,
) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError>;

/// Returns all [BlobTransactionSidecar] for the given transaction hashes if they exists in the
/// blob store.
Expand All @@ -453,7 +456,7 @@ pub trait TransactionPool: Send + Sync + Clone {
fn get_all_blobs(
&self,
tx_hashes: Vec<TxHash>,
) -> Result<Vec<(TxHash, BlobTransactionSidecar)>, BlobStoreError>;
) -> Result<Vec<(TxHash, Arc<BlobTransactionSidecar>)>, BlobStoreError>;

/// Returns the exact [BlobTransactionSidecar] for the given transaction hashes in the order
/// they were requested.
Expand All @@ -462,7 +465,7 @@ pub trait TransactionPool: Send + Sync + Clone {
fn get_all_blobs_exact(
&self,
tx_hashes: Vec<TxHash>,
) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError>;
) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError>;

/// Return the [`BlobTransactionSidecar`]s for a list of blob versioned hashes.
fn get_blobs_for_versioned_hashes(
Expand Down
Loading

0 comments on commit 755fac0

Please sign in to comment.