Skip to content

Commit

Permalink
Add Vote PubSub endpoint for live gossip votes. (solana-labs#10045)
Browse files Browse the repository at this point in the history
* Add Vote PubSub endpoint for live gossip votes.

* Updated tests for Vote RPC and Vote Listener

* Add JSON RPC documentation for Vote RPC.

* Base58 encode hash in Vote RPC response.
  • Loading branch information
Reisen authored May 17, 2020
1 parent 9222bc2 commit bfcfbab
Show file tree
Hide file tree
Showing 6 changed files with 324 additions and 21 deletions.
105 changes: 87 additions & 18 deletions core/src/cluster_info_vote_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::{
crds_value::CrdsValueLabel,
poh_recorder::PohRecorder,
result::{Error, Result},
rpc_subscriptions::RpcSubscriptions,
sigverify,
verified_vote_packets::VerifiedVotePackets,
};
Expand Down Expand Up @@ -202,6 +203,7 @@ impl ClusterInfoVoteListener {
poh_recorder: &Arc<Mutex<PohRecorder>>,
vote_tracker: Arc<VoteTracker>,
bank_forks: Arc<RwLock<BankForks>>,
subscriptions: Arc<RpcSubscriptions>,
) -> Self {
let exit_ = exit.clone();

Expand Down Expand Up @@ -242,6 +244,7 @@ impl ClusterInfoVoteListener {
verified_vote_transactions_receiver,
vote_tracker,
&bank_forks,
subscriptions,
);
})
.unwrap();
Expand Down Expand Up @@ -364,6 +367,7 @@ impl ClusterInfoVoteListener {
vote_txs_receiver: VerifiedVoteTransactionsReceiver,
vote_tracker: Arc<VoteTracker>,
bank_forks: &RwLock<BankForks>,
subscriptions: Arc<RpcSubscriptions>,
) -> Result<()> {
loop {
if exit.load(Ordering::Relaxed) {
Expand All @@ -373,9 +377,12 @@ impl ClusterInfoVoteListener {
let root_bank = bank_forks.read().unwrap().root_bank().clone();
vote_tracker.process_new_root_bank(&root_bank);

if let Err(e) =
Self::get_and_process_votes(&vote_txs_receiver, &vote_tracker, root_bank.slot())
{
if let Err(e) = Self::get_and_process_votes(
&vote_txs_receiver,
&vote_tracker,
root_bank.slot(),
subscriptions.clone(),
) {
match e {
Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => {
return Ok(());
Expand All @@ -389,21 +396,37 @@ impl ClusterInfoVoteListener {
}
}

#[cfg(test)]
pub fn get_and_process_votes_for_tests(
vote_txs_receiver: &VerifiedVoteTransactionsReceiver,
vote_tracker: &Arc<VoteTracker>,
last_root: Slot,
subscriptions: Arc<RpcSubscriptions>,
) -> Result<()> {
Self::get_and_process_votes(vote_txs_receiver, vote_tracker, last_root, subscriptions)
}

fn get_and_process_votes(
vote_txs_receiver: &VerifiedVoteTransactionsReceiver,
vote_tracker: &Arc<VoteTracker>,
last_root: Slot,
subscriptions: Arc<RpcSubscriptions>,
) -> Result<()> {
let timer = Duration::from_millis(200);
let mut vote_txs = vote_txs_receiver.recv_timeout(timer)?;
while let Ok(new_txs) = vote_txs_receiver.try_recv() {
vote_txs.extend(new_txs);
}
Self::process_votes(vote_tracker, vote_txs, last_root);
Self::process_votes(vote_tracker, vote_txs, last_root, subscriptions);
Ok(())
}

fn process_votes(vote_tracker: &VoteTracker, vote_txs: Vec<Transaction>, root: Slot) {
fn process_votes(
vote_tracker: &VoteTracker,
vote_txs: Vec<Transaction>,
root: Slot,
subscriptions: Arc<RpcSubscriptions>,
) {
let mut diff: HashMap<Slot, HashSet<Arc<Pubkey>>> = HashMap::new();
{
let all_slot_trackers = &vote_tracker.slot_vote_trackers;
Expand Down Expand Up @@ -455,7 +478,7 @@ impl ClusterInfoVoteListener {
continue;
}

for slot in vote.slots {
for &slot in vote.slots.iter() {
if slot <= root {
continue;
}
Expand All @@ -480,6 +503,8 @@ impl ClusterInfoVoteListener {
.or_default()
.insert(unduplicated_pubkey.unwrap());
}

subscriptions.notify_vote(&vote);
}
}
}
Expand Down Expand Up @@ -519,6 +544,8 @@ impl ClusterInfoVoteListener {
#[cfg(test)]
mod tests {
use super::*;
use crate::commitment::BlockCommitmentCache;
use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path};
use solana_perf::packet;
use solana_runtime::{
bank::Bank,
Expand Down Expand Up @@ -615,7 +642,7 @@ mod tests {

#[test]
fn test_update_new_root() {
let (vote_tracker, bank, _) = setup();
let (vote_tracker, bank, _, _) = setup();

// Check outdated slots are purged with new root
let new_voter = Arc::new(Pubkey::new_rand());
Expand Down Expand Up @@ -656,7 +683,7 @@ mod tests {

#[test]
fn test_update_new_leader_schedule_epoch() {
let (vote_tracker, bank, _) = setup();
let (vote_tracker, bank, _, _) = setup();

// Check outdated slots are purged with new root
let leader_schedule_epoch = bank.get_leader_schedule_epoch(bank.slot());
Expand Down Expand Up @@ -698,7 +725,7 @@ mod tests {
#[test]
fn test_process_votes() {
// Create some voters at genesis
let (vote_tracker, _, validator_voting_keypairs) = setup();
let (vote_tracker, _, validator_voting_keypairs, subscriptions) = setup();
let (votes_sender, votes_receiver) = unbounded();

let vote_slots = vec![1, 2];
Expand All @@ -717,7 +744,13 @@ mod tests {
});

// Check that all the votes were registered for each validator correctly
ClusterInfoVoteListener::get_and_process_votes(&votes_receiver, &vote_tracker, 0).unwrap();
ClusterInfoVoteListener::get_and_process_votes(
&votes_receiver,
&vote_tracker,
0,
subscriptions,
)
.unwrap();
for vote_slot in vote_slots {
let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(vote_slot).unwrap();
let r_slot_vote_tracker = slot_vote_tracker.read().unwrap();
Expand All @@ -736,7 +769,7 @@ mod tests {
#[test]
fn test_process_votes2() {
// Create some voters at genesis
let (vote_tracker, _, validator_voting_keypairs) = setup();
let (vote_tracker, _, validator_voting_keypairs, subscriptions) = setup();
// Send some votes to process
let (votes_sender, votes_receiver) = unbounded();

Expand All @@ -760,7 +793,13 @@ mod tests {
}

// Check that all the votes were registered for each validator correctly
ClusterInfoVoteListener::get_and_process_votes(&votes_receiver, &vote_tracker, 0).unwrap();
ClusterInfoVoteListener::get_and_process_votes(
&votes_receiver,
&vote_tracker,
0,
subscriptions,
)
.unwrap();
for (i, keyset) in validator_voting_keypairs.chunks(2).enumerate() {
let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(i as u64 + 1).unwrap();
let r_slot_vote_tracker = &slot_vote_tracker.read().unwrap();
Expand All @@ -779,7 +818,7 @@ mod tests {
#[test]
fn test_get_voters_by_epoch() {
// Create some voters at genesis
let (vote_tracker, bank, validator_voting_keypairs) = setup();
let (vote_tracker, bank, validator_voting_keypairs, _) = setup();
let last_known_epoch = bank.get_leader_schedule_epoch(bank.slot());
let last_known_slot = bank
.epoch_schedule()
Expand Down Expand Up @@ -850,11 +889,23 @@ mod tests {
100,
);
let bank = Bank::new(&genesis_config);
let exit = Arc::new(AtomicBool::new(false));
let bank_forks = BankForks::new(0, bank);
let bank = bank_forks.get(0).unwrap().clone();
let vote_tracker = VoteTracker::new(&bank);
let ledger_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let subscriptions = Arc::new(RpcSubscriptions::new(
&exit,
Arc::new(RwLock::new(bank_forks)),
Arc::new(RwLock::new(BlockCommitmentCache::default_with_blockstore(
blockstore.clone(),
))),
));

// Send a vote to process, should add a reference to the pubkey for that voter
// in the tracker
let validator0_keypairs = &validator_voting_keypairs[0];
let vote_tracker = VoteTracker::new(&bank);
let vote_tx = vec![vote_transaction::new_vote_transaction(
// Must vote > root to be processed
vec![bank.slot() + 1],
Expand All @@ -865,7 +916,7 @@ mod tests {
&validator0_keypairs.vote_keypair,
)];

ClusterInfoVoteListener::process_votes(&vote_tracker, vote_tx, 0);
ClusterInfoVoteListener::process_votes(&vote_tracker, vote_tx, 0, subscriptions.clone());
let ref_count = Arc::strong_count(
&vote_tracker
.keys
Expand Down Expand Up @@ -915,7 +966,7 @@ mod tests {
})
.collect();

ClusterInfoVoteListener::process_votes(&vote_tracker, vote_txs, 0);
ClusterInfoVoteListener::process_votes(&vote_tracker, vote_txs, 0, subscriptions);

let ref_count = Arc::strong_count(
&vote_tracker
Expand All @@ -929,7 +980,12 @@ mod tests {
assert_eq!(ref_count, current_ref_count);
}

fn setup() -> (Arc<VoteTracker>, Arc<Bank>, Vec<ValidatorVoteKeypairs>) {
fn setup() -> (
Arc<VoteTracker>,
Arc<Bank>,
Vec<ValidatorVoteKeypairs>,
Arc<RpcSubscriptions>,
) {
let validator_voting_keypairs: Vec<_> = (0..10)
.map(|_| ValidatorVoteKeypairs::new(Keypair::new(), Keypair::new(), Keypair::new()))
.collect();
Expand All @@ -941,6 +997,18 @@ mod tests {
);
let bank = Bank::new(&genesis_config);
let vote_tracker = VoteTracker::new(&bank);
let exit = Arc::new(AtomicBool::new(false));
let bank_forks = BankForks::new(0, bank);
let bank = bank_forks.get(0).unwrap().clone();
let ledger_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let subscriptions = Arc::new(RpcSubscriptions::new(
&exit,
Arc::new(RwLock::new(bank_forks)),
Arc::new(RwLock::new(BlockCommitmentCache::default_with_blockstore(
blockstore.clone(),
))),
));

// Integrity Checks
let current_epoch = bank.epoch();
Expand All @@ -967,8 +1035,9 @@ mod tests {
assert_eq!(*vote_tracker.current_epoch.read().unwrap(), current_epoch);
(
Arc::new(vote_tracker),
Arc::new(bank),
bank,
validator_voting_keypairs,
subscriptions,
)
}

Expand Down
Loading

0 comments on commit bfcfbab

Please sign in to comment.