Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/skip waiting 100 sign with lock free message validation #4740

Draft
wants to merge 9 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Lock free consensus methods.
  • Loading branch information
Frozen committed Aug 19, 2024
commit 989eca1e0b1d6bc373add2390d439a31c7f03128
1 change: 0 additions & 1 deletion cmd/harmony/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,6 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi
Msg("Init Blockchain")

currentNode.Consensus.Registry().SetNodeConfig(currentNode.NodeConfig)
currentConsensus.PostConsensusJob = currentNode.PostConsensusProcessing
// update consensus information based on the blockchain
currentConsensus.SetMode(currentConsensus.UpdateConsensusInformation())
currentConsensus.NextBlockDue = time.Now()
Expand Down
4 changes: 2 additions & 2 deletions consensus/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (consensus *Consensus) onAnnounceSanityChecks(recvMsg *FBFTMessage) bool {
Str("logMsgSenderKey", logMsgs[0].SenderPubkeys[0].Bytes.Hex()).
Str("logMsgBlockHash", logMsgs[0].BlockHash.Hex()).
Str("recvMsg", recvMsg.String()).
Str("LeaderKey", consensus.LeaderPubKey.Bytes.Hex()).
Str("LeaderKey", consensus.getLeaderPubKey().Bytes.Hex()).
Msg("[OnAnnounce] Leader is malicious")
if consensus.isViewChangingMode() {
consensus.getLogger().Debug().Msg(
Expand All @@ -96,7 +96,7 @@ func (consensus *Consensus) onAnnounceSanityChecks(recvMsg *FBFTMessage) bool {
}
}
consensus.getLogger().Debug().
Str("leaderKey", consensus.LeaderPubKey.Bytes.Hex()).
Str("leaderKey", consensus.getLeaderPubKey().Bytes.Hex()).
Msg("[OnAnnounce] Announce message received again")
}
return consensus.isRightBlockNumCheck(recvMsg)
Expand Down
14 changes: 6 additions & 8 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/harmony-one/abool"
bls_core "github.com/harmony-one/bls/ffi/go/bls"
Expand Down Expand Up @@ -84,7 +85,7 @@ type Consensus struct {
// private/public keys of current node
priKey multibls.PrivateKeys
// the publickey of leader
LeaderPubKey *bls.PublicKeyWrapper
leaderPubKey unsafe.Pointer //*bls.PublicKeyWrapper
// blockNum: the next blockNumber that FBFT is going to agree on,
// should be equal to the blockNumber of next block
blockNum uint64
Expand All @@ -104,9 +105,6 @@ type Consensus struct {
readySignal chan Proposal
// Channel to send full commit signatures to finish new block proposal
commitSigChannel chan []byte
// The post-consensus job func passed from Node object
// Called when consensus on a new block is done
PostConsensusJob func(*types.Block) error
// verified block to state sync broadcast
VerifiedNewBlock chan *types.Block
// Channel for DRG protocol to send pRnd (preimage of randomness resulting from combined vrf
Expand Down Expand Up @@ -230,7 +228,7 @@ func (consensus *Consensus) GetLeaderPubKey() *bls_cosi.PublicKeyWrapper {
}

func (consensus *Consensus) getLeaderPubKey() *bls_cosi.PublicKeyWrapper {
return consensus.LeaderPubKey
return (*bls_cosi.PublicKeyWrapper)(atomic.LoadPointer(&consensus.leaderPubKey))
}

func (consensus *Consensus) SetLeaderPubKey(pub *bls_cosi.PublicKeyWrapper) {
Expand All @@ -240,7 +238,7 @@ func (consensus *Consensus) SetLeaderPubKey(pub *bls_cosi.PublicKeyWrapper) {
}

func (consensus *Consensus) setLeaderPubKey(pub *bls_cosi.PublicKeyWrapper) {
consensus.LeaderPubKey = pub
atomic.StorePointer(&consensus.leaderPubKey, unsafe.Pointer(pub))
}

func (consensus *Consensus) GetPrivateKeys() multibls.PrivateKeys {
Expand All @@ -259,7 +257,7 @@ func (consensus *Consensus) getLeaderPrivateKey(leaderKey *bls_core.PublicKey) (

// getConsensusLeaderPrivateKey returns consensus leader private key if node is the leader
func (consensus *Consensus) getConsensusLeaderPrivateKey() (*bls.PrivateKeyWrapper, error) {
return consensus.getLeaderPrivateKey(consensus.LeaderPubKey.Object)
return consensus.getLeaderPrivateKey(consensus.getLeaderPubKey().Object)
}

func (consensus *Consensus) IsBackup() bool {
Expand Down Expand Up @@ -287,7 +285,7 @@ func New(
ShardID: shard,
fBFTLog: NewFBFTLog(),
phase: FBFTAnnounce,
current: State{mode: Normal},
current: NewState(Normal),
decider: Decider,
registry: registry,
MinPeers: minPeers,
Expand Down
10 changes: 2 additions & 8 deletions consensus/consensus_msg_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,7 @@ func (sender *MessageSender) SendWithRetry(blockNum uint64, msgType msg_pb.Messa
sender.Retry(&msgRetry)
}()
}
// MessageSender lays inside consensus, but internally calls consensus public api.
// Tt would be deadlock if run in current thread.
go sender.host.SendMessageToGroups(groups, p2pMsg)
return nil
return sender.host.SendMessageToGroups(groups, p2pMsg)
}

// DelayedSendWithRetry is similar to SendWithRetry but without the initial message sending but only retries.
Expand All @@ -89,10 +86,7 @@ func (sender *MessageSender) DelayedSendWithRetry(blockNum uint64, msgType msg_p

// SendWithoutRetry sends message without retry logic.
func (sender *MessageSender) SendWithoutRetry(groups []nodeconfig.GroupID, p2pMsg []byte) error {
// MessageSender lays inside consensus, but internally calls consensus public api.
// It would be deadlock if run in current thread.
go sender.host.SendMessageToGroups(groups, p2pMsg)
return nil
return sender.host.SendMessageToGroups(groups, p2pMsg)
}

// Retry will retry the consensus message for <RetryTimes> times.
Expand Down
33 changes: 14 additions & 19 deletions consensus/consensus_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ func (consensus *Consensus) updatePublicKeys(pubKeys, allowlist []bls_cosi.Publi

allKeys := consensus.decider.Participants()
if len(allKeys) != 0 {
consensus.LeaderPubKey = &allKeys[0]
consensus.setLeaderPubKey(&allKeys[0])
consensus.getLogger().Info().
Str("info", consensus.LeaderPubKey.Bytes.Hex()).Msg("Setting leader as first validator, because provided new keys")
Str("info", consensus.getLeaderPubKey().Bytes.Hex()).Msg("Setting leader as first validator, because provided new keys")
} else {
consensus.getLogger().Error().
Msg("[UpdatePublicKeys] Participants is empty")
Expand Down Expand Up @@ -172,8 +172,6 @@ func (consensus *Consensus) resetState() {

// IsValidatorInCommittee returns whether the given validator BLS address is part of my committee
func (consensus *Consensus) IsValidatorInCommittee(pubKey bls.SerializedPublicKey) bool {
consensus.mutex.RLock()
defer consensus.mutex.RUnlock()
return consensus.isValidatorInCommittee(pubKey)
}

Expand Down Expand Up @@ -207,9 +205,8 @@ func (consensus *Consensus) SetIsBackup(isBackup bool) {
}

// Mode returns the mode of consensus
// Method is thread safe.
func (consensus *Consensus) Mode() Mode {
consensus.mutex.RLock()
defer consensus.mutex.RUnlock()
return consensus.mode()
}

Expand Down Expand Up @@ -239,11 +236,11 @@ func (consensus *Consensus) checkViewID(msg *FBFTMessage) error {
if !msg.HasSingleSender() {
return errors.New("Leader message can not have multiple sender keys")
}
consensus.LeaderPubKey = msg.SenderPubkeys[0]
consensus.setLeaderPubKey(msg.SenderPubkeys[0])
consensus.IgnoreViewIDCheck.UnSet()
consensus.consensusTimeout[timeoutConsensus].Start()
consensus.getLogger().Info().
Str("leaderKey", consensus.LeaderPubKey.Bytes.Hex()).
Str("leaderKey", consensus.getLeaderPubKey().Bytes.Hex()).
Msg("[checkViewID] Start consensus timer")
return nil
} else if msg.ViewID > consensus.getCurBlockViewID() {
Expand Down Expand Up @@ -399,7 +396,7 @@ func (consensus *Consensus) updateConsensusInformation() Mode {
}

// update public keys in the committee
oldLeader := consensus.LeaderPubKey
oldLeader := consensus.getLeaderPubKey()
pubKeys, _ := committeeToSet.BLSPublicKeys()

consensus.getLogger().Info().
Expand Down Expand Up @@ -436,7 +433,7 @@ func (consensus *Consensus) updateConsensusInformation() Mode {
consensus.getLogger().Info().
Str("leaderPubKey", leaderPubKey.Bytes.Hex()).
Msgf("[UpdateConsensusInformation] Most Recent LeaderPubKey Updated Based on BlockChain, blocknum: %d", curHeader.NumberU64())
consensus.LeaderPubKey = leaderPubKey
consensus.setLeaderPubKey(leaderPubKey)
}
}

Expand All @@ -453,8 +450,8 @@ func (consensus *Consensus) updateConsensusInformation() Mode {
}

// If the leader changed and I myself become the leader
if (oldLeader != nil && consensus.LeaderPubKey != nil &&
!consensus.LeaderPubKey.Object.IsEqual(oldLeader.Object)) && consensus.isLeader() {
if (oldLeader != nil && consensus.getLeaderPubKey() != nil &&
!consensus.getLeaderPubKey().Object.IsEqual(oldLeader.Object)) && consensus.isLeader() {
go func() {
consensus.GetLogger().Info().
Str("myKey", myPubKeys.SerializeToHexStr()).
Expand All @@ -474,20 +471,19 @@ func (consensus *Consensus) updateConsensusInformation() Mode {

// IsLeader check if the node is a leader or not by comparing the public key of
// the node with the leader public key
// Method is thread safe
func (consensus *Consensus) IsLeader() bool {
consensus.mutex.RLock()
defer consensus.mutex.RUnlock()

return consensus.isLeader()
}

// isLeader check if the node is a leader or not by comparing the public key of
// the node with the leader public key. This function assume it runs under lock.
func (consensus *Consensus) isLeader() bool {
if consensus.LeaderPubKey == nil {
pub := consensus.getLeaderPubKey()
if pub == nil {
return false
}
obj := consensus.LeaderPubKey.Object
obj := pub.Object
for _, key := range consensus.priKey {
if key.Pub.Object.IsEqual(obj) {
return true
Expand Down Expand Up @@ -624,12 +620,11 @@ func (consensus *Consensus) selfCommit(payload []byte) error {
}

// NumSignaturesIncludedInBlock returns the number of signatures included in the block
// Method is thread safe.
func (consensus *Consensus) NumSignaturesIncludedInBlock(block *types.Block) uint32 {
count := uint32(0)
consensus.mutex.Lock()
members := consensus.decider.Participants()
pubKeys := consensus.getPublicKeys()
consensus.mutex.Unlock()

// TODO(audit): do not reconstruct the Mask
mask := bls.NewMask(members)
Expand Down
11 changes: 4 additions & 7 deletions consensus/consensus_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,9 @@ const (
CommitSigSenderTimeout = 10 * time.Second
)

// IsViewChangingMode return true if curernt mode is viewchanging
// IsViewChangingMode return true if current mode is viewchanging.
// Method is thread safe.
func (consensus *Consensus) IsViewChangingMode() bool {
consensus.mutex.RLock()
defer consensus.mutex.RUnlock()
return consensus.isViewChangingMode()
}

Expand All @@ -68,7 +67,7 @@ func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, peer libp2p
// Do easier check before signature check
if msg.Type == msg_pb.MessageType_ANNOUNCE || msg.Type == msg_pb.MessageType_PREPARED || msg.Type == msg_pb.MessageType_COMMITTED {
// Only validator needs to check whether the message is from the correct leader
if !bytes.Equal(senderKey[:], consensus.LeaderPubKey.Bytes[:]) &&
if !bytes.Equal(senderKey[:], consensus.getLeaderPubKey().Bytes[:]) &&
consensus.current.Mode() == Normal && !consensus.IgnoreViewIDCheck.IsSet() {
return errSenderPubKeyNotLeader
}
Expand Down Expand Up @@ -666,9 +665,7 @@ func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMess
}

consensus.FinishFinalityCount()
go func() {
consensus.PostConsensusJob(blk)
}()
consensus.PostConsensusProcessing(blk)
consensus.setupForNewConsensus(blk, committedMsg)
utils.Logger().Info().Uint64("blockNum", blk.NumberU64()).
Str("hash", blk.Header().Hash().Hex()).
Expand Down
12 changes: 4 additions & 8 deletions consensus/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,12 @@ func (consensus *Consensus) getConsensusPhase() string {

// GetConsensusMode returns the current mode of the consensus
func (consensus *Consensus) GetConsensusMode() string {
consensus.mutex.RLock()
defer consensus.mutex.RUnlock()
return consensus.current.mode.String()
return consensus.current.Mode().String()
}

// GetCurBlockViewID returns the current view ID of the consensus
// Method is thread safe.
func (consensus *Consensus) GetCurBlockViewID() uint64 {
consensus.mutex.RLock()
defer consensus.mutex.RUnlock()
return consensus.getCurBlockViewID()
}

Expand All @@ -31,10 +28,9 @@ func (consensus *Consensus) getCurBlockViewID() uint64 {
return consensus.current.GetCurBlockViewID()
}

// GetViewChangingID returns the current view changing ID of the consensus
// GetViewChangingID returns the current view changing ID of the consensus.
// Method is thread safe.
func (consensus *Consensus) GetViewChangingID() uint64 {
consensus.mutex.RLock()
defer consensus.mutex.RUnlock()
return consensus.current.GetViewChangingID()
}

Expand Down
2 changes: 1 addition & 1 deletion consensus/double_sign.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (consensus *Consensus) checkDoubleSign(recvMsg *FBFTMessage) bool {
break
}

leaderAddr, err := subComm.AddressForBLSKey(consensus.LeaderPubKey.Bytes)
leaderAddr, err := subComm.AddressForBLSKey(consensus.getLeaderPubKey().Bytes)
if err != nil {
consensus.getLogger().Err(err).Str("msg", recvMsg.String()).
Msg("could not find address for leader bls key")
Expand Down
2 changes: 1 addition & 1 deletion consensus/enums.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package consensus
import "fmt"

// Mode is the current
type Mode byte
type Mode uint32

const (
// Normal ..
Expand Down
Loading