Skip to content

Commit

Permalink
p2p: add network simulation framework (ethereum#14982)
Browse files Browse the repository at this point in the history
This commit introduces a network simulation framework which
can be used to run simulated networks of devp2p nodes. The
intention is to use this for testing protocols, performing
benchmarks and visualising emergent network behaviour.
  • Loading branch information
lmars authored and fjl committed Sep 25, 2017
1 parent 673007d commit 9feec51
Show file tree
Hide file tree
Showing 34 changed files with 6,522 additions and 69 deletions.
414 changes: 414 additions & 0 deletions cmd/p2psim/main.go

Large diffs are not rendered by default.

42 changes: 41 additions & 1 deletion node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package node

import (
"context"
"fmt"
"strings"
"time"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/rpc"
"github.com/rcrowley/go-metrics"
)

Expand Down Expand Up @@ -73,6 +75,44 @@ func (api *PrivateAdminAPI) RemovePeer(url string) (bool, error) {
return true, nil
}

// PeerEvents creates an RPC subscription which receives peer events from the
// node's p2p.Server
func (api *PrivateAdminAPI) PeerEvents(ctx context.Context) (*rpc.Subscription, error) {
// Make sure the server is running, fail otherwise
server := api.node.Server()
if server == nil {
return nil, ErrNodeStopped
}

// Create the subscription
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return nil, rpc.ErrNotificationsUnsupported
}
rpcSub := notifier.CreateSubscription()

go func() {
events := make(chan *p2p.PeerEvent)
sub := server.SubscribeEvents(events)
defer sub.Unsubscribe()

for {
select {
case event := <-events:
notifier.Notify(rpcSub.ID, event)
case <-sub.Err():
return
case <-rpcSub.Err():
return
case <-notifier.Closed():
return
}
}
}()

return rpcSub, nil
}

// StartRPC starts the HTTP RPC API server.
func (api *PrivateAdminAPI) StartRPC(host *string, port *int, cors *string, apis *string) (bool, error) {
api.node.lock.Lock()
Expand Down Expand Up @@ -163,7 +203,7 @@ func (api *PrivateAdminAPI) StartWS(host *string, port *int, allowedOrigins *str
}
}

