Skip to content

Commit

Permalink
remove changeset.Walk func (erigontech#2716)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored Sep 22, 2021
1 parent 133eec0 commit 3de5063
Show file tree
Hide file tree
Showing 14 changed files with 131 additions and 218 deletions.
60 changes: 15 additions & 45 deletions cmd/hack/hack.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,19 +888,6 @@ func validateTxLookups2(db kv.RwDB, startBlock uint64, interruptCh chan bool) {
}
}

func getModifiedAccounts(chaindata string) {
// TODO(tjayrush): The call to GetModifiedAccounts needs a database tx
fmt.Println("hack - getModiiedAccounts is temporarily disabled.")
db := mdbx.MustOpen(chaindata)
defer db.Close()
tool.Check(db.View(context.Background(), func(tx kv.Tx) error {
addrs, err := changeset.GetModifiedAccounts(tx, 49300, 49400)
tool.Check(err)
fmt.Printf("Len(addrs)=%d\n", len(addrs))
return nil
}))
}

type Receiver struct {
defaultReceiver *trie.RootHashAggregator
accountMap map[string]*accounts.Account
Expand Down Expand Up @@ -1012,52 +999,44 @@ func testGetProof(chaindata string, address common.Address, rewind int, regen bo
log.Info("GetProof", "address", address, "storage keys", len(storageKeys), "head", *headNumber, "block", block,
"alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys))

ts := dbutils.EncodeBlockNumber(block + 1)
accountMap := make(map[string]*accounts.Account)

if err := changeset.Walk(tx, kv.AccountChangeSet, ts, 0, func(blockN uint64, address, v []byte) (bool, error) {
if blockN > *headNumber {
return false, nil
}

if err := changeset.ForRange(tx, kv.AccountChangeSet, block+1, *headNumber+1, func(blockN uint64, address, v []byte) error {
var addrHash, err = common.HashData(address)
if err != nil {
return false, err
return err
}
k := addrHash[:]

if _, ok := accountMap[string(k)]; !ok {
if len(v) > 0 {
var a accounts.Account
if innerErr := a.DecodeForStorage(v); innerErr != nil {
return false, innerErr
return innerErr
}
accountMap[string(k)] = &a
} else {
accountMap[string(k)] = nil
}
}
return true, nil
return nil
}); err != nil {
return err
}
runtime.ReadMemStats(&m)
log.Info("Constructed account map", "size", len(accountMap),
"alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys))
storageMap := make(map[string][]byte)
if err := changeset.Walk(tx, kv.StorageChangeSet, ts, 0, func(blockN uint64, address, v []byte) (bool, error) {
if blockN > *headNumber {
return false, nil
}
if err := changeset.ForRange(tx, kv.StorageChangeSet, block+1, *headNumber+1, func(blockN uint64, address, v []byte) error {
var addrHash, err = common.HashData(address)
if err != nil {
return false, err
return err
}
k := addrHash[:]
if _, ok := storageMap[string(k)]; !ok {
storageMap[string(k)] = v
}
return true, nil
return nil
}); err != nil {
return err
}
Expand Down Expand Up @@ -1278,29 +1257,23 @@ func changeSetStats(chaindata string, block1, block2 uint64) error {
return err1
}
defer tx.Rollback()
if err := changeset.Walk(tx, kv.AccountChangeSet, dbutils.EncodeBlockNumber(block1), 0, func(blockN uint64, k, v []byte) (bool, error) {
if blockN >= block2 {
return false, nil
}
if err := changeset.ForRange(tx, kv.AccountChangeSet, block1, block2, func(blockN uint64, k, v []byte) error {
if (blockN-block1)%100000 == 0 {
fmt.Printf("at the block %d for accounts, booster size: %d\n", blockN, len(accounts))
}
accounts[string(common.CopyBytes(k))] = struct{}{}
return true, nil
return nil
}); err != nil {
return err
}

storage := make(map[string]struct{})
if err := changeset.Walk(tx, kv.StorageChangeSet, dbutils.EncodeBlockNumber(block1), 0, func(blockN uint64, k, v []byte) (bool, error) {
if blockN >= block2 {
return false, nil
}
if err := changeset.ForRange(tx, kv.StorageChangeSet, block1, block2, func(blockN uint64, k, v []byte) error {
if (blockN-block1)%100000 == 0 {
fmt.Printf("at the block %d for accounts, booster size: %d\n", blockN, len(accounts))
}
storage[string(common.CopyBytes(k))] = struct{}{}
return true, nil
return nil
}); err != nil {
return err
}
Expand All @@ -1319,11 +1292,11 @@ func searchChangeSet(chaindata string, key []byte, block uint64) error {
}
defer tx.Rollback()

if err := changeset.Walk(tx, kv.AccountChangeSet, dbutils.EncodeBlockNumber(block), 0, func(blockN uint64, k, v []byte) (bool, error) {
if err := changeset.ForEach(tx, kv.AccountChangeSet, dbutils.EncodeBlockNumber(block), func(blockN uint64, k, v []byte) error {
if bytes.Equal(k, key) {
fmt.Printf("Found in block %d with value %x\n", blockN, v)
}
return true, nil
return nil
}); err != nil {
return err
}
Expand All @@ -1339,11 +1312,11 @@ func searchStorageChangeSet(chaindata string, key []byte, block uint64) error {
return err1
}
defer tx.Rollback()
if err := changeset.Walk(tx, kv.StorageChangeSet, dbutils.EncodeBlockNumber(block), 0, func(blockN uint64, k, v []byte) (bool, error) {
if err := changeset.ForEach(tx, kv.StorageChangeSet, dbutils.EncodeBlockNumber(block), func(blockN uint64, k, v []byte) error {
if bytes.Equal(k, key) {
fmt.Printf("Found in block %d with value %x\n", blockN, v)
}
return true, nil
return nil
}); err != nil {
return err
}
Expand Down Expand Up @@ -2384,9 +2357,6 @@ func main() {
case "val-tx-lookup-2":
ValidateTxLookups2(*chaindata)

case "modiAccounts":
getModifiedAccounts(*chaindata)

case "slice":
dbSlice(*chaindata, *bucket, common.FromHex(*hash))

Expand Down
20 changes: 10 additions & 10 deletions cmd/integration/commands/state_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,19 +534,19 @@ func loopExec(db kv.RwDB, ctx context.Context, unwind uint64) error {
func checkChangeSet(db kv.Tx, blockNum uint64, expectedAccountChanges *changeset.ChangeSet, expectedStorageChanges *changeset.ChangeSet) error {
i := 0
sort.Sort(expectedAccountChanges)
err := changeset.Walk(db, kv.AccountChangeSet, dbutils.EncodeBlockNumber(blockNum), 8*8, func(blockN uint64, k, v []byte) (bool, error) {
err := changeset.ForPrefix(db, kv.AccountChangeSet, dbutils.EncodeBlockNumber(blockNum), func(blockN uint64, k, v []byte) error {
c := expectedAccountChanges.Changes[i]
i++
if bytes.Equal(c.Key, k) && bytes.Equal(c.Value, v) {
return true, nil
return nil
}

fmt.Printf("Unexpected account changes in block %d\n", blockNum)
fmt.Printf("In the database: ======================\n")
fmt.Printf("0x%x: %x\n", k, v)
fmt.Printf("Expected: ==========================\n")
fmt.Printf("0x%x %x\n", c.Key, c.Value)
return false, fmt.Errorf("check change set failed")
return fmt.Errorf("check change set failed")
})
if err != nil {
return err
Expand All @@ -560,19 +560,19 @@ func checkChangeSet(db kv.Tx, blockNum uint64, expectedAccountChanges *changeset

i = 0
sort.Sort(expectedStorageChanges)
err = changeset.Walk(db, kv.StorageChangeSet, dbutils.EncodeBlockNumber(blockNum), 8*8, func(blockN uint64, k, v []byte) (bool, error) {
err = changeset.ForPrefix(db, kv.StorageChangeSet, dbutils.EncodeBlockNumber(blockNum), func(blockN uint64, k, v []byte) error {
c := expectedStorageChanges.Changes[i]
i++
if bytes.Equal(c.Key, k) && bytes.Equal(c.Value, v) {
return true, nil
return nil
}

fmt.Printf("Unexpected storage changes in block %d\n", blockNum)
fmt.Printf("In the database: ======================\n")
fmt.Printf("0x%x: %x\n", k, v)
fmt.Printf("Expected: ==========================\n")
fmt.Printf("0x%x %x\n", c.Key, c.Value)
return false, fmt.Errorf("check change set failed")
return fmt.Errorf("check change set failed")
})
if err != nil {
return err
Expand All @@ -587,20 +587,20 @@ func checkChangeSet(db kv.Tx, blockNum uint64, expectedAccountChanges *changeset
func checkHistory(tx kv.Tx, changeSetBucket string, blockNum uint64) error {
indexBucket := changeset.Mapper[changeSetBucket].IndexBucket
blockNumBytes := dbutils.EncodeBlockNumber(blockNum)
if err := changeset.Walk(tx, changeSetBucket, blockNumBytes, 0, func(blockN uint64, address, v []byte) (bool, error) {
if err := changeset.ForEach(tx, changeSetBucket, blockNumBytes, func(blockN uint64, address, v []byte) error {
k := dbutils.CompositeKeyWithoutIncarnation(address)
from := blockN
if from > 0 {
from--
}
bm, innerErr := bitmapdb.Get64(tx, indexBucket, k, from, blockN+1)
if innerErr != nil {
return false, innerErr
return innerErr
}
if !bm.Contains(blockN) {
return false, fmt.Errorf("checkHistory failed: bucket=%s,block=%d,addr=%x", changeSetBucket, blockN, k)
return fmt.Errorf("checkHistory failed: bucket=%s,block=%d,addr=%x", changeSetBucket, blockN, k)
}
return true, nil
return nil
}); err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions cmd/rpcdaemon/commands/debug_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,10 @@ func (api *PrivateDebugAPIImpl) GetModifiedAccountsByNumber(ctx context.Context,
return nil, fmt.Errorf("start block (%d) is later than the latest block (%d)", startNum, latestBlock)
}

endNum := startNum // allows for single param calls
endNum := startNum + 1 // allows for single param calls
if endNumber != nil {
// forces negative numbers to fail (too large) but allows zero
endNum = uint64(endNumber.Int64())
endNum = uint64(endNumber.Int64()) + 1
}

// is endNum too big?
Expand Down Expand Up @@ -193,7 +193,7 @@ func (api *PrivateDebugAPIImpl) GetModifiedAccountsByHash(ctx context.Context, s
return nil, fmt.Errorf("start block %x not found", startHash)
}
startNum := startBlock.NumberU64()
endNum := startNum // allows for single parameter calls
endNum := startNum + 1 // allows for single parameter calls

if endHash != nil {
endBlock, err := rawdb.ReadBlockByHash(tx, *endHash)
Expand All @@ -203,7 +203,7 @@ func (api *PrivateDebugAPIImpl) GetModifiedAccountsByHash(ctx context.Context, s
if endBlock == nil {
return nil, fmt.Errorf("end block %x not found", *endHash)
}
endNum = endBlock.NumberU64()
endNum = endBlock.NumberU64() + 1
}

if startNum > endNum {
Expand Down
12 changes: 6 additions & 6 deletions cmd/state/commands/check_change_sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,11 @@ func CheckChangeSets(genesis *core.Genesis, logger log.Logger, blockNum uint64,
sort.Sort(accountChanges)
i := 0
match := true
err = changeset.Walk(historyTx, kv.AccountChangeSet, dbutils.EncodeBlockNumber(blockNum), 8*8, func(blockN uint64, k, v []byte) (bool, error) {
err = changeset.ForPrefix(historyTx, kv.AccountChangeSet, dbutils.EncodeBlockNumber(blockNum), func(blockN uint64, k, v []byte) error {
c := accountChanges.Changes[i]
if bytes.Equal(c.Key, k) && bytes.Equal(c.Value, v) {
i++
return true, nil
return nil
}

match = false
Expand All @@ -178,7 +178,7 @@ func CheckChangeSets(genesis *core.Genesis, logger log.Logger, blockNum uint64,
fmt.Printf("%d: 0x%x: %x\n", i, k, v)
fmt.Printf("Expected: ==========================\n")
fmt.Printf("%d: 0x%x %x\n", i, c.Key, c.Value)
return false, nil
return nil
})
if err != nil {
return err
Expand All @@ -197,18 +197,18 @@ func CheckChangeSets(genesis *core.Genesis, logger log.Logger, blockNum uint64,
expectedStorageChanges = changeset.NewChangeSet()
}
sort.Sort(expectedStorageChanges)
err = changeset.Walk(historyTx, kv.StorageChangeSet, dbutils.EncodeBlockNumber(blockNum), 8*8, func(blockN uint64, k, v []byte) (bool, error) {
err = changeset.ForPrefix(historyTx, kv.StorageChangeSet, dbutils.EncodeBlockNumber(blockNum), func(blockN uint64, k, v []byte) error {
c := expectedStorageChanges.Changes[i]
i++
if bytes.Equal(c.Key, k) && bytes.Equal(c.Value, v) {
return false, nil
return nil
}

fmt.Printf("Unexpected storage changes in block %d\nIn the database: ======================\n", blockNum)
fmt.Printf("0x%x: %x\n", k, v)
fmt.Printf("Expected: ==========================\n")
fmt.Printf("0x%x %x\n", c.Key, c.Value)
return true, fmt.Errorf("check change set failed")
return fmt.Errorf("check change set failed")
})
if err != nil {
return err
Expand Down
10 changes: 5 additions & 5 deletions cmd/state/verify/check_indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,25 @@ func CheckIndex(ctx context.Context, chaindata string, changeSetBucket string, i
startTime := time.Now()

i := 0
if err := changeset.Walk(tx, changeSetBucket, nil, 0, func(blockN uint64, k, v []byte) (bool, error) {
if err := changeset.ForEach(tx, changeSetBucket, nil, func(blockN uint64, k, v []byte) error {
i++
if i%100_000 == 0 {
fmt.Printf("Processed %dK, %s\n", blockN/1000, time.Since(startTime))
}
select {
default:
case <-ctx.Done():
return false, ctx.Err()
return ctx.Err()
}

bm, innerErr := bitmapdb.Get64(tx, indexBucket, dbutils.CompositeKeyWithoutIncarnation(k), blockN-1, blockN+1)
if innerErr != nil {
return false, innerErr
return innerErr
}
if !bm.Contains(blockN) {
return false, fmt.Errorf("%v,%v", blockN, common.Bytes2Hex(k))
return fmt.Errorf("%v,%v", blockN, common.Bytes2Hex(k))
}
return true, nil
return nil
}); err != nil {
return err
}
Expand Down
8 changes: 3 additions & 5 deletions common/changeset/account_changeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,12 @@ func FindAccount(c kv.CursorDupSort, blockNumber uint64, key []byte) ([]byte, er
}

// GetModifiedAccounts returns a list of addresses that were modified in the block range
// [startNum:endNum)
func GetModifiedAccounts(db kv.Tx, startNum, endNum uint64) ([]common.Address, error) {
changedAddrs := make(map[common.Address]struct{})
if err := Walk(db, kv.AccountChangeSet, dbutils.EncodeBlockNumber(startNum), 0, func(blockN uint64, k, v []byte) (bool, error) {
if blockN > endNum {
return false, nil
}
if err := ForRange(db, kv.AccountChangeSet, startNum, endNum, func(blockN uint64, k, v []byte) error {
changedAddrs[common.BytesToAddress(k)] = struct{}{}
return true, nil
return nil
}); err != nil {
return nil, err
}
Expand Down
25 changes: 23 additions & 2 deletions common/changeset/changeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,35 @@ func AvailableStorageFrom(tx kv.Tx) (uint64, error) {
return binary.BigEndian.Uint64(k), nil
}

func Walk(db kv.Tx, bucket string, startkey []byte, fixedbits int, walker func(blockN uint64, k, v []byte) (bool, error)) error {
// [from:to)
func ForRange(db kv.Tx, bucket string, from, to uint64, walker func(blockN uint64, k, v []byte) error) error {
var blockN uint64
c, err := db.Cursor(bucket)
if err != nil {
return err
}
defer c.Close()
return ethdb.Walk(c, startkey, fixedbits, func(k, v []byte) (bool, error) {
return ethdb.Walk(c, dbutils.EncodeBlockNumber(from), 0, func(k, v []byte) (bool, error) {
blockN, k, v = FromDBFormat(k, v)
if blockN >= to {
return false, nil
}
if err := walker(blockN, k, v); err != nil {
return false, err
}
return true, nil
})
}
func ForEach(db kv.Tx, bucket string, startkey []byte, walker func(blockN uint64, k, v []byte) error) error {
var blockN uint64
return db.ForEach(bucket, startkey, func(k, v []byte) error {
blockN, k, v = FromDBFormat(k, v)
return walker(blockN, k, v)
})
}
func ForPrefix(db kv.Tx, bucket string, startkey []byte, walker func(blockN uint64, k, v []byte) error) error {
var blockN uint64
return db.ForPrefix(bucket, startkey, func(k, v []byte) error {
blockN, k, v = FromDBFormat(k, v)
return walker(blockN, k, v)
})
Expand Down
Loading

0 comments on commit 3de5063

Please sign in to comment.