Skip to content

Commit

Permalink
les: improved header fetcher and server statistics
Browse files Browse the repository at this point in the history
zsfelfoldi committed Dec 10, 2016

Verified

This commit was signed with the committer’s verified signature.
sdispater Sébastien Eustace
1 parent e67500a commit af8a742
Showing 10 changed files with 812 additions and 481 deletions.
754 changes: 572 additions & 182 deletions les/fetcher.go

Large diffs are not rendered by default.

53 changes: 22 additions & 31 deletions les/handler.go
Original file line number Diff line number Diff line change
@@ -24,10 +24,8 @@ import (
"math/big"
"net"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
@@ -60,7 +58,7 @@ const (
MaxHeaderProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
MaxTxSend = 64 // Amount of transactions to be send per request

disableClientRemovePeer = true
disableClientRemovePeer = false
)

// errIncompatibleConfig is returned if the requested protocols and configs are
@@ -157,44 +155,27 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network
Length: ProtocolLengths[i],
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
var entry *poolEntry
peer := manager.newPeer(int(version), networkId, p, rw)
if manager.serverPool != nil {
addr := p.RemoteAddr().(*net.TCPAddr)
entry = manager.serverPool.connect(p.ID(), addr.IP, uint16(addr.Port))
entry = manager.serverPool.connect(peer, addr.IP, uint16(addr.Port))
if entry == nil {
return fmt.Errorf("unwanted connection")
}
}
peer := manager.newPeer(int(version), networkId, p, rw)
peer.poolEntry = entry
select {
case manager.newPeerCh <- peer:
manager.wg.Add(1)
defer manager.wg.Done()
start := mclock.Now()
err := manager.handle(peer)
if entry != nil {
connTime := time.Duration(mclock.Now() - start)
stopped := false
select {
case <-manager.quitSync:
stopped = true
default:
}
//fmt.Println("connTime", peer.id, connTime, stopped, err)
quality := float64(1)
setQuality := true
if connTime < time.Minute*10 {
quality = 0
if stopped {
setQuality = false
}
}
manager.serverPool.disconnect(entry, quality, setQuality)
manager.serverPool.disconnect(entry)
}
return err
case <-manager.quitSync:
if entry != nil {
manager.serverPool.disconnect(entry, 0, false)
manager.serverPool.disconnect(entry)
}
return p2p.DiscQuitting
}
@@ -224,7 +205,6 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network
manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, blockchain.HasHeader, nil, blockchain.GetHeaderByHash,
nil, blockchain.CurrentHeader, nil, nil, nil, blockchain.GetTdByHash,
blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer)
manager.fetcher = newLightFetcher(manager)
}

