forked from berachain/polaris-reth
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Example: Manual P2P (paradigmxyz#4736)
Co-authored-by: Matthias Seitz <[email protected]>
- Loading branch information
1 parent
2de29fb
commit 0a4e428
Showing
4 changed files
with
191 additions
and
1 deletion.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
[package] | ||
name = "manual-p2p" | ||
version = "0.0.0" | ||
publish = false | ||
edition.workspace = true | ||
license.workspace = true | ||
|
||
[dependencies] | ||
once_cell = "1.17.0" | ||
eyre = "0.6.8" | ||
|
||
reth-primitives.workspace = true | ||
reth-network.workspace = true | ||
reth-discv4.workspace = true | ||
reth-eth-wire.workspace = true | ||
reth-ecies.workspace = true | ||
|
||
futures.workspace = true | ||
secp256k1.workspace = true | ||
tokio.workspace = true |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
//! Low level example of connecting to and communicating with a peer. | ||
//! | ||
//! Run with | ||
//! | ||
//! ```not_rust | ||
//! cargo run -p manual-p2p | ||
//! ``` | ||
use std::time::Duration; | ||
|
||
use futures::StreamExt; | ||
use once_cell::sync::Lazy; | ||
use reth_discv4::{DiscoveryUpdate, Discv4, Discv4ConfigBuilder, DEFAULT_DISCOVERY_ADDRESS}; | ||
use reth_ecies::{stream::ECIESStream, util::pk2id}; | ||
use reth_eth_wire::{ | ||
EthMessage, EthStream, HelloMessage, P2PStream, Status, UnauthedEthStream, UnauthedP2PStream, | ||
}; | ||
use reth_network::config::rng_secret_key; | ||
use reth_primitives::{mainnet_nodes, Chain, Hardfork, Head, NodeRecord, MAINNET, MAINNET_GENESIS}; | ||
use secp256k1::{SecretKey, SECP256K1}; | ||
use tokio::net::TcpStream; | ||
|
||
type AuthedP2PStream = P2PStream<ECIESStream<TcpStream>>; | ||
type AuthedEthStream = EthStream<P2PStream<ECIESStream<TcpStream>>>; | ||
|
||
pub static MAINNET_BOOT_NODES: Lazy<Vec<NodeRecord>> = Lazy::new(mainnet_nodes); | ||
|
||
#[tokio::main] | ||
async fn main() -> eyre::Result<()> { | ||
// Setup configs related to this 'node' by creating a new random | ||
let our_key = rng_secret_key(); | ||
let our_enr = NodeRecord::from_secret_key(DEFAULT_DISCOVERY_ADDRESS, &our_key); | ||
|
||
// Setup discovery v4 protocol to find peers to talk to | ||
let mut discv4_cfg = Discv4ConfigBuilder::default(); | ||
discv4_cfg.add_boot_nodes(MAINNET_BOOT_NODES.clone()).lookup_interval(Duration::from_secs(1)); | ||
|
||
// Start discovery protocol | ||
let discv4 = Discv4::spawn(our_enr.udp_addr(), our_enr, our_key, discv4_cfg.build()).await?; | ||
let mut discv4_stream = discv4.update_stream().await?; | ||
|
||
while let Some(update) = discv4_stream.next().await { | ||
tokio::spawn(async move { | ||
if let DiscoveryUpdate::Added(peer) = update { | ||
// Boot nodes hard at work, lets not disturb them | ||
if MAINNET_BOOT_NODES.contains(&peer) { | ||
return | ||
} | ||
|
||
let (p2p_stream, their_hello) = match handshake_p2p(peer, our_key).await { | ||
Ok(s) => s, | ||
Err(e) => { | ||
println!("Failed P2P handshake with peer {}, {}", peer.address, e); | ||
return | ||
} | ||
}; | ||
|
||
let (eth_stream, their_status) = match handshake_eth(p2p_stream).await { | ||
Ok(s) => s, | ||
Err(e) => { | ||
println!("Failed ETH handshake with peer {}, {}", peer.address, e); | ||
return | ||
} | ||
}; | ||
|
||
println!( | ||
"Successfully connected to a peer at {}:{} ({}) using eth-wire version eth/{}", | ||
peer.address, peer.tcp_port, their_hello.client_version, their_status.version | ||
); | ||
|
||
snoop(peer, eth_stream).await; | ||
} | ||
}); | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
// Perform a P2P handshake with a peer | ||
async fn handshake_p2p( | ||
peer: NodeRecord, | ||
key: SecretKey, | ||
) -> eyre::Result<(AuthedP2PStream, HelloMessage)> { | ||
let outgoing = TcpStream::connect((peer.address, peer.tcp_port)).await?; | ||
let ecies_stream = ECIESStream::connect(outgoing, key, peer.id).await?; | ||
|
||
let our_peer_id = pk2id(&key.public_key(SECP256K1)); | ||
let our_hello = HelloMessage::builder(our_peer_id).build(); | ||
|
||
Ok(UnauthedP2PStream::new(ecies_stream).handshake(our_hello).await?) | ||
} | ||
|
||
// Perform a ETH Wire handshake with a peer | ||
async fn handshake_eth(p2p_stream: AuthedP2PStream) -> eyre::Result<(AuthedEthStream, Status)> { | ||
let fork_filter = MAINNET.fork_filter(Head { | ||
timestamp: MAINNET.fork(Hardfork::Shanghai).as_timestamp().unwrap(), | ||
..Default::default() | ||
}); | ||
|
||
let status = Status::builder() | ||
.chain(Chain::mainnet()) | ||
.genesis(MAINNET_GENESIS) | ||
.forkid(Hardfork::Shanghai.fork_id(&MAINNET).unwrap()) | ||
.build(); | ||
|
||
let status = Status { version: p2p_stream.shared_capability().version(), ..status }; | ||
let eth_unauthed = UnauthedEthStream::new(p2p_stream); | ||
Ok(eth_unauthed.handshake(status, fork_filter).await?) | ||
} | ||
|
||
// Snoop by greedily capturing all broadcasts that the peer emits | ||
// note: this node cannot handle request so will be disconnected by peer when challenged | ||
async fn snoop(peer: NodeRecord, mut eth_stream: AuthedEthStream) { | ||
while let Some(Ok(update)) = eth_stream.next().await { | ||
match update { | ||
EthMessage::NewPooledTransactionHashes66(txs) => { | ||
println!("Got {} new tx hashes from peer {}", txs.0.len(), peer.address); | ||
} | ||
EthMessage::NewBlock(block) => { | ||
println!("Got new block data {:?} from peer {}", block, peer.address); | ||
} | ||
EthMessage::NewPooledTransactionHashes68(txs) => { | ||
println!("Got {} new tx hashes from peer {}", txs.hashes.len(), peer.address); | ||
} | ||
EthMessage::NewBlockHashes(block_hashes) => { | ||
println!( | ||
"Got {} new block hashes from peer {}", | ||
block_hashes.0.len(), | ||
peer.address | ||
); | ||
} | ||
EthMessage::GetNodeData(_) => { | ||
println!("Unable to serve GetNodeData request to peer {}", peer.address); | ||
} | ||
EthMessage::GetReceipts(_) => { | ||
println!("Unable to serve GetReceipts request to peer {}", peer.address); | ||
} | ||
EthMessage::GetBlockHeaders(_) => { | ||
println!("Unable to serve GetBlockHeaders request to peer {}", peer.address); | ||
} | ||
EthMessage::GetBlockBodies(_) => { | ||
println!("Unable to serve GetBlockBodies request to peer {}", peer.address); | ||
} | ||
EthMessage::GetPooledTransactions(_) => { | ||
println!("Unable to serve GetPooledTransactions request to peer {}", peer.address); | ||
} | ||
_ => {} | ||
} | ||
} | ||
} |