Skip to content

Commit

Permalink
Fixed Invalid block production with invalid txpool txs. (erigontech#5205
Browse files Browse the repository at this point in the history
)

* updated mining

* print writer

* stageloop

* removed logs

* mining exec

* preemptive stuff

* create and execution

* demote to Debug

Co-authored-by: giuliorebuffo <[email protected]>
  • Loading branch information
Giulio2002 and giuliorebuffo authored Aug 27, 2022
1 parent 20e8820 commit 2c43ffe
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 18 deletions.
130 changes: 119 additions & 11 deletions eth/stagedsync/stage_mining_create_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"time"

mapset "github.com/deckarep/golang-set"
"github.com/holiman/uint256"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/erigon-lib/txpool"
types2 "github.com/ledgerwatch/erigon-lib/types"
"github.com/ledgerwatch/erigon/common"
Expand All @@ -21,6 +23,7 @@ import (
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/state"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/core/types/accounts"
"github.com/ledgerwatch/erigon/eth/ethutils"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/rlp"
Expand Down Expand Up @@ -125,9 +128,9 @@ func SpawnMiningCreateBlockStage(s *StageState, tx kv.RwTx, cfg MiningCreateBloc

blockNum := executionAt + 1
var txs []types.Transaction
if err = cfg.txPool2DB.View(context.Background(), func(tx kv.Tx) error {
if err = cfg.txPool2DB.View(context.Background(), func(poolTx kv.Tx) error {
txSlots := types2.TxsRlp{}
if err := cfg.txPool2.Best(200, &txSlots, tx); err != nil {
if err := cfg.txPool2.Best(200, &txSlots, poolTx); err != nil {
return err
}

Expand All @@ -144,21 +147,16 @@ func SpawnMiningCreateBlockStage(s *StageState, tx kv.RwTx, cfg MiningCreateBloc
if transaction.GetChainID().ToBig().Cmp(cfg.chainConfig.ChainID) != 0 {
continue
}
txs = append(txs, transaction)
}
var sender common.Address
for i := range txs {
var sender common.Address
copy(sender[:], txSlots.Senders.At(i))
txs[i].SetSender(sender)
// Check if tx nonce is too low
txs = append(txs, transaction)
txs[len(txs)-1].SetSender(sender)
}

return nil
}); err != nil {
return err
}
current.RemoteTxs = types.NewTransactionsFixedOrder(txs)
// txpool v2 - doesn't prioritise local txs over remote
current.LocalTxs = types.NewTransactionsFixedOrder(nil)
log.Debug(fmt.Sprintf("[%s] Candidate txs", logPrefix), "amount", len(txs))
localUncles, remoteUncles, err := readNonCanonicalHeaders(tx, blockNum, cfg.engine, coinbase, txPoolLocals)
if err != nil {
Expand Down Expand Up @@ -211,6 +209,14 @@ func SpawnMiningCreateBlockStage(s *StageState, tx kv.RwTx, cfg MiningCreateBloc
header.Coinbase = coinbase
header.Extra = cfg.miner.MiningConfig.ExtraData

txs, err = filterBadTransactions(tx, txs, cfg.chainConfig, blockNum, header.BaseFee)
if err != nil {
return err
}
current.RemoteTxs = types.NewTransactionsFixedOrder(txs)
// txpool v2 - doesn't prioritise local txs over remote
current.LocalTxs = types.NewTransactionsFixedOrder(nil)

log.Info(fmt.Sprintf("[%s] Start mine", logPrefix), "block", executionAt+1, "baseFee", header.BaseFee, "gasLimit", header.GasLimit)

stateReader := state.NewPlainStateReader(tx)
Expand Down Expand Up @@ -342,3 +348,105 @@ func readNonCanonicalHeaders(tx kv.Tx, blockNum uint64, engine consensus.Engine,
}
return
}

func filterBadTransactions(tx kv.Tx, transactions []types.Transaction, config params.ChainConfig, blockNumber uint64, baseFee *big.Int) ([]types.Transaction, error) {
var filtered []types.Transaction
simulationTx := memdb.NewMemoryBatch(tx)
defer simulationTx.Rollback()
gasBailout := config.Consensus == params.ParliaConsensus

missedTxs := 0
for len(transactions) > 0 && missedTxs != len(transactions) {
transaction := transactions[0]
sender, ok := transaction.GetSender()
if !ok {
transactions = transactions[:1]
continue
}
var account accounts.Account
ok, err := rawdb.ReadAccount(simulationTx, sender, &account)
if err != nil {
return nil, err
}
if !ok {
transactions = transactions[:1]
continue
}
// Check transaction nonce
if account.Nonce > transaction.GetNonce() {
transactions = transactions[1:]
continue
}
if account.Nonce < transaction.GetNonce() {
missedTxs++
transactions = append(transactions[1:], transaction)
continue
}
missedTxs = 0

// Make sure the sender is an EOA (EIP-3607)
if !account.IsEmptyCodeHash() {
transactions = transactions[1:]
continue
}

baseFee256 := uint256.NewInt(0)
if baseFee256.SetFromBig(baseFee) {
return nil, fmt.Errorf("bad baseFee")
}
// Make sure the transaction gasFeeCap is greater than the block's baseFee.
if config.IsLondon(blockNumber) {
if !transaction.GetFeeCap().IsZero() || !transaction.GetTip().IsZero() {
if err := core.CheckEip1559TxGasFeeCap(sender, transaction.GetFeeCap(), transaction.GetTip(), baseFee256); err != nil {
transactions = transactions[1:]
continue
}
}
}
txnGas := transaction.GetGas()
txnPrice := transaction.GetPrice()
value := transaction.GetValue()
accountBalance := account.Balance

want := uint256.NewInt(0)
want.SetUint64(txnGas)
want, overflow := want.MulOverflow(want, txnPrice)
if overflow {
transactions = transactions[1:]
continue
}

if transaction.GetFeeCap() != nil {
want.SetUint64(txnGas)
want, overflow = want.MulOverflow(want, transaction.GetFeeCap())
if overflow {
transactions = transactions[1:]
continue
}
want, overflow = want.AddOverflow(want, value)
if overflow {
transactions = transactions[1:]
continue
}
}

if accountBalance.Cmp(want) < 0 {
if !gasBailout {
transactions = transactions[1:]
continue
}
}
// Updates account in the simulation
account.Nonce++
account.Balance.Sub(&account.Balance, want)
accountBuffer := make([]byte, account.EncodingLengthForStorage())
account.EncodeForStorage(accountBuffer)
if err := simulationTx.Put(kv.PlainState, sender[:], accountBuffer); err != nil {
return nil, err
}
// Mark transaction as valid
filtered = append(filtered, transaction)
transactions = transactions[1:]
}
return filtered, nil
}
14 changes: 7 additions & 7 deletions eth/stagedsync/stage_mining_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func SpawnMiningExecStage(s *StageState, tx kv.RwTx, cfg MiningExecCfg, quit <-c
// empty block is necessary to keep the liveness of the network.
if noempty {
if !localTxs.Empty() {
logs, err := addTransactionsToMiningBlock(logPrefix, current, cfg.chainConfig, cfg.vmConfig, getHeader, contractHasTEVM, cfg.engine, localTxs, cfg.miningState.MiningConfig.Etherbase, ibs, quit, cfg.interrupt)
logs, err := addTransactionsToMiningBlock(logPrefix, current, cfg.chainConfig, cfg.vmConfig, getHeader, contractHasTEVM, cfg.engine, localTxs, cfg.miningState.MiningConfig.Etherbase, ibs, stateReader, quit, cfg.interrupt)
if err != nil {
return err
}
Expand All @@ -106,7 +106,7 @@ func SpawnMiningExecStage(s *StageState, tx kv.RwTx, cfg MiningExecCfg, quit <-c
//}
}
if !remoteTxs.Empty() {
logs, err := addTransactionsToMiningBlock(logPrefix, current, cfg.chainConfig, cfg.vmConfig, getHeader, contractHasTEVM, cfg.engine, remoteTxs, cfg.miningState.MiningConfig.Etherbase, ibs, quit, cfg.interrupt)
logs, err := addTransactionsToMiningBlock(logPrefix, current, cfg.chainConfig, cfg.vmConfig, getHeader, contractHasTEVM, cfg.engine, remoteTxs, cfg.miningState.MiningConfig.Etherbase, ibs, stateReader, quit, cfg.interrupt)
if err != nil {
return err
}
Expand Down Expand Up @@ -143,7 +143,7 @@ func SpawnMiningExecStage(s *StageState, tx kv.RwTx, cfg MiningExecCfg, quit <-c
select {
case w.taskCh <- &task{receipts: receipts, state: s, tds: w.env.tds, block: block, createdAt: time.Now(), ctx: ctx}:
log.Warn("mining: worker task event",
log.Debug("mining: worker task event",
"number", block.NumberU64(),
"hash", block.Hash().String(),
"parentHash", block.ParentHash().String(),
Expand All @@ -170,7 +170,7 @@ func SpawnMiningExecStage(s *StageState, tx kv.RwTx, cfg MiningExecCfg, quit <-c
return nil
}

func addTransactionsToMiningBlock(logPrefix string, current *MiningBlock, chainConfig params.ChainConfig, vmConfig *vm.Config, getHeader func(hash common.Hash, number uint64) *types.Header, contractHasTEVM func(common.Hash) (bool, error), engine consensus.Engine, txs types.TransactionsStream, coinbase common.Address, ibs *state.IntraBlockState, quit <-chan struct{}, interrupt *int32) (types.Logs, error) {
func addTransactionsToMiningBlock(logPrefix string, current *MiningBlock, chainConfig params.ChainConfig, vmConfig *vm.Config, getHeader func(hash common.Hash, number uint64) *types.Header, contractHasTEVM func(common.Hash) (bool, error), engine consensus.Engine, txs types.TransactionsStream, coinbase common.Address, ibs *state.IntraBlockState, stateReader state.StateReader, quit <-chan struct{}, interrupt *int32) (types.Logs, error) {
header := current.Header
tcount := 0
gasPool := new(core.GasPool).AddGas(current.Header.GasLimit)
Expand All @@ -180,8 +180,9 @@ func addTransactionsToMiningBlock(logPrefix string, current *MiningBlock, chainC
noop := state.NewNoopWriter()

var miningCommitTx = func(txn types.Transaction, coinbase common.Address, vmConfig *vm.Config, chainConfig params.ChainConfig, ibs *state.IntraBlockState, current *MiningBlock) ([]*types.Log, error) {
snap := ibs.Snapshot()
ibs.Prepare(txn.Hash(), common.Hash{}, tcount)
gasSnap := gasPool.Gas()
snap := ibs.Snapshot()
receipt, _, err := core.ApplyTransaction(&chainConfig, core.GetHashFn(header, getHeader), engine, &coinbase, gasPool, ibs, noop, header, txn, &header.GasUsed, *vmConfig, contractHasTEVM)
if err != nil {
ibs.RevertToSnapshot(snap)
Expand Down Expand Up @@ -228,7 +229,6 @@ func addTransactionsToMiningBlock(logPrefix string, current *MiningBlock, chainC
}

// Start executing the transaction
ibs.Prepare(txn.Hash(), common.Hash{}, tcount)
logs, err := miningCommitTx(txn, coinbase, vmConfig, chainConfig, ibs, current)

if errors.Is(err, core.ErrGasLimitReached) {
Expand Down Expand Up @@ -273,7 +273,7 @@ func NotifyPendingLogs(logPrefix string, notifier ChainEventNotifier, logs types
}

if notifier == nil {
log.Warn(fmt.Sprintf("[%s] rpc notifier is not set, rpc daemon won't be updated about pending logs", logPrefix))
log.Debug(fmt.Sprintf("[%s] rpc notifier is not set, rpc daemon won't be updated about pending logs", logPrefix))
return
}
notifier.OnNewPendingLogs(logs)
Expand Down

0 comments on commit 2c43ffe

Please sign in to comment.