Skip to content

Commit

Permalink
feat: implement transaction fetcher, w/o redundant tx hash request (p…
Browse files Browse the repository at this point in the history
…aradigmxyz#5058)

Co-authored-by: Matthias Seitz <[email protected]>
  • Loading branch information
smatthewenglish and mattsse authored Oct 26, 2023
1 parent f782a33 commit c1a6e42
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 31 deletions.
7 changes: 7 additions & 0 deletions crates/net/eth-wire/src/types/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ pub struct PooledTransactions(
pub Vec<PooledTransactionsElement>,
);

impl PooledTransactions {
/// Returns an iterator over the transaction hashes in this response.
pub fn hashes(&self) -> impl Iterator<Item = &B256> + '_ {
self.0.iter().map(|tx| tx.hash())
}
}

impl From<Vec<TransactionSigned>> for PooledTransactions {
fn from(txs: Vec<TransactionSigned>) -> Self {
PooledTransactions(txs.into_iter().map(Into::into).collect())
Expand Down
8 changes: 8 additions & 0 deletions crates/net/network/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ impl PeerRequest {
}
}
}

/// Consumes the type and returns the inner [`GetPooledTransactions`] variant.
pub fn into_get_pooled_transactions(self) -> Option<GetPooledTransactions> {
match self {
PeerRequest::GetPooledTransactions { request, .. } => Some(request),
_ => None,
}
}
}

/// Corresponding variant for [`PeerRequest`].
Expand Down
235 changes: 204 additions & 31 deletions crates/net/network/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use std::{
sync::Arc,
task::{Context, Poll},
};
use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
use tokio::sync::{mpsc, mpsc::error::TrySendError, oneshot, oneshot::error::RecvError};
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
use tracing::trace;

Expand All @@ -55,6 +55,9 @@ const GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES: usize = 256;
const GET_POOLED_TRANSACTION_SOFT_LIMIT_SIZE: GetPooledTransactionLimit =
GetPooledTransactionLimit::SizeSoftLimit(2 * 1024 * 1024);

/// How many peers we keep track of for each missing transaction.
const MAX_ALTERNATIVE_PEERS_PER_TX: usize = 3;

/// The future for inserting a function into the pool
pub type PoolImportFuture = Pin<Box<dyn Future<Output = PoolResult<TxHash>> + Send + 'static>>;

