Skip to content

Commit

Permalink
skips retransmit for shreds with unknown slot leader (solana-labs#19472)
Browse files Browse the repository at this point in the history
Shreds' signatures should be verified before they reach retransmit
stage, and if the leader is unknown they should fail signature check.
Therefore retransmit-stage can as well expect to know who the slot
leader is and otherwise just skip the shred.

Blockstore checking signature of recovered shreds before sending them to
retransmit stage:
https://github.com/solana-labs/solana/blob/4305d4b7b/ledger/src/blockstore.rs#L884-L930

Shred signature verifier:
https://github.com/solana-labs/solana/blob/4305d4b7b/core/src/sigverify_shreds.rs#L41-L57
https://github.com/solana-labs/solana/blob/4305d4b7b/ledger/src/sigverify_shreds.rs#L105
  • Loading branch information
behzadnouri authored Sep 1, 2021
1 parent 82a6bbe commit 6d9818b
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 57 deletions.
2 changes: 1 addition & 1 deletion core/src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ pub fn broadcast_shreds(
let mut result = Ok(());
let mut shred_select = Measure::start("shred_select");
// Only the leader broadcasts shreds.
let leader = Some(cluster_info.id());
let leader = cluster_info.id();
let (root_bank, working_bank) = {
let bank_forks = bank_forks.read().unwrap();
(bank_forks.root_bank(), bank_forks.working_bank())
Expand Down
2 changes: 1 addition & 1 deletion core/src/broadcast_stage/broadcast_duplicates_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
let packets: Vec<_> = shreds
.iter()
.filter_map(|shred| {
let seed = shred.seed(Some(self_pubkey), &root_bank);
let seed = shred.seed(self_pubkey, &root_bank);
let node = cluster_nodes.get_broadcast_peer(seed)?;
if !socket_addr_space.check(&node.tvu) {
return None;
Expand Down
24 changes: 10 additions & 14 deletions core/src/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,25 +122,21 @@ impl ClusterNodes<RetransmitStage> {
&self,
shred_seed: [u8; 32],
fanout: usize,
slot_leader: Option<Pubkey>,
slot_leader: Pubkey,
) -> (
Vec<&ContactInfo>, // neighbors
Vec<&ContactInfo>, // children
) {
// Exclude leader from list of nodes.
let index = self.index.iter().copied();
let (weights, index): (Vec<u64>, Vec<usize>) = match slot_leader {
None => {
error!("unknown leader for shred slot");
index.unzip()
}
Some(slot_leader) if slot_leader == self.pubkey => {
error!("retransmit from slot leader: {}", slot_leader);
index.unzip()
}
Some(slot_leader) => index
let (weights, index): (Vec<u64>, Vec<usize>) = if slot_leader == self.pubkey {
error!("retransmit from slot leader: {}", slot_leader);
self.index.iter().copied().unzip()
} else {
self.index
.iter()
.filter(|(_, i)| self.nodes[*i].pubkey() != slot_leader)
.unzip(),
.copied()
.unzip()
};
let index: Vec<_> = {
let shuffle = weighted_shuffle(weights.into_iter(), shred_seed);
Expand Down Expand Up @@ -462,7 +458,7 @@ mod tests {
let (neighbors_indices, children_indices) =
compute_retransmit_peers(fanout, self_index, &shuffled_index);
let (neighbors, children) =
cluster_nodes.get_retransmit_peers(shred_seed, fanout, Some(slot_leader));
cluster_nodes.get_retransmit_peers(shred_seed, fanout, slot_leader);
assert_eq!(children.len(), children_indices.len());
for (node, index) in children.into_iter().zip(children_indices) {
assert_eq!(*node, peers[index]);
Expand Down
9 changes: 8 additions & 1 deletion core/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,14 @@ fn retransmit(

let mut compute_turbine_peers = Measure::start("turbine_start");
// TODO: consider using root-bank here for leader lookup!
let slot_leader = leader_schedule_cache.slot_leader_at(shred_slot, Some(&working_bank));
// Shreds' signatures should be verified before they reach here, and if
// the leader is unknown they should fail signature check. So here we
// should expect to know the slot leader and otherwise skip the shred.
let slot_leader =
match leader_schedule_cache.slot_leader_at(shred_slot, Some(&working_bank)) {
Some(pubkey) => pubkey,
None => continue,
};
let cluster_nodes =
cluster_nodes_cache.get(shred_slot, &root_bank, &working_bank, cluster_info);
let shred_seed = shred.seed(slot_leader, &root_bank);
Expand Down
76 changes: 36 additions & 40 deletions ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,31 +49,31 @@
//! So, given a) - c), we must restrict data shred's payload length such that the entire coding
//! payload can fit into one coding shred / packet.
use crate::{blockstore::MAX_DATA_SHREDS_PER_SLOT, erasure::Session};
use bincode::config::Options;
use core::cell::RefCell;
use rayon::{
iter::{IndexedParallelIterator, IntoParallelRefMutIterator, ParallelIterator},
slice::ParallelSlice,
ThreadPool,
use {
crate::{blockstore::MAX_DATA_SHREDS_PER_SLOT, erasure::Session},
bincode::config::Options,
rayon::{
iter::{IndexedParallelIterator, IntoParallelRefMutIterator, ParallelIterator},
slice::ParallelSlice,
ThreadPool,
},
serde::{Deserialize, Serialize},
solana_entry::entry::{create_ticks, Entry},
solana_measure::measure::Measure,
solana_perf::packet::{limited_deserialize, Packet},
solana_rayon_threadlimit::get_thread_count,
solana_runtime::bank::Bank,
solana_sdk::{
clock::Slot,
feature_set,
hash::{hashv, Hash},
packet::PACKET_DATA_SIZE,
pubkey::Pubkey,
signature::{Keypair, Signature, Signer},
},
std::{cell::RefCell, convert::TryInto, mem::size_of},
thiserror::Error,
};
use serde::{Deserialize, Serialize};
use solana_entry::entry::{create_ticks, Entry};
use solana_measure::measure::Measure;
use solana_perf::packet::{limited_deserialize, Packet};
use solana_rayon_threadlimit::get_thread_count;
use solana_runtime::bank::Bank;
use solana_sdk::{
clock::Slot,
feature_set,
hash::hashv,
hash::Hash,
packet::PACKET_DATA_SIZE,
pubkey::Pubkey,
signature::{Keypair, Signature, Signer},
};
use std::mem::size_of;
use thiserror::Error;

#[derive(Default, Clone)]
pub struct ProcessShredsStats {
Expand Down Expand Up @@ -466,23 +466,19 @@ impl Shred {
self.common_header.signature
}

pub fn seed(&self, leader_pubkey: Option<Pubkey>, root_bank: &Bank) -> [u8; 32] {
if let Some(leader_pubkey) = leader_pubkey {
if enable_deterministic_seed(self.slot(), root_bank) {
let h = hashv(&[
&self.slot().to_le_bytes(),
&self.index().to_le_bytes(),
&leader_pubkey.to_bytes(),
]);
return h.to_bytes();
}
pub fn seed(&self, leader_pubkey: Pubkey, root_bank: &Bank) -> [u8; 32] {
if enable_deterministic_seed(self.slot(), root_bank) {
hashv(&[
&self.slot().to_le_bytes(),
&self.index().to_le_bytes(),
&leader_pubkey.to_bytes(),
])
.to_bytes()
} else {
let signature = self.common_header.signature.as_ref();
let offset = signature.len().checked_sub(32).unwrap();
signature[offset..].try_into().unwrap()
}

let mut seed = [0; 32];
let seed_len = seed.len();
let sig = self.common_header.signature.as_ref();
seed[0..seed_len].copy_from_slice(&sig[(sig.len() - seed_len)..]);
seed
}

pub fn is_data(&self) -> bool {
Expand Down

0 comments on commit 6d9818b

Please sign in to comment.