Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Print peers count. #4719

Draft
wants to merge 68 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
cbb62fd
Updated protobuf from outdated github.com/golang/protobuf to google.g…
Frozen Mar 7, 2024
56ad3fa
add hardfork to make testnet external (#4640)
diego1q2w Mar 7, 2024
8d2b36d
Got rid of redundant logic with isBackup. (#4639)
Frozen Mar 7, 2024
fa5efdc
Send sings count from leader to prometheus. (#4638)
Frozen Mar 7, 2024
c32319c
set SnapshotLimit to zero by default for all networks
GheisMohammadi Mar 11, 2024
4bdddf7
Merge pull request #4641 from harmony-one/fix/snapshot_disabled
sophoah Mar 13, 2024
12b84f3
Enable testnet leader rotation and external (#4649)
diego1q2w Mar 28, 2024
eb468b9
bring back the lost maxRate epochs (#4648)
diego1q2w Apr 2, 2024
7a6a801
Additional logging on a block proposing. (#4658)
Frozen Apr 19, 2024
587d2f0
Update devnet internal vote power back to 90%
sophoah Apr 23, 2024
3e66b85
Merge pull request #4662 from sophoah/dev
adsorptionenthalpy Apr 24, 2024
782eb4f
Additional logs and cleanup. (#4656)
Frozen Apr 25, 2024
0e29f52
fix validator state when trying to update the max-rate (#4647)
diego1q2w Apr 30, 2024
a3881d4
Improve staged stream sync (#4660)
GheisMohammadi May 2, 2024
1375a3b
pprof for bootnode (#4664)
Frozen May 9, 2024
5a33e2a
improve logs to reduce OUT of SYNC info logs (#4667)
GheisMohammadi May 14, 2024
b526c3a
Force sync when view change and received message with higher height.
Frozen Apr 21, 2024
2ece5a6
Removed unused method `NthNextHmyExt` (#4670)
Frozen May 21, 2024
3dad825
fix: close file (#4672)
testwill May 21, 2024
3d29596
fix boot node instability issue, refactor p2p host configurations and…
GheisMohammadi May 24, 2024
8abcc74
Fix for crosslink snap db. (#4675)
Frozen May 25, 2024
338ff01
Removed threshold 10 epochs. (#4671)
Frozen May 27, 2024
c148e5f
Extracts `GetAddressForBLSKey` functionality from `Node` struct. (#4642)
Frozen May 27, 2024
a7560fe
Crosslink heartbeat verify through current committee (#4673)
Frozen May 28, 2024
a8a2a13
Update testnet internal vote power back to 90%
sophoah May 29, 2024
905bf58
[testnet] add harmony internal node in sharding config
sophoah May 30, 2024
7233e1f
add muxer flag to bootnode
GheisMohammadi May 29, 2024
7d06fdc
add support for multiple muxers to p2p host (#4682)
GheisMohammadi Jun 3, 2024
434abca
Fix possible panic when the leader is unknown. (#4684)
Frozen Jun 5, 2024
b194d2b
Devnet TopMaxRateEpoch 1976
Frozen Jun 14, 2024
ef74501
Fixed order
Frozen Jun 14, 2024
d748647
Configurable reward frequency. (#4700)
Frozen Jun 25, 2024
d6da9ff
Block proposing moved from Node to Consensus. (#4698)
Frozen Jun 25, 2024
878e75b
Fix: travis CI changed the way how we install docker - travis updated…
mur-me Jun 27, 2024
be65e8c
enables p2p transport security by default (#4705)
GheisMohammadi Jul 3, 2024
74f7b5c
Removed unused parameter disableViewChange. (#4699)
Frozen Jul 3, 2024
d81611e
Fixed condition which is always true. (#4695)
Frozen Jul 3, 2024
1bafc6c
Genesis block hash log. (#4712)
Frozen Jul 15, 2024
6e772e2
small fixes on legacy sync (#4713)
GheisMohammadi Jul 15, 2024
3503972
100% coverage for UpdateMaxCommissionFee. (#4708)
Frozen Jul 18, 2024
8306c60
Fixed PeerStore usage. (#4706)
Frozen Jul 19, 2024
a65cf71
Fixed data race. (#4686)
Frozen Jul 19, 2024
16fe018
fix lock release issue in stream sync (#4714)
GheisMohammadi Jul 19, 2024
73df9c5
update a few of deprecated dependencies (#4715)
GheisMohammadi Jul 19, 2024
252bda9
Removed trailing zeros. (#4709)
Frozen Jul 19, 2024
900529d
add linux static quick command to make file (#4704)
GheisMohammadi Jul 19, 2024
0e74201
Broadcast sign power (#4683)
Frozen Jul 23, 2024
8070f87
Fixed panic.
Frozen Jul 23, 2024
c126c61
Tests.
Frozen Jun 25, 2024
f2be022
Test.
Frozen Jun 26, 2024
49ead49
1.21.11
Frozen Jun 28, 2024
f204b08
Fixed rosetta
Frozen Jun 30, 2024
b149cdb
Fixed rosetta
Frozen Jul 3, 2024
c643a3c
Fixed bootstrap
Frozen Jul 4, 2024
9cf4cd2
Custom genesis generation for [email protected]+.
Frozen Jul 16, 2024
1939b71
Fixed 1.19 imports.
Frozen Jul 16, 2024
293309c
Fixed 1.19 imports.
Frozen Jul 17, 2024
672a2fe
Replaced 1.20 keys.
Frozen Jul 18, 2024
1412f47
100% coverage.
Frozen Jul 18, 2024
963d519
Fixed trailing zeroes.
Frozen Jul 19, 2024
2bd71bf
Rebased & Fixed protobuf usage.
Frozen Jul 23, 2024
edceca8
PrintPeers
Frozen Jul 23, 2024
c69df17
No security
Frozen Jul 23, 2024
73df3bd
Mplex compability.
Frozen Jul 23, 2024
5d869db
Mplex 6&0 compability.
Frozen Jul 23, 2024
92af759
Fixed failing tests.
Frozen Jul 24, 2024
45c0539
Fixed failing tests.
Frozen Jul 24, 2024
74f8a29
Added logs.
Frozen Jul 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Improve staged stream sync (#4660)
* disable stream sync for partner network and other networks
* remove unused tx in block manager
* add log for staged sync creation
* change bucket for stroring progress in staged stream sync, fix saveBlocks tx issue
* refactor stream sync db initialization
* set SnapshotLimit to zero for localnet
* add new logs for stream request manager
  • Loading branch information
GheisMohammadi authored May 2, 2024
commit a3881d434d69fb35ee6e4e1b2752415347700ce9
5 changes: 1 addition & 4 deletions api/service/stagedstreamsync/block_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/ethereum/go-ethereum/common"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/rs/zerolog"
)

Expand All @@ -19,7 +18,6 @@ type BlockDownloadDetails struct {
// blockDownloadManager is the helper structure for get blocks request management
type blockDownloadManager struct {
chain blockChain
tx kv.RwTx

targetBN uint64
requesting map[uint64]struct{} // block numbers that have been assigned to workers but not received
Expand All @@ -32,10 +30,9 @@ type blockDownloadManager struct {
lock sync.Mutex
}

func newBlockDownloadManager(tx kv.RwTx, chain blockChain, targetBN uint64, logger zerolog.Logger) *blockDownloadManager {
func newBlockDownloadManager(chain blockChain, targetBN uint64, logger zerolog.Logger) *blockDownloadManager {
return &blockDownloadManager{
chain: chain,
tx: tx,
targetBN: targetBN,
requesting: make(map[uint64]struct{}),
processing: make(map[uint64]struct{}),
Expand Down
52 changes: 27 additions & 25 deletions api/service/stagedstreamsync/stage_bodies.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRev
currProgress := uint64(0)
targetHeight := s.state.currentCycle.TargetHeight

if useInternalTx {
var err error
tx, err = b.configs.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}

if errV := CreateView(ctx, b.configs.db, tx, func(etx kv.Tx) error {
if currProgress, err = s.CurrentStageProgress(etx); err != nil {
return err
Expand All @@ -97,24 +106,14 @@ func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRev
return nil
}

// size := uint64(0)
startTime := time.Now()
// startBlock := currProgress
if b.configs.logProgress {
fmt.Print("\033[s") // save the cursor position
}

if useInternalTx {
var err error
tx, err = b.configs.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}

// Fetch blocks from neighbors
s.state.gbm = newBlockDownloadManager(tx, b.configs.bc, targetHeight, s.state.logger)
s.state.gbm = newBlockDownloadManager(b.configs.bc, targetHeight, s.state.logger)

// Setup workers to fetch blocks from remote node
var wg sync.WaitGroup
Expand Down Expand Up @@ -188,7 +187,7 @@ func (b *StageBodies) runBlockWorkerLoop(ctx context.Context, gbm *blockDownload
gbm.HandleRequestError(batch, err, stid)
b.configs.protocol.RemoveStream(stid)
} else {
if err = b.saveBlocks(ctx, gbm.tx, batch, blockBytes, sigBytes, loopID, stid); err != nil {
if err = b.saveBlocks(ctx, nil, batch, blockBytes, sigBytes, loopID, stid); err != nil {
panic(ErrSaveBlocksToDbFailed)
}
gbm.HandleRequestResult(batch, blockBytes, sigBytes, loopID, stid)
Expand Down Expand Up @@ -239,8 +238,9 @@ func (b *StageBodies) verifyBlockAndExtractReceiptsData(batchBlockBytes [][]byte
// redownloadBadBlock tries to redownload the bad block from other streams
func (b *StageBodies) redownloadBadBlock(ctx context.Context, s *StageState) error {

batch := make([]uint64, 1)
batch = append(batch, s.state.invalidBlock.Number)
batch := []uint64{s.state.invalidBlock.Number}

badBlockDownloadLoop:

for {
if b.configs.protocol.NumStreams() == 0 {
Expand All @@ -253,21 +253,20 @@ func (b *StageBodies) redownloadBadBlock(ctx context.Context, s *StageState) err
}
continue
}
isOneOfTheBadStreams := false
for _, id := range s.state.invalidBlock.StreamID {
if id == stid {
// TODO: if block is invalid then call StreamFailed
b.configs.protocol.StreamFailed(stid, "re-download bad block from this stream failed")
isOneOfTheBadStreams = true
break
continue badBlockDownloadLoop
}
}
if isOneOfTheBadStreams {
continue
}
s.state.gbm.SetDownloadDetails(batch, 0, stid)
if errU := b.configs.blockDBs[0].Update(ctx, func(tx kv.RwTx) error {
if err = b.saveBlocks(ctx, tx, batch, blockBytes, sigBytes, 0, stid); err != nil {
return errors.Errorf("[STAGED_STREAM_SYNC] saving re-downloaded bad block to db failed.")
utils.Logger().Error().
Err(err).
Msg("[STAGED_STREAM_SYNC] saving re-downloaded bad block to db failed")
return errors.Errorf("%s: %s", ErrSaveBlocksToDbFailed.Error(), err.Error())
}
return nil
}); errU != nil {
Expand Down Expand Up @@ -314,11 +313,14 @@ func validateGetBlocksResult(requested []uint64, result []*types.Block) error {
// saveBlocks saves the blocks into db
func (b *StageBodies) saveBlocks(ctx context.Context, tx kv.RwTx, bns []uint64, blockBytes [][]byte, sigBytes [][]byte, loopID int, stid sttypes.StreamID) error {

tx, err := b.configs.blockDBs[loopID].BeginRw(ctx)
if err != nil {
return err
if tx == nil {
var err error
tx, err = b.configs.blockDBs[loopID].BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
defer tx.Rollback()

for i := uint64(0); i < uint64(len(blockBytes)); i++ {
block := blockBytes[i]
Expand Down
8 changes: 4 additions & 4 deletions api/service/stagedstreamsync/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func GetStageID(stage SyncStageID, isBeacon bool, prune bool) []byte {
// GetStageProgress retrieves saved progress of a given sync stage from the database
func GetStageProgress(db kv.Getter, stage SyncStageID, isBeacon bool) (uint64, error) {
stgID := GetStageID(stage, isBeacon, false)
v, err := db.GetOne(kv.SyncStageProgress, stgID)
v, err := db.GetOne(StageProgressBucket, stgID)
if err != nil {
return 0, err
}
Expand All @@ -50,13 +50,13 @@ func GetStageProgress(db kv.Getter, stage SyncStageID, isBeacon bool) (uint64, e
// SaveStageProgress saves progress of given sync stage
func SaveStageProgress(db kv.Putter, stage SyncStageID, isBeacon bool, progress uint64) error {
stgID := GetStageID(stage, isBeacon, false)
return db.Put(kv.SyncStageProgress, stgID, marshalData(progress))
return db.Put(StageProgressBucket, stgID, marshalData(progress))
}

// GetStageCleanUpProgress retrieves saved progress of given sync stage from the database
func GetStageCleanUpProgress(db kv.Getter, stage SyncStageID, isBeacon bool) (uint64, error) {
stgID := GetStageID(stage, isBeacon, true)
v, err := db.GetOne(kv.SyncStageProgress, stgID)
v, err := db.GetOne(StageProgressBucket, stgID)
if err != nil {
return 0, err
}
Expand All @@ -66,5 +66,5 @@ func GetStageCleanUpProgress(db kv.Getter, stage SyncStageID, isBeacon bool) (ui
// SaveStageCleanUpProgress stores the progress of the clean up for a given sync stage to the database
func SaveStageCleanUpProgress(db kv.Putter, stage SyncStageID, isBeacon bool, progress uint64) error {
stgID := GetStageID(stage, isBeacon, true)
return db.Put(kv.SyncStageProgress, stgID, marshalData(progress))
return db.Put(StageProgressBucket, stgID, marshalData(progress))
}
61 changes: 35 additions & 26 deletions api/service/stagedstreamsync/syncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/log/v3"
"github.com/pkg/errors"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -62,23 +61,34 @@ func CreateStagedSync(ctx context.Context,
var mainDB kv.RwDB
dbs := make([]kv.RwDB, config.Concurrency)
if config.UseMemDB {
mainDB = memdb.New(getBlockDbPath(bc.ShardID(), isBeaconNode, -1, dbDir))
mdbPath := getBlockDbPath(bc.ShardID(), isBeaconNode, -1, dbDir)
logger.Info().
Str("path", mdbPath).
Msg(WrapStagedSyncMsg("creating main db in memory"))
mainDB = mdbx.NewMDBX(log.New()).InMem(mdbPath).MustOpen()
for i := 0; i < config.Concurrency; i++ {
dbPath := getBlockDbPath(bc.ShardID(), isBeaconNode, i, dbDir)
dbs[i] = memdb.New(dbPath)
logger.Info().
Str("path", dbPath).
Msg(WrapStagedSyncMsg("creating blocks db in memory"))
dbs[i] = mdbx.NewMDBX(log.New()).InMem(dbPath).MustOpen()
}
} else {
mdbPath := getBlockDbPath(bc.ShardID(), isBeaconNode, -1, dbDir)
logger.Info().
Str("path", getBlockDbPath(bc.ShardID(), isBeaconNode, -1, dbDir)).
Msg(WrapStagedSyncMsg("creating main db"))
mainDB = mdbx.NewMDBX(log.New()).Path(getBlockDbPath(bc.ShardID(), isBeaconNode, -1, dbDir)).MustOpen()
Str("path", mdbPath).
Msg(WrapStagedSyncMsg("creating main db in disk"))
mainDB = mdbx.NewMDBX(log.New()).Path(mdbPath).MustOpen()
for i := 0; i < config.Concurrency; i++ {
dbPath := getBlockDbPath(bc.ShardID(), isBeaconNode, i, dbDir)
logger.Info().
Str("path", dbPath).
Msg(WrapStagedSyncMsg("creating blocks db in disk"))
dbs[i] = mdbx.NewMDBX(log.New()).Path(dbPath).MustOpen()
}
}

if errInitDB := initDB(ctx, mainDB, dbs, config.Concurrency); errInitDB != nil {
if errInitDB := initDB(ctx, mainDB, dbs); errInitDB != nil {
logger.Error().Err(errInitDB).Msg("create staged sync instance failed")
return nil, errInitDB
}
Expand Down Expand Up @@ -118,7 +128,8 @@ func CreateStagedSync(ctx context.Context,
Str("dbDir", dbDir).
Bool("serverOnly", config.ServerOnly).
Int("minStreams", config.MinStreams).
Msg(WrapStagedSyncMsg("staged sync created successfully"))
Str("dbDir", dbDir).
Msg(WrapStagedSyncMsg("staged stream sync created successfully"))

return New(
bc,
Expand All @@ -134,7 +145,7 @@ func CreateStagedSync(ctx context.Context,
}

// initDB inits the sync loop main database and create buckets
func initDB(ctx context.Context, mainDB kv.RwDB, dbs []kv.RwDB, concurrency int) error {
func initDB(ctx context.Context, mainDB kv.RwDB, dbs []kv.RwDB) error {

// create buckets for mainDB
tx, errRW := mainDB.BeginRw(ctx)
Expand All @@ -143,43 +154,41 @@ func initDB(ctx context.Context, mainDB kv.RwDB, dbs []kv.RwDB, concurrency int)
}
defer tx.Rollback()

for _, name := range Buckets {
if err := tx.CreateBucket(GetStageName(name, false, false)); err != nil {
for _, bucketName := range Buckets {
if err := tx.CreateBucket(bucketName); err != nil {
return err
}
}
if err := tx.Commit(); err != nil {
return err
}

// create buckets for block cache DBs
for _, db := range dbs {
tx, errRW := db.BeginRw(ctx)
if errRW != nil {
return errRW
createBlockBuckets := func(db kv.RwDB) error {
tx, err := db.BeginRw(ctx)
if err != nil {
return err
}

defer tx.Rollback()
if err := tx.CreateBucket(BlocksBucket); err != nil {
return err
}
if err := tx.CreateBucket(BlockSignaturesBucket); err != nil {
return err
}

if err := tx.Commit(); err != nil {
return err
}
return nil
}

return nil
}

// getMemDbTempPath returns the path of the temporary cache database for memdb
func getMemDbTempPath(dbDir string, dbIndex int) string {
if dbIndex >= 0 {
return fmt.Sprintf("%s_%d", filepath.Join(dbDir, "cache/memdb/db"), dbIndex)
// create buckets for block cache DBs
for _, db := range dbs {
if err := createBlockBuckets(db); err != nil {
return err
}
}
return filepath.Join(dbDir, "cache/memdb/db_main")

return nil
}

// getBlockDbPath returns the path of the cache database which stores blocks
Expand Down
18 changes: 18 additions & 0 deletions api/service/stagedsync/syncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,24 @@ func CreateStagedSync(
finishCfg,
)

utils.Logger().Info().
Str("ip", ip).
Str("port", port).
Uint32("shard", bc.ShardID()).
Bool("isExplorer", isExplorer).
Bool("TurboMode", TurboMode).
Bool("memdb", UseMemDB).
Bool("doubleCheckBlockHashes", doubleCheckBlockHashes).
Uint64("maxBlocksPerCycle", maxBlocksPerCycle).
Uint64("maxBackgroundBlocks", maxBackgroundBlocks).
Uint64("maxMemSyncCycleSize", maxMemSyncCycleSize).
Bool("verifyAllSig", verifyAllSig).
Uint64("verifyHeaderBatchSize", verifyHeaderBatchSize).
Int("insertChainBatchSize", insertChainBatchSize).
Bool("debugMode", debugMode).
Str("dbDir", dbDir).
Msg("[STAGED_SYNC] staged sync created successfully")

return New(ctx,
ip,
port,
Expand Down
1 change: 1 addition & 0 deletions cmd/harmony/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func getDefaultCacheConfig(nt nodeconfig.NetworkType) harmonyconfig.CacheConfig
case nodeconfig.Localnet:
cacheConfig.Disabled = false
cacheConfig.Preimages = false
cacheConfig.SnapshotLimit = 0
default:
cacheConfig.Disabled = false
cacheConfig.Preimages = true
Expand Down
8 changes: 4 additions & 4 deletions cmd/harmony/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,9 @@ var (
}

defaultPartnerSyncConfig = harmonyconfig.SyncConfig{
Enabled: true,
Enabled: false,
SyncMode: 0,
Downloader: true,
Downloader: false,
StagedSync: false,
StagedSyncCfg: defaultStagedSyncConfig,
Concurrency: 2,
Expand All @@ -257,9 +257,9 @@ var (
}

defaultElseSyncConfig = harmonyconfig.SyncConfig{
Enabled: true,
Enabled: false,
SyncMode: 0,
Downloader: true,
Downloader: false,
StagedSync: false,
StagedSyncCfg: defaultStagedSyncConfig,
Concurrency: 4,
Expand Down
5 changes: 4 additions & 1 deletion p2p/stream/common/requestmanager/requestmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ func (rm *requestManager) loop() {
if req == nil {
break loop
}
rm.logger.Debug().Str("request", req.String()).
Msg("add new incoming request to pending queue")
rm.addPendingRequest(req, st)
b, err := req.Encode()
if err != nil {
Expand Down Expand Up @@ -202,7 +204,8 @@ func (rm *requestManager) loop() {
func (rm *requestManager) handleNewRequest(req *request) bool {
rm.lock.Lock()
defer rm.lock.Unlock()

rm.logger.Debug().Str("request", req.String()).
Msg("add new outgoing request to waiting queue")
err := rm.addRequestToWaitings(req, reqPriorityLow)
if err != nil {
rm.logger.Warn().Err(err).Msg("failed to add new request to waitings")
Expand Down