Skip to content

Commit

Permalink
swarm, p2p/protocols: Stream accounting (ethereum#18337)
Browse files Browse the repository at this point in the history
* swarm: completed 1st phase of swap accounting

* swarm, p2p/protocols: added stream pricing

* swarm/network/stream: gofmt simplify stream.go

* swarm: fixed review comments

* swarm: used snapshots for swap tests

* swarm: custom retrieve for swap (less cascaded requests at any one time)

* swarm: addressed PR comments

* swarm: log output formatting

* swarm: removed parallelism in swap tests

* swarm: swap tests simplification

* swarm: removed swap_test.go

* swarm/network/stream: added prefix space for comments

* swarm/network/stream: unit test for prices

* swarm/network/stream: don't hardcode price

* swarm/network/stream: fixed invalid price check
  • Loading branch information
holisticode authored and zelig committed Jan 7, 2019
1 parent 56a3f6c commit ae857e7
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 104 deletions.
148 changes: 74 additions & 74 deletions p2p/protocols/accounting.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,33 @@ import (
"github.com/ethereum/go-ethereum/metrics"
)

//define some metrics
// define some metrics
var (
//All metrics are cumulative
// All metrics are cumulative

//total amount of units credited
// total amount of units credited
mBalanceCredit metrics.Counter
//total amount of units debited
// total amount of units debited
mBalanceDebit metrics.Counter
//total amount of bytes credited
// total amount of bytes credited
mBytesCredit metrics.Counter
//total amount of bytes debited
// total amount of bytes debited
mBytesDebit metrics.Counter
//total amount of credited messages
// total amount of credited messages
mMsgCredit metrics.Counter
//total amount of debited messages
// total amount of debited messages
mMsgDebit metrics.Counter
//how many times local node had to drop remote peers
// how many times local node had to drop remote peers
mPeerDrops metrics.Counter
//how many times local node overdrafted and dropped
// how many times local node overdrafted and dropped
mSelfDrops metrics.Counter

MetricsRegistry metrics.Registry
)

//Prices defines how prices are being passed on to the accounting instance
// Prices defines how prices are being passed on to the accounting instance
type Prices interface {
//Return the Price for a message
// Return the Price for a message
Price(interface{}) *Price
}

Expand All @@ -57,20 +59,20 @@ const (
Receiver = Payer(false)
)

//Price represents the costs of a message
// Price represents the costs of a message
type Price struct {
Value uint64 //
PerByte bool //True if the price is per byte or for unit
Value uint64
PerByte bool // True if the price is per byte or for unit
Payer Payer
}

//For gives back the price for a message
//A protocol provides the message price in absolute value
//This method then returns the correct signed amount,
//depending on who pays, which is identified by the `payer` argument:
//`Send` will pass a `Sender` payer, `Receive` will pass the `Receiver` argument.
//Thus: If Sending and sender pays, amount positive, otherwise negative
//If Receiving, and receiver pays, amount positive, otherwise negative
// For gives back the price for a message
// A protocol provides the message price in absolute value
// This method then returns the correct signed amount,
// depending on who pays, which is identified by the `payer` argument:
// `Send` will pass a `Sender` payer, `Receive` will pass the `Receiver` argument.
// Thus: If Sending and sender pays, amount positive, otherwise negative
// If Receiving, and receiver pays, amount positive, otherwise negative
func (p *Price) For(payer Payer, size uint32) int64 {
price := p.Value
if p.PerByte {
Expand All @@ -82,22 +84,22 @@ func (p *Price) For(payer Payer, size uint32) int64 {
return int64(price)
}

//Balance is the actual accounting instance
//Balance defines the operations needed for accounting
//Implementations internally maintain the balance for every peer
// Balance is the actual accounting instance
// Balance defines the operations needed for accounting
// Implementations internally maintain the balance for every peer
type Balance interface {
//Adds amount to the local balance with remote node `peer`;
//positive amount = credit local node
//negative amount = debit local node
// Adds amount to the local balance with remote node `peer`;
// positive amount = credit local node
// negative amount = debit local node
Add(amount int64, peer *Peer) error
}

//Accounting implements the Hook interface
//It interfaces to the balances through the Balance interface,
//while interfacing with protocols and its prices through the Prices interface
// Accounting implements the Hook interface
// It interfaces to the balances through the Balance interface,
// while interfacing with protocols and its prices through the Prices interface
type Accounting struct {
Balance //interface to accounting logic
Prices //interface to prices logic
Balance // interface to accounting logic
Prices // interface to prices logic
}

func NewAccounting(balance Balance, po Prices) *Accounting {
Expand All @@ -108,87 +110,85 @@ func NewAccounting(balance Balance, po Prices) *Accounting {
return ah
}

//SetupAccountingMetrics creates a separate registry for p2p accounting metrics;
//this registry should be independent of any other metrics as it persists at different endpoints.
//It also instantiates the given metrics and starts the persisting go-routine which
//at the passed interval writes the metrics to a LevelDB
// SetupAccountingMetrics creates a separate registry for p2p accounting metrics;
// this registry should be independent of any other metrics as it persists at different endpoints.
// It also instantiates the given metrics and starts the persisting go-routine which
// at the passed interval writes the metrics to a LevelDB
func SetupAccountingMetrics(reportInterval time.Duration, path string) *AccountingMetrics {
//create an empty registry
registry := metrics.NewRegistry()
//instantiate the metrics
mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", registry)
mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", registry)
mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", registry)
mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", registry)
mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", registry)
mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", registry)
mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", registry)
mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", registry)
//create the DB and start persisting
return NewAccountingMetrics(registry, reportInterval, path)
// create an empty registry
MetricsRegistry = metrics.NewRegistry()
// instantiate the metrics
mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", MetricsRegistry)
mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", MetricsRegistry)
mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", MetricsRegistry)
mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", MetricsRegistry)
mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", MetricsRegistry)
mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", MetricsRegistry)
mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", MetricsRegistry)
mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", MetricsRegistry)
// create the DB and start persisting
return NewAccountingMetrics(MetricsRegistry, reportInterval, path)
}

//Implement Hook.Send
// Send takes a peer, a size and a msg and
// - calculates the cost for the local node sending a msg of size to peer using the Prices interface
// - credits/debits local node using balance interface
// - calculates the cost for the local node sending a msg of size to peer using the Prices interface
// - credits/debits local node using balance interface
func (ah *Accounting) Send(peer *Peer, size uint32, msg interface{}) error {
//get the price for a message (through the protocol spec)
// get the price for a message (through the protocol spec)
price := ah.Price(msg)
//this message doesn't need accounting
// this message doesn't need accounting
if price == nil {
return nil
}
//evaluate the price for sending messages
// evaluate the price for sending messages
costToLocalNode := price.For(Sender, size)
//do the accounting
// do the accounting
err := ah.Add(costToLocalNode, peer)
//record metrics: just increase counters for user-facing metrics
// record metrics: just increase counters for user-facing metrics
ah.doMetrics(costToLocalNode, size, err)
return err
}

//Implement Hook.Receive
// Receive takes a peer, a size and a msg and
// - calculates the cost for the local node receiving a msg of size from peer using the Prices interface
// - credits/debits local node using balance interface
// - calculates the cost for the local node receiving a msg of size from peer using the Prices interface
// - credits/debits local node using balance interface
func (ah *Accounting) Receive(peer *Peer, size uint32, msg interface{}) error {
//get the price for a message (through the protocol spec)
// get the price for a message (through the protocol spec)
price := ah.Price(msg)
//this message doesn't need accounting
// this message doesn't need accounting
if price == nil {
return nil
}
//evaluate the price for receiving messages
// evaluate the price for receiving messages
costToLocalNode := price.For(Receiver, size)
//do the accounting
// do the accounting
err := ah.Add(costToLocalNode, peer)
//record metrics: just increase counters for user-facing metrics
// record metrics: just increase counters for user-facing metrics
ah.doMetrics(costToLocalNode, size, err)
return err
}

//record some metrics
//this is not an error handling. `err` is returned by both `Send` and `Receive`
//`err` will only be non-nil if a limit has been violated (overdraft), in which case the peer has been dropped.
//if the limit has been violated and `err` is thus not nil:
// * if the price is positive, local node has been credited; thus `err` implicitly signals the REMOTE has been dropped
// * if the price is negative, local node has been debited, thus `err` implicitly signals LOCAL node "overdraft"
// record some metrics
// this is not an error handling. `err` is returned by both `Send` and `Receive`
// `err` will only be non-nil if a limit has been violated (overdraft), in which case the peer has been dropped.
// if the limit has been violated and `err` is thus not nil:
// * if the price is positive, local node has been credited; thus `err` implicitly signals the REMOTE has been dropped
// * if the price is negative, local node has been debited, thus `err` implicitly signals LOCAL node "overdraft"
func (ah *Accounting) doMetrics(price int64, size uint32, err error) {
if price > 0 {
mBalanceCredit.Inc(price)
mBytesCredit.Inc(int64(size))
mMsgCredit.Inc(1)
if err != nil {
//increase the number of times a remote node has been dropped due to "overdraft"
// increase the number of times a remote node has been dropped due to "overdraft"
mPeerDrops.Inc(1)
}
} else {
mBalanceDebit.Inc(price)
mBytesDebit.Inc(int64(size))
mMsgDebit.Inc(1)
if err != nil {
//increase the number of times the local node has done an "overdraft" in respect to other nodes
// increase the number of times the local node has done an "overdraft" in respect to other nodes
mSelfDrops.Inc(1)
}
}
Expand Down
Loading

0 comments on commit ae857e7

Please sign in to comment.