Skip to content

Commit

Permalink
gossip-service: Spy multiple nodes by pubkey at once (#31597)
Browse files Browse the repository at this point in the history
* gossip: Spy multiple pubkeys

* Address behzad's feedback
  • Loading branch information
joncinque authored May 11, 2023
1 parent 7229213 commit c0e107a
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 28 deletions.
2 changes: 1 addition & 1 deletion accounts-cluster-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ fn main() {
Some(&entrypoint_addr),
None, // num_nodes
Duration::from_secs(60), // timeout
None, // find_node_by_pubkey
None, // find_nodes_by_pubkey
Some(&entrypoint_addr), // find_node_by_gossip_addr
None, // my_gossip_addr
0, // my_shred_version
Expand Down
2 changes: 1 addition & 1 deletion dos/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ fn main() {
Some(&cmd_params.entrypoint_addr),
None, // num_nodes
Duration::from_secs(60), // timeout
None, // find_node_by_pubkey
None, // find_nodes_by_pubkey
Some(&cmd_params.entrypoint_addr), // find_node_by_gossip_addr
None, // my_gossip_addr
0, // my_shred_version
Expand Down
30 changes: 16 additions & 14 deletions gossip/src/gossip_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ pub fn discover_cluster(
Some(entrypoint),
Some(num_nodes),
DISCOVER_CLUSTER_TIMEOUT,
None, // find_node_by_pubkey
None, // find_nodes_by_pubkey
None, // find_node_by_gossip_addr
None, // my_gossip_addr
0, // my_shred_version
Expand All @@ -130,7 +130,7 @@ pub fn discover(
entrypoint: Option<&SocketAddr>,
num_nodes: Option<usize>, // num_nodes only counts validators, excludes spy nodes
timeout: Duration,
find_node_by_pubkey: Option<Pubkey>,
find_nodes_by_pubkey: Option<&[Pubkey]>,
find_node_by_gossip_addr: Option<&SocketAddr>,
my_gossip_addr: Option<&SocketAddr>,
my_shred_version: u16,
Expand Down Expand Up @@ -163,7 +163,7 @@ pub fn discover(
spy_ref.clone(),
num_nodes,
timeout,
find_node_by_pubkey,
find_nodes_by_pubkey,
find_node_by_gossip_addr,
);

Expand Down Expand Up @@ -231,7 +231,7 @@ fn spy(
spy_ref: Arc<ClusterInfo>,
num_nodes: Option<usize>,
timeout: Duration,
find_node_by_pubkey: Option<Pubkey>,
find_nodes_by_pubkey: Option<&[Pubkey]>,
find_node_by_gossip_addr: Option<&SocketAddr>,
) -> (
bool, // if found the specified nodes
Expand All @@ -252,8 +252,10 @@ fn spy(
.collect::<Vec<_>>();
tvu_peers = spy_ref.all_tvu_peers();

let found_node_by_pubkey = if let Some(pubkey) = find_node_by_pubkey {
all_peers.iter().any(|node| node.pubkey() == &pubkey)
let found_nodes_by_pubkey = if let Some(pubkeys) = find_nodes_by_pubkey {
pubkeys
.iter()
.all(|pubkey| all_peers.iter().any(|node| node.pubkey() == pubkey))
} else {
false
};
Expand All @@ -273,15 +275,15 @@ fn spy(
nodes.dedup();

if nodes.len() >= num {
if found_node_by_pubkey || found_node_by_gossip_addr {
if found_nodes_by_pubkey || found_node_by_gossip_addr {
met_criteria = true;
}

if find_node_by_pubkey.is_none() && find_node_by_gossip_addr.is_none() {
if find_nodes_by_pubkey.is_none() && find_node_by_gossip_addr.is_none() {
met_criteria = true;
}
}
} else if found_node_by_pubkey || found_node_by_gossip_addr {
} else if found_nodes_by_pubkey || found_node_by_gossip_addr {
met_criteria = true;
}
if i % 20 == 0 {
Expand Down Expand Up @@ -395,27 +397,27 @@ mod tests {
assert!(met_criteria);

// Find specific node by pubkey
let (met_criteria, _, _, _) = spy(spy_ref.clone(), None, TIMEOUT, Some(peer0), None);
let (met_criteria, _, _, _) = spy(spy_ref.clone(), None, TIMEOUT, Some(&[peer0]), None);
assert!(met_criteria);
let (met_criteria, _, _, _) = spy(
spy_ref.clone(),
None,
TIMEOUT,
Some(solana_sdk::pubkey::new_rand()),
Some(&[solana_sdk::pubkey::new_rand()]),
None,
);
assert!(!met_criteria);

// Find num_nodes *and* specific node by pubkey
let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(1), TIMEOUT, Some(peer0), None);
let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(1), TIMEOUT, Some(&[peer0]), None);
assert!(met_criteria);
let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(3), TIMEOUT, Some(peer0), None);
let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(3), TIMEOUT, Some(&[peer0]), None);
assert!(!met_criteria);
let (met_criteria, _, _, _) = spy(
spy_ref.clone(),
Some(1),
TIMEOUT,
Some(solana_sdk::pubkey::new_rand()),
Some(&[solana_sdk::pubkey::new_rand()]),
None,
);
assert!(!met_criteria);
Expand Down
31 changes: 19 additions & 12 deletions gossip/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use {
},
solana_clap_utils::{
hidden_unless_forced,
input_parsers::keypair_of,
input_parsers::{keypair_of, pubkeys_of},
input_validators::{is_keypair_or_ask_keyword, is_port, is_pubkey},
},
solana_gossip::{
Expand Down Expand Up @@ -142,6 +142,7 @@ fn parse_matches() -> ArgMatches<'static> {
.value_name("PUBKEY")
.takes_value(true)
.validator(is_pubkey)
.multiple(true)
.help("Public key of a specific node to wait for"),
)
.arg(&shred_version_arg)
Expand Down Expand Up @@ -182,7 +183,7 @@ fn process_spy_results(
validators: Vec<ContactInfo>,
num_nodes: Option<usize>,
num_nodes_exactly: Option<usize>,
pubkey: Option<Pubkey>,
pubkeys: Option<&[Pubkey]>,
) {
if timeout.is_some() {
if let Some(num) = num_nodes {
Expand All @@ -196,10 +197,12 @@ fn process_spy_results(
exit(1);
}
}
if let Some(node) = pubkey {
if !validators.iter().any(|x| x.pubkey() == &node) {
eprintln!("Error: Could not find node {node:?}");
exit(1);
if let Some(nodes) = pubkeys {
for node in nodes {
if !validators.iter().any(|x| x.pubkey() == node) {
eprintln!("Error: Could not find node {node:?}");
exit(1);
}
}
}
}
Expand All @@ -222,9 +225,7 @@ fn process_spy(matches: &ArgMatches, socket_addr_space: SocketAddrSpace) -> std:
let timeout = matches
.value_of("timeout")
.map(|secs| secs.to_string().parse().unwrap());
let pubkey = matches
.value_of("node_pubkey")
.map(|pubkey_str| pubkey_str.parse::<Pubkey>().unwrap());
let pubkeys = pubkeys_of(matches, "node_pubkey");
let shred_version = value_t_or_exit!(matches, "shred_version", u16);
let identity_keypair = keypair_of(matches, "identity");

Expand All @@ -248,14 +249,20 @@ fn process_spy(matches: &ArgMatches, socket_addr_space: SocketAddrSpace) -> std:
entrypoint_addr.as_ref(),
num_nodes,
discover_timeout,
pubkey, // find_node_by_pubkey
pubkeys.as_deref(), // find_nodes_by_pubkey
None, // find_node_by_gossip_addr
Some(&gossip_addr), // my_gossip_addr
shred_version,
socket_addr_space,
)?;

process_spy_results(timeout, validators, num_nodes, num_nodes_exactly, pubkey);
process_spy_results(
timeout,
validators,
num_nodes,
num_nodes_exactly,
pubkeys.as_deref(),
);

Ok(())
}
Expand Down Expand Up @@ -283,7 +290,7 @@ fn process_rpc_url(
entrypoint_addr.as_ref(),
Some(1), // num_nodes
Duration::from_secs(timeout),
None, // find_node_by_pubkey
None, // find_nodes_by_pubkey
entrypoint_addr.as_ref(), // find_node_by_gossip_addr
None, // my_gossip_addr
shred_version,
Expand Down

0 comments on commit c0e107a

Please sign in to comment.