Skip to content

Commit

Permalink
les, les/lespay/client: add service value statistics and API (ethereu…
Browse files Browse the repository at this point in the history
…m#20837)

This PR adds service value measurement statistics to the light client. It
also adds a private API that makes these statistics accessible. A follow-up
PR will add the new server pool which uses these statistics to select
servers with good performance.

This document describes the function of the new components:
https://gist.github.com/zsfelfoldi/3c7ace895234b7b345ab4f71dab102d4

Co-authored-by: rjl493456442 <[email protected]>
Co-authored-by: rjl493456442 <[email protected]>
  • Loading branch information
zsfelfoldi and rjl493456442 authored Apr 9, 2020
1 parent 15540ae commit 0851646
Show file tree
Hide file tree
Showing 17 changed files with 2,144 additions and 40 deletions.
32 changes: 32 additions & 0 deletions internal/web3ext/web3ext.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var Modules = map[string]string{
"swarmfs": SwarmfsJs,
"txpool": TxpoolJs,
"les": LESJs,
"lespay": LESPayJs,
}

const ChequebookJs = `
Expand Down Expand Up @@ -856,3 +857,34 @@ web3._extend({
]
});
`

const LESPayJs = `
web3._extend({
property: 'lespay',
methods:
[
new web3._extend.Method({
name: 'distribution',
call: 'lespay_distribution',
params: 2
}),
new web3._extend.Method({
name: 'timeout',
call: 'lespay_timeout',
params: 2
}),
new web3._extend.Method({
name: 'value',
call: 'lespay_value',
params: 2
}),
],
properties:
[
new web3._extend.Property({
name: 'requestStats',
getter: 'lespay_requestStats'
}),
]
});
`
2 changes: 1 addition & 1 deletion les/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (b *benchmarkTxSend) init(h *serverHandler, count int) error {

func (b *benchmarkTxSend) request(peer *serverPeer, index int) error {
enc, _ := rlp.EncodeToBytes(types.Transactions{b.txs[index]})
return peer.sendTxs(0, enc)
return peer.sendTxs(0, 1, enc)
}

// benchmarkTxStatus implements requestBenchmark
Expand Down
50 changes: 41 additions & 9 deletions les/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package les

import (
"fmt"
"time"

"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
Expand All @@ -37,6 +38,7 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/les/checkpointoracle"
lpc "github.com/ethereum/go-ethereum/les/lespay/client"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
Expand All @@ -49,15 +51,16 @@ import (
type LightEthereum struct {
lesCommons

peers *serverPeerSet
reqDist *requestDistributor
retriever *retrieveManager
odr *LesOdr
relay *lesTxRelay
handler *clientHandler
txPool *light.TxPool
blockchain *light.LightChain
serverPool *serverPool
peers *serverPeerSet
reqDist *requestDistributor
retriever *retrieveManager
odr *LesOdr
relay *lesTxRelay
handler *clientHandler
txPool *light.TxPool
blockchain *light.LightChain
serverPool *serverPool
valueTracker *lpc.ValueTracker

bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
Expand All @@ -74,6 +77,10 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
if err != nil {
return nil, err
}
lespayDb, err := ctx.OpenDatabase("lespay", 0, 0, "eth/db/lespay")
if err != nil {
return nil, err
}
chainConfig, genesisHash, genesisErr := core.SetupGenesisBlockWithOverride(chainDb, config.Genesis,
config.OverrideIstanbul, config.OverrideMuirGlacier)
if _, isCompat := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !isCompat {
Expand All @@ -99,7 +106,9 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: eth.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations),
serverPool: newServerPool(chainDb, config.UltraLightServers),
valueTracker: lpc.NewValueTracker(lespayDb, &mclock.System{}, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000)),
}
peers.subscribe((*vtSubscription)(leth.valueTracker))
leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool)
leth.relay = newLesTxRelay(peers, leth.retriever)

Expand Down Expand Up @@ -154,6 +163,23 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
return leth, nil
}

// vtSubscription implements serverPeerSubscriber
type vtSubscription lpc.ValueTracker

// registerPeer implements serverPeerSubscriber
func (v *vtSubscription) registerPeer(p *serverPeer) {
vt := (*lpc.ValueTracker)(v)
p.setValueTracker(vt, vt.Register(p.ID()))
p.updateVtParams()
}

// unregisterPeer implements serverPeerSubscriber
func (v *vtSubscription) unregisterPeer(p *serverPeer) {
vt := (*lpc.ValueTracker)(v)
vt.Unregister(p.ID())
p.setValueTracker(nil, nil)
}

type LightDummyAPI struct{}

// Etherbase is the address that mining rewards will be send to
Expand Down Expand Up @@ -207,6 +233,11 @@ func (s *LightEthereum) APIs() []rpc.API {
Version: "1.0",
Service: NewPrivateLightAPI(&s.lesCommons),
Public: false,
}, {
Namespace: "lespay",
Version: "1.0",
Service: lpc.NewPrivateClientAPI(s.valueTracker),
Public: false,
},
}...)
}
Expand Down Expand Up @@ -266,6 +297,7 @@ func (s *LightEthereum) Stop() error {
s.engine.Close()
s.eventMux.Stop()
s.serverPool.stop()
s.valueTracker.Stop()
s.chainDb.Close()
s.wg.Wait()
log.Info("Light ethereum stopped")
Expand Down
8 changes: 8 additions & 0 deletions les/client_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error {
return errResp(ErrRequestRejected, "")
}
p.updateFlowControl(update)
p.updateVtParams()

if req.Hash != (common.Hash{}) {
if p.announceType == announceTypeNone {
Expand All @@ -205,6 +206,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
p.answeredRequest(resp.ReqID)
if h.fetcher.requestedID(resp.ReqID) {
h.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
} else {
Expand All @@ -222,6 +224,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
p.answeredRequest(resp.ReqID)
deliverMsg = &Msg{
MsgType: MsgBlockBodies,
ReqID: resp.ReqID,
Expand All @@ -237,6 +240,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
p.answeredRequest(resp.ReqID)
deliverMsg = &Msg{
MsgType: MsgCode,
ReqID: resp.ReqID,
Expand All @@ -252,6 +256,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
p.answeredRequest(resp.ReqID)
deliverMsg = &Msg{
MsgType: MsgReceipts,
ReqID: resp.ReqID,
Expand All @@ -267,6 +272,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
p.answeredRequest(resp.ReqID)
deliverMsg = &Msg{
MsgType: MsgProofsV2,
ReqID: resp.ReqID,
Expand All @@ -282,6 +288,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
p.answeredRequest(resp.ReqID)
deliverMsg = &Msg{
MsgType: MsgHelperTrieProofs,
ReqID: resp.ReqID,
Expand All @@ -297,6 +304,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
p.answeredRequest(resp.ReqID)
deliverMsg = &Msg{
MsgType: MsgTxStatus,
ReqID: resp.ReqID,
Expand Down
107 changes: 107 additions & 0 deletions les/lespay/client/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2020 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package client

import (
"time"

"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/les/utils"
"github.com/ethereum/go-ethereum/p2p/enode"
)

// PrivateClientAPI implements the lespay client side API
type PrivateClientAPI struct {
vt *ValueTracker
}

// NewPrivateClientAPI creates a PrivateClientAPI
func NewPrivateClientAPI(vt *ValueTracker) *PrivateClientAPI {
return &PrivateClientAPI{vt}
}

// parseNodeStr converts either an enode address or a plain hex node id to enode.ID
func parseNodeStr(nodeStr string) (enode.ID, error) {
if id, err := enode.ParseID(nodeStr); err == nil {
return id, nil
}
if node, err := enode.Parse(enode.ValidSchemes, nodeStr); err == nil {
return node.ID(), nil
} else {
return enode.ID{}, err
}
}

// RequestStats returns the current contents of the reference request basket, with
// request values meaning average per request rather than total.
func (api *PrivateClientAPI) RequestStats() []RequestStatsItem {
return api.vt.RequestStats()
}

// Distribution returns a distribution as a series of (X, Y) chart coordinates,
// where the X axis is the response time in seconds while the Y axis is the amount of
// service value received with a response time close to the X coordinate.
// The distribution is optionally normalized to a sum of 1.
// If nodeStr == "" then the global distribution is returned, otherwise the individual
// distribution of the specified server node.
func (api *PrivateClientAPI) Distribution(nodeStr string, normalized bool) (RtDistribution, error) {
var expFactor utils.ExpirationFactor
if !normalized {
expFactor = utils.ExpFactor(api.vt.StatsExpirer().LogOffset(mclock.Now()))
}
if nodeStr == "" {
return api.vt.RtStats().Distribution(normalized, expFactor), nil
}
if id, err := parseNodeStr(nodeStr); err == nil {
return api.vt.GetNode(id).RtStats().Distribution(normalized, expFactor), nil
} else {
return RtDistribution{}, err
}
}

// Timeout suggests a timeout value based on either the global distribution or the
// distribution of the specified node. The parameter is the desired rate of timeouts
// assuming a similar distribution in the future.
// Note that the actual timeout should have a sensible minimum bound so that operating
// under ideal working conditions for a long time (for example, using a local server
// with very low response times) will not make it very hard for the system to accommodate
// longer response times in the future.
func (api *PrivateClientAPI) Timeout(nodeStr string, failRate float64) (float64, error) {
if nodeStr == "" {
return float64(api.vt.RtStats().Timeout(failRate)) / float64(time.Second), nil
}
if id, err := parseNodeStr(nodeStr); err == nil {
return float64(api.vt.GetNode(id).RtStats().Timeout(failRate)) / float64(time.Second), nil
} else {
return 0, err
}
}

// Value calculates the total service value provided either globally or by the specified
// server node, using a weight function based on the given timeout.
func (api *PrivateClientAPI) Value(nodeStr string, timeout float64) (float64, error) {
wt := TimeoutWeights(time.Duration(timeout * float64(time.Second)))
expFactor := utils.ExpFactor(api.vt.StatsExpirer().LogOffset(mclock.Now()))
if nodeStr == "" {
return api.vt.RtStats().Value(wt, expFactor), nil
}
if id, err := parseNodeStr(nodeStr); err == nil {
return api.vt.GetNode(id).RtStats().Value(wt, expFactor), nil
} else {
return 0, err
}
}
Loading

0 comments on commit 0851646

Please sign in to comment.