Skip to content

Commit

Permalink
feat: remove peers after several unsuccessful attempts (paradigmxyz#3780
Browse files Browse the repository at this point in the history
)
  • Loading branch information
mattsse authored Jul 14, 2023
1 parent bc91caf commit 77faa04
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 9 deletions.
2 changes: 1 addition & 1 deletion crates/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ mod tests {
fn test_store_config() {
with_tempdir("config-store-test", |config_path| {
let config = Config::default();
confy::store_path(config_path, config).unwrap();
confy::store_path(config_path, config).expect("Failed to store config");
})
}

Expand Down
18 changes: 15 additions & 3 deletions crates/net/network/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,18 @@ impl SessionError for EthStreamError {
return match err {
DisconnectReason::TooManyPeers |
DisconnectReason::AlreadyConnected |
DisconnectReason::PingTimeout |
DisconnectReason::DisconnectRequested |
DisconnectReason::TcpSubsystemError => Some(BackoffKind::Low),
_ => {

DisconnectReason::ProtocolBreach |
DisconnectReason::UselessPeer |
DisconnectReason::IncompatibleP2PProtocolVersion |
DisconnectReason::NullNodeIdentity |
DisconnectReason::ClientQuitting |
DisconnectReason::UnexpectedHandshakeIdentity |
DisconnectReason::ConnectedToSelf |
DisconnectReason::SubprotocolSpecific => {
// These are considered fatal, and are handled by the
// [`SessionError::is_fatal_protocol_error`]
Some(BackoffKind::High)
Expand Down Expand Up @@ -245,8 +255,10 @@ impl SessionError for io::Error {
// these usually happen when the remote instantly drops the connection, for example
// if the previous connection isn't properly cleaned up yet and the peer is temp.
// banned.
ErrorKind::ConnectionRefused | ErrorKind::ConnectionReset | ErrorKind::BrokenPipe => {
Some(BackoffKind::Low)
ErrorKind::ConnectionReset | ErrorKind::BrokenPipe => Some(BackoffKind::Low),
ErrorKind::ConnectionRefused => {
// peer is unreachable, e.g. port not open or down
Some(BackoffKind::High)
}
_ => Some(BackoffKind::Medium),
}
Expand Down
100 changes: 95 additions & 5 deletions crates/net/network/src/peers/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ pub(crate) struct PeersManager {
connect_trusted_nodes_only: bool,
/// Timestamp of the last time [Self::tick] was called.
last_tick: Instant,
/// Maximum number of backoff attempts before we give up on a peer and dropping.
max_backoff_count: u32,
}

impl PeersManager {
Expand All @@ -125,7 +127,7 @@ impl PeersManager {
trusted_nodes,
connect_trusted_nodes_only,
basic_nodes,
..
max_backoff_count,
} = config;
let (manager_tx, handle_rx) = mpsc::unbounded_channel();
let now = Instant::now();
Expand Down Expand Up @@ -161,6 +163,7 @@ impl PeersManager {
backoff_durations,
connect_trusted_nodes_only,
last_tick: Instant::now(),
max_backoff_count,
}
}

Expand Down Expand Up @@ -294,7 +297,7 @@ impl PeersManager {
self.ban_list.ban_ip_until(ip, std::time::Instant::now() + self.ban_duration);
}

/// Temporarily puts the peer in timeout
/// Temporarily puts the peer in timeout by inserting it into the backedoff peers set
fn backoff_peer_until(&mut self, peer_id: PeerId, until: std::time::Instant) {
trace!(target: "net::peers", ?peer_id, "backing off");

Expand Down Expand Up @@ -448,9 +451,9 @@ impl PeersManager {
trace!(target: "net::peers", ?remote_addr, ?peer_id, ?err, "fatal connection error");
// remove the peer to which we can't establish a connection due to protocol related
// issues.
if let Some(peer) = self.peers.remove(peer_id) {
if let Some((peer_id, peer)) = self.peers.remove_entry(peer_id) {
self.connection_info.decr_state(peer.state);
self.queued_actions.push_back(PeerAction::PeerRemoved(*peer_id));
self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
}

// ban the peer
Expand All @@ -465,6 +468,7 @@ impl PeersManager {
}
} else {
let mut backoff_until = None;
let mut remove_peer = false;

if let Some(peer) = self.peers.get_mut(peer_id) {
if let Some(kind) = err.should_backoff() {
Expand All @@ -488,8 +492,20 @@ impl PeersManager {

self.connection_info.decr_state(peer.state);
peer.state = PeerConnectionState::Idle;

if peer.severe_backoff_counter > self.max_backoff_count && !peer.is_trusted() {
// mark peer for removal if it has been backoff too many times and is _not_
// trusted
remove_peer = true;
}
}
if let Some(backoff_until) = backoff_until {

// remove peer if it has been marked for removal
if remove_peer {
let (peer_id, _) = self.peers.remove_entry(peer_id).expect("peer must exist");
self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
} else if let Some(backoff_until) = backoff_until {
// otherwise, backoff the peer if marked as such
self.backoff_peer_until(*peer_id, backoff_until);
}
}
Expand Down Expand Up @@ -1052,6 +1068,16 @@ pub struct PeersConfig {
pub trusted_nodes: HashSet<NodeRecord>,
/// Connect to trusted nodes only?
pub connect_trusted_nodes_only: bool,
/// Maximum number of backoff attempts before we give up on a peer and dropping.
///
/// The max time spent of a peer before it's removed from the set is determined by the
/// configured backoff duration and the max backoff count.
///
/// With a backoff counter of 5 and a backoff duration of 1h, the minimum time spent of the
/// peer in the table is the sum of all backoffs (1h + 2h + 3h + 4h + 5h = 15h).
///
/// Note: this does not apply to trusted peers.
pub max_backoff_count: u32,
/// Basic nodes to connect to.
#[cfg_attr(feature = "serde", serde(skip))]
pub basic_nodes: HashSet<NodeRecord>,
Expand All @@ -1067,6 +1093,8 @@ pub struct PeersConfig {
pub reputation_weights: ReputationChangeWeights,
/// How long to backoff peers that are we failed to connect to for non-fatal reasons, such as
/// [`DisconnectReason::TooManyPeers`].
///
/// The backoff duration increases with number of backoff attempts.
pub backoff_durations: PeerBackoffDurations,
}

Expand All @@ -1083,6 +1111,7 @@ impl Default for PeersConfig {
trusted_nodes: Default::default(),
connect_trusted_nodes_only: false,
basic_nodes: Default::default(),
max_backoff_count: 5,
}
}
}
Expand Down Expand Up @@ -1134,6 +1163,12 @@ impl PeersConfig {
self
}

/// Configures the max allowed backoff count.
pub fn with_max_backoff_count(mut self, max_backoff_count: u32) -> Self {
self.max_backoff_count = max_backoff_count;
self
}

/// Read from file nodes available at launch. Ignored if None.
pub fn with_basic_nodes_from_file(
self,
Expand Down Expand Up @@ -1575,6 +1610,61 @@ mod test {
assert!(peers.peers.get(&peer).is_none());
}

#[tokio::test]
async fn test_remove_on_max_backoff_count() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let config = PeersConfig::default();
let mut peers = PeersManager::new(config.clone());
peers.add_peer(peer, socket_addr, None);
let peer_struct = peers.peers.get_mut(&peer).unwrap();

// Simulate a peer that was already backed off once
peer_struct.severe_backoff_counter = config.max_backoff_count;

match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { peer_id, .. } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}

poll_fn(|cx| {
assert!(peers.poll(cx).is_pending());
Poll::Ready(())
})
.await;

peers.on_pending_session_dropped(
&socket_addr,
&peer,
&PendingSessionHandshakeError::Eth(
io::Error::new(io::ErrorKind::ConnectionRefused, "peer unreachable").into(),
),
);

match event!(peers) {
PeerAction::PeerRemoved(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}

poll_fn(|cx| {
assert!(peers.poll(cx).is_pending());
Poll::Ready(())
})
.await;

assert!(peers.peers.get(&peer).is_none());
}

#[tokio::test]
async fn test_ban_on_pending_drop() {
let peer = PeerId::random();
Expand Down

0 comments on commit 77faa04

Please sign in to comment.