Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
adds metrics for gossip push fanout (#29065)
Browse files Browse the repository at this point in the history
  • Loading branch information
behzadnouri authored Dec 4, 2022
1 parent fcde8c8 commit 718f433
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 11 deletions.
10 changes: 8 additions & 2 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1511,11 +1511,17 @@ impl ClusterInfo {
}
fn new_push_requests(&self, stakes: &HashMap<Pubkey, u64>) -> Vec<(SocketAddr, Protocol)> {
let self_id = self.id();
let mut push_messages = {
let (mut push_messages, num_entries, num_nodes) = {
let _st = ScopedTimer::from(&self.stats.new_push_requests);
self.gossip
.new_push_messages(self.drain_push_queue(), timestamp())
};
self.stats
.push_fanout_num_entries
.add_relaxed(num_entries as u64);
self.stats
.push_fanout_num_nodes
.add_relaxed(num_nodes as u64);
if self.require_stake_for_gossip(stakes) {
push_messages.retain(|_, data| {
retain_staked(data, stakes);
Expand Down Expand Up @@ -3723,7 +3729,7 @@ RPC Enabled Nodes: 1"#;
&SocketAddrSpace::Unspecified,
);
//check that all types of gossip messages are signed correctly
let push_messages = cluster_info
let (push_messages, _, _) = cluster_info
.gossip
.new_push_messages(cluster_info.drain_push_queue(), timestamp());
// there should be some pushes ready
Expand Down
12 changes: 12 additions & 0 deletions gossip/src/cluster_info_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ pub struct GossipStats {
pub(crate) pull_requests_count: Counter,
pub(crate) purge: Counter,
pub(crate) purge_count: Counter,
pub(crate) push_fanout_num_entries: Counter,
pub(crate) push_fanout_num_nodes: Counter,
pub(crate) push_message_count: Counter,
pub(crate) push_message_pushes: Counter,
pub(crate) push_message_value_count: Counter,
Expand Down Expand Up @@ -443,6 +445,16 @@ pub(crate) fn submit_gossip_stats(
i64
),
("push_message_count", stats.push_message_count.clear(), i64),
(
"push_fanout_num_entries",
stats.push_fanout_num_entries.clear(),
i64
),
(
"push_fanout_num_nodes",
stats.push_fanout_num_nodes.clear(),
i64
),
(
"push_message_pushes",
stats.push_message_pushes.clear(),
Expand Down
6 changes: 5 additions & 1 deletion gossip/src/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ impl CrdsGossip {
&self,
pending_push_messages: Vec<CrdsValue>,
now: u64,
) -> HashMap<Pubkey, Vec<CrdsValue>> {
) -> (
HashMap<Pubkey, Vec<CrdsValue>>,
usize, // number of values
usize, // number of push messages
) {
{
let mut crds = self.crds.write().unwrap();
for entry in pending_push_messages {
Expand Down
18 changes: 11 additions & 7 deletions gossip/src/crds_gossip_push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,16 @@ impl CrdsGossipPush {
&self,
crds: &RwLock<Crds>,
now: u64,
) -> HashMap<Pubkey, Vec<CrdsValue>> {
) -> (
HashMap<Pubkey, Vec<CrdsValue>>,
usize, // number of values
usize, // number of push messages
) {
let active_set = self.active_set.read().unwrap();
let active_set_len = active_set.len();
let push_fanout = self.push_fanout.min(active_set_len);
if push_fanout == 0 {
return HashMap::default();
return (HashMap::default(), 0, 0);
}
let mut num_pushes = 0;
let mut num_values = 0;
Expand Down Expand Up @@ -318,7 +322,7 @@ impl CrdsGossipPush {
for target_pubkey in push_messages.keys().copied() {
last_pushed_to.put(target_pubkey, now);
}
push_messages
(push_messages, num_values, num_pushes)
}

/// Add the `from` to the peer's filter of nodes.
Expand Down Expand Up @@ -997,7 +1001,7 @@ mod tests {
[Ok(origin)]
);
assert_eq!(push.active_set.read().unwrap().len(), 1);
assert_eq!(push.new_push_messages(&crds, 0), expected);
assert_eq!(push.new_push_messages(&crds, 0).0, expected);
}
#[test]
fn test_personalized_push_messages() {
Expand Down Expand Up @@ -1051,7 +1055,7 @@ mod tests {
.into_iter()
.collect();
assert_eq!(push.active_set.read().unwrap().len(), 3);
assert_eq!(push.new_push_messages(&crds, now), expected);
assert_eq!(push.new_push_messages(&crds, now).0, expected);
}
#[test]
fn test_process_prune() {
Expand Down Expand Up @@ -1096,7 +1100,7 @@ mod tests {
&peer.label().pubkey(),
&[new_msg.label().pubkey()],
);
assert_eq!(push.new_push_messages(&crds, 0), expected);
assert_eq!(push.new_push_messages(&crds, 0).0, expected);
}
#[test]
fn test_purge_old_pending_push_messages() {
Expand Down Expand Up @@ -1131,7 +1135,7 @@ mod tests {
push.process_push_message(&crds, &Pubkey::default(), vec![new_msg], 1),
[Ok(origin)],
);
assert_eq!(push.new_push_messages(&crds, 0), expected);
assert_eq!(push.new_push_messages(&crds, 0).0, expected);
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion gossip/tests/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ fn network_run_push(
Duration::from_millis(node.gossip.pull.crds_timeout),
);
node.gossip.purge(&node_pubkey, thread_pool, now, &timeouts);
(node_pubkey, node.gossip.new_push_messages(vec![], now))
(node_pubkey, node.gossip.new_push_messages(vec![], now).0)
})
.collect();
let transfered: Vec<_> = requests
Expand Down

0 comments on commit 718f433

Please sign in to comment.