Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/skip waiting 100 sign with lock free message validation #4740

Draft
wants to merge 9 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Final commit for 1s version.
  • Loading branch information
Frozen committed Aug 19, 2024
commit 2029b6c1bb746277faef6665d3b48cfbceac7e47
5 changes: 3 additions & 2 deletions api/service/legacysync/epoch_syncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func syncLoop(bc core.BlockChain, syncConfig *SyncConfig) (timeout int) {
utils.Logger().Info().
Msgf("[EPOCHSYNC] No peers to sync (isBeacon: %t, ShardID: %d, peersCount: %d)",
isBeacon, bc.ShardID(), syncConfig.PeersCount())
return 10
return 5
}

utils.Logger().Info().
Expand All @@ -120,7 +120,7 @@ func syncLoop(bc core.BlockChain, syncConfig *SyncConfig) (timeout int) {
Uint64("currentEpoch", curEpoch).
Int("peers count", syncConfig.PeersCount()).
Msg("[EPOCHSYNC] Node is now IN SYNC!")
return 60
return 2
}
if otherEpoch < curEpoch {
for _, peerCfg := range syncConfig.GetPeers() {
Expand Down Expand Up @@ -231,6 +231,7 @@ func processWithPayload(payload [][]byte, bc core.BlockChain) error {
// CreateSyncConfig creates SyncConfig for StateSync object.
func (ss *EpochSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, selfPeerID libp2p_peer.ID, waitForEachPeerToConnect bool) error {
var err error
fmt.Println("CreateEpochSync")
ss.syncConfig, err = createSyncConfig(ss.syncConfig, peers, shardID, selfPeerID, waitForEachPeerToConnect)
return err
}
151 changes: 151 additions & 0 deletions consensus/consensus_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,157 @@ func (consensus *Consensus) finalCommit(viewID uint64) {
}
}

// finalCommit uses locks, not suited to be called internally
func (consensus *Consensus) finalCommit1s(viewID uint64, nextBlockDue time.Time) {
waitTime := 0 * time.Millisecond
maxWaitTime := time.Until(nextBlockDue) - 200*time.Millisecond
if maxWaitTime > waitTime {
waitTime = maxWaitTime
}
consensus.GetLogger().Info().Str("waitTime", waitTime.String()).
Msg("[OnCommit] Starting Grace Period")
time.Sleep(waitTime)

consensus.mutex.Lock()
defer consensus.mutex.Unlock()
if viewID != consensus.getCurBlockViewID() {
return
}
numCommits := consensus.decider.SignersCount(quorum.Commit)

consensus.getLogger().Info().
Int64("NumCommits", numCommits).
Msg("[finalCommit] Finalizing Consensus")
beforeCatchupNum := consensus.getBlockNum()

leaderPriKey, err := consensus.getConsensusLeaderPrivateKey()
if err != nil {
consensus.getLogger().Error().Err(err).Msg("[finalCommit] leader not found")
return
}
// Construct committed message
network, err := consensus.construct(msg_pb.MessageType_COMMITTED, nil, []*bls.PrivateKeyWrapper{leaderPriKey})
if err != nil {
consensus.getLogger().Warn().Err(err).
Msg("[finalCommit] Unable to construct Committed message")
return
}
var (
msgToSend = network.Bytes
FBFTMsg = network.FBFTMsg
commitSigAndBitmap = FBFTMsg.Payload
)
consensus.fBFTLog.AddVerifiedMessage(FBFTMsg)
// find correct block content
curBlockHash := consensus.blockHash
block := consensus.fBFTLog.GetBlockByHash(curBlockHash)
if block == nil {
consensus.getLogger().Warn().
Str("blockHash", hex.EncodeToString(curBlockHash[:])).
Msg("[finalCommit] Cannot find block by hash")
return
}

if err := consensus.verifyLastCommitSig(commitSigAndBitmap, block); err != nil {
consensus.getLogger().Warn().Err(err).Msg("[finalCommit] failed verifying last commit sig")
return
}
consensus.getLogger().Info().Hex("new", commitSigAndBitmap).Msg("[finalCommit] Overriding commit signatures!!")

if err := consensus.Blockchain().WriteCommitSig(block.NumberU64(), commitSigAndBitmap); err != nil {
consensus.getLogger().Warn().Err(err).Msg("[finalCommit] failed writting commit sig")
}

// Send committed message before block insertion.
// if leader successfully finalizes the block, send committed message to validators
// Note: leader already sent 67% commit in preCommit. The 100% commit won't be sent immediately
// to save network traffic. It will only be sent in retry if consensus doesn't move forward.
// Or if the leader is changed for next block, the 100% committed sig will be sent to the next leader immediately.
if !consensus.isLeader() || block.IsLastBlockInEpoch() {
// send immediately
if err := consensus.msgSender.SendWithRetry(
block.NumberU64(),
msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)),
},
p2p.ConstructMessage(msgToSend)); err != nil {
consensus.getLogger().Warn().Err(err).Msg("[finalCommit] Cannot send committed message")
} else {
consensus.getLogger().Info().
Hex("blockHash", curBlockHash[:]).
Uint64("blockNum", consensus.BlockNum()).
Msg("[finalCommit] Sent Committed Message")
}
consensus.getLogger().Info().Msg("[finalCommit] Start consensus timer")
consensus.consensusTimeout[timeoutConsensus].Start()
} else {
// delayed send
consensus.msgSender.DelayedSendWithRetry(
block.NumberU64(),
msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)),
},
p2p.ConstructMessage(msgToSend))
consensus.getLogger().Info().
Hex("blockHash", curBlockHash[:]).
Uint64("blockNum", consensus.BlockNum()).
Hex("lastCommitSig", commitSigAndBitmap).
Msg("[finalCommit] Queued Committed Message")
}

