Skip to content

Commit

Permalink
Merge pull request livepeer#1291 from livepeer/nv/o-active-check2
Browse files Browse the repository at this point in the history
cmd, core, eth/watchers: O checks that it is active in ProcessPayment
  • Loading branch information
kyriediculous authored Jan 21, 2020
2 parents b4b77f9 + 467ac3d commit 42702b8
Show file tree
Hide file tree
Showing 11 changed files with 500 additions and 98 deletions.
33 changes: 24 additions & 9 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,14 +466,6 @@ func main() {
n.SetBasePrice(big.NewRat(int64(*pricePerUnit), int64(*pixelsPerUnit)))
glog.Infof("Price: %d wei for %d pixels\n ", *pricePerUnit, *pixelsPerUnit)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if err := setupOrchestrator(ctx, n, *initializeRound); err != nil {
glog.Errorf("Error setting up orchestrator: %v", err)
return
}

ev, _ := new(big.Int).SetString(*ticketEV, 10)
if ev == nil {
glog.Errorf("-ticketEV must be a valid integer, but %v provided. Restart the node with a different valid value for -ticketEV", *ticketEV)
Expand All @@ -485,6 +477,14 @@ func main() {
return
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if err := setupOrchestrator(ctx, n, *initializeRound); err != nil {
glog.Errorf("Error setting up orchestrator: %v", err)
return
}

sigVerifier := &pm.DefaultSigVerifier{}
validator := pm.NewValidator(sigVerifier, roundsWatcher)
gpm := eth.NewGasPriceMonitor(backend, blockPollingTime)
Expand Down Expand Up @@ -745,7 +745,7 @@ func main() {
return
}

orch := core.NewOrchestrator(s.LivepeerNode)
orch := core.NewOrchestrator(s.LivepeerNode, roundsWatcher)

go func() {
server.StartTranscodeServer(orch, *httpAddr, s.HTTPMux, n.WorkDir, n.TranscoderManager != nil)
Expand Down Expand Up @@ -868,6 +868,21 @@ func setupOrchestrator(ctx context.Context, n *core.LivepeerNode, initializeRoun
return err
}

// add orchestrator to DB
orch, err := n.Eth.GetTranscoder(n.Eth.Account().Address)
if err != nil {
return err
}

err = n.Database.UpdateOrch(&common.DBOrch{
EthereumAddr: n.Eth.Account().Address.Hex(),
ActivationRound: common.ToInt64(orch.ActivationRound),
DeactivationRound: common.ToInt64(orch.DeactivationRound),
})
if err != nil {
return err
}

if !active {
glog.Infof("Orchestrator %v is inactive", n.Eth.Account().Address.Hex())
} else {
Expand Down
58 changes: 58 additions & 0 deletions cmd/livepeer/livepeer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package main

import (
"context"
"errors"
"math/big"
"testing"

ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/livepeer/go-livepeer/common"
"github.com/livepeer/go-livepeer/core"
"github.com/livepeer/go-livepeer/eth"
lpTypes "github.com/livepeer/go-livepeer/eth/types"
"github.com/livepeer/go-livepeer/pm"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestSetupOrchestrator(t *testing.T) {
require := require.New(t)
assert := assert.New(t)

dbh, dbraw, err := common.TempDB(t)
require.Nil(err)

defer dbh.Close()
defer dbraw.Close()

orch := pm.RandAddress()

stubEthClient := &eth.StubClient{
Orch: &lpTypes.Transcoder{
Address: orch,
ActivationRound: big.NewInt(5),
DeactivationRound: big.NewInt(10),
},
TranscoderAddress: orch,
}

n, err := core.NewLivepeerNode(stubEthClient, "", dbh)
require.Nil(err)

err = setupOrchestrator(context.Background(), n, false)
assert.Nil(err)

orchs, err := dbh.SelectOrchs(&common.DBOrchFilter{
Addresses: []ethcommon.Address{orch},
})
assert.Nil(err)
assert.Len(orchs, 1)
assert.Equal(orchs[0].ActivationRound, int64(5))
assert.Equal(orchs[0].DeactivationRound, int64(10))

// test eth.GetTranscoder error
stubEthClient.Err = errors.New("GetTranscoder error")
err = setupOrchestrator(context.Background(), n, false)
assert.EqualError(err, "GetTranscoder error")
}
5 changes: 5 additions & 0 deletions common/types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package common

import (
"math/big"
"net/url"

ethcommon "github.com/ethereum/go-ethereum/common"
Expand All @@ -23,3 +24,7 @@ type OrchestratorStore interface {
SelectOrchs(filter *DBOrchFilter) ([]*DBOrch, error)
UpdateOrch(orch *DBOrch) error
}

type RoundsManager interface {
LastInitializedRound() *big.Int
}
11 changes: 11 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/hex"
"fmt"
"math"
"math/big"
"math/rand"
"regexp"
Expand All @@ -23,6 +24,8 @@ import (
// HTTPTimeout timeout used in HTTP connections between nodes
const HTTPTimeout = 8 * time.Second

const maxInt64 = int64(math.MaxInt64)

var (
ErrParseBigInt = fmt.Errorf("failed to parse big integer")
ErrProfile = fmt.Errorf("failed to parse profile")
Expand Down Expand Up @@ -246,3 +249,11 @@ var RandomIDGenerator = func(length uint) string {
func RandName() string {
return RandomIDGenerator(10)
}

func ToInt64(val *big.Int) int64 {
if val.Cmp(big.NewInt(maxInt64)) > 0 {
return maxInt64
}

return val.Int64()
}
8 changes: 8 additions & 0 deletions common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,3 +207,11 @@ func TestBaseTokenAmountToFixed(t *testing.T) {
assert.Nil(err)
assert.Zero(fp)
}

func TestToInt64(t *testing.T) {
// test val > math.MaxInt64 => val = math.MaxInt64
val, _ := new(big.Int).SetString("9223372036854775808", 10) // 2^63
assert.Equal(t, int64(math.MaxInt64), ToInt64(val))
// test val < math.MaxInt64
assert.Equal(t, int64(5), ToInt64(big.NewInt(5)))
}
Loading

0 comments on commit 42702b8

Please sign in to comment.