Skip to content

Commit

Permalink
implements cursor for gossip crds table queries (solana-labs#16952)
Browse files Browse the repository at this point in the history
VersionedCrdsValue.insert_timestamp is used for fetching crds values
inserted since last query:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1197-L1215
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1274-L1298

So it is crucial that insert_timestamp does not go backward in time when
new values are inserted into the table. However std::time::SystemTime is
not monotonic, or due to workload, lock contention, thread scheduling,
etc, ... new values may be inserted with a stalled timestamp way in the
past. Additionally, reading system time for the above purpose is
inefficient/unnecessary.

This commit adds an ordinal index to crds values indicating their insert
order. Additionally, it implements a new Cursor type for fetching values
inserted since last query.
  • Loading branch information
behzadnouri authored May 6, 2021
1 parent d19526e commit fa86a33
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 261 deletions.
125 changes: 35 additions & 90 deletions core/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use crate::{
cluster_info_metrics::{submit_gossip_stats, Counter, GossipStats, ScopedTimer},
contact_info::ContactInfo,
crds::Cursor,
crds_gossip::CrdsGossip,
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{CrdsFilter, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS},
Expand Down Expand Up @@ -1120,20 +1121,13 @@ impl ClusterInfo {
Ok(())
}

/// Get votes in the crds
/// * since - The timestamp of when the vote inserted must be greater than
/// since. This allows the bank to query for new votes only.
///
/// * return - The votes, and the max timestamp from the new set.
pub fn get_votes(&self, since: u64) -> (Vec<CrdsValueLabel>, Vec<Transaction>, u64) {
let mut max_ts = since;
let (labels, txs): (Vec<CrdsValueLabel>, Vec<Transaction>) = self
/// Returns votes inserted since the given cursor.
pub fn get_votes(&self, cursor: &mut Cursor) -> (Vec<CrdsValueLabel>, Vec<Transaction>) {
let (labels, txs): (_, Vec<_>) = self
.time_gossip_read_lock("get_votes", &self.stats.get_votes)
.crds
.get_votes()
.filter(|vote| vote.insert_timestamp > since)
.get_votes(cursor)
.map(|vote| {
max_ts = std::cmp::max(vote.insert_timestamp, max_ts);
let transaction = match &vote.value.data {
CrdsData::Vote(_, vote) => vote.transaction().clone(),
_ => panic!("this should not happen!"),
Expand All @@ -1142,7 +1136,7 @@ impl ClusterInfo {
})
.unzip();
inc_new_counter_info!("cluster_info-get_votes-count", txs.len());
(labels, txs, max_ts)
(labels, txs)
}

pub(crate) fn push_duplicate_shred(&self, shred: &Shred, other_payload: &[u8]) -> Result<()> {
Expand Down Expand Up @@ -1180,52 +1174,15 @@ impl ClusterInfo {
.map(map)
}

pub fn get_lowest_slot_for_node<F, Y>(
&self,
pubkey: &Pubkey,
since: Option<u64>,
map: F,
) -> Option<Y>
where
F: FnOnce(&LowestSlot, u64) -> Y,
{
self.gossip
.read()
.unwrap()
.crds
.get(&CrdsValueLabel::LowestSlot(*pubkey))
.filter(|x| {
since
.map(|since| x.insert_timestamp > since)
.unwrap_or(true)
pub(crate) fn get_epoch_slots(&self, cursor: &mut Cursor) -> Vec<EpochSlots> {
let gossip = self.gossip.read().unwrap();
let entries = gossip.crds.get_epoch_slots(cursor);
entries
.map(|entry| match &entry.value.data {
CrdsData::EpochSlots(_, slots) => slots.clone(),
_ => panic!("this should not happen!"),
})
.map(|x| map(x.value.lowest_slot().unwrap(), x.insert_timestamp))
}

pub fn get_epoch_slots_since(
&self,
timestamp: u64,
) -> (
Vec<EpochSlots>,
Option<u64>, // Most recent insert timestmap.
) {
let mut max_ts = 0;
let vals: Vec<_> = self
.gossip
.read()
.unwrap()
.crds
.get_epoch_slots_since(timestamp)
.map(|value| {
max_ts = std::cmp::max(max_ts, value.insert_timestamp);
match &value.value.data {
CrdsData::EpochSlots(_, slots) => slots.clone(),
_ => panic!("this should not happen!"),
}
})
.collect();
let max_ts = if vals.is_empty() { None } else { Some(max_ts) };
(vals, max_ts)
.collect()
}

pub fn get_node_version(&self, pubkey: &Pubkey) -> Option<solana_version::Version> {
Expand Down Expand Up @@ -3670,7 +3627,8 @@ mod tests {
);
cluster_info.push_vote(&unrefresh_tower, unrefresh_tx.clone());
cluster_info.flush_push_queue();
let (_, votes, max_ts) = cluster_info.get_votes(0);
let mut cursor = Cursor::default();
let (_, votes) = cluster_info.get_votes(&mut cursor);
assert_eq!(votes, vec![unrefresh_tx.clone()]);

// Now construct vote for the slot to be refreshed later
Expand All @@ -3691,9 +3649,9 @@ mod tests {
// shouldn't add the vote
cluster_info.refresh_vote(refresh_tx.clone(), refresh_slot);
cluster_info.flush_push_queue();
let (_, votes, max_ts) = cluster_info.get_votes(max_ts);
let (_, votes) = cluster_info.get_votes(&mut cursor);
assert_eq!(votes, vec![]);
let (_, votes, _) = cluster_info.get_votes(0);
let (_, votes) = cluster_info.get_votes(&mut Cursor::default());
assert_eq!(votes.len(), 1);
assert!(votes.contains(&unrefresh_tx));

Expand All @@ -3702,7 +3660,7 @@ mod tests {
cluster_info.flush_push_queue();

// Should be two votes in gossip
let (_, votes, _) = cluster_info.get_votes(0);
let (_, votes) = cluster_info.get_votes(&mut Cursor::default());
assert_eq!(votes.len(), 2);
assert!(votes.contains(&unrefresh_tx));
assert!(votes.contains(&refresh_tx));
Expand All @@ -3728,12 +3686,12 @@ mod tests {
cluster_info.flush_push_queue();

// The diff since `max_ts` should only be the latest refreshed vote
let (_, votes, _) = cluster_info.get_votes(max_ts);
let (_, votes) = cluster_info.get_votes(&mut cursor);
assert_eq!(votes.len(), 1);
assert_eq!(votes[0], latest_refresh_tx);

// Should still be two votes in gossip
let (_, votes, _) = cluster_info.get_votes(0);
let (_, votes) = cluster_info.get_votes(&mut Cursor::default());
assert_eq!(votes.len(), 2);
assert!(votes.contains(&unrefresh_tx));
assert!(votes.contains(&latest_refresh_tx));
Expand All @@ -3747,10 +3705,9 @@ mod tests {
let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info);

// make sure empty crds is handled correctly
let now = timestamp();
let (_, votes, max_ts) = cluster_info.get_votes(now);
let mut cursor = Cursor::default();
let (_, votes) = cluster_info.get_votes(&mut cursor);
assert_eq!(votes, vec![]);
assert_eq!(max_ts, now);

// add a vote
let vote = Vote::new(
Expand All @@ -3770,8 +3727,7 @@ mod tests {
cluster_info.push_vote(&tower, tx.clone());
cluster_info.flush_push_queue();

// -1 to make sure that the clock is strictly lower then when insert occurred
let (labels, votes, max_ts) = cluster_info.get_votes(now - 1);
let (labels, votes) = cluster_info.get_votes(&mut cursor);
assert_eq!(votes, vec![tx]);
assert_eq!(labels.len(), 1);
match labels[0] {
Expand All @@ -3781,12 +3737,9 @@ mod tests {

_ => panic!("Bad match"),
}
assert!(max_ts >= now - 1);

// make sure timestamp filter works
let (_, votes, new_max_ts) = cluster_info.get_votes(max_ts);
let (_, votes) = cluster_info.get_votes(&mut cursor);
assert_eq!(votes, vec![]);
assert_eq!(max_ts, new_max_ts);
}

fn new_vote_transaction<R: Rng>(rng: &mut R, slots: Vec<Slot>) -> Transaction {
Expand All @@ -3804,8 +3757,8 @@ mod tests {

#[test]
fn test_push_votes_with_tower() {
let get_vote_slots = |cluster_info: &ClusterInfo, now| -> Vec<Slot> {
let (labels, _, _) = cluster_info.get_votes(now);
let get_vote_slots = |cluster_info: &ClusterInfo| -> Vec<Slot> {
let (labels, _) = cluster_info.get_votes(&mut Cursor::default());
let gossip = cluster_info.gossip.read().unwrap();
let mut vote_slots = HashSet::new();
for label in labels {
Expand All @@ -3819,7 +3772,6 @@ mod tests {
vote_slots.into_iter().collect()
};
let mut rng = rand::thread_rng();
let now = timestamp();
let keys = Keypair::new();
let contact_info = ContactInfo::new_localhost(&keys.pubkey(), 0);
let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info);
Expand All @@ -3830,7 +3782,7 @@ mod tests {
let vote = new_vote_transaction(&mut rng, vec![slot]);
cluster_info.push_vote(&tower, vote);
}
let vote_slots = get_vote_slots(&cluster_info, now);
let vote_slots = get_vote_slots(&cluster_info);
assert_eq!(vote_slots.len(), MAX_LOCKOUT_HISTORY);
for vote_slot in vote_slots {
assert!(vote_slot < MAX_LOCKOUT_HISTORY as u64);
Expand All @@ -3841,7 +3793,7 @@ mod tests {
tower.remove(23);
let vote = new_vote_transaction(&mut rng, vec![slot]);
cluster_info.push_vote(&tower, vote);
let vote_slots = get_vote_slots(&cluster_info, now);
let vote_slots = get_vote_slots(&cluster_info);
assert_eq!(vote_slots.len(), MAX_LOCKOUT_HISTORY);
for vote_slot in vote_slots {
assert!(vote_slot <= slot);
Expand All @@ -3855,7 +3807,7 @@ mod tests {
tower.remove(5);
let vote = new_vote_transaction(&mut rng, vec![slot]);
cluster_info.push_vote(&tower, vote);
let vote_slots = get_vote_slots(&cluster_info, now);
let vote_slots = get_vote_slots(&cluster_info);
assert_eq!(vote_slots.len(), MAX_LOCKOUT_HISTORY);
for vote_slot in vote_slots {
assert!(vote_slot <= slot);
Expand All @@ -3869,23 +3821,17 @@ mod tests {
let keys = Keypair::new();
let contact_info = ContactInfo::new_localhost(&keys.pubkey(), 0);
let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info);
let (slots, since) = cluster_info.get_epoch_slots_since(0);
let slots = cluster_info.get_epoch_slots(&mut Cursor::default());
assert!(slots.is_empty());
assert!(since.is_none());
cluster_info.push_epoch_slots(&[0]);
cluster_info.flush_push_queue();

let (slots, since) = cluster_info.get_epoch_slots_since(std::u64::MAX);
assert!(slots.is_empty());
assert_eq!(since, None);

let (slots, since) = cluster_info.get_epoch_slots_since(0);
let mut cursor = Cursor::default();
let slots = cluster_info.get_epoch_slots(&mut cursor);
assert_eq!(slots.len(), 1);
assert!(since.is_some());

let (slots, since2) = cluster_info.get_epoch_slots_since(since.unwrap() + 1);
let slots = cluster_info.get_epoch_slots(&mut cursor);
assert!(slots.is_empty());
assert_eq!(since2, None);
}

#[test]
Expand Down Expand Up @@ -4228,10 +4174,9 @@ mod tests {
cluster_info.flush_push_queue();
cluster_info.push_epoch_slots(&range[16000..]);
cluster_info.flush_push_queue();
let (slots, since) = cluster_info.get_epoch_slots_since(0);
let slots = cluster_info.get_epoch_slots(&mut Cursor::default());
let slots: Vec<_> = slots.iter().flat_map(|x| x.to_slots(0)).collect();
assert_eq!(slots, range);
assert!(since.is_some());
}

#[test]
Expand Down
14 changes: 5 additions & 9 deletions core/src/cluster_info_vote_listener.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS},
crds::Cursor,
crds_value::CrdsValueLabel,
optimistic_confirmation_verifier::OptimisticConfirmationVerifier,
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender},
Expand Down Expand Up @@ -326,23 +327,18 @@ impl ClusterInfoVoteListener {
verified_vote_label_packets_sender: VerifiedLabelVotePacketsSender,
verified_vote_transactions_sender: VerifiedVoteTransactionsSender,
) -> Result<()> {
let mut last_ts = 0;
loop {
if exit.load(Ordering::Relaxed) {
return Ok(());
}
let (labels, votes, new_ts) = cluster_info.get_votes(last_ts);
let mut cursor = Cursor::default();
while !exit.load(Ordering::Relaxed) {
let (labels, votes) = cluster_info.get_votes(&mut cursor);
inc_new_counter_debug!("cluster_info_vote_listener-recv_count", votes.len());

last_ts = new_ts;
if !votes.is_empty() {
let (vote_txs, packets) = Self::verify_votes(votes, labels);
verified_vote_transactions_sender.send(vote_txs)?;
verified_vote_label_packets_sender.send(packets)?;
}

sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
}
Ok(())
}

#[allow(clippy::type_complexity)]
Expand Down
Loading

0 comments on commit fa86a33

Please sign in to comment.