Skip to content

Commit

Permalink
feat(net/peer): add peer with udp socket (paradigmxyz#9156)
Browse files Browse the repository at this point in the history
Signed-off-by: jsvisa <[email protected]>
  • Loading branch information
jsvisa authored Jun 29, 2024
1 parent 57c4f7e commit b93e70c
Show file tree
Hide file tree
Showing 10 changed files with 221 additions and 88 deletions.
30 changes: 23 additions & 7 deletions crates/net/network-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,23 +66,39 @@ pub trait PeersInfo: Send + Sync {

/// Provides an API for managing the peers of the network.
pub trait Peers: PeersInfo {
/// Adds a peer to the peer set.
fn add_peer(&self, peer: PeerId, addr: SocketAddr) {
self.add_peer_kind(peer, PeerKind::Basic, addr);
/// Adds a peer to the peer set with UDP `SocketAddr`.
fn add_peer(&self, peer: PeerId, tcp_addr: SocketAddr) {
self.add_peer_kind(peer, PeerKind::Basic, tcp_addr, None);
}

/// Adds a peer to the peer set with TCP and UDP `SocketAddr`.
fn add_peer_with_udp(&self, peer: PeerId, tcp_addr: SocketAddr, udp_addr: SocketAddr) {
self.add_peer_kind(peer, PeerKind::Basic, tcp_addr, Some(udp_addr));
}

/// Adds a trusted [`PeerId`] to the peer set.
///
/// This allows marking a peer as trusted without having to know the peer's address.
fn add_trusted_peer_id(&self, peer: PeerId);

/// Adds a trusted peer to the peer set.
fn add_trusted_peer(&self, peer: PeerId, addr: SocketAddr) {
self.add_peer_kind(peer, PeerKind::Trusted, addr);
/// Adds a trusted peer to the peer set with UDP `SocketAddr`.
fn add_trusted_peer(&self, peer: PeerId, tcp_addr: SocketAddr) {
self.add_peer_kind(peer, PeerKind::Trusted, tcp_addr, None);
}

/// Adds a trusted peer with TCP and UDP `SocketAddr` to the peer set.
fn add_trusted_peer_with_udp(&self, peer: PeerId, tcp_addr: SocketAddr, udp_addr: SocketAddr) {
self.add_peer_kind(peer, PeerKind::Trusted, tcp_addr, Some(udp_addr));
}

/// Adds a peer to the known peer set, with the given kind.
fn add_peer_kind(&self, peer: PeerId, kind: PeerKind, addr: SocketAddr);
fn add_peer_kind(
&self,
peer: PeerId,
kind: PeerKind,
tcp_addr: SocketAddr,
udp_addr: Option<SocketAddr>,
);

/// Returns the rpc [`PeerInfo`] for all connected [`PeerKind::Trusted`] peers.
fn get_trusted_peers(
Expand Down
9 changes: 8 additions & 1 deletion crates/net/network-api/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,14 @@ impl PeersInfo for NoopNetwork {
impl Peers for NoopNetwork {
fn add_trusted_peer_id(&self, _peer: PeerId) {}

fn add_peer_kind(&self, _peer: PeerId, _kind: PeerKind, _addr: SocketAddr) {}
fn add_peer_kind(
&self,
_peer: PeerId,
_kind: PeerKind,
_tcp_addr: SocketAddr,
_udp_addr: Option<SocketAddr>,
) {
}

async fn get_peers_by_kind(&self, _kind: PeerKind) -> Result<Vec<PeerInfo>, NetworkError> {
Ok(vec![])
Expand Down
21 changes: 12 additions & 9 deletions crates/net/network/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
cache::LruMap,
error::{NetworkError, ServiceKind},
manager::DiscoveredEvent,
peers::PeerAddr,
};
use enr::Enr;
use futures::StreamExt;
Expand Down Expand Up @@ -40,7 +41,7 @@ pub struct Discovery {
/// All nodes discovered via discovery protocol.
///
/// These nodes can be ephemeral and are updated via the discovery protocol.
discovered_nodes: LruMap<PeerId, SocketAddr>,
discovered_nodes: LruMap<PeerId, PeerAddr>,
/// Local ENR of the discovery v4 service (discv5 ENR has same [`PeerId`]).
local_enr: NodeRecord,
/// Handler to interact with the Discovery v4 service
Expand Down Expand Up @@ -204,12 +205,14 @@ impl Discovery {

/// Processes an incoming [`NodeRecord`] update from a discovery service
fn on_node_record_update(&mut self, record: NodeRecord, fork_id: Option<ForkId>) {
let id = record.id;
let addr = record.tcp_addr();
let peer_id = record.id;
let tcp_addr = record.tcp_addr();
let udp_addr = record.udp_addr();
let addr = PeerAddr::new(tcp_addr, Some(udp_addr));
_ =
self.discovered_nodes.get_or_insert(id, || {
self.discovered_nodes.get_or_insert(peer_id, || {
self.queued_events.push_back(DiscoveryEvent::NewNode(
DiscoveredEvent::EventQueued { peer_id: id, socket_addr: addr, fork_id },
DiscoveredEvent::EventQueued { peer_id, addr, fork_id },
));

addr
Expand All @@ -224,8 +227,8 @@ impl Discovery {
DiscoveryUpdate::EnrForkId(node, fork_id) => {
self.queued_events.push_back(DiscoveryEvent::EnrForkId(node.id, fork_id))
}
DiscoveryUpdate::Removed(node) => {
self.discovered_nodes.remove(&node);
DiscoveryUpdate::Removed(peer_id) => {
self.discovered_nodes.remove(&peer_id);
}
DiscoveryUpdate::Batch(updates) => {
for update in updates {
Expand Down Expand Up @@ -427,15 +430,15 @@ mod tests {
assert_eq!(
DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued {
peer_id: discv4_id_2,
socket_addr: discv4_enr_2.tcp_addr(),
addr: PeerAddr::new(discv4_enr_2.tcp_addr(), Some(discv4_enr_2.udp_addr())),
fork_id: None
}),
event_node_1
);
assert_eq!(
DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued {
peer_id: discv4_id_1,
socket_addr: discv4_enr_1.tcp_addr(),
addr: PeerAddr::new(discv4_enr_1.tcp_addr(), Some(discv4_enr_1.udp_addr())),
fork_id: None
}),
event_node_2
Expand Down
4 changes: 2 additions & 2 deletions crates/net/network/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
message::{NewBlockMessage, PeerMessage, PeerRequest, PeerRequestSender},
metrics::{DisconnectMetrics, NetworkMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
network::{NetworkHandle, NetworkHandleMessage},
peers::{PeersHandle, PeersManager},
peers::{PeerAddr, PeersHandle, PeersManager},
poll_nested_stream_with_budget,
protocol::IntoRlpxSubProtocol,
session::SessionManager,
Expand Down Expand Up @@ -1030,7 +1030,7 @@ pub enum NetworkEvent {

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DiscoveredEvent {
EventQueued { peer_id: PeerId, socket_addr: SocketAddr, fork_id: Option<ForkId> },
EventQueued { peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId> },
}

#[derive(Debug, Default)]
Expand Down
23 changes: 18 additions & 5 deletions crates/net/network/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
use crate::{
config::NetworkMode, discovery::DiscoveryEvent, manager::NetworkEvent, message::PeerRequest,
peers::PeersHandle, protocol::RlpxSubProtocol, swarm::NetworkConnectionState,
transactions::TransactionsHandle, FetchClient,
config::NetworkMode,
discovery::DiscoveryEvent,
manager::NetworkEvent,
message::PeerRequest,
peers::{PeerAddr, PeersHandle},
protocol::RlpxSubProtocol,
swarm::NetworkConnectionState,
transactions::TransactionsHandle,
FetchClient,
};
use enr::Enr;
use parking_lot::Mutex;
Expand Down Expand Up @@ -257,7 +263,14 @@ impl Peers for NetworkHandle {

/// Sends a message to the [`NetworkManager`](crate::NetworkManager) to add a peer to the known
/// set, with the given kind.
fn add_peer_kind(&self, peer: PeerId, kind: PeerKind, addr: SocketAddr) {
fn add_peer_kind(
&self,
peer: PeerId,
kind: PeerKind,
tcp_addr: SocketAddr,
udp_addr: Option<SocketAddr>,
) {
let addr = PeerAddr::new(tcp_addr, udp_addr);
self.send_message(NetworkHandleMessage::AddPeerAddress(peer, kind, addr));
}

Expand Down Expand Up @@ -420,7 +433,7 @@ pub(crate) enum NetworkHandleMessage {
/// Marks a peer as trusted.
AddTrustedPeerId(PeerId),
/// Adds an address for a peer, including its ID, kind, and socket address.
AddPeerAddress(PeerId, PeerKind, SocketAddr),
AddPeerAddress(PeerId, PeerKind, PeerAddr),
/// Removes a peer from the peerset corresponding to the given kind.
RemovePeer(PeerId, PeerKind),
/// Disconnects a connection to a peer if it exists, optionally providing a disconnect reason.
Expand Down
Loading

0 comments on commit b93e70c

Please sign in to comment.