Skip to content

Commit

Permalink
cmd, discovery, server: allow B to set MaxBroadcastPrice to filter ac…
Browse files Browse the repository at this point in the history
…ceptable O's stored in DBcache
  • Loading branch information
kyriediculous committed Jun 25, 2019
1 parent 7808c60 commit aa92e9d
Show file tree
Hide file tree
Showing 10 changed files with 485 additions and 32 deletions.
22 changes: 17 additions & 5 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,11 @@ func main() {
initializeRound := flag.Bool("initializeRound", false, "Set to true if running as a transcoder and the node should automatically initialize new rounds")
ticketEV := flag.String("ticketEV", "1000000000", "The expected value for PM tickets")

// Price Info
// Orchestrator base pricing info
pricePerUnit := flag.Int("pricePerUnit", 0, "The price per 'pixelsPerUnit' amount pixels")
// Broadcaster max acceptable price
maxPricePerUnit := flag.Int("maxPricePerUnit", 0, "The maximum transcoding price (in wei) per 'pixelsPerUnit' a broadcaster is willing to accept. If not set explicitly, broadcaster is willing to accept ANY price")
// Unit of pixels for both O's basePriceInfo and B's MaxBroadcastPrice
pixelsPerUnit := flag.Int("pixelsPerUnit", 1, "Amount of pixels per unit. Set to '> 1' to have smaller price granularity than 1 wei / pixel")

// Metrics & logging:
Expand Down Expand Up @@ -321,13 +324,11 @@ func main() {
// Set price per pixel base info
if *pixelsPerUnit <= 0 {
// Can't divide by 0
glog.Fatalf("The amount of pixels per unit must be greater than 0, provided %d instead\n", *pixelsPerUnit)
return
panic(fmt.Errorf("The amount of pixels per unit must be greater than 0, provided %d instead\n", *pixelsPerUnit))
}
if *pricePerUnit <= 0 {
// Prevent orchestrator from unknowingly provide free transcoding
glog.Fatalf("Price per unit of pixels must be greater than 0, provided %d instead\n", *pricePerUnit)
return
panic(fmt.Errorf("Price per unit of pixels must be greater than 0, provided %d instead\n", *pricePerUnit))
}
n.PriceInfo = big.NewRat(int64(*pricePerUnit), int64(*pixelsPerUnit))
glog.Infof("Price: %d wei for %d pixels\n ", *pricePerUnit, *pixelsPerUnit)
Expand Down Expand Up @@ -387,6 +388,17 @@ func main() {
// TODO: Initialize Sender with an implementation
// of RoundsManager that reads from a cache
n.Sender = pm.NewSender(n.Eth, n.Eth)

if *pixelsPerUnit <= 0 {
// Can't divide by 0
panic(fmt.Errorf("The amount of pixels per unit must be greater than 0, provided %d instead\n", *pixelsPerUnit))
}
if *maxPricePerUnit > 0 {
server.BroadcastCfg.SetMaxPrice(big.NewRat(int64(*maxPricePerUnit), int64(*pixelsPerUnit)))
} else {
glog.Infof("Maximum transcoding price per pixel is not greater than 0: %v, broadcaster is currently set to accept ANY price.\n", *maxPricePerUnit)
glog.Infoln("To update the broadcaster's maximum acceptable transcoding price per pixel, use the CLI or restart the broadcaster with the appropriate 'maxPricePerUnit' and 'pixelsPerUnit' values")
}
}

// Start services
Expand Down
13 changes: 10 additions & 3 deletions cmd/livepeer_cli/wizard_broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,14 @@ func (w *wizard) allTranscodingOptions() map[int]string {
}

func (w *wizard) setBroadcastConfig() {
fmt.Printf("Enter broadcast max price per segment in Wei - ")
maxPricePerSegment := w.readBigInt()
fmt.Printf("Enter a maximum transcoding price per pixel, in wei per pixels (pricePerUnit / pixelsPerUnit).\n")
fmt.Printf("eg. 1 wei / 10 pixels = 0,1 wei per pixel \n")
fmt.Printf("\n")
fmt.Printf("Enter amount of pixels that make up a single unit (default: 1 pixel) - ")
pixelsPerUnit := w.readDefaultInt(1)
fmt.Printf("\n")
fmt.Printf("Enter the maximum price to pay for %d pixels in Wei (required) - ", pixelsPerUnit)
maxPricePerUnit := w.readDefaultInt(0)

opts := w.allTranscodingOptions()
if opts == nil {
Expand All @@ -69,7 +75,8 @@ func (w *wizard) setBroadcastConfig() {
}

val := url.Values{
"maxPricePerSegment": {fmt.Sprintf("%v", maxPricePerSegment.String())},
"pixelsPerUnit": {fmt.Sprintf("%v", strconv.Itoa(pixelsPerUnit))},
"maxPricePerUnit": {fmt.Sprintf("%v", strconv.Itoa(maxPricePerUnit))},
"transcodingOptions": {fmt.Sprintf("%v", transOpts)},
}

Expand Down
6 changes: 3 additions & 3 deletions cmd/livepeer_cli/wizard_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func (w *wizard) getEthBalance() string {
return e
}

func (w *wizard) getBroadcastConfig() (*big.Int, string) {
func (w *wizard) getBroadcastConfig() (*big.Rat, string) {
resp, err := http.Get(fmt.Sprintf("http://%v:%v/getBroadcastConfig", w.host, w.httpPort))
if err != nil {
glog.Errorf("Error getting broadcast config: %v", err)
Expand All @@ -375,7 +375,7 @@ func (w *wizard) getBroadcastConfig() (*big.Int, string) {
}

var config struct {
MaxPricePerSegment *big.Int
MaxPrice *big.Rat
TranscodingOptions string
}
err = json.Unmarshal(result, &config)
Expand All @@ -384,7 +384,7 @@ func (w *wizard) getBroadcastConfig() (*big.Int, string) {
return nil, ""
}

return config.MaxPricePerSegment, config.TranscodingOptions
return config.MaxPrice, config.TranscodingOptions
}

func (w *wizard) getOrchestratorInfo() (lpTypes.Transcoder, error) {
Expand Down
12 changes: 11 additions & 1 deletion discovery/db_discovery.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package discovery

import (
"math/big"
"net/url"
"time"

"github.com/livepeer/go-livepeer/common"
"github.com/livepeer/go-livepeer/core"
lpTypes "github.com/livepeer/go-livepeer/eth/types"
"github.com/livepeer/go-livepeer/net"
"github.com/livepeer/go-livepeer/server"

"github.com/golang/glog"
)
Expand Down Expand Up @@ -69,7 +71,15 @@ func (dbo *DBOrchestratorPoolCache) GetOrchestrators(numOrchestrators int) ([]*n
uris = append(uris, uri)
}

orchPool := NewOrchestratorPool(dbo.node, uris)
pred := func(info *net.OrchestratorInfo) bool {
price := server.BroadcastCfg.MaxPrice()
if price != nil {
return big.NewRat(info.PriceInfo.PricePerUnit, info.PriceInfo.PixelsPerUnit).Cmp(price) <= 0
}
return true
}

orchPool := NewOrchestratorPoolWithPred(dbo.node, uris, pred)

orchInfos, err := orchPool.GetOrchestrators(numOrchestrators)
if err != nil || len(orchInfos) <= 0 {
Expand Down
14 changes: 13 additions & 1 deletion discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ var serverGetOrchInfo = server.GetOrchestratorInfo
type orchestratorPool struct {
uris []*url.URL
bcast server.Broadcaster
pred func(info *net.OrchestratorInfo) bool
}

var perm = func(len int) []int { return rand.Perm(len) }
Expand Down Expand Up @@ -57,6 +58,17 @@ func NewOrchestratorPool(node *core.LivepeerNode, addresses []string) *orchestra
return &orchestratorPool{bcast: bcast, uris: randomizedUris}
}

func NewOrchestratorPoolWithPred(node *core.LivepeerNode, addresses []string, pred func(*net.OrchestratorInfo) bool) *orchestratorPool {
// if livepeer running in offchain mode, return nil
if node.Eth == nil {
glog.Error("Could not refresh DB list of orchestrators: LivepeerNode nil")
return nil
}
pool := NewOrchestratorPool(node, addresses)
pool.pred = pred
return pool
}

func NewOnchainOrchestratorPool(node *core.LivepeerNode) *orchestratorPool {
// if livepeer running in offchain mode, return nil
if node.Eth == nil {
Expand Down Expand Up @@ -97,7 +109,7 @@ func (o *orchestratorPool) GetOrchestrators(numOrchestrators int) ([]*net.Orches
respLock.Lock()
defer respLock.Unlock()
numResp++
if err == nil {
if err == nil && (o.pred == nil || o.pred(info)) {
orchInfos = append(orchInfos, info)
numSuccessResp++
} else if monitor.Enabled {
Expand Down
Loading

0 comments on commit aa92e9d

Please sign in to comment.