Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Replace get_tmp_ledger_path!() with self cleaning version (#33702)
Browse files Browse the repository at this point in the history
This macro is used a lot for tests to create a ledger path in order to
open a Blockstore. Files will be left on disk unless the test remembers
to call Blockstore::destroy() on the directory. So, instead of requiring
this, use the get_tmp_ledger_path_auto_delete!() macro that creates a
TempDir (which automatically deletes itself when it goes out of scope).
  • Loading branch information
steviez authored Oct 21, 2023
1 parent 5a96352 commit 56ccffd
Show file tree
Hide file tree
Showing 18 changed files with 2,081 additions and 2,211 deletions.
396 changes: 196 additions & 200 deletions banking-bench/src/main.rs

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions client-test/tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use {
futures_util::StreamExt,
rand::Rng,
serde_json::{json, Value},
solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path},
solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path_auto_delete},
solana_pubsub_client::{nonblocking, pubsub_client::PubsubClient},
solana_rpc::{
optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
Expand Down Expand Up @@ -233,8 +233,8 @@ fn test_block_subscription() {
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));

// setup Blockstore
let ledger_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&ledger_path).unwrap();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let blockstore = Arc::new(blockstore);

// populate ledger with test txs
Expand Down
252 changes: 123 additions & 129 deletions core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use {
blockstore::Blockstore,
blockstore_processor::process_entries_for_tests,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
get_tmp_ledger_path,
get_tmp_ledger_path_auto_delete,
},
solana_perf::{
packet::{to_packet_batches, Packet},
Expand Down Expand Up @@ -83,49 +83,46 @@ fn check_txs(receiver: &Arc<Receiver<WorkingBankEntry>>, ref_tx_count: usize) {
fn bench_consume_buffered(bencher: &mut Bencher) {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100_000);
let bank = Arc::new(Bank::new_for_benches(&genesis_config));
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(
Blockstore::open(&ledger_path).expect("Expected to be able to open database ledger"),
);
let (exit, poh_recorder, poh_service, _signal_receiver) =
create_test_recorder(bank, blockstore, None, None);

let recorder = poh_recorder.read().unwrap().new_recorder();
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();

let tx = test_tx();
let transactions = vec![tx; 4194304];
let batches = transactions
.iter()
.filter_map(|transaction| {
let packet = Packet::from_data(None, transaction).ok().unwrap();
DeserializedPacket::new(packet).ok()
})
.collect::<Vec<_>>();
let batches_len = batches.len();
let mut transaction_buffer = UnprocessedTransactionStorage::new_transaction_storage(
UnprocessedPacketBatches::from_iter(batches, 2 * batches_len),
ThreadType::Transactions,
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(
Blockstore::open(ledger_path.path()).expect("Expected to be able to open database ledger"),
);
let (exit, poh_recorder, poh_service, _signal_receiver) =
create_test_recorder(bank, blockstore, None, None);

let recorder = poh_recorder.read().unwrap().new_recorder();
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();

let tx = test_tx();
let transactions = vec![tx; 4194304];
let batches = transactions
.iter()
.filter_map(|transaction| {
let packet = Packet::from_data(None, transaction).ok().unwrap();
DeserializedPacket::new(packet).ok()
})
.collect::<Vec<_>>();
let batches_len = batches.len();
let mut transaction_buffer = UnprocessedTransactionStorage::new_transaction_storage(
UnprocessedPacketBatches::from_iter(batches, 2 * batches_len),
ThreadType::Transactions,
);
let (s, _r) = unbounded();
let committer = Committer::new(None, s, Arc::new(PrioritizationFeeCache::new(0u64)));
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
// This tests the performance of buffering packets.
// If the packet buffers are copied, performance will be poor.
bencher.iter(move || {
consumer.consume_buffered_packets(
&bank_start,
&mut transaction_buffer,
&BankingStageStats::default(),
&mut LeaderSlotMetricsTracker::new(0),
);
let (s, _r) = unbounded();
let committer = Committer::new(None, s, Arc::new(PrioritizationFeeCache::new(0u64)));
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
// This tests the performance of buffering packets.
// If the packet buffers are copied, performance will be poor.
bencher.iter(move || {
consumer.consume_buffered_packets(
&bank_start,
&mut transaction_buffer,
&BankingStageStats::default(),
&mut LeaderSlotMetricsTracker::new(0),
);
});
});

exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
}
let _unused = Blockstore::destroy(&ledger_path);
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
}

fn make_accounts_txs(txes: usize, mint_keypair: &Keypair, hash: Hash) -> Vec<Transaction> {
Expand Down Expand Up @@ -279,95 +276,92 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
packet_batches
});

