From b42249fffbb94ef0dc227f67bfd73179cc05c4ac Mon Sep 17 00:00:00 2001 From: Jon Cinque Date: Thu, 24 Aug 2023 13:04:00 +0200 Subject: [PATCH] tpu-client: Speed up performance by awaiting all futures at once (#32945) * tpu-client: Await all futures at once * Add timeout when sending to not waste time on down nodes * Update comment to make it clearer that we're not spiking --- tpu-client/src/nonblocking/tpu_client.rs | 147 +++++++++++++++++++++-- 1 file changed, 140 insertions(+), 7 deletions(-) diff --git a/tpu-client/src/nonblocking/tpu_client.rs b/tpu-client/src/nonblocking/tpu_client.rs index 2ddc1cfce21806..79c36520000c74 100644 --- a/tpu-client/src/nonblocking/tpu_client.rs +++ b/tpu-client/src/nonblocking/tpu_client.rs @@ -2,7 +2,10 @@ pub use crate::tpu_client::Result; use { crate::tpu_client::{RecentLeaderSlots, TpuClientConfig, MAX_FANOUT_SLOTS}, bincode::serialize, - futures_util::{future::join_all, stream::StreamExt}, + futures_util::{ + future::{join_all, FutureExt, TryFutureExt}, + stream::StreamExt, + }, log::*, solana_connection_cache::{ connection_cache::{ @@ -29,6 +32,8 @@ use { }, std::{ collections::{HashMap, HashSet}, + future::Future, + iter, net::SocketAddr, str::FromStr, sync::{ @@ -45,6 +50,7 @@ use { #[cfg(feature = "spinner")] use { crate::tpu_client::{SEND_TRANSACTION_INTERVAL, TRANSACTION_RESEND_INTERVAL}, + indicatif::ProgressBar, solana_rpc_client::spinner::{self, SendTransactionProgress}, solana_rpc_client_api::request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS, solana_sdk::{message::Message, signers::Signers, transaction::TransactionError}, @@ -247,6 +253,100 @@ pub struct TpuClient< connection_cache: Arc>, } +/// Helper function which generates futures to all be awaited together for maximum +/// throughput +#[cfg(feature = "spinner")] +fn send_wire_transaction_futures<'a, P, M, C>( + progress_bar: &'a ProgressBar, + progress: &'a SendTransactionProgress, + index: usize, + num_transactions: usize, + wire_transaction: Vec, + leaders: Vec, + connection_cache: &'a ConnectionCache, +) -> Vec> + 'a> +where + P: ConnectionPool, + M: ConnectionManager, +{ + const SEND_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5); + let sleep_duration = SEND_TRANSACTION_INTERVAL.saturating_mul(index as u32); + let send_timeout = SEND_TIMEOUT_INTERVAL.saturating_add(sleep_duration); + leaders + .into_iter() + .map(|addr| { + timeout_future( + send_timeout, + sleep_and_send_wire_transaction_to_addr( + sleep_duration, + connection_cache, + addr, + wire_transaction.clone(), + ), + ) + .boxed_local() // required to make types work simply + }) + .chain(iter::once( + timeout_future( + send_timeout, + sleep_and_set_message( + sleep_duration, + progress_bar, + progress, + index, + num_transactions, + ), + ) + .boxed_local(), // required to make types work simply + )) + .collect::>() +} + +// Wrap an existing future with a timeout. +// +// Useful for end-users who don't need a persistent connection to each validator, +// and want to abort more quickly. +fn timeout_future<'a, Fut: Future> + 'a>( + timeout_duration: Duration, + future: Fut, +) -> impl Future> + 'a { + timeout(timeout_duration, future) + .unwrap_or_else(|_| Err(TransportError::Custom("Timed out".to_string()))) + .boxed_local() +} + +#[cfg(feature = "spinner")] +async fn sleep_and_set_message( + sleep_duration: Duration, + progress_bar: &ProgressBar, + progress: &SendTransactionProgress, + index: usize, + num_transactions: usize, +) -> TransportResult<()> { + sleep(sleep_duration).await; + progress.set_message_for_confirmed_transactions( + progress_bar, + &format!("Sending {}/{} transactions", index + 1, num_transactions,), + ); + Ok(()) +} + +#[cfg(feature = "spinner")] +async fn sleep_and_send_wire_transaction_to_addr( + sleep_duration: Duration, + connection_cache: &ConnectionCache, + addr: SocketAddr, + wire_transaction: Vec, +) -> TransportResult<()> +where + P: ConnectionPool, + M: ConnectionManager, +{ + sleep(sleep_duration).await; + let conn = connection_cache.get_nonblocking_connection(&addr); + conn.send_data(&wire_transaction).await +} + async fn send_wire_transaction_to_addr( connection_cache: &ConnectionCache, addr: &SocketAddr, @@ -459,15 +559,48 @@ where // Periodically re-send all pending transactions if Instant::now().duration_since(last_resend) > TRANSACTION_RESEND_INTERVAL { + // Prepare futures for all transactions + let mut futures = vec![]; for (index, (_i, transaction)) in pending_transactions.values().enumerate() { - if !self.send_transaction(transaction).await { + let wire_transaction = serialize(transaction).unwrap(); + let leaders = self + .leader_tpu_service + .leader_tpu_sockets(self.fanout_slots); + futures.extend(send_wire_transaction_futures( + &progress_bar, + &progress, + index, + num_transactions, + wire_transaction, + leaders, + &self.connection_cache, + )); + } + + // Start the process of sending them all + let results = join_all(futures).await; + + progress.set_message_for_confirmed_transactions( + &progress_bar, + "Checking sent transactions", + ); + for (index, (tx_results, (_i, transaction))) in results + .chunks(self.fanout_slots as usize) + .zip(pending_transactions.values()) + .enumerate() + { + // Only report an error if every future in the chunk errored + if tx_results.iter().all(|r| r.is_err()) { + progress.set_message_for_confirmed_transactions( + &progress_bar, + &format!( + "Resending failed transaction {} of {}", + index + 1, + num_transactions, + ), + ); let _result = self.rpc_client.send_transaction(transaction).await.ok(); } - progress.set_message_for_confirmed_transactions( - &progress_bar, - &format!("Sending {}/{} transactions", index + 1, num_transactions,), - ); - sleep(SEND_TRANSACTION_INTERVAL).await; } last_resend = Instant::now(); }