Skip to content

Commit

Permalink
les: create utilities as common package (ethereum#20509)
Browse files Browse the repository at this point in the history
* les: move execqueue into utilities package

execqueue is a util for executing queued functions
in a serial order which is used by both les server
and les client. Move it to common package.

* les: move randselect to utilities package

weighted_random_selector is a helpful tool for randomly select
items maintained in a set but based on the item weight.

It's used anywhere is LES package, mainly by les client but will
be used in les server with very high chance. So move it into a
common package as the second step for les separation.

* les: rename to utils
  • Loading branch information
rjl493456442 authored Mar 31, 2020
1 parent 32d31c3 commit f78ffc0
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 90 deletions.
9 changes: 5 additions & 4 deletions les/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/les/utils"
)

// requestDistributor implements a mechanism that distributes requests to
Expand Down Expand Up @@ -194,7 +195,7 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
elem := d.reqQueue.Front()
var (
bestWait time.Duration
sel *weightedRandomSelect
sel *utils.WeightedRandomSelect
)

d.peerLock.RLock()
Expand All @@ -219,9 +220,9 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
wait, bufRemain := peer.waitBefore(cost)
if wait == 0 {
if sel == nil {
sel = newWeightedRandomSelect()
sel = utils.NewWeightedRandomSelect()
}
sel.update(selectPeerItem{peer: peer, req: req, weight: int64(bufRemain*1000000) + 1})
sel.Update(selectPeerItem{peer: peer, req: req, weight: int64(bufRemain*1000000) + 1})
} else {
if bestWait == 0 || wait < bestWait {
bestWait = wait
Expand All @@ -239,7 +240,7 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
}

if sel != nil {
c := sel.choose().(selectPeerItem)
c := sel.Choose().(selectPeerItem)
return c.peer, c.req, 0
}
return nil, nil, bestWait
Expand Down
15 changes: 8 additions & 7 deletions les/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/les/flowcontrol"
"github.com/ethereum/go-ethereum/les/utils"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
Expand Down Expand Up @@ -135,7 +136,7 @@ type peerCommons struct {
headInfo blockInfo // Latest block information.

// Background task queue for caching peer tasks and executing in order.
sendQueue *execQueue
sendQueue *utils.ExecQueue

// Flow control agreement.
fcParams flowcontrol.ServerParams // The config for token bucket.
Expand All @@ -153,13 +154,13 @@ func (p *peerCommons) isFrozen() bool {

// canQueue returns an indicator whether the peer can queue a operation.
func (p *peerCommons) canQueue() bool {
return p.sendQueue.canQueue() && !p.isFrozen()
return p.sendQueue.CanQueue() && !p.isFrozen()
}

// queueSend caches a peer operation in the background task queue.
// Please ensure to check `canQueue` before call this function
func (p *peerCommons) queueSend(f func()) bool {
return p.sendQueue.queue(f)
return p.sendQueue.Queue(f)
}

// mustQueueSend starts a for loop and retry the caching if failed.
Expand Down Expand Up @@ -337,7 +338,7 @@ func (p *peerCommons) handshake(td *big.Int, head common.Hash, headNum uint64, g
// close closes the channel and notifies all background routines to exit.
func (p *peerCommons) close() {
close(p.closeCh)
p.sendQueue.quit()
p.sendQueue.Quit()
}

// serverPeer represents each node to which the client is connected.
Expand Down Expand Up @@ -375,7 +376,7 @@ func newServerPeer(version int, network uint64, trusted bool, p *p2p.Peer, rw p2
id: peerIdToString(p.ID()),
version: version,
network: network,
sendQueue: newExecQueue(100),
sendQueue: utils.NewExecQueue(100),
closeCh: make(chan struct{}),
},
trusted: trusted,
Expand Down Expand Up @@ -407,7 +408,7 @@ func (p *serverPeer) rejectUpdate(size uint64) bool {
// frozen.
func (p *serverPeer) freeze() {
if atomic.CompareAndSwapUint32(&p.frozen, 0, 1) {
p.sendQueue.clear()
p.sendQueue.Clear()
}
}

Expand Down Expand Up @@ -652,7 +653,7 @@ func newClientPeer(version int, network uint64, p *p2p.Peer, rw p2p.MsgReadWrite
id: peerIdToString(p.ID()),
version: version,
network: network,
sendQueue: newExecQueue(100),
sendQueue: utils.NewExecQueue(100),
closeCh: make(chan struct{}),
},
errCh: make(chan error, 1),
Expand Down
39 changes: 20 additions & 19 deletions les/serverpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/les/utils"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discv5"
Expand Down Expand Up @@ -129,7 +130,7 @@ type serverPool struct {
adjustStats chan poolStatAdjust

knownQueue, newQueue poolEntryQueue
knownSelect, newSelect *weightedRandomSelect
knownSelect, newSelect *utils.WeightedRandomSelect
knownSelected, newSelected int
fastDiscover bool
connCh chan *connReq
Expand All @@ -152,8 +153,8 @@ func newServerPool(db ethdb.Database, ulcServers []string) *serverPool {
disconnCh: make(chan *disconnReq),
registerCh: make(chan *registerReq),
closeCh: make(chan struct{}),
knownSelect: newWeightedRandomSelect(),
newSelect: newWeightedRandomSelect(),
knownSelect: utils.NewWeightedRandomSelect(),
newSelect: utils.NewWeightedRandomSelect(),
fastDiscover: true,
trustedNodes: parseTrustedNodes(ulcServers),
}
Expand Down Expand Up @@ -402,8 +403,8 @@ func (pool *serverPool) eventLoop() {
entry.lastConnected = addr
entry.addr = make(map[string]*poolEntryAddress)
entry.addr[addr.strKey()] = addr
entry.addrSelect = *newWeightedRandomSelect()
entry.addrSelect.update(addr)
entry.addrSelect = *utils.NewWeightedRandomSelect()
entry.addrSelect.Update(addr)
req.result <- entry
}

Expand Down Expand Up @@ -459,7 +460,7 @@ func (pool *serverPool) findOrNewNode(node *enode.Node) *poolEntry {
entry = &poolEntry{
node: node,
addr: make(map[string]*poolEntryAddress),
addrSelect: *newWeightedRandomSelect(),
addrSelect: *utils.NewWeightedRandomSelect(),
shortRetry: shortRetryCnt,
}
pool.entries[node.ID()] = entry
Expand All @@ -477,7 +478,7 @@ func (pool *serverPool) findOrNewNode(node *enode.Node) *poolEntry {
entry.addr[addr.strKey()] = addr
}
addr.lastSeen = now
entry.addrSelect.update(addr)
entry.addrSelect.Update(addr)
if !entry.known {
pool.newQueue.setLatest(entry)
}
Expand Down Expand Up @@ -505,7 +506,7 @@ func (pool *serverPool) loadNodes() {
pool.entries[e.node.ID()] = e
if pool.trustedNodes[e.node.ID()] == nil {
pool.knownQueue.setLatest(e)
pool.knownSelect.update((*knownEntry)(e))
pool.knownSelect.Update((*knownEntry)(e))
}
}
}
Expand Down Expand Up @@ -556,8 +557,8 @@ func (pool *serverPool) saveNodes() {
// Note that it is called by the new/known queues from which the entry has already
// been removed so removing it from the queues is not necessary.
func (pool *serverPool) removeEntry(entry *poolEntry) {
pool.newSelect.remove((*discoveredEntry)(entry))
pool.knownSelect.remove((*knownEntry)(entry))
pool.newSelect.Remove((*discoveredEntry)(entry))
pool.knownSelect.Remove((*knownEntry)(entry))
entry.removed = true
delete(pool.entries, entry.node.ID())
}
Expand Down Expand Up @@ -586,8 +587,8 @@ func (pool *serverPool) setRetryDial(entry *poolEntry) {
// updateCheckDial is called when an entry can potentially be dialed again. It updates
// its selection weights and checks if new dials can/should be made.
func (pool *serverPool) updateCheckDial(entry *poolEntry) {
pool.newSelect.update((*discoveredEntry)(entry))
pool.knownSelect.update((*knownEntry)(entry))
pool.newSelect.Update((*discoveredEntry)(entry))
pool.knownSelect.Update((*knownEntry)(entry))
pool.checkDial()
}

Expand All @@ -596,15 +597,15 @@ func (pool *serverPool) updateCheckDial(entry *poolEntry) {
func (pool *serverPool) checkDial() {
fillWithKnownSelects := !pool.fastDiscover
for pool.knownSelected < targetKnownSelect {
entry := pool.knownSelect.choose()
entry := pool.knownSelect.Choose()
if entry == nil {
fillWithKnownSelects = false
break
}
pool.dial((*poolEntry)(entry.(*knownEntry)), true)
}
for pool.knownSelected+pool.newSelected < targetServerCount {
entry := pool.newSelect.choose()
entry := pool.newSelect.Choose()
if entry == nil {
break
}
Expand All @@ -615,7 +616,7 @@ func (pool *serverPool) checkDial() {
// is over, we probably won't find more in the near future so select more
// known entries if possible
for pool.knownSelected < targetServerCount {
entry := pool.knownSelect.choose()
entry := pool.knownSelect.Choose()
if entry == nil {
break
}
Expand All @@ -636,7 +637,7 @@ func (pool *serverPool) dial(entry *poolEntry, knownSelected bool) {
} else {
pool.newSelected++
}
addr := entry.addrSelect.choose().(*poolEntryAddress)
addr := entry.addrSelect.Choose().(*poolEntryAddress)
log.Debug("Dialing new peer", "lesaddr", entry.node.ID().String()+"@"+addr.strKey(), "set", len(entry.addr), "known", knownSelected)
entry.dialed = addr
go func() {
Expand Down Expand Up @@ -684,7 +685,7 @@ type poolEntry struct {
addr map[string]*poolEntryAddress
node *enode.Node
lastConnected, dialed *poolEntryAddress
addrSelect weightedRandomSelect
addrSelect utils.WeightedRandomSelect

lastDiscovered mclock.AbsTime
known, knownSelected, trusted bool
Expand Down Expand Up @@ -734,8 +735,8 @@ func (e *poolEntry) DecodeRLP(s *rlp.Stream) error {
e.node = enode.NewV4(pubkey, entry.IP, int(entry.Port), int(entry.Port))
e.addr = make(map[string]*poolEntryAddress)
e.addr[addr.strKey()] = addr
e.addrSelect = *newWeightedRandomSelect()
e.addrSelect.update(addr)
e.addrSelect = *utils.NewWeightedRandomSelect()
e.addrSelect.Update(addr)
e.lastConnected = addr
e.connectStats = entry.CStat
e.delayStats = entry.DStat
Expand Down
37 changes: 19 additions & 18 deletions les/execqueue.go → les/utils/exec_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,35 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package les
package utils

import "sync"

// execQueue implements a queue that executes function calls in a single thread,
// ExecQueue implements a queue that executes function calls in a single thread,
// in the same order as they have been queued.
type execQueue struct {
type ExecQueue struct {
mu sync.Mutex
cond *sync.Cond
funcs []func()
closeWait chan struct{}
}

// newExecQueue creates a new execution queue.
func newExecQueue(capacity int) *execQueue {
q := &execQueue{funcs: make([]func(), 0, capacity)}
// NewExecQueue creates a new execution Queue.
func NewExecQueue(capacity int) *ExecQueue {
q := &ExecQueue{funcs: make([]func(), 0, capacity)}
q.cond = sync.NewCond(&q.mu)
go q.loop()
return q
}

func (q *execQueue) loop() {
func (q *ExecQueue) loop() {
for f := q.waitNext(false); f != nil; f = q.waitNext(true) {
f()
}
close(q.closeWait)
}

func (q *execQueue) waitNext(drop bool) (f func()) {
func (q *ExecQueue) waitNext(drop bool) (f func()) {
q.mu.Lock()
if drop && len(q.funcs) > 0 {
// Remove the function that just executed. We do this here instead of when
Expand All @@ -60,20 +60,20 @@ func (q *execQueue) waitNext(drop bool) (f func()) {
return f
}

func (q *execQueue) isClosed() bool {
func (q *ExecQueue) isClosed() bool {
return q.closeWait != nil
}

// canQueue returns true if more function calls can be added to the execution queue.
func (q *execQueue) canQueue() bool {
// CanQueue returns true if more function calls can be added to the execution Queue.
func (q *ExecQueue) CanQueue() bool {
q.mu.Lock()
ok := !q.isClosed() && len(q.funcs) < cap(q.funcs)
q.mu.Unlock()
return ok
}

// queue adds a function call to the execution queue. Returns true if successful.
func (q *execQueue) queue(f func()) bool {
// Queue adds a function call to the execution Queue. Returns true if successful.
func (q *ExecQueue) Queue(f func()) bool {
q.mu.Lock()
ok := !q.isClosed() && len(q.funcs) < cap(q.funcs)
if ok {
Expand All @@ -84,16 +84,17 @@ func (q *execQueue) queue(f func()) bool {
return ok
}

// clear drops all queued functions
func (q *execQueue) clear() {
// Clear drops all queued functions.
func (q *ExecQueue) Clear() {
q.mu.Lock()
q.funcs = q.funcs[:0]
q.mu.Unlock()
}

// quit stops the exec queue.
// quit waits for the current execution to finish before returning.
func (q *execQueue) quit() {
// Quit stops the exec Queue.
//
// Quit waits for the current execution to finish before returning.
func (q *ExecQueue) Quit() {
q.mu.Lock()
if !q.isClosed() {
q.closeWait = make(chan struct{})
Expand Down
20 changes: 9 additions & 11 deletions les/execqueue_test.go → les/utils/exec_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,19 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package les
package utils

import (
"testing"
)
import "testing"

func TestExecQueue(t *testing.T) {
var (
N = 10000
q = newExecQueue(N)
q = NewExecQueue(N)
counter int
execd = make(chan int)
testexit = make(chan struct{})
)
defer q.quit()
defer q.Quit()
defer close(testexit)

check := func(state string, wantOK bool) {
Expand All @@ -40,11 +38,11 @@ func TestExecQueue(t *testing.T) {
case <-testexit:
}
}
if q.canQueue() != wantOK {
t.Fatalf("canQueue() == %t for %s", !wantOK, state)
if q.CanQueue() != wantOK {
t.Fatalf("CanQueue() == %t for %s", !wantOK, state)
}
if q.queue(qf) != wantOK {
t.Fatalf("canQueue() == %t for %s", !wantOK, state)
if q.Queue(qf) != wantOK {
t.Fatalf("Queue() == %t for %s", !wantOK, state)
}
}

Expand All @@ -57,6 +55,6 @@ func TestExecQueue(t *testing.T) {
t.Fatal("execution out of order")
}
}
q.quit()
q.Quit()
check("closed queue", false)
}
Loading

0 comments on commit f78ffc0

Please sign in to comment.