if err := api.node.startWS(fmt.Sprintf("%s:%d", *host, *port), api.node.rpcAPIs, modules, origins); err != nil {
if err := api.node.startWS(fmt.Sprintf("%s:%d", *host, *port), api.node.rpcAPIs, modules, origins, api.node.config.WSExposeAll); err != nil {
return false, err
}
return true, nil
Expand Down
7 changes: 7 additions & 0 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,13 @@ type Config struct {
// If the module list is empty, all RPC API endpoints designated public will be
// exposed.
WSModules []string `toml:",omitempty"`

// WSExposeAll exposes all API modules via the WebSocket RPC interface rather
// than just the public ones.
//
// *WARNING* Only set this if the node is running in a trusted network, exposing
// private APIs to untrusted users is a major security risk.
WSExposeAll bool `toml:",omitempty"`
}

// IPCEndpoint resolves an IPC endpoint based on a configured value, taking into
Expand Down
19 changes: 15 additions & 4 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (n *Node) startRPC(services map[reflect.Type]Service) error {
n.stopInProc()
return err
}
if err := n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins); err != nil {
if err := n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins, n.config.WSExposeAll); err != nil {
n.stopHTTP()
n.stopIPC()
n.stopInProc()
Expand Down Expand Up @@ -412,7 +412,7 @@ func (n *Node) stopHTTP() {
}

// startWS initializes and starts the websocket RPC endpoint.
func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrigins []string) error {
func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrigins []string, exposeAll bool) error {
// Short circuit if the WS endpoint isn't being exposed
if endpoint == "" {
return nil
Expand All @@ -425,7 +425,7 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig
// Register all the APIs exposed by the services
handler := rpc.NewServer()
for _, api := range apis {
if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
if exposeAll || whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
Expand All @@ -441,7 +441,7 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig
return err
}
go rpc.NewWSServer(wsOrigins, handler).Serve(listener)
log.Info(fmt.Sprintf("WebSocket endpoint opened: ws://%s", endpoint))
log.Info(fmt.Sprintf("WebSocket endpoint opened: ws://%s", listener.Addr()))

// All listeners booted successfully
n.wsEndpoint = endpoint
Expand Down Expand Up @@ -556,6 +556,17 @@ func (n *Node) Attach() (*rpc.Client, error) {
return rpc.DialInProc(n.inprocHandler), nil
}

// RPCHandler returns the in-process RPC request handler.
func (n *Node) RPCHandler() (*rpc.Server, error) {
n.lock.RLock()
defer n.lock.RUnlock()

if n.inprocHandler == nil {
return nil, ErrNodeStopped
}
return n.inprocHandler, nil
}

// Server retrieves the currently running P2P network layer. This method is meant
// only to inspect fields of the currently running server, life cycle management
// should be left to this Node entity.
Expand Down
23 changes: 20 additions & 3 deletions p2p/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,24 @@ const (
maxResolveDelay = time.Hour
)

// NodeDialer is used to connect to nodes in the network, typically by using
// an underlying net.Dialer but also using net.Pipe in tests
type NodeDialer interface {
Dial(*discover.Node) (net.Conn, error)
}

// TCPDialer implements the NodeDialer interface by using a net.Dialer to
// create TCP connections to nodes in the network
type TCPDialer struct {
*net.Dialer
}

// Dial creates a TCP connection to the node
func (t TCPDialer) Dial(dest *discover.Node) (net.Conn, error) {
addr := &net.TCPAddr{IP: dest.IP, Port: int(dest.TCP)}
return t.Dialer.Dial("tcp", addr.String())
}

// dialstate schedules dials and discovery lookups.
// it get's a chance to compute new tasks on every iteration
// of the main loop in Server.run.
Expand Down Expand Up @@ -318,14 +336,13 @@ func (t *dialTask) resolve(srv *Server) bool {

// dial performs the actual connection attempt.
func (t *dialTask) dial(srv *Server, dest *discover.Node) bool {
addr := &net.TCPAddr{IP: dest.IP, Port: int(dest.TCP)}
fd, err := srv.Dialer.Dial("tcp", addr.String())
fd, err := srv.Dialer.Dial(dest)
if err != nil {
log.Trace("Dial error", "task", t, "err", err)
return false
}
mfd := newMeteredConn(fd, false)
srv.setupConn(mfd, t.flags, dest)
srv.SetupConn(mfd, t.flags, dest)
return true
}

Expand Down
2 changes: 1 addition & 1 deletion p2p/dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ func TestDialResolve(t *testing.T) {
}

// Now run the task, it should resolve the ID once.
config := Config{Dialer: &net.Dialer{Deadline: time.Now().Add(-5 * time.Minute)}}
config := Config{Dialer: TCPDialer{&net.Dialer{Deadline: time.Now().Add(-5 * time.Minute)}}}
srv := &Server{ntab: table, Config: config}
tasks[0].Do(srv)
if !reflect.DeepEqual(table.resolveCalls, []discover.NodeID{dest.ID}) {
Expand Down
40 changes: 40 additions & 0 deletions p2p/discover/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,11 @@ func (n *Node) UnmarshalText(text []byte) error {
// The node identifier is a marshaled elliptic curve public key.
type NodeID [NodeIDBits / 8]byte

// Bytes returns a byte slice representation of the NodeID
func (n NodeID) Bytes() []byte {
return n[:]
}

// NodeID prints as a long hexadecimal number.
func (n NodeID) String() string {
return fmt.Sprintf("%x", n[:])
Expand All @@ -240,6 +245,41 @@ func (n NodeID) TerminalString() string {
return hex.EncodeToString(n[:8])
}

// MarshalText implements the encoding.TextMarshaler interface.
func (n NodeID) MarshalText() ([]byte, error) {
return []byte(hex.EncodeToString(n[:])), nil
}

// UnmarshalText implements the encoding.TextUnmarshaler interface.
func (n *NodeID) UnmarshalText(text []byte) error {
id, err := HexID(string(text))
if err != nil {
return err
}
*n = id
return nil
}

// BytesID converts a byte slice to a NodeID
func BytesID(b []byte) (NodeID, error) {
var id NodeID
if len(b) != len(id) {
return id, fmt.Errorf("wrong length, want %d bytes", len(id))
}
copy(id[:], b)
return id, nil
}

// MustBytesID converts a byte slice to a NodeID.
// It panics if the byte slice is not a valid NodeID.
func MustBytesID(b []byte) NodeID {
id, err := BytesID(b)
if err != nil {
panic(err)
}
return id
}

// HexID converts a hex string to a NodeID.
// The string may be prefixed with 0x.
func HexID(in string) (NodeID, error) {
Expand Down
30 changes: 30 additions & 0 deletions p2p/discover/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package discover

import (
"bytes"
"fmt"
"math/big"
"math/rand"
Expand Down Expand Up @@ -192,6 +193,35 @@ func TestHexID(t *testing.T) {
}
}

func TestNodeID_textEncoding(t *testing.T) {
ref := NodeID{
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x10,
0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x20,
0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x30,
0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x40,
0x41, 0x42, 0x43, 0x44, 0x45, 0x46, 0x47, 0x48, 0x49, 0x50,
0x51, 0x52, 0x53, 0x54, 0x55, 0x56, 0x57, 0x58, 0x59, 0x60,
0x61, 0x62, 0x63, 0x64,
}
hex := "01020304050607080910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364"

text, err := ref.MarshalText()
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(text, []byte(hex)) {
t.Fatalf("text encoding did not match\nexpected: %s\ngot: %s", hex, text)
}

id := new(NodeID)
if err := id.UnmarshalText(text); err != nil {
t.Fatal(err)
}
if *id != ref {
t.Fatalf("text decoding did not match\nexpected: %s\ngot: %s", ref, id)
}
}

func TestNodeID_recover(t *testing.T) {
prv := newkey()
hash := make([]byte, 32)
Expand Down
66 changes: 66 additions & 0 deletions p2p/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/rlp"
)

Expand Down Expand Up @@ -271,3 +273,67 @@ func ExpectMsg(r MsgReader, code uint64, content interface{}) error {
}
return nil
}

// msgEventer wraps a MsgReadWriter and sends events whenever a message is sent
// or received
type msgEventer struct {
MsgReadWriter

feed *event.Feed
peerID discover.NodeID
Protocol string
}

// newMsgEventer returns a msgEventer which sends message events to the given
// feed
func newMsgEventer(rw MsgReadWriter, feed *event.Feed, peerID discover.NodeID, proto string) *msgEventer {
return &msgEventer{
MsgReadWriter: rw,
feed: feed,
peerID: peerID,
Protocol: proto,
}
}

// ReadMsg reads a message from the underlying MsgReadWriter and emits a
// "message received" event
func (self *msgEventer) ReadMsg() (Msg, error) {
msg, err := self.MsgReadWriter.ReadMsg()
if err != nil {
return msg, err
}
self.feed.Send(&PeerEvent{
Type: PeerEventTypeMsgRecv,
Peer: self.peerID,
Protocol: self.Protocol,
MsgCode: &msg.Code,
MsgSize: &msg.Size,
})
return msg, nil
}

// WriteMsg writes a message to the underlying MsgReadWriter and emits a
// "message sent" event
func (self *msgEventer) WriteMsg(msg Msg) error {
err := self.MsgReadWriter.WriteMsg(msg)
if err != nil {
return err
}
self.feed.Send(&PeerEvent{
Type: PeerEventTypeMsgSend,
Peer: self.peerID,
Protocol: self.Protocol,
MsgCode: &msg.Code,
MsgSize: &msg.Size,
})
return nil
}

// Close closes the underlying MsgReadWriter if it implements the io.Closer
// interface
func (self *msgEventer) Close() error {
if v, ok := self.MsgReadWriter.(io.Closer); ok {
return v.Close()
}
return nil
}
Loading

0 comments on commit 9feec51

Please sign in to comment.