Skip to content

Commit

Permalink
tpu-client: Speed up performance by awaiting all futures at once (#32945
Browse files Browse the repository at this point in the history
)

* tpu-client: Await all futures at once

* Add timeout when sending to not waste time on down nodes

* Update comment to make it clearer that we're not spiking
  • Loading branch information
joncinque authored Aug 24, 2023
1 parent 329c6f1 commit b42249f
Showing 1 changed file with 140 additions and 7 deletions.
147 changes: 140 additions & 7 deletions tpu-client/src/nonblocking/tpu_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ pub use crate::tpu_client::Result;
use {
crate::tpu_client::{RecentLeaderSlots, TpuClientConfig, MAX_FANOUT_SLOTS},
bincode::serialize,
futures_util::{future::join_all, stream::StreamExt},
futures_util::{
future::{join_all, FutureExt, TryFutureExt},
stream::StreamExt,
},
log::*,
solana_connection_cache::{
connection_cache::{
Expand All @@ -29,6 +32,8 @@ use {
},
std::{
collections::{HashMap, HashSet},
future::Future,
iter,
net::SocketAddr,
str::FromStr,
sync::{
Expand All @@ -45,6 +50,7 @@ use {
#[cfg(feature = "spinner")]
use {
crate::tpu_client::{SEND_TRANSACTION_INTERVAL, TRANSACTION_RESEND_INTERVAL},
indicatif::ProgressBar,
solana_rpc_client::spinner::{self, SendTransactionProgress},
solana_rpc_client_api::request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
solana_sdk::{message::Message, signers::Signers, transaction::TransactionError},
Expand Down Expand Up @@ -247,6 +253,100 @@ pub struct TpuClient<
connection_cache: Arc<ConnectionCache<P, M, C>>,
}

/// Helper function which generates futures to all be awaited together for maximum
/// throughput
#[cfg(feature = "spinner")]
fn send_wire_transaction_futures<'a, P, M, C>(
progress_bar: &'a ProgressBar,
progress: &'a SendTransactionProgress,
index: usize,
num_transactions: usize,
wire_transaction: Vec<u8>,
leaders: Vec<SocketAddr>,
connection_cache: &'a ConnectionCache<P, M, C>,
) -> Vec<impl Future<Output = TransportResult<()>> + 'a>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
{
const SEND_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);
let sleep_duration = SEND_TRANSACTION_INTERVAL.saturating_mul(index as u32);
let send_timeout = SEND_TIMEOUT_INTERVAL.saturating_add(sleep_duration);
leaders
.into_iter()
.map(|addr| {
timeout_future(
send_timeout,
sleep_and_send_wire_transaction_to_addr(
sleep_duration,
connection_cache,
addr,
wire_transaction.clone(),
),
)
.boxed_local() // required to make types work simply
})
.chain(iter::once(
timeout_future(
send_timeout,
sleep_and_set_message(
sleep_duration,
progress_bar,
progress,
index,
num_transactions,
),
)
.boxed_local(), // required to make types work simply
))
.collect::<Vec<_>>()
}

// Wrap an existing future with a timeout.
//
// Useful for end-users who don't need a persistent connection to each validator,
// and want to abort more quickly.
fn timeout_future<'a, Fut: Future<Output = TransportResult<()>> + 'a>(
timeout_duration: Duration,
future: Fut,
) -> impl Future<Output = TransportResult<()>> + 'a {
timeout(timeout_duration, future)
.unwrap_or_else(|_| Err(TransportError::Custom("Timed out".to_string())))
.boxed_local()
}

#[cfg(feature = "spinner")]
async fn sleep_and_set_message(
sleep_duration: Duration,
progress_bar: &ProgressBar,
progress: &SendTransactionProgress,
index: usize,
num_transactions: usize,
) -> TransportResult<()> {
sleep(sleep_duration).await;
progress.set_message_for_confirmed_transactions(
progress_bar,
&format!("Sending {}/{} transactions", index + 1, num_transactions,),
);
Ok(())
}

#[cfg(feature = "spinner")]
async fn sleep_and_send_wire_transaction_to_addr<P, M, C>(
sleep_duration: Duration,
connection_cache: &ConnectionCache<P, M, C>,
addr: SocketAddr,
wire_transaction: Vec<u8>,
) -> TransportResult<()>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
{
sleep(sleep_duration).await;
let conn = connection_cache.get_nonblocking_connection(&addr);
conn.send_data(&wire_transaction).await
}

async fn send_wire_transaction_to_addr<P, M, C>(
connection_cache: &ConnectionCache<P, M, C>,
addr: &SocketAddr,
Expand Down Expand Up @@ -459,15 +559,48 @@ where

// Periodically re-send all pending transactions
if Instant::now().duration_since(last_resend) > TRANSACTION_RESEND_INTERVAL {
// Prepare futures for all transactions
let mut futures = vec![];
for (index, (_i, transaction)) in pending_transactions.values().enumerate() {
if !self.send_transaction(transaction).await {
let wire_transaction = serialize(transaction).unwrap();
let leaders = self
.leader_tpu_service
.leader_tpu_sockets(self.fanout_slots);
futures.extend(send_wire_transaction_futures(
&progress_bar,
&progress,
index,
num_transactions,
wire_transaction,
leaders,
&self.connection_cache,
));
}

// Start the process of sending them all
let results = join_all(futures).await;

progress.set_message_for_confirmed_transactions(
&progress_bar,
"Checking sent transactions",
);
for (index, (tx_results, (_i, transaction))) in results
.chunks(self.fanout_slots as usize)
.zip(pending_transactions.values())
.enumerate()
{
// Only report an error if every future in the chunk errored
if tx_results.iter().all(|r| r.is_err()) {
progress.set_message_for_confirmed_transactions(
&progress_bar,
&format!(
"Resending failed transaction {} of {}",
index + 1,
num_transactions,
),
);
let _result = self.rpc_client.send_transaction(transaction).await.ok();
}
progress.set_message_for_confirmed_transactions(
&progress_bar,
&format!("Sending {}/{} transactions", index + 1, num_transactions,),
);
sleep(SEND_TRANSACTION_INTERVAL).await;
}
last_resend = Instant::now();
}
Expand Down

0 comments on commit b42249f

Please sign in to comment.