Expand Down Expand Up @@ -149,8 +152,8 @@ pub struct TransactionsManager<Pool> {
///
/// From which we get all new incoming transaction related messages.
network_events: UnboundedReceiverStream<NetworkEvent>,
/// All currently active requests for pooled transactions.
inflight_requests: FuturesUnordered<GetPooledTxRequestFut>,
/// Transaction fetcher to handle inflight and missing transaction requests.
transaction_fetcher: TransactionFetcher,
/// All currently pending transactions grouped by peers.
///
/// This way we can track incoming transactions and prevent multiple pool imports for the same
Expand Down Expand Up @@ -192,7 +195,7 @@ impl<Pool: TransactionPool> TransactionsManager<Pool> {
pool,
network,
network_events,
inflight_requests: Default::default(),
transaction_fetcher: Default::default(),
transactions_by_peers: Default::default(),
pool_imports: Default::default(),
peers: Default::default(),
Expand Down Expand Up @@ -231,7 +234,9 @@ where

#[inline]
fn update_request_metrics(&self) {
self.metrics.inflight_transaction_requests.set(self.inflight_requests.len() as f64);
self.metrics
.inflight_transaction_requests
.set(self.transaction_fetcher.inflight_requests.len() as f64);
}

/// Request handler for an incoming request for transactions
Expand Down Expand Up @@ -503,16 +508,9 @@ where
hashes.truncate(GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES);

// request the missing transactions
let (response, rx) = oneshot::channel();
let req = PeerRequest::GetPooledTransactions {
request: GetPooledTransactions(hashes),
response,
};

if peer.request_tx.try_send(req).is_ok() {
self.inflight_requests.push(GetPooledTxRequestFut::new(peer_id, rx))
} else {
// peer channel is saturated, drop the request
let request_sent =
self.transaction_fetcher.request_transactions_from_peer(hashes, peer);
if !request_sent {
self.metrics.egress_peer_channel_full.increment(1);
return
}
Expand Down Expand Up @@ -542,7 +540,12 @@ where
.into_iter()
.map(PooledTransactionsElement::try_from_broadcast)
.filter_map(Result::ok)
.collect();
.collect::<Vec<_>>();

// mark the transactions as received
self.transaction_fetcher.on_received_full_transactions_broadcast(
non_blob_txs.iter().map(|tx| tx.hash()),
);

self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);

Expand Down Expand Up @@ -775,22 +778,19 @@ where

this.update_request_metrics();

// Advance all requests.
while let Poll::Ready(Some(GetPooledTxResponse { peer_id, result })) =
this.inflight_requests.poll_next_unpin(cx)
{
match result {
Ok(Ok(txs)) => {
this.import_transactions(peer_id, txs.0, TransactionSource::Response)
}
Ok(Err(req_err)) => {
this.on_request_error(peer_id, req_err);
}
Err(_) => {
// request channel closed/dropped
this.on_request_error(peer_id, RequestError::ChannelClosed)
let fetch_event = this.transaction_fetcher.poll(cx);
match fetch_event {
Poll::Ready(FetchEvent::TransactionsFetched { peer_id, transactions }) => {
if let Some(txns) = transactions {
this.import_transactions(peer_id, txns, TransactionSource::Response);
}
}
Poll::Ready(FetchEvent::FetchError { peer_id, error }) => {
this.on_request_error(peer_id, error);
}
Poll::Pending => {
// No event ready at the moment, nothing to do here.
}
}

this.update_request_metrics();
Expand Down Expand Up @@ -965,6 +965,8 @@ struct GetPooledTxRequest {

struct GetPooledTxResponse {
peer_id: PeerId,
/// Transaction hashes that were requested, for cleanup purposes
requested_hashes: Vec<TxHash>,
result: Result<RequestResult<PooledTransactions>, RecvError>,
}

Expand All @@ -991,7 +993,18 @@ impl Future for GetPooledTxRequestFut {
let mut req = self.as_mut().project().inner.take().expect("polled after completion");
match req.response.poll_unpin(cx) {
Poll::Ready(result) => {
Poll::Ready(GetPooledTxResponse { peer_id: req.peer_id, result })
let request_hashes: Vec<TxHash> = match &result {
Ok(Ok(pooled_txs)) => {
pooled_txs.0.iter().map(|tx_elem| *tx_elem.hash()).collect()
}
_ => Vec::new(),
};

Poll::Ready(GetPooledTxResponse {
peer_id: req.peer_id,
requested_hashes: request_hashes,
result,
})
}
Poll::Pending => {
self.project().inner.set(Some(req));
Expand All @@ -1015,6 +1028,166 @@ struct Peer {
client_version: Arc<String>,
}

/// The type responsible for fetching missing transactions from peers.
///
/// This will keep track of unique transaction hashes that are currently being fetched and submits
/// new requests on announced hashes.
#[derive(Debug, Default)]
struct TransactionFetcher {
/// All currently active requests for pooled transactions.
inflight_requests: FuturesUnordered<GetPooledTxRequestFut>,
/// Set that tracks all hashes that are currently being fetched.
inflight_hash_to_fallback_peers: HashMap<TxHash, Vec<PeerId>>,
}

// === impl TransactionFetcher ===

impl TransactionFetcher {
/// Removes the specified hashes from inflight tracking.
#[inline]
fn remove_inflight_hashes<'a, I>(&mut self, hashes: I)
where
I: IntoIterator<Item = &'a TxHash>,
{
for &hash in hashes {
self.inflight_hash_to_fallback_peers.remove(&hash);
}
}

/// Advances all inflight requests and returns the next event.
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<FetchEvent> {
if let Poll::Ready(Some(GetPooledTxResponse { peer_id, requested_hashes, result })) =
self.inflight_requests.poll_next_unpin(cx)
{
return match result {
Ok(Ok(txs)) => {
// clear received hashes
self.remove_inflight_hashes(txs.hashes());

// TODO: re-request missing hashes, for now clear all of them
self.remove_inflight_hashes(requested_hashes.iter());

Poll::Ready(FetchEvent::TransactionsFetched {
peer_id,
transactions: Some(txs.0),
})
}
Ok(Err(req_err)) => {
// TODO: re-request missing hashes
self.remove_inflight_hashes(&requested_hashes);
Poll::Ready(FetchEvent::FetchError { peer_id, error: req_err })
}
Err(_) => {
// TODO: re-request missing hashes
self.remove_inflight_hashes(&requested_hashes);
// request channel closed/dropped
Poll::Ready(FetchEvent::FetchError {
peer_id,
error: RequestError::ChannelClosed,
})
}
}
}
Poll::Pending
}

/// Removes the provided transaction hashes from the inflight requests set.
///
/// This is called when we receive full transactions that are currently scheduled for fetching.
#[inline]
fn on_received_full_transactions_broadcast<'a>(
&mut self,
hashes: impl IntoIterator<Item = &'a TxHash>,
) {
self.remove_inflight_hashes(hashes)
}

/// Requests the missing transactions from the announced hashes of the peer
///
/// This filters all announced hashes that are already in flight, and requests the missing,
/// while marking the given peer as an alternative peer for the hashes that are already in
/// flight.
fn request_transactions_from_peer(
&mut self,
mut announced_hashes: Vec<TxHash>,
peer: &Peer,
) -> bool {
let peer_id: PeerId = peer.request_tx.peer_id;
// 1. filter out inflight hashes, and register the peer as fallback for all inflight hashes
announced_hashes.retain(|&hash| {
match self.inflight_hash_to_fallback_peers.entry(hash) {
Entry::Vacant(entry) => {
// the hash is not in inflight hashes, insert it and retain in the vector
entry.insert(vec![peer_id]);
true
}
Entry::Occupied(mut entry) => {
// the hash is already in inflight, add this peer as a backup if not more than 3
// backups already
let backups = entry.get_mut();
if backups.len() < MAX_ALTERNATIVE_PEERS_PER_TX {
backups.push(peer_id);
}
false
}
}
});

// 2. request all missing from peer
if announced_hashes.is_empty() {
// nothing to request
return false
}

let (response, rx) = oneshot::channel();
let req: PeerRequest = PeerRequest::GetPooledTransactions {
request: GetPooledTransactions(announced_hashes),
response,
};

// try to send the request to the peer
if let Err(err) = peer.request_tx.try_send(req) {
// peer channel is full
match err {
TrySendError::Full(req) | TrySendError::Closed(req) => {
// need to do some cleanup so
let req = req.into_get_pooled_transactions().expect("is get pooled tx");

// we know that the peer is the only entry in the map, so we can remove all
for hash in req.0.into_iter() {
self.inflight_hash_to_fallback_peers.remove(&hash);
}
}
}
return false
} else {
//create a new request for it, from that peer
self.inflight_requests.push(GetPooledTxRequestFut::new(peer_id, rx))
}

true
}
}

/// Represents possible events from fetching transactions.
#[derive(Debug)]
enum FetchEvent {
/// Triggered when transactions are successfully fetched.
TransactionsFetched {
/// The ID of the peer from which transactions were fetched.
peer_id: PeerId,
/// The transactions that were fetched, if available.
transactions: Option<Vec<PooledTransactionsElement>>,
},
/// Triggered when there is an error in fetching transactions.
FetchError {
/// The ID of the peer from which an attempt to fetch transactions resulted in an error.
peer_id: PeerId,
/// The specific error that occurred while fetching.
error: RequestError,
},
}

/// Commands to send to the [`TransactionsManager`]
#[derive(Debug)]
enum TransactionsCommand {
Expand Down

0 comments on commit c1a6e42

Please sign in to comment.