Skip to content

Commit

Permalink
keep track of block being synced (#12903)
Browse files Browse the repository at this point in the history
* keep track of block being synced

* gazelle

* use maps

* shutup deepsource

* change godoc

* Radek's review
  • Loading branch information
potuz authored Sep 15, 2023
1 parent 4d120b5 commit dd73f76
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 2 deletions.
1 change: 1 addition & 0 deletions beacon-chain/blockchain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"chain_info.go",
"chain_info_forkchoice.go",
"currently_syncing_block.go",
"error.go",
"execution_engine.go",
"forkchoice_update_execution.go",
Expand Down
5 changes: 5 additions & 0 deletions beacon-chain/blockchain/chain_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,3 +525,8 @@ func (s *Service) recoverStateSummary(ctx context.Context, blockRoot [32]byte) (
}
return nil, errBlockDoesNotExist
}

// BlockBeingSynced returns whether the block with the given root is currently being synced
func (s *Service) BlockBeingSynced(root [32]byte) bool {
return s.blockBeingSynced.isSyncing(root)
}
27 changes: 27 additions & 0 deletions beacon-chain/blockchain/currently_syncing_block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package blockchain

import "sync"

type currentlySyncingBlock struct {
sync.Mutex
roots map[[32]byte]struct{}
}

func (b *currentlySyncingBlock) set(root [32]byte) {
b.Lock()
defer b.Unlock()
b.roots[root] = struct{}{}
}

func (b *currentlySyncingBlock) unset(root [32]byte) {
b.Lock()
defer b.Unlock()
delete(b.roots, root)
}

func (b *currentlySyncingBlock) isSyncing(root [32]byte) bool {
b.Lock()
defer b.Unlock()
_, ok := b.roots[root]
return ok
}
4 changes: 4 additions & 0 deletions beacon-chain/blockchain/receive_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type BlockReceiver interface {
ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock) error
HasBlock(ctx context.Context, root [32]byte) bool
RecentBlockSlot(root [32]byte) (primitives.Slot, error)
BlockBeingSynced([32]byte) bool
}

// BlobReceiver interface defines the methods of chain service for receiving new
Expand All @@ -58,6 +59,9 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
ctx, span := trace.StartSpan(ctx, "blockChain.ReceiveBlock")
defer span.End()
receivedTime := time.Now()
s.blockBeingSynced.set(blockRoot)
defer s.blockBeingSynced.unset(blockRoot)

blockCopy, err := block.Copy()
if err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/blockchain/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type Service struct {
clockWaiter startup.ClockWaiter
syncComplete chan struct{}
blobNotifiers *blobNotifierMap
blockBeingSynced *currentlySyncingBlock
}

// config options for the service.
Expand Down Expand Up @@ -134,6 +135,7 @@ func NewService(ctx context.Context, opts ...Option) (*Service, error) {
initSyncBlocks: make(map[[32]byte]interfaces.ReadOnlySignedBeaconBlock),
blobNotifiers: bn,
cfg: &config{ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache()},
blockBeingSynced: &currentlySyncingBlock{roots: make(map[[32]byte]struct{})},
}
for _, opt := range opts {
if err := opt(srv); err != nil {
Expand Down
7 changes: 6 additions & 1 deletion beacon-chain/blockchain/testing/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,4 +606,9 @@ func (s *ChainService) UnrealizedJustifiedPayloadBlockHash() [32]byte {
}

// SendNewBlobEvent mocks the same method in the chain service
func (s *ChainService) SendNewBlobEvent(_ [32]byte, _ uint64) {}
func (*ChainService) SendNewBlobEvent(_ [32]byte, _ uint64) {}

// BlockBeingSynced mocks the same method in the chain service
func (*ChainService) BlockBeingSynced(_ [32]byte) bool {
return false
}
7 changes: 6 additions & 1 deletion beacon-chain/sync/pending_blocks_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ func (s *Service) sendBatchRootRequest(ctx context.Context, roots [][32]byte, ra
ctx, span := trace.StartSpan(ctx, "sendBatchRootRequest")
defer span.End()

roots = dedupRoots(roots)
for i := len(roots) - 1; i >= 0; i-- {
if s.cfg.chain.BlockBeingSynced(roots[i]) {
roots = append(roots[:i], roots[i+1:]...)
}
}
if len(roots) == 0 {
return nil
}
Expand All @@ -249,7 +255,6 @@ func (s *Service) sendBatchRootRequest(ctx context.Context, roots [][32]byte, ra
if len(bestPeers) == 0 {
return nil
}
roots = dedupRoots(roots)
// Randomly choose a peer to query from our best peers. If that peer cannot return
// all the requested blocks, we randomly select another peer.
pid := bestPeers[randGen.Int()%len(bestPeers)]
Expand Down

0 comments on commit dd73f76

Please sign in to comment.