Skip to content

Commit

Permalink
client: Speed up send-and-confirm-parallel (#33032)
Browse files Browse the repository at this point in the history
* Rename blockheight -> block height

* Clean up comments

* Simplify tpu-client implementation

* client: Destructure more in match arm

* client: Avoid passing Arc where it isn't needed

* Refactor sending into a new function

* Chain work after send to make last part cleaner

* Do all sending at once in the same future

* Sleep before sending to avoid overwhelming
  • Loading branch information
joncinque authored Aug 29, 2023
1 parent 555741e commit 2124f0a
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 87 deletions.
199 changes: 116 additions & 83 deletions client/src/send_and_confirm_transactions_in_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ use {
},
bincode::serialize,
dashmap::DashMap,
futures_util::future::{join_all, TryFutureExt},
solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
solana_rpc_client::spinner::{self, SendTransactionProgress},
solana_rpc_client_api::{
client_error::ErrorKind,
request::{RpcError, RpcResponseErrorData, MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS},
response::RpcSimulateTransactionResult,
},
solana_sdk::{
hash::Hash,
Expand All @@ -31,11 +33,12 @@ use {

const BLOCKHASH_REFRESH_RATE: Duration = Duration::from_secs(10);
const TPU_RESEND_REFRESH_RATE: Duration = Duration::from_secs(2);
const SEND_INTERVAL: Duration = Duration::from_millis(10);
type QuicTpuClient = TpuClient<QuicPool, QuicConnectionManager, QuicConfig>;

#[derive(Clone, Debug)]
struct TransactionData {
last_valid_blockheight: u64,
last_valid_block_height: u64,
message: Message,
index: usize,
serialized_transaction: Vec<u8>,
Expand All @@ -44,7 +47,7 @@ struct TransactionData {
#[derive(Clone, Debug, Copy)]
struct BlockHashData {
pub blockhash: Hash,
pub last_valid_blockheight: u64,
pub last_valid_block_height: u64,
}

#[derive(Clone, Debug, Copy)]
Expand All @@ -53,6 +56,7 @@ pub struct SendAndConfirmConfig {
pub resign_txs_count: Option<usize>,
}

/// Sends and confirms transactions concurrently in a sync context
pub fn send_and_confirm_transactions_in_parallel_blocking<T: Signers + ?Sized>(
rpc_client: Arc<BlockingRpcClient>,
tpu_client: Option<QuicTpuClient>,
Expand All @@ -77,18 +81,18 @@ fn create_blockhash_data_updating_task(
) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
if let Ok((blockhash, last_valid_blockheight)) = rpc_client
if let Ok((blockhash, last_valid_block_height)) = rpc_client
.get_latest_blockhash_with_commitment(rpc_client.commitment())
.await
{
*blockhash_data_rw.write().await = BlockHashData {
blockhash,
last_valid_blockheight,
last_valid_block_height,
};
}

if let Ok(blockheight) = rpc_client.get_block_height().await {
current_block_height.store(blockheight, Ordering::Relaxed);
if let Ok(block_height) = rpc_client.get_block_height().await {
current_block_height.store(block_height, Ordering::Relaxed);
}
tokio::time::sleep(BLOCKHASH_REFRESH_RATE).await;
}
Expand All @@ -112,10 +116,10 @@ fn create_transaction_confirmation_task(
let transactions_to_verify: Vec<Signature> = unconfirmed_transasction_map
.iter()
.filter(|x| {
let is_not_expired = current_block_height <= x.last_valid_blockheight;
let is_not_expired = current_block_height <= x.last_valid_block_height;
// transaction expired between last and current check
let is_recently_expired = last_block_height <= x.last_valid_blockheight
&& current_block_height > x.last_valid_blockheight;
let is_recently_expired = last_block_height <= x.last_valid_block_height
&& current_block_height > x.last_valid_block_height;
is_not_expired || is_recently_expired
})
.map(|x| *x.key())
Expand Down Expand Up @@ -177,15 +181,65 @@ fn progress_from_context_and_block_height(
}
}

async fn send_transaction_with_rpc_fallback(
rpc_client: &RpcClient,
tpu_client: &Option<QuicTpuClient>,
transaction: Transaction,
serialized_transaction: Vec<u8>,
context: &SendingContext,
index: usize,
counter: usize,
) -> Result<()> {
tokio::time::sleep(SEND_INTERVAL.saturating_mul(counter as u32)).await;
let send_over_rpc = if let Some(tpu_client) = tpu_client {
!tpu_client
.send_wire_transaction(serialized_transaction.clone())
.await
} else {
true
};
if send_over_rpc {
if let Err(e) = rpc_client.send_transaction(&transaction).await {
match &e.kind {
ErrorKind::Io(_) | ErrorKind::Reqwest(_) => {
// fall through on io error, we will retry the transaction
}
ErrorKind::TransactionError(transaction_error) => {
context.error_map.insert(index, transaction_error.clone());
return Ok(());
}
ErrorKind::RpcError(RpcError::RpcResponseError {
data:
RpcResponseErrorData::SendTransactionPreflightFailure(
RpcSimulateTransactionResult {
err: Some(transaction_error),
..
},
),
..
}) => {
context.error_map.insert(index, transaction_error.clone());
return Ok(());
}
_ => {
return Err(TpuSenderError::from(e));
}
}
}
}
Ok(())
}

async fn sign_all_messages_and_send<T: Signers + ?Sized>(
progress_bar: &Option<indicatif::ProgressBar>,
rpc_client: Arc<RpcClient>,
rpc_client: &RpcClient,
tpu_client: &Option<QuicTpuClient>,
messages_with_index: Vec<(usize, Message)>,
signers: &T,
context: &SendingContext,
) -> Result<()> {
let current_transaction_count = messages_with_index.len();
let mut futures = vec![];
// send all the transaction messages
for (counter, (index, message)) in messages_with_index.iter().enumerate() {
let mut transaction = Transaction::new_unsigned(message.clone());
Expand All @@ -196,71 +250,48 @@ async fn sign_all_messages_and_send<T: Signers + ?Sized>(
.try_sign(signers, blockhashdata.blockhash)
.expect("Transaction should be signable");
let serialized_transaction = serialize(&transaction).expect("Transaction should serailize");
let send_over_rpc = if let Some(tpu_client) = tpu_client {
!tpu_client
.send_wire_transaction(serialized_transaction.clone())
.await
} else {
true
};
if send_over_rpc {
if let Err(e) = rpc_client.send_transaction(&transaction).await {
match &e.kind {
ErrorKind::Io(_) | ErrorKind::Reqwest(_) => {
// fall through on io error, we will retry the transaction
}
ErrorKind::TransactionError(transaction_error) => {
context.error_map.insert(*index, transaction_error.clone());
continue;
}
ErrorKind::RpcError(rpc_error) => {
if let RpcError::RpcResponseError {
data:
RpcResponseErrorData::SendTransactionPreflightFailure(simulation_result),
..
} = rpc_error
{
if let Some(transaction_error) = &simulation_result.err {
context.error_map.insert(*index, transaction_error.clone());
continue;
}
}
return Err(TpuSenderError::from(e));
}
_ => {
return Err(TpuSenderError::from(e));
}
}
}
}

let signature = transaction.signatures[0];
// send to confirm the transaction
context.unconfirmed_transasction_map.insert(
signature,
TransactionData {
index: *index,
serialized_transaction,
last_valid_blockheight: blockhashdata.last_valid_blockheight,
message: message.clone(),
},
);

if let Some(progress_bar) = progress_bar {
let progress = progress_from_context_and_block_height(
futures.push(
send_transaction_with_rpc_fallback(
rpc_client,
tpu_client,
transaction,
serialized_transaction.clone(),
context,
blockhashdata.last_valid_blockheight,
);
progress.set_message_for_confirmed_transactions(
progress_bar,
&format!(
"Sending {}/{} transactions",
counter + 1,
current_transaction_count,
),
);
}
*index,
counter,
)
.and_then(move |_| async move {
// send to confirm the transaction
context.unconfirmed_transasction_map.insert(
signature,
TransactionData {
index: *index,
serialized_transaction,
last_valid_block_height: blockhashdata.last_valid_block_height,
message: message.clone(),
},
);
if let Some(progress_bar) = progress_bar {
let progress = progress_from_context_and_block_height(
context,
blockhashdata.last_valid_block_height,
);
progress.set_message_for_confirmed_transactions(
progress_bar,
&format!(
"Sending {}/{} transactions",
counter + 1,
current_transaction_count,
),
);
}
Ok(())
}),
);
}
// collect to convert Vec<Result<_>> to Result<Vec<_>>
join_all(futures).await.into_iter().collect::<Result<_>>()?;
Ok(())
}

Expand All @@ -275,7 +306,7 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction
let transactions_to_confirm = unconfirmed_transasction_map.len();
let max_valid_block_height = unconfirmed_transasction_map
.iter()
.map(|x| x.last_valid_blockheight)
.map(|x| x.last_valid_block_height)
.max();

if let Some(mut max_valid_block_height) = max_valid_block_height {
Expand All @@ -293,7 +324,7 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction
while !unconfirmed_transasction_map.is_empty()
&& current_block_height.load(Ordering::Relaxed) <= max_valid_block_height
{
let blockheight = current_block_height.load(Ordering::Relaxed);
let block_height = current_block_height.load(Ordering::Relaxed);

if let Some(progress_bar) = progress_bar {
let progress =
Expand All @@ -310,7 +341,7 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction
// any transactions sent over RPC will be automatically rebroadcast by the RPC server
let txs_to_resend_over_tpu = unconfirmed_transasction_map
.iter()
.filter(|x| blockheight < x.last_valid_blockheight)
.filter(|x| block_height < x.last_valid_block_height)
.map(|x| x.serialized_transaction.clone())
.collect();
let _ = tpu_client
Expand All @@ -327,7 +358,7 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction
if let Some(max_valid_block_height_in_remaining_transaction) =
unconfirmed_transasction_map
.iter()
.map(|x| x.last_valid_blockheight)
.map(|x| x.last_valid_block_height)
.max()
{
max_valid_block_height = max_valid_block_height_in_remaining_transaction;
Expand All @@ -344,9 +375,11 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction
}
}

/// This is a new method which will be able to send and confirm a large amount of transactions
/// Sends and confirms transactions concurrently
///
/// The sending and confirmation of transactions is done in parallel tasks
/// The signer sign the transaction just before sending so that blockhash is not expired
/// The method signs transactions just before sending so that blockhash does not
/// expire.
pub async fn send_and_confirm_transactions_in_parallel<T: Signers + ?Sized>(
rpc_client: Arc<RpcClient>,
tpu_client: Option<QuicTpuClient>,
Expand All @@ -355,12 +388,12 @@ pub async fn send_and_confirm_transactions_in_parallel<T: Signers + ?Sized>(
config: SendAndConfirmConfig,
) -> Result<Vec<Option<TransactionError>>> {
// get current blockhash and corresponding last valid block height
let (blockhash, last_valid_blockheight) = rpc_client
let (blockhash, last_valid_block_height) = rpc_client
.get_latest_blockhash_with_commitment(rpc_client.commitment())
.await?;
let blockhash_data_rw = Arc::new(RwLock::new(BlockHashData {
blockhash,
last_valid_blockheight,
last_valid_block_height,
}));

// check if all the messages are signable by the signers
Expand All @@ -372,7 +405,7 @@ pub async fn send_and_confirm_transactions_in_parallel<T: Signers + ?Sized>(
})
.collect::<std::result::Result<Vec<()>, SignerError>>()?;

// get current blockheight
// get current block height
let block_height = rpc_client.get_block_height().await?;
let current_block_height = Arc::new(AtomicU64::new(block_height));

Expand All @@ -382,7 +415,7 @@ pub async fn send_and_confirm_transactions_in_parallel<T: Signers + ?Sized>(
progress_bar
});

// blockhash and blockheight update task
// blockhash and block height update task
let block_data_task = create_blockhash_data_updating_task(
rpc_client.clone(),
blockhash_data_rw.clone(),
Expand Down Expand Up @@ -436,7 +469,7 @@ pub async fn send_and_confirm_transactions_in_parallel<T: Signers + ?Sized>(

sign_all_messages_and_send(
&progress_bar,
rpc_client.clone(),
&rpc_client,
&tpu_client,
messages_with_index,
signers,
Expand Down
5 changes: 1 addition & 4 deletions tpu-client/src/nonblocking/tpu_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,6 @@ fn timeout_future<'a, Fut: Future<Output = TransportResult<()>> + 'a>(
) -> impl Future<Output = TransportResult<()>> + 'a {
timeout(timeout_duration, future)
.unwrap_or_else(|_| Err(TransportError::Custom("Timed out".to_string())))
.boxed_local()
}

#[cfg(feature = "spinner")]
Expand All @@ -331,7 +330,6 @@ async fn sleep_and_set_message(
Ok(())
}

#[cfg(feature = "spinner")]
async fn sleep_and_send_wire_transaction_to_addr<P, M, C>(
sleep_duration: Duration,
connection_cache: &ConnectionCache<P, M, C>,
Expand All @@ -343,8 +341,7 @@ where
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
{
sleep(sleep_duration).await;
let conn = connection_cache.get_nonblocking_connection(&addr);
conn.send_data(&wire_transaction).await
send_wire_transaction_to_addr(connection_cache, &addr, wire_transaction).await
}

async fn send_wire_transaction_to_addr<P, M, C>(
Expand Down

0 comments on commit 2124f0a

Please sign in to comment.