Skip to content

Commit

Permalink
More transaction forwarding (solana-labs#19834)
Browse files Browse the repository at this point in the history
  • Loading branch information
sakridge authored Sep 14, 2021
1 parent 6bb63f9 commit 83d0822
Showing 1 changed file with 71 additions and 2 deletions.
73 changes: 71 additions & 2 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! can do its processing in parallel with signature verification on the GPU.
use crate::{
cluster_info::ClusterInfo,
data_budget::DataBudget,
packet_hasher::PacketHasher,
poh_recorder::{PohRecorder, PohRecorderError, TransactionRecorder, WorkingBankEntry},
poh_service::{self, PohService},
Expand Down Expand Up @@ -260,6 +261,7 @@ impl BankingStage {
LruCache::new(DEFAULT_LRU_SIZE),
PacketHasher::default(),
)));
let data_budget = Arc::new(DataBudget::default());
// Many banks that process transactions in parallel.
let bank_thread_hdls: Vec<JoinHandle<()>> = (0..num_threads)
.map(|i| {
Expand All @@ -276,6 +278,7 @@ impl BankingStage {
let transaction_status_sender = transaction_status_sender.clone();
let gossip_vote_sender = gossip_vote_sender.clone();
let duplicates = duplicates.clone();
let data_budget = data_budget.clone();
Builder::new()
.name("solana-banking-stage-tx".to_string())
.spawn(move || {
Expand All @@ -292,6 +295,7 @@ impl BankingStage {
transaction_status_sender,
gossip_vote_sender,
&duplicates,
&data_budget,
);
})
.unwrap()
Expand All @@ -315,11 +319,21 @@ impl BankingStage {
socket: &std::net::UdpSocket,
tpu_forwards: &std::net::SocketAddr,
unprocessed_packets: &UnprocessedPackets,
data_budget: &DataBudget,
) -> std::io::Result<()> {
let packets = Self::filter_valid_packets_for_forwarding(unprocessed_packets.iter());
inc_new_counter_info!("banking_stage-forwarded_packets", packets.len());
const INTERVAL_MS: u64 = 100;
const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200;
const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000;
const MAX_BYTES_BUDGET: usize = MAX_BYTES_PER_INTERVAL * 5;
data_budget.update(INTERVAL_MS, |bytes| {
std::cmp::min(bytes + MAX_BYTES_PER_INTERVAL, MAX_BYTES_BUDGET)
});
for p in packets {
socket.send_to(&p.data[..p.meta.size], &tpu_forwards)?;
if data_budget.take(p.meta.size) {
socket.send_to(&p.data[..p.meta.size], &tpu_forwards)?;
}
}

Ok(())
Expand Down Expand Up @@ -486,6 +500,7 @@ impl BankingStage {
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
recorder: &TransactionRecorder,
data_budget: &DataBudget,
) -> BufferedPacketsDecision {
let bank_start;
let (
Expand Down Expand Up @@ -536,6 +551,7 @@ impl BankingStage {
poh_recorder,
socket,
false,
&data_budget,
);
}
BufferedPacketsDecision::ForwardAndHold => {
Expand All @@ -546,6 +562,7 @@ impl BankingStage {
poh_recorder,
socket,
true,
&data_budget,
);
}
_ => (),
Expand All @@ -560,6 +577,7 @@ impl BankingStage {
poh_recorder: &Arc<Mutex<PohRecorder>>,
socket: &UdpSocket,
hold: bool,
data_budget: &DataBudget,
) {
if !enable_forwarding {
if !hold {
Expand All @@ -572,7 +590,7 @@ impl BankingStage {
Some(addr) => addr,
None => return,
};
let _ = Self::forward_buffered_packets(socket, &addr, buffered_packets);
let _ = Self::forward_buffered_packets(socket, &addr, buffered_packets, data_budget);
if hold {
buffered_packets.retain(|(_, index, _)| !index.is_empty());
for (_, _, forwarded) in buffered_packets.iter_mut() {
Expand All @@ -596,6 +614,7 @@ impl BankingStage {
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
data_budget: &DataBudget,
) {
let recorder = poh_recorder.lock().unwrap().recorder();
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
Expand All @@ -614,6 +633,7 @@ impl BankingStage {
&gossip_vote_sender,
&banking_stage_stats,
&recorder,
&data_budget,
);
if matches!(decision, BufferedPacketsDecision::Hold)
|| matches!(decision, BufferedPacketsDecision::ForwardAndHold)
Expand Down Expand Up @@ -2694,6 +2714,55 @@ mod tests {
Blockstore::destroy(&ledger_path).unwrap();
}

#[test]
fn test_forwarder_budget() {
solana_logger::setup();
// Create `Packets` with 1 unprocessed element
let single_element_packets = Packets::new(vec![Packet::default()]);
let mut unprocessed_packets: UnprocessedPackets =
vec![(single_element_packets, vec![0], false)]
.into_iter()
.collect();

let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);

let genesis_config_info = create_slow_genesis_config(10_000);
let GenesisConfigInfo { genesis_config, .. } = &genesis_config_info;

let bank = Arc::new(Bank::new_no_wallclock_throttle(&genesis_config));
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(
Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger"),
);
let poh_config = PohConfig {
// limit tick count to avoid clearing working_bank at
// PohRecord then PohRecorderError(MaxHeightReached) at BankingStage
target_tick_count: Some(bank.max_tick_height() - 1),
..PohConfig::default()
};

let (exit, poh_recorder, poh_service, _entry_receiver) =
create_test_recorder(&bank, &blockstore, Some(poh_config));

let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let data_budget = DataBudget::default();
BankingStage::handle_forwarding(
true,
&cluster_info,
&mut unprocessed_packets,
&poh_recorder,
&socket,
false,
&data_budget,
);
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
}
Blockstore::destroy(&ledger_path).unwrap();
}

#[test]
fn test_push_unprocessed_batch_limit() {
solana_logger::setup();
Expand Down

0 comments on commit 83d0822

Please sign in to comment.