block.SetCurrentCommitSig(commitSigAndBitmap)
err = consensus.commitBlock(block, FBFTMsg)

if err != nil || consensus.BlockNum()-beforeCatchupNum != 1 {
consensus.getLogger().Err(err).
Uint64("beforeCatchupBlockNum", beforeCatchupNum).
Msg("[finalCommit] Leader failed to commit the confirmed block")
}

// Dump new block into level db
// In current code, we add signatures in block in tryCatchup, the block dump to explorer does not contains signatures
// but since explorer doesn't need signatures, it should be fine
// in future, we will move signatures to next block
//explorer.GetStorageInstance(consensus.leader.IP, consensus.leader.Port, true).Dump(block, beforeCatchupNum)

if consensus.consensusTimeout[timeoutBootstrap].IsActive() {
consensus.consensusTimeout[timeoutBootstrap].Stop()
consensus.getLogger().Info().Msg("[finalCommit] stop bootstrap timer only once")
}

consensus.getLogger().Info().
Uint64("blockNum", block.NumberU64()).
Uint64("epochNum", block.Epoch().Uint64()).
Uint64("ViewId", block.Header().ViewID().Uint64()).
Str("blockHash", block.Hash().String()).
Int("numTxns", len(block.Transactions())).
Int("numStakingTxns", len(block.StakingTransactions())).
Msg("HOORAY!!!!!!! CONSENSUS REACHED!!!!!!!")

consensus.UpdateLeaderMetrics(float64(numCommits), float64(block.NumberU64()))

// If still the leader, send commit sig/bitmap to finish the new block proposal,
// else, the block proposal will timeout by itself.
if consensus.isLeader() {
if block.IsLastBlockInEpoch() {
// No pipelining
go func() {
consensus.getLogger().Info().Msg("[finalCommit] sending block proposal signal")
consensus.ReadySignal(NewProposal(SyncProposal))
}()
} else {
// pipelining
go func() {
select {
case consensus.GetCommitSigChannel() <- commitSigAndBitmap:
case <-time.After(CommitSigSenderTimeout):
utils.Logger().Error().Err(err).Msg("[finalCommit] channel not received after 6s for commitSigAndBitmap")
}
}()
}
}
}

