Skip to content

Commit

Permalink
fix(rotation): fixed race condition for last block (#1244)
Browse files Browse the repository at this point in the history
  • Loading branch information
mtsitrin authored Nov 22, 2024
1 parent 34018b9 commit a1be6f2
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 22 deletions.
2 changes: 1 addition & 1 deletion block/fork.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (m *Manager) MonitorForkUpdateLoop(ctx context.Context) error {
}
select {
case <-ctx.Done():
return ctx.Err()
return nil
case <-ticker.C:
}
}
Expand Down
6 changes: 1 addition & 5 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,7 @@ func (m *Manager) Start(ctx context.Context) error {

// Start the settlement sync loop in the background
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
err := m.SettlementSyncLoop(ctx)
if err != nil {
m.freezeNode(err)
}
return nil
return m.SettlementSyncLoop(ctx)
})

// Monitor sequencer set updates
Expand Down
18 changes: 16 additions & 2 deletions block/modes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package block

import (
"context"
"errors"
"fmt"

"github.com/dymensionxyz/dymint/p2p"
Expand Down Expand Up @@ -69,7 +70,7 @@ func (m *Manager) runAsProposer(ctx context.Context, eg *errgroup.Group) error {
return fmt.Errorf("checking should rotate: %w", err)
}
if shouldRotate {
m.rotate(ctx)
m.rotate(ctx) // panics afterwards
}

// populate the bytes produced channel
Expand All @@ -85,7 +86,20 @@ func (m *Manager) runAsProposer(ctx context.Context, eg *errgroup.Group) error {
})

// Monitor and handling of the rotation
go m.MonitorProposerRotation(ctx)
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.MonitorProposerRotation(ctx)
})

go func() {
err = eg.Wait()
// Check if loops exited due to sequencer rotation signal
if errors.Is(err, errRotationRequested) {
m.rotate(ctx)
} else if err != nil {
m.logger.Error("block manager exited with error", "error", err)
m.freezeNode(err)
}
}()

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int)
for {
select {
case <-ctx.Done():
return ctx.Err()
return nil
case <-ticker.C:
// Only produce if I'm the current rollapp proposer.
if !m.AmIProposerOnRollapp() {
Expand Down
2 changes: 1 addition & 1 deletion block/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (m *Manager) PruningLoop(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
return nil
case retainHeight := <-m.pruningC:
var pruningHeight uint64
if m.RunMode == RunModeProposer { // do not delete anything that we might submit in future
Expand Down
19 changes: 13 additions & 6 deletions block/sequencers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,30 @@ const (
ProposerMonitorInterval = 3 * time.Minute
)

func (m *Manager) MonitorProposerRotation(ctx context.Context) {
var errRotationRequested = fmt.Errorf("sequencer rotation started. signal to stop production")

func (m *Manager) MonitorProposerRotation(ctx context.Context) error {
ticker := time.NewTicker(ProposerMonitorInterval) // TODO: make this configurable
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
return nil
case <-ticker.C:
next, err := m.SLClient.GetNextProposer()
nextProposer, err := m.SLClient.GetNextProposer()
if err != nil {
m.logger.Error("Check rotation in progress", "err", err)
continue
}
if next != nil {
m.rotate(ctx)
// no rotation in progress
if nextProposer == nil {
continue
}

// we get here once a sequencer rotation signal is received
m.logger.Info("Sequencer rotation started.", "nextSeqAddr", nextProposer.SettlementAddress)
return errRotationRequested
}
}
}
Expand Down Expand Up @@ -103,7 +110,7 @@ func (m *Manager) ShouldRotate() (bool, error) {
func (m *Manager) rotate(ctx context.Context) {
// Get Next Proposer from SL. We assume such exists (even if empty proposer) otherwise function wouldn't be called.
nextProposer, err := m.SLClient.GetNextProposer()
if err != nil {
if err != nil || nextProposer == nil {
panic(fmt.Sprintf("rotate: fetch next proposer set from Hub: %v", err))
}

Expand Down
6 changes: 3 additions & 3 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func SubmitLoopInner(
for {
select {
case <-ctx.Done():
return ctx.Err()
return nil
case n := <-bytesProduced:
pendingBytes.Add(uint64(n))
logger.Debug("Added bytes produced to bytes pending submission counter.", "bytes added", n, "pending", pendingBytes.Load())
Expand All @@ -78,7 +78,7 @@ func SubmitLoopInner(
// we block here until we get a progress nudge from the submitter thread
select {
case <-ctx.Done():
return ctx.Err()
return nil
case <-trigger.C:
}
}
Expand All @@ -92,7 +92,7 @@ func SubmitLoopInner(
for {
select {
case <-ctx.Done():
return ctx.Err()
return nil
case <-ticker.C:
case <-submitter.C:
}
Expand Down
4 changes: 2 additions & 2 deletions block/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ func (m *Manager) SettlementSyncLoop(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
return nil
case <-m.settlementSyncingC:
m.logger.Info("syncing to target height", "targetHeight", m.LastSettlementHeight.Load())

for currH := m.State.NextHeight(); currH <= m.LastSettlementHeight.Load(); currH = m.State.NextHeight() {
// if context has been cancelled, stop syncing
if ctx.Err() != nil {
return ctx.Err()
return nil
}
// if we have the block locally, we don't need to fetch it from the DA.
// it will only happen in case of rollback.
Expand Down
2 changes: 1 addition & 1 deletion block/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (m *Manager) SettlementValidateLoop(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
return nil
case <-m.settlementValidationC:
targetValidationHeight := min(m.LastSettlementHeight.Load(), m.State.Height())
m.logger.Info("validating state updates to target height", "targetHeight", targetValidationHeight)
Expand Down

0 comments on commit a1be6f2

Please sign in to comment.