Skip to content

Commit

Permalink
Fix blocking issue in runDial() (0xPolygon#259)
Browse files Browse the repository at this point in the history
* Rename PeerEventTypes

* Add comment to PeerEventType

* Add comment to PeerEventType

* Add PeerAddedToDialQueue

* Fix TestPeerEvent_EmitAndSubscribe

* Add runDial unit test

* Fix wrong call in PeerAdded handler in routing table

* Add constants for priority of dial queue

* Fix TestPeerEvent_EmitAndSubscribe

* Disable discover in TestPeerEvent_EmitAndSubscribe

* Fix emitEvent

* Fix TestRunDial
  • Loading branch information
Kourin1996 authored Dec 9, 2021
1 parent a9ee5ed commit 34cc105
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 60 deletions.
6 changes: 3 additions & 3 deletions network/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (d *discovery) setup() error {

d.routingTable.PeerAdded = func(p peer.ID) {
info := d.srv.host.Peerstore().PeerInfo(p)
d.srv.dialQueue.add(&info, 10)
d.srv.addToDialQueue(&info, PriorityRandomDial)
}
d.routingTable.PeerRemoved = func(p peer.ID) {
d.srv.dialQueue.del(p)
Expand All @@ -105,7 +105,7 @@ func (d *discovery) setup() error {
err = d.srv.SubscribeFn(func(evnt *PeerEvent) {
peerID := evnt.PeerID
switch evnt.Type {
case PeerEventConnected:
case PeerConnected:
// add peer to the routing table and to our local peer
_, err := d.routingTable.TryAddPeer(peerID, false, false)
if err != nil {
Expand All @@ -118,7 +118,7 @@ func (d *discovery) setup() error {
stream: nil,
})
d.peersLock.Unlock()
case PeerEventDisconnected:
case PeerDisconnected:
d.routingTable.RemovePeer(peerID)
d.peersLock.Lock()
d.peers.delete(peerID)
Expand Down
5 changes: 1 addition & 4 deletions network/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,7 @@ func (i *identity) setup() {
defer func() {
if i.isPending(peerID) {
i.delPending(peerID)
i.srv.emitEvent(&PeerEvent{
PeerID: peerID,
Type: PeerEventDialCompleted,
})
i.srv.emitEvent(peerID, PeerDialCompleted)
}
}()

Expand Down
99 changes: 57 additions & 42 deletions network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,16 @@ import (

const DefaultLibp2pPort int = 1478

const MinimumPeerConnections int64 = 1
const (
MinimumPeerConnections int64 = 1
MinimumBootNodes int = 2 // MinimumBootNodes Count is set to 2 so that, a bootnode can reconnect to the network using other bootnode after restarting.
)

// MinimumBootNodes Count is set to 2 so that, a bootnode can reconnect to the network using other bootnode after restarting.
const MinimumBootNodes int = 2
// Priority for dial queue
const (
PriorityRequestedDial uint64 = 1
PriorityRandomDial uint64 = 10
)

type Config struct {
NoDiscover bool
Expand Down Expand Up @@ -247,20 +253,19 @@ func (s *Server) checkPeerConnections() {
//TODO: dial peers from the peerstore
} else {
randomNode := s.getRandomBootNode()
s.dialQueue.add(randomNode, 10)
s.addToDialQueue(randomNode, PriorityRandomDial)
}

}
}

}

func (s *Server) runDial() {
// watch for events of peers included or removed
notifyCh := make(chan struct{})
err := s.SubscribeFn(func(evnt *PeerEvent) {
// only concerned about PeerConnected, PeerFailedToConnect, PeerDisconnected, PeerDialCompleted, and PeerAddedToDialQueue
switch evnt.Type {
case PeerEventConnected, PeerEventConnectedFailed, PeerEventDisconnected, PeerEventDialCompleted:
case PeerConnected, PeerFailedToConnect, PeerDisconnected, PeerDialCompleted, PeerAddedToDialQueue:
default:
return
}
Expand Down Expand Up @@ -291,10 +296,7 @@ func (s *Server) runDial() {
if s.isConnected(tt.addr.ID) {
// the node is already connected, send an event to wake up
// any join watchers
s.emitEvent(&PeerEvent{
PeerID: tt.addr.ID,
Type: PeerEventDialConnectedNode,
})
s.emitEvent(tt.addr.ID, PeerAlreadyConnected)
} else {
// the connection process is async because it involves connection (here) +
// the handshake done in the identity service.
Expand Down Expand Up @@ -367,10 +369,7 @@ func (s *Server) addPeer(id peer.ID) {
}
s.peers[id] = p

s.emitEvent(&PeerEvent{
PeerID: id,
Type: PeerEventConnected,
})
s.emitEvent(id, PeerConnected)
}

func (s *Server) delPeer(id peer.ID) {
Expand All @@ -382,10 +381,7 @@ func (s *Server) delPeer(id peer.ID) {
delete(s.peers, id)
s.host.Network().ClosePeer(id)

s.emitEvent(&PeerEvent{
PeerID: id,
Type: PeerEventDisconnected,
})
s.emitEvent(id, PeerDisconnected)
}

func (s *Server) Disconnect(peer peer.ID, reason string) {
Expand Down Expand Up @@ -446,7 +442,7 @@ func (s *Server) JoinAddr(addr string, timeout time.Duration) error {

func (s *Server) Join(addr *peer.AddrInfo, timeout time.Duration) error {
s.logger.Info("Join request", "addr", addr.String())
s.dialQueue.add(addr, 1)
s.addToDialQueue(addr, PriorityRequestedDial)

if timeout == 0 {
return nil
Expand Down Expand Up @@ -479,8 +475,10 @@ func (s *Server) watch(peerID peer.ID, dur time.Duration) error {

func (s *Server) runJoinWatcher() error {
return s.SubscribeFn(func(evnt *PeerEvent) {
// only concerned about 'PeerEventConnected' and 'PeerEventConnectedFailed'
if evnt.Type != PeerEventConnected && evnt.Type != PeerEventConnectedFailed && evnt.Type != PeerEventDialConnectedNode {
switch evnt.Type {
// only concerned about PeerConnected, PeerFailedToConnect, and PeerAlreadyConnected
case PeerConnected, PeerFailedToConnect, PeerAlreadyConnected:
default:
return
}

Expand Down Expand Up @@ -550,8 +548,18 @@ func (s *Server) AddrInfo() *peer.AddrInfo {
}
}

func (s *Server) emitEvent(evnt *PeerEvent) {
if err := s.emitterPeerEvent.Emit(*evnt); err != nil {
func (s *Server) addToDialQueue(addr *peer.AddrInfo, priority uint64) {
s.dialQueue.add(addr, priority)
s.emitEvent(addr.ID, PeerAddedToDialQueue)
}

func (s *Server) emitEvent(peerID peer.ID, typ PeerEventType) {
evnt := PeerEvent{
PeerID: peerID,
Type: typ,
}

if err := s.emitterPeerEvent.Emit(evnt); err != nil {
s.logger.Info("failed to emit event", "peer", evnt.PeerID, "type", evnt.Type, "err", err)
}
}
Expand Down Expand Up @@ -704,32 +712,39 @@ func AddrInfoToString(addr *peer.AddrInfo) string {
return dialAddress + "/p2p/" + addr.ID.String()
}

type PeerConnectedEvent struct {
Peer peer.ID
Err error
}

type PeerDisconnectedEvent struct {
Peer peer.ID
}
type PeerEventType uint

const (
PeerEventConnected = "PeerConnected"
PeerEventConnectedFailed = "PeerConnectedFailed"
PeerEventDisconnected = "PeerDisconnected"
PeerEventDialConnectedNode = "PeerDialConnectedNode"
PeerEventDialCompleted = "PeerDialCompleted"
PeerConnected PeerEventType = iota // Emitted when a peer connected
PeerFailedToConnect // Emitted when a peer failed to connect
PeerDisconnected // Emitted when a peer disconnected from node
PeerAlreadyConnected // Emitted when a peer already connected on dial
PeerDialCompleted // Emitted when a peer completed dial
PeerAddedToDialQueue // Emitted when a peer is added to dial queue
)

var peerEventToName = map[PeerEventType]string{
PeerConnected: "PeerConnected",
PeerFailedToConnect: "PeerFailedToConnect",
PeerDisconnected: "PeerDisconnected",
PeerAlreadyConnected: "PeerAlreadyConnected",
PeerDialCompleted: "PeerDialCompleted",
PeerAddedToDialQueue: "PeerAddedToDialQueue",
}

func (s PeerEventType) String() string {
name, ok := peerEventToName[s]
if !ok {
return "unknown"
}
return name
}

type PeerEvent struct {
// PeerID is the id of the peer that triggered
// the event
PeerID peer.ID

// Type is the type of the event
Type string

// Desc is used to include more contextual
// information for the event
Desc string
Type PeerEventType
}
109 changes: 101 additions & 8 deletions network/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -99,37 +100,58 @@ func asyncWaitForEvent(s *Server, timeout time.Duration, handler func(*PeerEvent

func disconnectedPeerHandler(p peer.ID) func(evnt *PeerEvent) bool {
return func(evnt *PeerEvent) bool {
return evnt.Type == PeerEventDisconnected && evnt.PeerID == p
return evnt.Type == PeerDisconnected && evnt.PeerID == p
}
}

func connectedPeerHandler(p peer.ID) func(evnt *PeerEvent) bool {
return func(evnt *PeerEvent) bool {
return evnt.Type == PeerEventConnected && evnt.PeerID == p
return evnt.Type == PeerConnected && evnt.PeerID == p
}
}

func TestPeerEvent_EmitAndSubscribe(t *testing.T) {
srv0 := CreateServer(t, nil)

srv0 := CreateServer(t, func(c *Config) {
c.NoDiscover = true
})
sub, err := srv0.Subscribe()
assert.NoError(t, err)

count := 10
events := []PeerEventType{
PeerConnected,
PeerFailedToConnect,
PeerDisconnected,
PeerAlreadyConnected,
PeerDialCompleted,
PeerAddedToDialQueue,
}

getIDAndEventType := func(i int) (peer.ID, PeerEventType) {
id := peer.ID(strconv.Itoa(i))
event := events[i%len(events)]
return id, event
}

t.Run("serial", func(t *testing.T) {
for i := 0; i < count; i++ {
srv0.emitEvent(&PeerEvent{})
sub.Get()
id, event := getIDAndEventType(i)
srv0.emitEvent(id, event)

received := sub.Get()
assert.Equal(t, &PeerEvent{id, event}, received)
}
})

t.Run("parallel", func(t *testing.T) {
for i := 0; i < count; i++ {
srv0.emitEvent(&PeerEvent{})
id, event := getIDAndEventType(i)
srv0.emitEvent(id, event)
}
for i := 0; i < count; i++ {
sub.Get()
received := sub.Get()
id, event := getIDAndEventType(i)
assert.Equal(t, &PeerEvent{id, event}, received)
}
})
}
Expand Down Expand Up @@ -457,6 +479,77 @@ func TestSelfConnection_WithBootNodes(t *testing.T) {
}
}

func TestRunDial(t *testing.T) {
// setupServers returns server and list of peer's server
setupServers := func(t *testing.T, maxPeers []uint64) []*Server {
servers := make([]*Server, len(maxPeers))
for idx := range servers {
servers[idx] = CreateServer(t, func(c *Config) {
c.MaxPeers = maxPeers[idx]
c.NoDiscover = true
})
}
return servers
}

connectToPeer := func(srv *Server, peer *peer.AddrInfo, timeout time.Duration) bool {
connectedCh := asyncWaitForEvent(srv, timeout, connectedPeerHandler(peer.ID))
srv.Join(peer, 0)
return <-connectedCh
}

closeServers := func(servers ...*Server) {
for _, s := range servers {
s.Close()
}
}

t.Run("should connect to all peers", func(t *testing.T) {
maxPeers := []uint64{2, 1, 1}
servers := setupServers(t, maxPeers)
srv, peers := servers[0], servers[1:]

for idx, p := range peers {
addr := p.AddrInfo()
connected := connectToPeer(srv, addr, 5*time.Second)
assert.Truef(t, connected, "should connect to peer %d[%s], but didn't\n", idx, addr.ID)
}
closeServers(servers...)
})

t.Run("should fail to connect to some peers due to reaching limit", func(t *testing.T) {
maxPeers := []uint64{2, 1, 1, 1}
servers := setupServers(t, maxPeers)
srv, peers := servers[0], servers[1:]

for idx, p := range peers {
addr := p.AddrInfo()
connected := connectToPeer(srv, addr, 5*time.Second)
if uint64(idx) < maxPeers[0] {
assert.Truef(t, connected, "should connect to peer %d[%s], but didn't\n", idx, addr.ID)
} else {
assert.Falsef(t, connected, "should fail to connect to peer %d[%s], but connected\n", idx, addr.ID)
}
}
closeServers(servers...)
})

t.Run("should try to connect after adding a peer to queue", func(t *testing.T) {
// peer1 can't connect to any peer
maxPeers := []uint64{1, 0, 1}
servers := setupServers(t, maxPeers)
srv, peers := servers[0], servers[1:]

connected := connectToPeer(srv, peers[0].AddrInfo(), 15*time.Second)
assert.False(t, connected)

connected = connectToPeer(srv, peers[1].AddrInfo(), 15*time.Second)
assert.True(t, connected)

closeServers(srv, peers[1])
})
}

func TestMinimumBootNodeCount(t *testing.T) {
tests := []struct {
name string
Expand Down
6 changes: 3 additions & 3 deletions protocol/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func (s *Syncer) Start() {
}

switch evnt.Type {
case network.PeerEventConnected:
case network.PeerConnected:
stream, err := s.server.NewStream(syncerV1, evnt.PeerID)
if err != nil {
s.logger.Error("failed to open a stream", "err", err)
Expand All @@ -362,7 +362,7 @@ func (s *Syncer) Start() {
s.logger.Error("failed to handle user", "err", err)
}

case network.PeerEventDisconnected:
case network.PeerDisconnected:
if err := s.DeletePeer(evnt.PeerID); err != nil {
s.logger.Error("failed to delete user", "err", err)
}
Expand Down Expand Up @@ -623,4 +623,4 @@ func getHeader(clt proto.V1Client, num *uint64, hash *types.Hash) (*types.Header
return nil, err
}
return header, nil
}
}

0 comments on commit 34cc105

Please sign in to comment.