forked from okx/xlayer-node
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathl2block.go
129 lines (112 loc) · 4.11 KB
/
l2block.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package state
import (
"context"
"errors"
"math/big"
"sync"
"time"
"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/ethereum/go-ethereum/core/types"
)
const newL2BlocksCheckInterval = 200 * time.Millisecond
// NewL2BlockEventHandler represent a func that will be called by the
// state when a NewL2BlockEvent is triggered
type NewL2BlockEventHandler func(e NewL2BlockEvent)
// NewL2BlockEvent is a struct provided from the state to the NewL2BlockEventHandler
// when a new l2 block is detected with data related to this new l2 block.
type NewL2BlockEvent struct {
Block types.Block
}
// StartToMonitorNewL2Blocks starts 2 go routines that will
// monitor new blocks and execute handlers registered to be executed
// when a new l2 block is detected. This is used by the RPC WebSocket
// filter subscription but can be used by any other component that
// needs to react to a new L2 block added to the state.
func (s *State) StartToMonitorNewL2Blocks() {
lastL2Block, err := s.GetLastL2Block(context.Background(), nil)
if errors.Is(err, ErrStateNotSynchronized) {
lastL2Block = types.NewBlockWithHeader(&types.Header{Number: big.NewInt(0)})
} else if err != nil {
log.Fatalf("failed to load the last l2 block: %v", err)
}
s.lastL2BlockSeen.Store(lastL2Block)
go s.monitorNewL2Blocks()
go s.handleEvents()
}
// RegisterNewL2BlockEventHandler add the provided handler to the list of handlers
// that will be triggered when a new l2 block event is triggered
func (s *State) RegisterNewL2BlockEventHandler(h NewL2BlockEventHandler) {
log.Info("new l2 block event handler registered")
s.newL2BlockEventHandlers = append(s.newL2BlockEventHandlers, h)
}
func (s *State) monitorNewL2Blocks() {
waitNextCycle := func() {
time.Sleep(newL2BlocksCheckInterval)
}
for {
if len(s.newL2BlockEventHandlers) == 0 {
waitNextCycle()
continue
}
lastL2Block, err := s.GetLastL2Block(context.Background(), nil)
if errors.Is(err, ErrStateNotSynchronized) {
waitNextCycle()
continue
} else if err != nil {
log.Errorf("failed to get last l2 block while monitoring new blocks: %v", err)
waitNextCycle()
continue
}
lastL2BlockSeen := s.lastL2BlockSeen.Load()
// not updates until now
if lastL2Block == nil || lastL2BlockSeen.NumberU64() >= lastL2Block.NumberU64() {
waitNextCycle()
continue
}
fromBlockNumber := lastL2BlockSeen.NumberU64() + uint64(1)
toBlockNumber := lastL2Block.NumberU64()
log.Debugf("[monitorNewL2Blocks] new l2 block detected from block %v to %v", fromBlockNumber, toBlockNumber)
for bn := fromBlockNumber; bn <= toBlockNumber; bn++ {
block, err := s.GetL2BlockByNumber(context.Background(), bn, nil)
if err != nil {
log.Errorf("failed to get l2 block while monitoring new blocks: %v", err)
break
}
log.Debugf("[monitorNewL2Blocks] sending NewL2BlockEvent for block %v", block.NumberU64())
start := time.Now()
s.newL2BlockEvents <- NewL2BlockEvent{
Block: *block,
}
s.lastL2BlockSeen.Store(block)
log.Debugf("[monitorNewL2Blocks] NewL2BlockEvent for block %v took %v to be sent", block.NumberU64(), time.Since(start))
log.Infof("new l2 block detected: number %v, hash %v", block.NumberU64(), block.Hash().String())
}
// interval to check for new l2 blocks
waitNextCycle()
}
}
func (s *State) handleEvents() {
for newL2BlockEvent := range s.newL2BlockEvents {
log.Debugf("[handleEvents] new l2 block event detected for block: %v", newL2BlockEvent.Block.NumberU64())
if len(s.newL2BlockEventHandlers) == 0 {
continue
}
wg := sync.WaitGroup{}
for _, handler := range s.newL2BlockEventHandlers {
wg.Add(1)
go func(h NewL2BlockEventHandler, e NewL2BlockEvent) {
defer func() {
wg.Done()
if r := recover(); r != nil {
log.Errorf("failed and recovered in NewL2BlockEventHandler: %v", r)
}
}()
log.Debugf("[handleEvents] triggering new l2 block event handler for block: %v", e.Block.NumberU64())
start := time.Now()
h(e)
log.Debugf("[handleEvents] new l2 block event handler for block %v took %v to be executed", e.Block.NumberU64(), time.Since(start))
}(handler, newL2BlockEvent)
}
wg.Wait()
}
}