let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(
Blockstore::open(&ledger_path).expect("Expected to be able to open database ledger"),
);
let (exit, poh_recorder, poh_service, signal_receiver) =
create_test_recorder(bank.clone(), blockstore, None, None);
let cluster_info = {
let keypair = Arc::new(Keypair::new());
let node = Node::new_localhost_with_pubkey(&keypair.pubkey());
ClusterInfo::new(node.info, keypair, SocketAddrSpace::Unspecified)
};
let cluster_info = Arc::new(cluster_info);
let (s, _r) = unbounded();
let _banking_stage = BankingStage::new(
BlockProductionMethod::ThreadLocalMultiIterator,
&cluster_info,
&poh_recorder,
non_vote_receiver,
tpu_vote_receiver,
gossip_vote_receiver,
None,
s,
None,
Arc::new(ConnectionCache::new("connection_cache_test")),
bank_forks,
&Arc::new(PrioritizationFeeCache::new(0u64)),
);

let chunk_len = verified.len() / CHUNKS;
let mut start = 0;

// This is so that the signal_receiver does not go out of scope after the closure.
// If it is dropped before poh_service, then poh_service will error when
// calling send() on the channel.
let signal_receiver = Arc::new(signal_receiver);
let signal_receiver2 = signal_receiver;
bencher.iter(move || {
let now = Instant::now();
let mut sent = 0;
if let Some(vote_packets) = &vote_packets {
tpu_vote_sender
.send(BankingPacketBatch::new((
vote_packets[start..start + chunk_len].to_vec(),
None,
)))
.unwrap();
gossip_vote_sender
.send(BankingPacketBatch::new((
vote_packets[start..start + chunk_len].to_vec(),
None,
)))
.unwrap();
}
for v in verified[start..start + chunk_len].chunks(chunk_len / num_threads) {
debug!(
"sending... {}..{} {} v.len: {}",
start,
start + chunk_len,
timestamp(),
v.len(),
);
for xv in v {
sent += xv.len();
}
non_vote_sender
.send(BankingPacketBatch::new((v.to_vec(), None)))
.unwrap();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(
Blockstore::open(ledger_path.path()).expect("Expected to be able to open database ledger"),
);
let (exit, poh_recorder, poh_service, signal_receiver) =
create_test_recorder(bank.clone(), blockstore, None, None);
let cluster_info = {
let keypair = Arc::new(Keypair::new());
let node = Node::new_localhost_with_pubkey(&keypair.pubkey());
ClusterInfo::new(node.info, keypair, SocketAddrSpace::Unspecified)
};
let cluster_info = Arc::new(cluster_info);
let (s, _r) = unbounded();
let _banking_stage = BankingStage::new(
BlockProductionMethod::ThreadLocalMultiIterator,
&cluster_info,
&poh_recorder,
non_vote_receiver,
tpu_vote_receiver,
gossip_vote_receiver,
None,
s,
None,
Arc::new(ConnectionCache::new("connection_cache_test")),
bank_forks,
&Arc::new(PrioritizationFeeCache::new(0u64)),
);

let chunk_len = verified.len() / CHUNKS;
let mut start = 0;

// This is so that the signal_receiver does not go out of scope after the closure.
// If it is dropped before poh_service, then poh_service will error when
// calling send() on the channel.
let signal_receiver = Arc::new(signal_receiver);
let signal_receiver2 = signal_receiver;
bencher.iter(move || {
let now = Instant::now();
let mut sent = 0;
if let Some(vote_packets) = &vote_packets {
tpu_vote_sender
.send(BankingPacketBatch::new((
vote_packets[start..start + chunk_len].to_vec(),
None,
)))
.unwrap();
gossip_vote_sender
.send(BankingPacketBatch::new((
vote_packets[start..start + chunk_len].to_vec(),
None,
)))
.unwrap();
}
for v in verified[start..start + chunk_len].chunks(chunk_len / num_threads) {
debug!(
"sending... {}..{} {} v.len: {}",
start,
start + chunk_len,
timestamp(),
v.len(),
);
for xv in v {
sent += xv.len();
}
non_vote_sender
.send(BankingPacketBatch::new((v.to_vec(), None)))
.unwrap();
}

check_txs(&signal_receiver2, txes / CHUNKS);

// This signature clear may not actually clear the signatures
// in this chunk, but since we rotate between CHUNKS then
// we should clear them by the time we come around again to re-use that chunk.
bank.clear_signatures();
trace!(
"time: {} checked: {} sent: {}",
duration_as_us(&now.elapsed()),
txes / CHUNKS,
sent,
);
start += chunk_len;
start %= verified.len();
});
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
}
let _unused = Blockstore::destroy(&ledger_path);
check_txs(&signal_receiver2, txes / CHUNKS);

// This signature clear may not actually clear the signatures
// in this chunk, but since we rotate between CHUNKS then
// we should clear them by the time we come around again to re-use that chunk.
bank.clear_signatures();
trace!(
"time: {} checked: {} sent: {}",
duration_as_us(&now.elapsed()),
txes / CHUNKS,
sent,
);
start += chunk_len;
start %= verified.len();
});
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
}

#[bench]
Expand Down
Loading

0 comments on commit 56ccffd

Please sign in to comment.