if odr != nil {
@@ -254,10 +234,12 @@ func (pm *ProtocolManager) removePeer(id string) {
glog.V(logger.Debug).Infof("LES: unregister peer %v", id)
if pm.lightSync {
pm.downloader.UnregisterPeer(id)
pm.odr.UnregisterPeer(peer)
if pm.txrelay != nil {
pm.txrelay.removePeer(id)
}
if pm.fetcher != nil {
pm.fetcher.removePeer(peer)
}
}
if err := pm.peers.Unregister(id); err != nil {
glog.V(logger.Error).Infoln("Removal failed:", err)
@@ -276,8 +258,10 @@ func (pm *ProtocolManager) Start(srvr *p2p.Server) {
lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8]))
if pm.lightSync {
// start sync handler
if srvr != nil {
if srvr != nil { // srvr is nil during testing
pm.serverPool = newServerPool(pm.chainDb, []byte("serverPool/"), srvr, lesTopic, pm.quitSync, &pm.wg)
pm.odr.serverPool = pm.serverPool
pm.fetcher = newLightFetcher(pm)
}
go pm.syncer()
} else {
@@ -369,12 +353,17 @@ func (pm *ProtocolManager) handle(p *peer) error {
requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil {
return err
}
pm.odr.RegisterPeer(p)
if pm.txrelay != nil {
pm.txrelay.addPeer(p)
}

pm.fetcher.notify(p, nil)
p.lock.Lock()
head := p.headInfo
p.lock.Unlock()
if pm.fetcher != nil {
pm.fetcher.addPeer(p)
pm.fetcher.announce(p, head)
}

if p.poolEntry != nil {
pm.serverPool.registered(p.poolEntry)
@@ -460,7 +449,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "%v: %v", msg, err)
}
glog.V(logger.Detail).Infoln("AnnounceMsg:", req.Number, req.Hash, req.Td, req.ReorgDepth)
pm.fetcher.notify(p, &req)
if pm.fetcher != nil {
go pm.fetcher.announce(p, &req)
}

case GetBlockHeadersMsg:
glog.V(logger.Debug).Infof("<=== GetBlockHeadersMsg from peer %v", p.id)
@@ -558,7 +549,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
p.fcServer.GotReply(resp.ReqID, resp.BV)
if pm.fetcher.requestedID(resp.ReqID) {
if pm.fetcher != nil && pm.fetcher.requestedID(resp.ReqID) {
pm.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
} else {
err := pm.downloader.DeliverHeaders(p.id, resp.Headers)
11 changes: 11 additions & 0 deletions les/helper_test.go
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@ import (
"math/big"
"sync"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
@@ -334,3 +335,13 @@ func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, headNu
func (p *testPeer) close() {
p.app.Close()
}

type testServerPool peer

func (p *testServerPool) selectPeer(func(*peer) (bool, uint64)) *peer {
return (*peer)(p)
}

func (p *testServerPool) adjustResponseTime(*poolEntry, time.Duration, bool) {

}
55 changes: 26 additions & 29 deletions les/odr.go
Original file line number Diff line number Diff line change
@@ -37,22 +37,26 @@ var (
// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string)

type odrPeerSelector interface {
selectPeer(func(*peer) (bool, uint64)) *peer
adjustResponseTime(*poolEntry, time.Duration, bool)
}

type LesOdr struct {
light.OdrBackend
db ethdb.Database
stop chan struct{}
removePeer peerDropFn
mlock, clock sync.Mutex
sentReqs map[uint64]*sentReq
peers *odrPeerSet
serverPool odrPeerSelector
lastReqID uint64
}

func NewLesOdr(db ethdb.Database) *LesOdr {
return &LesOdr{
db: db,
stop: make(chan struct{}),
peers: newOdrPeerSet(),
sentReqs: make(map[uint64]*sentReq),
}
}
@@ -77,16 +81,6 @@ type sentReq struct {
answered chan struct{} // closed and set to nil when any peer answers it
}

// RegisterPeer registers a new LES peer to the ODR capable peer set
func (self *LesOdr) RegisterPeer(p *peer) error {
return self.peers.register(p)
}

// UnregisterPeer removes a peer from the ODR capable peer set
func (self *LesOdr) UnregisterPeer(p *peer) {
self.peers.unregister(p)
}

const (
MsgBlockBodies = iota
MsgCode
@@ -142,29 +136,26 @@ func (self *LesOdr) requestPeer(req *sentReq, peer *peer, delivered, timeout cha

select {
case <-delivered:
servTime := uint64(mclock.Now() - stime)
self.peers.updateTimeout(peer, false)
self.peers.updateServTime(peer, servTime)
if self.serverPool != nil {
self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), false)
}
return
case <-time.After(softRequestTimeout):
close(timeout)
if self.peers.updateTimeout(peer, true) {
self.removePeer(peer.id)
}
case <-self.stop:
return
}

select {
case <-delivered:
servTime := uint64(mclock.Now() - stime)
self.peers.updateServTime(peer, servTime)
return
case <-time.After(hardRequestTimeout):
self.removePeer(peer.id)
go self.removePeer(peer.id)
case <-self.stop:
return
}
if self.serverPool != nil {
self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), true)
}
}

// networkRequest sends a request to known peers until an answer is received
@@ -193,7 +184,13 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro

exclude := make(map[*peer]struct{})
for {
if peer := self.peers.bestPeer(lreq, exclude); peer == nil {
var p *peer
if self.serverPool != nil {
p = self.serverPool.selectPeer(func(p *peer) (bool, uint64) {
return true, p.fcServer.CanSend(lreq.GetCost(p))
})
}
if p == nil {
select {
case <-ctx.Done():
return ctx.Err()
@@ -202,17 +199,17 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro
case <-time.After(retryPeers):
}
} else {
exclude[peer] = struct{}{}
exclude[p] = struct{}{}
delivered := make(chan struct{})
timeout := make(chan struct{})
req.lock.Lock()
req.sentTo[peer] = delivered
req.sentTo[p] = delivered
req.lock.Unlock()
reqWg.Add(1)
cost := lreq.GetCost(peer)
peer.fcServer.SendRequest(reqID, cost)
go self.requestPeer(req, peer, delivered, timeout, reqWg)
lreq.Request(reqID, peer)
cost := lreq.GetCost(p)
p.fcServer.SendRequest(reqID, cost)
go self.requestPeer(req, p, delivered, timeout, reqWg)
lreq.Request(reqID, p)

select {
case <-ctx.Done():
Loading

0 comments on commit af8a742

Please sign in to comment.