Skip to content

Commit

Permalink
les: historical data garbage collection (ethereum#19570)
Browse files Browse the repository at this point in the history
This change introduces garbage collection for the light client. Historical
chain data is deleted periodically. If you want to disable the GC, use
the --light.nopruning flag.
  • Loading branch information
rjl493456442 authored Jul 13, 2020
1 parent b8dd089 commit 6eef141
Show file tree
Hide file tree
Showing 45 changed files with 843 additions and 215 deletions.
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ var (
utils.LightEgressFlag,
utils.LightMaxPeersFlag,
utils.LegacyLightPeersFlag,
utils.LightNoPruneFlag,
utils.LightKDFFlag,
utils.UltraLightServersFlag,
utils.UltraLightFractionFlag,
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ var AppHelpFlagGroups = []flagGroup{
utils.UltraLightServersFlag,
utils.UltraLightFractionFlag,
utils.UltraLightOnlyAnnounceFlag,
utils.LightNoPruneFlag,
},
},
{
Expand Down
18 changes: 15 additions & 3 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ var (
Name: "ulc.onlyannounce",
Usage: "Ultra light server sends announcements only",
}
LightNoPruneFlag = cli.BoolFlag{
Name: "light.nopruning",
Usage: "Disable ancient light chain data pruning",
}
// Ethash settings
EthashCacheDirFlag = DirectoryFlag{
Name: "ethash.cachedir",
Expand Down Expand Up @@ -1070,6 +1074,9 @@ func setLes(ctx *cli.Context, cfg *eth.Config) {
if ctx.GlobalIsSet(UltraLightOnlyAnnounceFlag.Name) {
cfg.UltraLightOnlyAnnounce = ctx.GlobalBool(UltraLightOnlyAnnounceFlag.Name)
}
if ctx.GlobalIsSet(LightNoPruneFlag.Name) {
cfg.LightNoPrune = ctx.GlobalBool(LightNoPruneFlag.Name)
}
}

// makeDatabaseHandles raises out the number of allowed file handles per process
Expand Down Expand Up @@ -1800,12 +1807,17 @@ func MakeChainDatabase(ctx *cli.Context, stack *node.Node) ethdb.Database {
var (
cache = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheDatabaseFlag.Name) / 100
handles = makeDatabaseHandles()

err error
chainDb ethdb.Database
)
name := "chaindata"
if ctx.GlobalString(SyncModeFlag.Name) == "light" {
name = "lightchaindata"
name := "lightchaindata"
chainDb, err = stack.OpenDatabase(name, cache, handles, "")
} else {
name := "chaindata"
chainDb, err = stack.OpenDatabaseWithFreezer(name, cache, handles, ctx.GlobalString(AncientFlag.Name), "")
}
chainDb, err := stack.OpenDatabaseWithFreezer(name, cache, handles, ctx.GlobalString(AncientFlag.Name), "")
if err != nil {
Fatalf("Could not open database: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/clique/clique.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func (c *Clique) snapshot(chain consensus.ChainReader, number uint64, hash commo
// at a checkpoint block without a parent (light client CHT), or we have piled
// up more headers than allowed to be reorged (chain reinit from a freezer),
// consider the checkpoint trusted and snapshot it.
if number == 0 || (number%c.config.Epoch == 0 && (len(headers) > params.ImmutabilityThreshold || chain.GetHeaderByNumber(number-1) == nil)) {
if number == 0 || (number%c.config.Epoch == 0 && (len(headers) > params.FullImmutabilityThreshold || chain.GetHeaderByNumber(number-1) == nil)) {
checkpoint := chain.GetHeaderByNumber(number)
if checkpoint != nil {
hash := checkpoint.Hash()
Expand Down
8 changes: 4 additions & 4 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -901,14 +901,14 @@ func (bc *BlockChain) Stop() {
recent := bc.GetBlockByNumber(number - offset)

log.Info("Writing cached state to disk", "block", recent.Number(), "hash", recent.Hash(), "root", recent.Root())
if err := triedb.Commit(recent.Root(), true); err != nil {
if err := triedb.Commit(recent.Root(), true, nil); err != nil {
log.Error("Failed to commit recent state trie", "err", err)
}
}
}
if snapBase != (common.Hash{}) {
log.Info("Writing snapshot state to disk", "root", snapBase)
if err := triedb.Commit(snapBase, true); err != nil {
if err := triedb.Commit(snapBase, true, nil); err != nil {
log.Error("Failed to commit recent state trie", "err", err)
}
}
Expand Down Expand Up @@ -1442,7 +1442,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.

// If we're running an archive node, always flush
if bc.cacheConfig.TrieDirtyDisabled {
if err := triedb.Commit(root, false); err != nil {
if err := triedb.Commit(root, false, nil); err != nil {
return NonStatTy, err
}
} else {
Expand Down Expand Up @@ -1476,7 +1476,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/TriesInMemory)
}
// Flush an entire trie and restart the counters
triedb.Commit(header.Root, true)
triedb.Commit(header.Root, true, nil)
lastWrite = chosen
bc.gcproc = 0
}
Expand Down
9 changes: 8 additions & 1 deletion core/chain_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ type ChainIndexerBackend interface {

// Commit finalizes the section metadata and stores it into the database.
Commit() error

// Prune deletes the chain index older than the given threshold.
Prune(threshold uint64) error
}

// ChainIndexerChain interface is used for connecting the indexer to a blockchain
Expand Down Expand Up @@ -386,7 +389,6 @@ func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (com
c.log.Trace("Processing new chain section", "section", section)

// Reset and partial processing

if err := c.backend.Reset(c.ctx, section, lastHead); err != nil {
c.setValidSections(0)
return common.Hash{}, err
Expand Down Expand Up @@ -459,6 +461,11 @@ func (c *ChainIndexer) AddChildIndexer(indexer *ChainIndexer) {
}
}

// Prune deletes all chain data older than given threshold.
func (c *ChainIndexer) Prune(threshold uint64) error {
return c.backend.Prune(threshold)
}

// loadValidSections reads the number of valid sections from the index database
// and caches is into the local state.
func (c *ChainIndexer) loadValidSections() {
Expand Down
4 changes: 4 additions & 0 deletions core/chain_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,7 @@ func (b *testChainIndexBackend) Commit() error {
}
return nil
}

func (b *testChainIndexBackend) Prune(threshold uint64) error {
return nil
}
2 changes: 1 addition & 1 deletion core/chain_makers.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func GenerateChain(config *params.ChainConfig, parent *types.Block, engine conse
if err != nil {
panic(fmt.Sprintf("state write error: %v", err))
}
if err := statedb.Database().TrieDB().Commit(root, false); err != nil {
if err := statedb.Database().TrieDB().Commit(root, false, nil); err != nil {
panic(fmt.Sprintf("trie write error: %v", err))
}
return block, b.receipts
Expand Down
8 changes: 4 additions & 4 deletions core/dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestDAOForkRangeExtradata(t *testing.T) {
if _, err := bc.InsertChain(blocks); err != nil {
t.Fatalf("failed to import contra-fork chain for expansion: %v", err)
}
if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true); err != nil {
if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true, nil); err != nil {
t.Fatalf("failed to commit contra-fork head for expansion: %v", err)
}
blocks, _ = GenerateChain(&proConf, conBc.CurrentBlock(), ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {})
Expand All @@ -104,7 +104,7 @@ func TestDAOForkRangeExtradata(t *testing.T) {
if _, err := bc.InsertChain(blocks); err != nil {
t.Fatalf("failed to import pro-fork chain for expansion: %v", err)
}
if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true); err != nil {
if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true, nil); err != nil {
t.Fatalf("failed to commit pro-fork head for expansion: %v", err)
}
blocks, _ = GenerateChain(&conConf, proBc.CurrentBlock(), ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {})
Expand All @@ -130,7 +130,7 @@ func TestDAOForkRangeExtradata(t *testing.T) {
if _, err := bc.InsertChain(blocks); err != nil {
t.Fatalf("failed to import contra-fork chain for expansion: %v", err)
}
if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true); err != nil {
if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true, nil); err != nil {
t.Fatalf("failed to commit contra-fork head for expansion: %v", err)
}
blocks, _ = GenerateChain(&proConf, conBc.CurrentBlock(), ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {})
Expand All @@ -150,7 +150,7 @@ func TestDAOForkRangeExtradata(t *testing.T) {
if _, err := bc.InsertChain(blocks); err != nil {
t.Fatalf("failed to import pro-fork chain for expansion: %v", err)
}
if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true); err != nil {
if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true, nil); err != nil {
t.Fatalf("failed to commit pro-fork head for expansion: %v", err)
}
blocks, _ = GenerateChain(&conConf, proBc.CurrentBlock(), ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {})
Expand Down
2 changes: 1 addition & 1 deletion core/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (g *Genesis) ToBlock(db ethdb.Database) *types.Block {
head.Difficulty = params.GenesisDifficulty
}
statedb.Commit(false)
statedb.Database().TrieDB().Commit(root, true)
statedb.Database().TrieDB().Commit(root, true, nil)

return types.NewBlock(head, nil, nil, nil)
}
Expand Down
33 changes: 33 additions & 0 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,39 @@ func ReadAllHashes(db ethdb.Iteratee, number uint64) []common.Hash {
return hashes
}

// ReadAllCanonicalHashes retrieves all canonical number and hash mappings at the
// certain chain range. If the accumulated entries reaches the given threshold,
// abort the iteration and return the semi-finish result.
func ReadAllCanonicalHashes(db ethdb.Iteratee, from uint64, to uint64, limit int) ([]uint64, []common.Hash) {
// Short circuit if the limit is 0.
if limit == 0 {
return nil, nil
}
var (
numbers []uint64
hashes []common.Hash
)
// Construct the key prefix of start point.
start, end := headerHashKey(from), headerHashKey(to)
it := db.NewIterator(nil, start)
defer it.Release()

for it.Next() {
if bytes.Compare(it.Key(), end) >= 0 {
break
}
if key := it.Key(); len(key) == len(headerPrefix)+8+1 && bytes.Equal(key[len(key)-1:], headerHashSuffix) {
numbers = append(numbers, binary.BigEndian.Uint64(key[len(headerPrefix):len(headerPrefix)+8]))
hashes = append(hashes, common.BytesToHash(it.Value()))
// If the accumulated entries reaches the limit threshold, return.
if len(numbers) >= limit {
break
}
}
}
return numbers, hashes
}

// ReadHeaderNumber returns the header number assigned to a hash.
func ReadHeaderNumber(db ethdb.KeyValueReader, hash common.Hash) *uint64 {
data, _ := db.Get(headerNumberKey(hash))
Expand Down
33 changes: 33 additions & 0 deletions core/rawdb/accessors_chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"io/ioutil"
"math/big"
"os"
"reflect"
"testing"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -424,3 +425,35 @@ func TestAncientStorage(t *testing.T) {
t.Fatalf("invalid td returned")
}
}

func TestCanonicalHashIteration(t *testing.T) {
var cases = []struct {
from, to uint64
limit int
expect []uint64
}{
{1, 8, 0, nil},
{1, 8, 1, []uint64{1}},
{1, 8, 10, []uint64{1, 2, 3, 4, 5, 6, 7}},
{1, 9, 10, []uint64{1, 2, 3, 4, 5, 6, 7, 8}},
{2, 9, 10, []uint64{2, 3, 4, 5, 6, 7, 8}},
{9, 10, 10, nil},
}
// Test empty db iteration
db := NewMemoryDatabase()
numbers, _ := ReadAllCanonicalHashes(db, 0, 10, 10)
if len(numbers) != 0 {
t.Fatalf("No entry should be returned to iterate an empty db")
}
// Fill database with testing data.
for i := uint64(1); i <= 8; i++ {
WriteCanonicalHash(db, common.Hash{}, i)
WriteTd(db, common.Hash{}, i, big.NewInt(10)) // Write some interferential data
}
for i, c := range cases {
numbers, _ := ReadAllCanonicalHashes(db, c.from, c.to, c.limit)
if !reflect.DeepEqual(numbers, c.expect) {
t.Fatalf("Case %d failed, want %v, got %v", i, c.expect, numbers)
}
}
}
22 changes: 22 additions & 0 deletions core/rawdb/accessors_indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package rawdb

import (
"bytes"
"math/big"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -151,3 +152,24 @@ func WriteBloomBits(db ethdb.KeyValueWriter, bit uint, section uint64, head comm
log.Crit("Failed to store bloom bits", "err", err)
}
}

// DeleteBloombits removes all compressed bloom bits vector belonging to the
// given section range and bit index.
func DeleteBloombits(db ethdb.Database, bit uint, from uint64, to uint64) {
start, end := bloomBitsKey(bit, from, common.Hash{}), bloomBitsKey(bit, to, common.Hash{})
it := db.NewIterator(nil, start)
defer it.Release()

for it.Next() {
if bytes.Compare(it.Key(), end) >= 0 {
break
}
if len(it.Key()) != len(bloomBitsPrefix)+2+8+32 {
continue
}
db.Delete(it.Key())
}
if it.Error() != nil {
log.Crit("Failed to delete bloom bits", "err", it.Error())
}
}
45 changes: 45 additions & 0 deletions core/rawdb/accessors_indexes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
package rawdb

import (
"bytes"
"math/big"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
)

Expand Down Expand Up @@ -106,3 +108,46 @@ func TestLookupStorage(t *testing.T) {
})
}
}

func TestDeleteBloomBits(t *testing.T) {
// Prepare testing data
db := NewMemoryDatabase()
for i := uint(0); i < 2; i++ {
for s := uint64(0); s < 2; s++ {
WriteBloomBits(db, i, s, params.MainnetGenesisHash, []byte{0x01, 0x02})
WriteBloomBits(db, i, s, params.RinkebyGenesisHash, []byte{0x01, 0x02})
}
}
check := func(bit uint, section uint64, head common.Hash, exist bool) {
bits, _ := ReadBloomBits(db, bit, section, head)
if exist && !bytes.Equal(bits, []byte{0x01, 0x02}) {
t.Fatalf("Bloombits mismatch")
}
if !exist && len(bits) > 0 {
t.Fatalf("Bloombits should be removed")
}
}
// Check the existence of written data.
check(0, 0, params.MainnetGenesisHash, true)
check(0, 0, params.RinkebyGenesisHash, true)

// Check the existence of deleted data.
DeleteBloombits(db, 0, 0, 1)
check(0, 0, params.MainnetGenesisHash, false)
check(0, 0, params.RinkebyGenesisHash, false)
check(0, 1, params.MainnetGenesisHash, true)
check(0, 1, params.RinkebyGenesisHash, true)

// Check the existence of deleted data.
DeleteBloombits(db, 0, 0, 2)
check(0, 0, params.MainnetGenesisHash, false)
check(0, 0, params.RinkebyGenesisHash, false)
check(0, 1, params.MainnetGenesisHash, false)
check(0, 1, params.RinkebyGenesisHash, false)

// Bit1 shouldn't be affect.
check(1, 0, params.MainnetGenesisHash, true)
check(1, 0, params.RinkebyGenesisHash, true)
check(1, 1, params.MainnetGenesisHash, true)
check(1, 1, params.RinkebyGenesisHash, true)
}
8 changes: 4 additions & 4 deletions core/rawdb/freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,12 +287,12 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) {
backoff = true
continue

case *number < params.ImmutabilityThreshold:
log.Debug("Current full block not old enough", "number", *number, "hash", hash, "delay", params.ImmutabilityThreshold)
case *number < params.FullImmutabilityThreshold:
log.Debug("Current full block not old enough", "number", *number, "hash", hash, "delay", params.FullImmutabilityThreshold)
backoff = true
continue

case *number-params.ImmutabilityThreshold <= f.frozen:
case *number-params.FullImmutabilityThreshold <= f.frozen:
log.Debug("Ancient blocks frozen already", "number", *number, "hash", hash, "frozen", f.frozen)
backoff = true
continue
Expand All @@ -304,7 +304,7 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) {
continue
}
// Seems we have data ready to be frozen, process in usable batches
limit := *number - params.ImmutabilityThreshold
limit := *number - params.FullImmutabilityThreshold
if limit-f.frozen > freezerBatchLimit {
limit = f.frozen + freezerBatchLimit
}
Expand Down
Loading

0 comments on commit 6eef141

Please sign in to comment.