Skip to content

Commit

Permalink
les: implement request distributor, fix blocking issues (ethereum#3660)
Browse files Browse the repository at this point in the history
* les: implement request distributor, fix blocking issues
* core: moved header validation before chain mutex lock
  • Loading branch information
zsfelfoldi authored and fjl committed Mar 22, 2017
1 parent 1c1dc0e commit 525116d
Show file tree
Hide file tree
Showing 19 changed files with 877 additions and 322 deletions.
7 changes: 6 additions & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1313,6 +1313,11 @@ Error: %v
// of the header retrieval mechanisms already need to verify nonces, as well as
// because nonces can be verified sparsely, not needing to check each.
func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error) {
start := time.Now()
if i, err := self.hc.ValidateHeaderChain(chain, checkFreq); err != nil {
return i, err
}

// Make sure only one thread manipulates the chain at once
self.chainmu.Lock()
defer self.chainmu.Unlock()
Expand All @@ -1328,7 +1333,7 @@ func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int)
return err
}

return self.hc.InsertHeaderChain(chain, checkFreq, whFunc)
return self.hc.InsertHeaderChain(chain, whFunc, start)
}

// writeHeader writes a header into the local chain, given that its parent is
Expand Down
13 changes: 9 additions & 4 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ type WhCallback func(*types.Header) error
// should be done or not. The reason behind the optional check is because some
// of the header retrieval mechanisms already need to verfy nonces, as well as
// because nonces can be verified sparsely, not needing to check each.
func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, checkFreq int, writeHeader WhCallback) (int, error) {

func (hc *HeaderChain) ValidateHeaderChain(chain []*types.Header, checkFreq int) (int, error) {
// Do a sanity check that the provided chain is actually ordered and linked
for i := 1; i < len(chain); i++ {
if chain[i].Number.Uint64() != chain[i-1].Number.Uint64()+1 || chain[i].ParentHash != chain[i-1].Hash() {
Expand All @@ -231,9 +232,6 @@ func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, checkFreq int, w
chain[i-1].Hash().Bytes()[:4], i, chain[i].Number, chain[i].Hash().Bytes()[:4], chain[i].ParentHash[:4])
}
}
// Collect some import statistics to report on
stats := struct{ processed, ignored int }{}
start := time.Now()

// Generate the list of headers that should be POW verified
verify := make([]bool, len(chain))
Expand Down Expand Up @@ -309,6 +307,13 @@ func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, checkFreq int, w
}
}
}

return 0, nil
}