// BlockCommitSigs returns the byte array of aggregated
// commit signature and bitmap signed on the block
func (consensus *Consensus) BlockCommitSigs(blockNum uint64) ([]byte, error) {
Expand Down
15 changes: 9 additions & 6 deletions consensus/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,14 +293,17 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) {
logger.Info().Msg("[OnCommit] 2/3 Enough commits received")
consensus.fBFTLog.MarkBlockVerified(blockObj)

if !blockObj.IsLastBlockInEpoch() {
// only do early commit if it's not epoch block to avoid problems
consensus.preCommitAndPropose(blockObj)
}

if consensus.Blockchain().Config().IsOneSecond(currentHeader.Epoch()) {
go consensus.finalCommit(viewID)
if !blockObj.IsLastBlockInEpoch() {
// only do early commit if it's not epoch block to avoid problems
consensus.preCommitAndPropose(blockObj)
}
go consensus.finalCommit1s(viewID, consensus.NextBlockDue)
} else {
if !blockObj.IsLastBlockInEpoch() {
// only do early commit if it's not epoch block to avoid problems
consensus.preCommitAndPropose(blockObj)
}
go consensus.finalCommit(viewID)
}

Expand Down
2 changes: 2 additions & 0 deletions core/epochchain.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package core

import (
"fmt"
"math/big"
"sync/atomic"
"time"
Expand Down Expand Up @@ -118,6 +119,7 @@ func (bc *EpochChain) InsertChain(blocks types.Blocks, _ bool) (int, error) {
if len(blocks) == 0 {
return 0, nil
}
fmt.Println("InsertEpocj")
bc.mu <- struct{}{}
defer func() {
<-bc.mu
Expand Down
88 changes: 44 additions & 44 deletions internal/params/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,50 +273,50 @@ var (

// LocalnetChainConfig contains the chain parameters to run for local development.
LocalnetChainConfig = &ChainConfig{
ChainID: TestnetChainID,
EthCompatibleChainID: EthTestnetShard0ChainID,
EthCompatibleShard0ChainID: EthTestnetShard0ChainID,
EthCompatibleEpoch: big.NewInt(0),
CrossTxEpoch: big.NewInt(0),
CrossLinkEpoch: big.NewInt(2),
AggregatedRewardEpoch: big.NewInt(3),
StakingEpoch: big.NewInt(2),
PreStakingEpoch: big.NewInt(0),
QuickUnlockEpoch: big.NewInt(0),
FiveSecondsEpoch: big.NewInt(0),
TwoSecondsEpoch: big.NewInt(0),
SixtyPercentEpoch: EpochTBD, // Never enable it for localnet as localnet has no external validator setup
RedelegationEpoch: big.NewInt(0),
NoEarlyUnlockEpoch: big.NewInt(0),
VRFEpoch: big.NewInt(0),
PrevVRFEpoch: big.NewInt(0),
MinDelegation100Epoch: big.NewInt(0),
MinCommissionRateEpoch: big.NewInt(0),
MinCommissionPromoPeriod: big.NewInt(10),
EPoSBound35Epoch: big.NewInt(0),
EIP155Epoch: big.NewInt(0),
S3Epoch: big.NewInt(0),
DataCopyFixEpoch: big.NewInt(0),
IstanbulEpoch: big.NewInt(0),
ReceiptLogEpoch: big.NewInt(0),
SHA3Epoch: big.NewInt(0),
HIP6And8Epoch: EpochTBD, // Never enable it for localnet as localnet has no external validator setup
StakingPrecompileEpoch: big.NewInt(2),
ChainIdFixEpoch: big.NewInt(0),
SlotsLimitedEpoch: EpochTBD, // epoch to enable HIP-16
CrossShardXferPrecompileEpoch: big.NewInt(1),
AllowlistEpoch: EpochTBD,
LeaderRotationInternalValidatorsEpoch: big.NewInt(5),
LeaderRotationExternalValidatorsEpoch: big.NewInt(6),
FeeCollectEpoch: big.NewInt(2),
ValidatorCodeFixEpoch: big.NewInt(2),
HIP30Epoch: EpochTBD,
BlockGas30MEpoch: big.NewInt(0),
TopMaxRateEpoch: EpochTBD,
MaxRateEpoch: EpochTBD,
DevnetExternalEpoch: EpochTBD,
TestnetExternalEpoch: EpochTBD,
IsOneSecondEpoch: big.NewInt(2),
ChainID: TestnetChainID,
EthCompatibleChainID: EthTestnetShard0ChainID,
EthCompatibleShard0ChainID: EthTestnetShard0ChainID,
EthCompatibleEpoch: big.NewInt(0),
CrossTxEpoch: big.NewInt(0),
CrossLinkEpoch: big.NewInt(2),
AggregatedRewardEpoch: big.NewInt(3),
StakingEpoch: big.NewInt(2),
PreStakingEpoch: big.NewInt(0),
QuickUnlockEpoch: big.NewInt(0),
FiveSecondsEpoch: big.NewInt(0),
TwoSecondsEpoch: big.NewInt(0),
SixtyPercentEpoch: EpochTBD, // Never enable it for localnet as localnet has no external validator setup
RedelegationEpoch: big.NewInt(0),
NoEarlyUnlockEpoch: big.NewInt(0),
VRFEpoch: big.NewInt(0),
PrevVRFEpoch: big.NewInt(0),
MinDelegation100Epoch: big.NewInt(0),
MinCommissionRateEpoch: big.NewInt(0),
MinCommissionPromoPeriod: big.NewInt(10),
EPoSBound35Epoch: big.NewInt(0),
EIP155Epoch: big.NewInt(0),
S3Epoch: big.NewInt(0),
DataCopyFixEpoch: big.NewInt(0),
IstanbulEpoch: big.NewInt(0),
ReceiptLogEpoch: big.NewInt(0),
SHA3Epoch: big.NewInt(0),
HIP6And8Epoch: EpochTBD, // Never enable it for localnet as localnet has no external validator setup
StakingPrecompileEpoch: big.NewInt(2),
ChainIdFixEpoch: big.NewInt(0),
SlotsLimitedEpoch: EpochTBD, // epoch to enable HIP-16
CrossShardXferPrecompileEpoch: big.NewInt(1),
AllowlistEpoch: EpochTBD,
//LeaderRotationInternalValidatorsEpoch: big.NewInt(5),
//LeaderRotationExternalValidatorsEpoch: big.NewInt(6),
FeeCollectEpoch: big.NewInt(2),
ValidatorCodeFixEpoch: big.NewInt(2),
HIP30Epoch: EpochTBD,
BlockGas30MEpoch: big.NewInt(0),
TopMaxRateEpoch: EpochTBD,
MaxRateEpoch: EpochTBD,
DevnetExternalEpoch: EpochTBD,
TestnetExternalEpoch: EpochTBD,
IsOneSecondEpoch: big.NewInt(2),
}

// AllProtocolChanges ...
Expand Down
4 changes: 2 additions & 2 deletions test/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ function launch_localnet() {
if ${VERBOSE}; then
verbosity=5
else
verbosity=3
verbosity=5
fi

base_args=(--log_folder "${log_folder}" --min_peers "${MIN}" --bootnodes "${BN_MA}" "--network_type=$NETWORK" --blspass file:"${ROOT}/.hmy/blspass.txt" "--dns=false" "--verbosity=${verbosity}" "--p2p.security.max-conn-per-ip=100")
base_args=(--log_folder "${log_folder}" --min_peers "${MIN}" --bootnodes "${BN_MA}" "--network_type=$NETWORK" --blspass file:"${ROOT}/.hmy/blspass.txt" "--dns=true" "--sync=false" "--dns.client=true" "--sync.downloader=false" "--sync.stagedsync=false" "--verbosity=${verbosity}" "--p2p.security.max-conn-per-ip=100")
sleep 2

# Start nodes
Expand Down