Skip to content

Commit

Permalink
op-batcher: wait for node sync & check recent L1 txs at startup to av…
Browse files Browse the repository at this point in the history
…oid duplicate txs (ethereum-optimism#10193)

* fix(batcher): check recent L1 txs at startup to avoid duplicate txs

* Add tests for checkRecentTxsOnStart

* Cleanup based on PR comments

* Move checkRecentTxs into op-service/eth package

* Address peer review comments

* Protect against reorg causing infinite loop in CheckRecentTxs

* Add missing WaitNodeSyncFlag to optionalFlags
  • Loading branch information
bitwiseguy authored Apr 22, 2024
1 parent 3fc229e commit 8661b79
Show file tree
Hide file tree
Showing 7 changed files with 267 additions and 2 deletions.
14 changes: 14 additions & 0 deletions op-batcher/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,17 @@ type CLIConfig struct {
// Type of compressor to use. Must be one of [compressor.KindKeys].
Compressor string

// If Stopped is true, the batcher starts stopped and won't start batching right away.
// Batching needs to be started via an admin RPC.
Stopped bool

// Whether to wait for the sequencer to sync to a recent block at startup.
WaitNodeSync bool

// How many blocks back to look for recent batcher transactions during node sync at startup.
// If 0, the batcher will just use the current head.
CheckRecentTxsDepth int

BatchType uint

// DataAvailabilityType is one of the values defined in op-batcher/flags/types.go and dictates
Expand Down Expand Up @@ -118,6 +127,9 @@ func (c *CLIConfig) Check() error {
if c.BatchType > 1 {
return fmt.Errorf("unknown batch type: %v", c.BatchType)
}
if c.CheckRecentTxsDepth > 128 {
return fmt.Errorf("CheckRecentTxsDepth cannot be set higher than 128: %v", c.CheckRecentTxsDepth)
}
if c.DataAvailabilityType == flags.BlobsType && c.TargetNumFrames > 6 {
return errors.New("too many frames for blob transactions, max 6")
}
Expand Down Expand Up @@ -157,6 +169,8 @@ func NewConfig(ctx *cli.Context) *CLIConfig {
ApproxComprRatio: ctx.Float64(flags.ApproxComprRatioFlag.Name),
Compressor: ctx.String(flags.CompressorFlag.Name),
Stopped: ctx.Bool(flags.StoppedFlag.Name),
WaitNodeSync: ctx.Bool(flags.WaitNodeSyncFlag.Name),
CheckRecentTxsDepth: ctx.Int(flags.CheckRecentTxsDepthFlag.Name),
BatchType: ctx.Uint(flags.BatchTypeFlag.Name),
DataAvailabilityType: flags.DataAvailabilityType(ctx.String(flags.DataAvailabilityTypeFlag.Name)),
ActiveSequencerCheckDuration: ctx.Duration(flags.ActiveSequencerCheckDurationFlag.Name),
Expand Down
41 changes: 41 additions & 0 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
Expand All @@ -26,6 +27,7 @@ var ErrBatcherNotRunning = errors.New("batcher is not running")

type L1Client interface {
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error)
}

type L2Client interface {
Expand Down Expand Up @@ -252,6 +254,13 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.

func (l *BatchSubmitter) loop() {
defer l.wg.Done()
if l.Config.WaitNodeSync {
err := l.waitNodeSync()
if err != nil {
l.Log.Error("Error waiting for node sync", "err", err)
return
}
}

receiptsCh := make(chan txmgr.TxReceipt[txID])
queue := txmgr.NewQueue[txID](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions)
Expand Down Expand Up @@ -283,6 +292,7 @@ func (l *BatchSubmitter) loop() {
l.Log.Info("Txmgr is closed, remaining channel data won't be sent")
}
}

for {
select {
case <-ticker.C:
Expand Down Expand Up @@ -324,6 +334,37 @@ func (l *BatchSubmitter) loop() {
}
}

// waitNodeSync Check to see if there was a batcher tx sent recently that
// still needs more block confirmations before being considered finalized
func (l *BatchSubmitter) waitNodeSync() error {
ctx, cancel := context.WithTimeout(l.shutdownCtx, l.Config.NetworkTimeout)
defer cancel()

rollupClient, err := l.EndpointProvider.RollupClient(ctx)
if err != nil {
return fmt.Errorf("failed to get rollup client: %w", err)
}

l1Tip, err := l.l1Tip(ctx)
if err != nil {
return fmt.Errorf("failed to retrieve l1 tip: %w", err)
}

l1TargetBlock := l1Tip.Number
if l.Config.CheckRecentTxsDepth != 0 {
l.Log.Info("Checking for recently submitted batcher transactions on L1")
recentBlock, found, err := eth.CheckRecentTxs(ctx, l.L1Client, l.Config.CheckRecentTxsDepth, l.Txmgr.From())
if err != nil {
return fmt.Errorf("failed when checking recent batcher txs: %w", err)
}
if found {
l1TargetBlock = recentBlock
}
}

return dial.WaitRollupSync(l.shutdownCtx, l.Log, rollupClient, l1TargetBlock, time.Second*12)
}

// publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is
// no more data to queue for publishing or if there was an error queing the data.
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) {
Expand Down
5 changes: 5 additions & 0 deletions op-batcher/batcher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type BatcherConfig struct {
// UsePlasma is true if the rollup config has a DA challenge address so the batcher
// will post inputs to the Plasma DA server and post commitments to blobs or calldata.
UsePlasma bool

WaitNodeSync bool
CheckRecentTxsDepth int
}

// BatcherService represents a full batch-submitter instance and its resources,
Expand Down Expand Up @@ -96,6 +99,8 @@ func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string,
bs.PollInterval = cfg.PollInterval
bs.MaxPendingTransactions = cfg.MaxPendingTransactions
bs.NetworkTimeout = cfg.TxMgrConfig.NetworkTimeout
bs.CheckRecentTxsDepth = cfg.CheckRecentTxsDepth
bs.WaitNodeSync = cfg.WaitNodeSync
if err := bs.initRPCClients(ctx, cfg); err != nil {
return err
}
Expand Down
16 changes: 16 additions & 0 deletions op-batcher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,20 @@ var (
Value: 2 * time.Minute,
EnvVars: prefixEnvVars("ACTIVE_SEQUENCER_CHECK_DURATION"),
}
CheckRecentTxsDepthFlag = &cli.IntFlag{
Name: "check-recent-txs-depth",
Usage: "Indicates how many blocks back the batcher should look during startup for a recent batch tx on L1. This can " +
"speed up waiting for node sync. It should be set to the verifier confirmation depth of the sequencer (e.g. 4).",
Value: 0,
EnvVars: prefixEnvVars("CHECK_RECENT_TXS_DEPTH"),
}
WaitNodeSyncFlag = &cli.BoolFlag{
Name: "wait-node-sync",
Usage: "Indicates if, during startup, the batcher should wait for a recent batcher tx on L1 to " +
"finalize (via more block confirmations). This should help avoid duplicate batcher txs.",
Value: false,
EnvVars: prefixEnvVars("WAIT_NODE_SYNC"),
}
// Legacy Flags
SequencerHDPathFlag = txmgr.SequencerHDPathFlag
)
Expand All @@ -138,6 +152,8 @@ var requiredFlags = []cli.Flag{
}

var optionalFlags = []cli.Flag{
WaitNodeSyncFlag,
CheckRecentTxsDepthFlag,
SubSafetyMarginFlag,
PollIntervalFlag,
MaxPendingTransactionsFlag,
Expand Down
4 changes: 2 additions & 2 deletions op-service/dial/rollup_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func WaitRollupSync(
lgr log.Logger,
rollup SyncStatusProvider,
l1BlockTarget uint64,
pollDuration time.Duration,
pollInterval time.Duration,
) error {
for {
syncst, err := rollup.SyncStatus(ctx)
Expand All @@ -29,7 +29,7 @@ func WaitRollupSync(
}

lgr.Info("rollup current L1 block still behind target, retrying")
timer := time.NewTimer(pollDuration)
timer := time.NewTimer(pollInterval)
select {
case <-timer.C: // next try
case <-ctx.Done():
Expand Down
49 changes: 49 additions & 0 deletions op-service/eth/transactions.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
package eth

import (
"context"
"fmt"
"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
)

type L1Client interface {
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error)
}

// EncodeTransactions encodes a list of transactions into opaque transactions.
func EncodeTransactions(elems []*types.Transaction) ([]hexutil.Bytes, error) {
out := make([]hexutil.Bytes, len(elems))
Expand Down Expand Up @@ -42,3 +49,45 @@ func TransactionsToHashes(elems []*types.Transaction) []common.Hash {
}
return out
}

// CheckRecentTxs checks the depth recent blocks for transactions from the account with address addr
// and returns the most recent block and true, if any was found, or the oldest block checked and false, if not.
func CheckRecentTxs(
ctx context.Context,
l1 L1Client,
depth int,
addr common.Address,
) (recentBlock uint64, found bool, err error) {
blockHeader, err := l1.HeaderByNumber(ctx, nil)
if err != nil {
return 0, false, fmt.Errorf("failed to retrieve current block header: %w", err)
}

currentBlock := blockHeader.Number
currentNonce, err := l1.NonceAt(ctx, addr, currentBlock)
if err != nil {
return 0, false, fmt.Errorf("failed to retrieve current nonce: %w", err)
}

oldestBlock := new(big.Int)
oldestBlock.Sub(currentBlock, big.NewInt(int64(depth)))
previousNonce, err := l1.NonceAt(ctx, addr, oldestBlock)
if err != nil {
return 0, false, fmt.Errorf("failed to retrieve previous nonce: %w", err)
}

if currentNonce == previousNonce {
return oldestBlock.Uint64(), false, nil
}

// Decrease block num until we find the block before the most recent batcher tx was sent
targetNonce := currentNonce - 1
for currentNonce > targetNonce && currentBlock.Cmp(oldestBlock) != -1 {
currentBlock.Sub(currentBlock, big.NewInt(1))
currentNonce, err = l1.NonceAt(ctx, addr, currentBlock)
if err != nil {
return 0, false, fmt.Errorf("failed to retrieve nonce: %w", err)
}
}
return currentBlock.Uint64() + 1, true, nil
}
140 changes: 140 additions & 0 deletions op-service/eth/transactions_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package eth

import (
"context"
"math/big"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

type MockL1Client struct {
mock.Mock
}

func (m *MockL1Client) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) {
args := m.Called(ctx, account, blockNumber)
return args.Get(0).(uint64), args.Error(1)
}

func (m *MockL1Client) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) {
args := m.Called(ctx, number)
if header, ok := args.Get(0).(*types.Header); ok {
return header, args.Error(1)
}
return nil, args.Error(1)
}

func TestTransactions_checkRecentTxs(t *testing.T) {
tests := []struct {
name string
currentBlock uint64
blockConfirms uint64
previousNonceBlock uint64
expectedBlockNum uint64
expectedFound bool
}{
{
// Blocks 495 496 497 498 499 500
// Nonce 5 5 5 6 6 6
// call NonceAt x - x x x x
name: "NonceDiff_3Blocks",
currentBlock: 500,
blockConfirms: 5,
previousNonceBlock: 497,
expectedBlockNum: 498,
expectedFound: true,
},
{
// Blocks 495 496 497 498 499 500
// Nonce 5 5 5 5 5 6
// call NonceAt x - - - x x
name: "NonceDiff_1Block",
currentBlock: 500,
blockConfirms: 5,
previousNonceBlock: 499,
expectedBlockNum: 500,
expectedFound: true,
},
{
// Blocks 495 496 497 498 499 500
// Nonce 6 6 6 6 6 6
// call NonceAt x - - - - x
name: "NonceUnchanged",
currentBlock: 500,
blockConfirms: 5,
previousNonceBlock: 400,
expectedBlockNum: 495,
expectedFound: false,
},
}

for _, tt := range tests {

t.Run(tt.name, func(t *testing.T) {
l1Client := new(MockL1Client)
ctx := context.Background()

currentNonce := uint64(6)
previousNonce := uint64(5)

l1Client.On("HeaderByNumber", ctx, (*big.Int)(nil)).Return(&types.Header{Number: big.NewInt(int64(tt.currentBlock))}, nil)

// Setup mock calls for NonceAt, depending on how many times its expected to be called
if tt.previousNonceBlock < tt.currentBlock-tt.blockConfirms {
l1Client.On("NonceAt", ctx, common.Address{}, big.NewInt(int64(tt.currentBlock))).Return(currentNonce, nil)
l1Client.On("NonceAt", ctx, common.Address{}, big.NewInt(int64(tt.currentBlock-tt.blockConfirms))).Return(currentNonce, nil)
} else {
for block := tt.currentBlock; block >= (tt.currentBlock - tt.blockConfirms); block-- {
blockBig := big.NewInt(int64(block))
if block > (tt.currentBlock-tt.blockConfirms) && block < tt.previousNonceBlock {
t.Log("skipped block: ", block)
continue
} else if block <= tt.previousNonceBlock {
t.Log("previousNonce set at block: ", block)
l1Client.On("NonceAt", ctx, common.Address{}, blockBig).Return(previousNonce, nil)
} else {
t.Log("currentNonce set at block: ", block)
l1Client.On("NonceAt", ctx, common.Address{}, blockBig).Return(currentNonce, nil)
}
}
}

blockNum, found, err := CheckRecentTxs(ctx, l1Client, 5, common.Address{})
require.NoError(t, err)
require.Equal(t, tt.expectedBlockNum, blockNum)
require.Equal(t, tt.expectedFound, found)

l1Client.AssertExpectations(t)
})
}
}
func TestTransactions_checkRecentTxs_reorg(t *testing.T) {
l1Client := new(MockL1Client)
ctx := context.Background()

currentNonce := uint64(6)
currentBlock := uint64(500)
blockConfirms := uint64(5)

l1Client.On("HeaderByNumber", ctx, (*big.Int)(nil)).Return(&types.Header{Number: big.NewInt(int64(currentBlock))}, nil)
l1Client.On("NonceAt", ctx, common.Address{}, big.NewInt(int64(currentBlock))).Return(currentNonce, nil)

l1Client.On("NonceAt", ctx, common.Address{}, big.NewInt(int64(currentBlock-blockConfirms))).Return(currentNonce+1, nil)
l1Client.On("NonceAt", ctx, common.Address{}, big.NewInt(int64(currentBlock-1))).Return(currentNonce, nil)
l1Client.On("NonceAt", ctx, common.Address{}, big.NewInt(int64(currentBlock-2))).Return(currentNonce, nil)
l1Client.On("NonceAt", ctx, common.Address{}, big.NewInt(int64(currentBlock-3))).Return(currentNonce, nil)
l1Client.On("NonceAt", ctx, common.Address{}, big.NewInt(int64(currentBlock-4))).Return(currentNonce, nil)
l1Client.On("NonceAt", ctx, common.Address{}, big.NewInt(int64(currentBlock-5))).Return(currentNonce, nil)
l1Client.On("NonceAt", ctx, common.Address{}, big.NewInt(int64(currentBlock-6))).Return(currentNonce, nil)

blockNum, found, err := CheckRecentTxs(ctx, l1Client, 5, common.Address{})
require.NoError(t, err)
require.Equal(t, uint64(495), blockNum)
require.Equal(t, true, found)

l1Client.AssertExpectations(t)
}

0 comments on commit 8661b79

Please sign in to comment.