func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, writeHeader WhCallback, start time.Time) (int, error) {
// Collect some import statistics to report on
stats := struct{ processed, ignored int }{}
// All headers passed verification, import them into the database
for i, header := range chain {
// Short circuit insertion if shutting down
Expand Down
2 changes: 2 additions & 0 deletions les/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.LightMode, config.NetworkId, eth.eventMux, eth.pow, eth.blockchain, nil, chainDb, odr, relay); err != nil {
return nil, err
}
relay.ps = eth.protocolManager.peers
relay.reqDist = eth.protocolManager.reqDist

eth.ApiBackend = &LesApiBackend{eth, nil}
eth.ApiBackend.gpo = gasprice.NewLightPriceOracle(eth.ApiBackend)
Expand Down
259 changes: 259 additions & 0 deletions les/distributor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

// Package light implements on-demand retrieval capable state and chain objects
// for the Ethereum Light Client.
package les

import (
"container/list"
"errors"
"sync"
"time"
)

// ErrNoPeers is returned if no peers capable of serving a queued request are available
var ErrNoPeers = errors.New("no suitable peers available")

// requestDistributor implements a mechanism that distributes requests to
// suitable peers, obeying flow control rules and prioritizing them in creation
// order (even when a resend is necessary).
type requestDistributor struct {
reqQueue *list.List
lastReqOrder uint64
stopChn, loopChn chan struct{}
loopNextSent bool
lock sync.Mutex

getAllPeers func() map[distPeer]struct{}
}

// distPeer is an LES server peer interface for the request distributor.
// waitBefore returns either the necessary waiting time before sending a request
// with the given upper estimated cost or the estimated remaining relative buffer
// value after sending such a request (in which case the request can be sent
// immediately). At least one of these values is always zero.
type distPeer interface {
waitBefore(uint64) (time.Duration, float64)
canQueue() bool
queueSend(f func())
}

// distReq is the request abstraction used by the distributor. It is based on
// three callback functions:
// - getCost returns the upper estimate of the cost of sending the request to a given peer
// - canSend tells if the server peer is suitable to serve the request
// - request prepares sending the request to the given peer and returns a function that
// does the actual sending. Request order should be preserved but the callback itself should not
// block until it is sent because other peers might still be able to receive requests while
// one of them is blocking. Instead, the returned function is put in the peer's send queue.
type distReq struct {
getCost func(distPeer) uint64
canSend func(distPeer) bool
request func(distPeer) func()

reqOrder uint64
sentChn chan distPeer
element *list.Element
}

// newRequestDistributor creates a new request distributor
func newRequestDistributor(getAllPeers func() map[distPeer]struct{}, stopChn chan struct{}) *requestDistributor {
r := &requestDistributor{
reqQueue: list.New(),
loopChn: make(chan struct{}, 2),
stopChn: stopChn,
getAllPeers: getAllPeers,
}
go r.loop()
return r
}

// distMaxWait is the maximum waiting time after which further necessary waiting
// times are recalculated based on new feedback from the servers
const distMaxWait = time.Millisecond * 10

// main event loop
func (d *requestDistributor) loop() {
for {
select {
case <-d.stopChn:
d.lock.Lock()
elem := d.reqQueue.Front()
for elem != nil {
close(elem.Value.(*distReq).sentChn)
elem = elem.Next()
}
d.lock.Unlock()
return
case <-d.loopChn:
d.lock.Lock()
d.loopNextSent = false
loop:
for {
peer, req, wait := d.nextRequest()
if req != nil && wait == 0 {
chn := req.sentChn // save sentChn because remove sets it to nil
d.remove(req)
send := req.request(peer)
if send != nil {
peer.queueSend(send)
}
chn <- peer
close(chn)
} else {
if wait == 0 {
// no request to send and nothing to wait for; the next
// queued request will wake up the loop
break loop
}
d.loopNextSent = true // a "next" signal has been sent, do not send another one until this one has been received
if wait > distMaxWait {
// waiting times may be reduced by incoming request replies, if it is too long, recalculate it periodically
wait = distMaxWait
}
go func() {
time.Sleep(wait)
d.loopChn <- struct{}{}
}()
break loop
}
}
d.lock.Unlock()
}
}
}

// selectPeerItem represents a peer to be selected for a request by weightedRandomSelect
type selectPeerItem struct {
peer distPeer
req *distReq
weight int64
}

// Weight implements wrsItem interface
func (sp selectPeerItem) Weight() int64 {
return sp.weight
}

// nextRequest returns the next possible request from any peer, along with the
// associated peer and necessary waiting time
func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
peers := d.getAllPeers()

elem := d.reqQueue.Front()
var (
bestPeer distPeer
bestReq *distReq
bestWait time.Duration
sel *weightedRandomSelect
)

for (len(peers) > 0 || elem == d.reqQueue.Front()) && elem != nil {
req := elem.Value.(*distReq)
canSend := false
for peer, _ := range peers {
if peer.canQueue() && req.canSend(peer) {
canSend = true
cost := req.getCost(peer)
wait, bufRemain := peer.waitBefore(cost)
if wait == 0 {
if sel == nil {
sel = newWeightedRandomSelect()
}
sel.update(selectPeerItem{peer: peer, req: req, weight: int64(bufRemain*1000000) + 1})
} else {
if bestReq == nil || wait < bestWait {
bestPeer = peer
bestReq = req
bestWait = wait
}
}
delete(peers, peer)
}
}
next := elem.Next()
if !canSend && elem == d.reqQueue.Front() {
close(req.sentChn)
d.remove(req)
}
elem = next
}

if sel != nil {
c := sel.choose().(selectPeerItem)
return c.peer, c.req, 0
}
return bestPeer, bestReq, bestWait
}

// queue adds a request to the distribution queue, returns a channel where the
// receiving peer is sent once the request has been sent (request callback returned).
// If the request is cancelled or timed out without suitable peers, the channel is
// closed without sending any peer references to it.
func (d *requestDistributor) queue(r *distReq) chan distPeer {
d.lock.Lock()
defer d.lock.Unlock()

if r.reqOrder == 0 {
d.lastReqOrder++
r.reqOrder = d.lastReqOrder
}

back := d.reqQueue.Back()
if back == nil || r.reqOrder > back.Value.(*distReq).reqOrder {
r.element = d.reqQueue.PushBack(r)
} else {
before := d.reqQueue.Front()
for before.Value.(*distReq).reqOrder < r.reqOrder {
before = before.Next()
}
r.element = d.reqQueue.InsertBefore(r, before)
}

if !d.loopNextSent {
d.loopNextSent = true
d.loopChn <- struct{}{}
}

r.sentChn = make(chan distPeer, 1)
return r.sentChn
}

// cancel removes a request from the queue if it has not been sent yet (returns
// false if it has been sent already). It is guaranteed that the callback functions
// will not be called after cancel returns.
func (d *requestDistributor) cancel(r *distReq) bool {
d.lock.Lock()
defer d.lock.Unlock()

if r.sentChn == nil {
return false
}

close(r.sentChn)
d.remove(r)
return true
}

// remove removes a request from the queue
func (d *requestDistributor) remove(r *distReq) {
r.sentChn = nil
if r.element != nil {
d.reqQueue.Remove(r.element)
r.element = nil
}
}
Loading

0 comments on commit 525116d

Please sign in to comment.