Skip to content

Commit

Permalink
Gossiper abstraction (autonity#855)
Browse files Browse the repository at this point in the history
* move Gossip and AskSync logic into a segregated Gossiper interface
* remove ctx from gossip, signAndBroadcast and Broadcast function signatures - it is unused
* change tendermint services from using reflection to custom constructor functions
* move tendermint services to node config
  • Loading branch information
lorenzo-dev1 authored Nov 15, 2023
1 parent 1dd6b8b commit d581810
Show file tree
Hide file tree
Showing 45 changed files with 1,075 additions and 524 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ mock-gen:
mockgen -source=consensus/consensus.go -package=consensus -destination=consensus/consensus_mock.go
mockgen -source=consensus/tendermint/core/interfaces/tendermint.go -package=interfaces -destination=consensus/tendermint/core/interfaces/tendermint_mock.go
mockgen -source=accounts/abi/bind/backend.go -package=bind -destination=accounts/abi/bind/backend_mock.go

mockgen -source=consensus/tendermint/core/interfaces/gossiper.go -package=interfaces -destination=consensus/tendermint/core/interfaces/gossiper_mock.go
mockgen -source=consensus/tendermint/core/interfaces/broadcaster.go -package=interfaces -destination=consensus/tendermint/core/interfaces/broadcaster_mock.go

lint-dead:
@./.github/tools/golangci-lint run \
Expand Down
4 changes: 2 additions & 2 deletions consensus/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ const (

// Broadcaster defines the interface to enqueue blocks to fetcher and find peer
type Broadcaster interface {
// Enqueue add a block into fetcher queue
// Enqueue adds a block into fetcher queue
Enqueue(id string, block *types.Block)
// FindPeers retrives connected peers by addresses
// FindPeers retrieves connected peers by addresses
FindPeers(map[common.Address]struct{}) map[common.Address]ethereum.Peer
}
105 changes: 19 additions & 86 deletions consensus/tendermint/backend/backend.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
package backend

import (
"context"
"crypto/ecdsa"
"errors"
"math/big"
"sync"
"time"

"github.com/autonity/autonity/accounts/abi"
"github.com/autonity/autonity/common"
"github.com/autonity/autonity/consensus"
"github.com/autonity/autonity/consensus/misc"
"github.com/autonity/autonity/consensus/tendermint/bft"
tendermintCore "github.com/autonity/autonity/consensus/tendermint/core"
"github.com/autonity/autonity/consensus/tendermint/core/interfaces"
tctypes "github.com/autonity/autonity/consensus/tendermint/core/types"
Expand All @@ -33,6 +30,8 @@ const (
fetcherID = "tendermint"
// ring buffer to be able to handle at maximum 10 rounds, 20 committee and 3 messages types
ringCapacity = 10 * 20 * 3
// while asking sync for consensus messages, if we do not find any peers we try again after 10 ms
retryPeriod = 10
)

var (
Expand Down Expand Up @@ -70,12 +69,11 @@ func New(privateKey *ecdsa.PrivateKey,
}

backend.pendingMessages.SetCapacity(ringCapacity)
core := tendermintCore.New(backend)
core := tendermintCore.New(backend, services)

backend.gossiper = NewGossiper(backend.recentMessages, backend.knownMessages, backend.address, backend.logger, backend.stopped)
if services != nil {
core.SetBroadcaster(services.Broadcaster)
core.SetPrevoter(services.Prevoter)
core.SetPrecommitter(services.Precommitter)
core.SetProposer(services.Proposer)
backend.gossiper = services.Gossiper(backend)
}
backend.core = core
return backend
Expand Down Expand Up @@ -107,8 +105,10 @@ type Backend struct {
// we save the last received p2p.messages in the ring buffer
pendingMessages ring.Ring

// event subscription for ChainHeadEvent event
// interface to enqueue blocks to fetcher and find peers
Broadcaster consensus.Broadcaster
// interface to gossip consensus messages
gossiper interfaces.Gossiper

//TODO: ARCChace is patented by IBM, so probably need to stop using it
//Update: patent has expired https://patents.google.com/patent/US7167953B2/en
Expand All @@ -133,9 +133,9 @@ func (sb *Backend) Address() common.Address {
}

// Broadcast implements tendermint.Backend.SignAndBroadcast
func (sb *Backend) Broadcast(ctx context.Context, committee types.Committee, payload []byte) error {
func (sb *Backend) Broadcast(committee types.Committee, payload []byte) error {
// send to others
sb.Gossip(ctx, committee, payload)
sb.Gossip(committee, payload)
// send to self
msg := events.MessageEvent{
Payload: payload,
Expand All @@ -149,83 +149,12 @@ func (sb *Backend) postEvent(event interface{}) {
}

func (sb *Backend) AskSync(header *types.Header) {
sb.logger.Info("Consensus liveness lost, broadcasting sync request..")

targets := make(map[common.Address]struct{})
for _, val := range header.Committee {
if val.Address != sb.Address() {
targets[val.Address] = struct{}{}
}
}

if sb.Broadcaster != nil && len(targets) > 0 {
for {
ps := sb.Broadcaster.FindPeers(targets)
// If we didn't find any peers try again in 10ms or exit if we have
// been stopped.
if len(ps) == 0 {
t := time.NewTimer(10 * time.Millisecond)
select {
case <-t.C:
continue
case <-sb.stopped:
return
}
}
count := new(big.Int)
for addr, p := range ps {
//ask to a quorum nodes to sync, 1 must then be honest and updated
if count.Cmp(bft.Quorum(header.TotalVotingPower())) >= 0 {
break
}
sb.logger.Debug("Asking sync to", "addr", addr)
go p.Send(SyncMsg, []byte{}) //nolint

member := header.CommitteeMember(addr)
if member == nil {
sb.logger.Error("could not retrieve member from address")
continue
}
count.Add(count, member.VotingPower)
}
break
}
}
sb.gossiper.AskSync(header)
}

// Broadcast implements tendermint.Backend.Gossip
func (sb *Backend) Gossip(ctx context.Context, committee types.Committee, payload []byte) {
hash := types.RLPHash(payload)
sb.knownMessages.Add(hash, true)

targets := make(map[common.Address]struct{})
for _, val := range committee {
if val.Address != sb.Address() {
targets[val.Address] = struct{}{}
}
}

if sb.Broadcaster != nil && len(targets) > 0 {
ps := sb.Broadcaster.FindPeers(targets)
for addr, p := range ps {
ms, ok := sb.recentMessages.Get(addr)
var m *lru.ARCCache
if ok {
m, _ = ms.(*lru.ARCCache)
if _, k := m.Get(hash); k {
// This peer had this event, skip it
continue
}
} else {
m, _ = lru.NewARC(inmemoryMessages)
}

m.Add(hash, true)
sb.recentMessages.Add(addr, m)

go p.Send(TendermintMsg, payload) //nolint
}
}
// Gossip implements tendermint.Backend.Gossip
func (sb *Backend) Gossip(committee types.Committee, payload []byte) {
sb.gossiper.Gossip(committee, payload)
}

// KnownMsgHash dumps the known messages in case of gossiping.
Expand All @@ -241,6 +170,10 @@ func (sb *Backend) Logger() log.Logger {
return sb.logger
}

func (sb *Backend) Gossiper() interfaces.Gossiper {
return sb.gossiper
}

// Commit implements tendermint.Backend.Commit
func (sb *Backend) Commit(proposal *types.Block, round int64, seals [][]byte) error {
h := proposal.Header()
Expand Down
52 changes: 28 additions & 24 deletions consensus/tendermint/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@ import (
"context"
"crypto/ecdsa"
"fmt"
"math"
"math/big"
"os"
"reflect"
"strings"
"sync/atomic"
"testing"
"time"

ethereum "github.com/autonity/autonity"
"github.com/autonity/autonity/accounts/abi/bind/backends"
"github.com/autonity/autonity/consensus/misc"
Expand All @@ -14,15 +23,8 @@ import (
"github.com/autonity/autonity/consensus/tendermint/core/message"
"github.com/autonity/autonity/event"
"github.com/autonity/autonity/p2p/enode"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"math"
"math/big"
"os"
"reflect"
"strings"
"sync/atomic"
"testing"
"time"

lru "github.com/hashicorp/golang-lru"

Expand Down Expand Up @@ -60,15 +62,16 @@ func TestAskSync(t *testing.T) {
for _, p := range addresses {
m[p] = struct{}{}
}
recentMessages, err := lru.NewARC(inmemoryMessages)
require.NoError(t, err)
knownMessages, err := lru.NewARC(inmemoryMessages)
if err != nil {
t.Fatalf("Expected <nil>, got %v", err)
}
require.NoError(t, err)

broadcaster := consensus.NewMockBroadcaster(ctrl)
broadcaster.EXPECT().FindPeers(m).Return(peers)
b := &Backend{
knownMessages: knownMessages,
gossiper: NewGossiper(recentMessages, knownMessages, common.Address{}, log.New(), make(chan struct{})),
logger: log.New("backend", "test", "id", 0),
}
b.SetBroadcaster(broadcaster)
Expand All @@ -86,10 +89,9 @@ func TestGossip(t *testing.T) {
header := newTestHeader(5)
validators := header.Committee
payload, err := rlp.EncodeToBytes([]byte("data"))
require.NoError(t, err)
hash := types.RLPHash(payload)
if err != nil {
t.Fatalf("Expected <nil>, got %v", err)
}

addresses := make([]common.Address, 0, len(validators))
peers := make(map[common.Address]ethereum.Peer)
counter := uint64(0)
Expand Down Expand Up @@ -119,26 +121,22 @@ func TestGossip(t *testing.T) {
broadcaster.EXPECT().FindPeers(m).Return(peers)

knownMessages, err := lru.NewARC(inmemoryMessages)
if err != nil {
t.Fatalf("Expected <nil>, got %v", err)
}
require.NoError(t, err)
recentMessages, err := lru.NewARC(inmemoryMessages)
if err != nil {
t.Fatalf("Expected <nil>, got %v", err)
}
require.NoError(t, err)
address3Cache, err := lru.NewARC(inmemoryMessages)
if err != nil {
t.Fatalf("Expected <nil>, got %v", err)
}
require.NoError(t, err)

address3Cache.Add(hash, true)
recentMessages.Add(addresses[3], address3Cache)
b := &Backend{
knownMessages: knownMessages,
recentMessages: recentMessages,
gossiper: NewGossiper(recentMessages, knownMessages, common.Address{}, log.New(), make(chan struct{})),
}
b.SetBroadcaster(broadcaster)

b.Gossip(context.Background(), validators, payload)
b.Gossip(validators, payload)
<-time.NewTimer(2 * time.Second).C
if atomic.LoadUint64(&counter) != 4 {
t.Fatalf("gossip message transmission failure")
Expand Down Expand Up @@ -360,8 +358,11 @@ func TestCommit(t *testing.T) {
broadcaster := consensus.NewMockBroadcaster(ctrl)
broadcaster.EXPECT().Enqueue(fetcherID, gomock.Any())

gossiper := interfaces.NewMockGossiper(ctrl)
gossiper.EXPECT().SetBroadcaster(broadcaster).Times(1)
b := &Backend{
Broadcaster: broadcaster,
gossiper: gossiper,
logger: log.New("backend", "test", "id", 0),
}
b.SetBroadcaster(broadcaster)
Expand Down Expand Up @@ -412,8 +413,11 @@ func TestSyncPeer(t *testing.T) {
tendermintC := interfaces.NewMockTendermint(ctrl)
tendermintC.EXPECT().CurrentHeightMessages().Return(messages)

gossiper := interfaces.NewMockGossiper(ctrl)
gossiper.EXPECT().SetBroadcaster(broadcaster).Times(1)
b := &Backend{
logger: log.New("backend", "test", "id", 0),
gossiper: gossiper,
recentMessages: recentMessages,
core: tendermintC,
}
Expand Down
Loading

0 comments on commit d581810

Please sign in to comment.