Skip to content

Commit

Permalink
core, eth, trie: bloom filter for trie node dedup during fast sync (e…
Browse files Browse the repository at this point in the history
…thereum#19489)

* core, eth, trie: bloom filter for trie node dedup during fast sync

* eth/downloader, trie: address review comments

* core, ethdb, trie: restart fast-sync bloom construction now and again

* eth/downloader: initialize fast sync bloom on startup

* eth: reenable eth/62 until we properly remove it
  • Loading branch information
karalabe authored May 13, 2019
1 parent 40cdcf8 commit 9effd64
Show file tree
Hide file tree
Showing 46 changed files with 2,668 additions and 57 deletions.
14 changes: 10 additions & 4 deletions cmd/geth/chaincmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/trie"
"gopkg.in/urfave/cli.v1"
)

Expand Down Expand Up @@ -375,11 +376,16 @@ func copyDb(ctx *cli.Context) error {
defer stack.Close()

chain, chainDb := utils.MakeChain(ctx, stack)
syncmode := *utils.GlobalTextMarshaler(ctx, utils.SyncModeFlag.Name).(*downloader.SyncMode)
dl := downloader.New(syncmode, 0, chainDb, new(event.TypeMux), chain, nil, nil)
syncMode := *utils.GlobalTextMarshaler(ctx, utils.SyncModeFlag.Name).(*downloader.SyncMode)

var syncBloom *trie.SyncBloom
if syncMode == downloader.FastSync {
syncBloom = trie.NewSyncBloom(uint64(ctx.GlobalInt(utils.CacheFlag.Name)/2), chainDb)
}
dl := downloader.New(0, chainDb, syncBloom, new(event.TypeMux), chain, nil, nil)

// Create a source peer to satisfy downloader requests from
db, err := rawdb.NewLevelDBDatabase(ctx.Args().First(), ctx.GlobalInt(utils.CacheFlag.Name), 256, "")
db, err := rawdb.NewLevelDBDatabase(ctx.Args().First(), ctx.GlobalInt(utils.CacheFlag.Name)/2, 256, "")
if err != nil {
return err
}
Expand All @@ -395,7 +401,7 @@ func copyDb(ctx *cli.Context) error {
start := time.Now()

currentHeader := hc.CurrentHeader()
if err = dl.Synchronise("local", currentHeader.Hash(), hc.GetTd(currentHeader.Hash(), currentHeader.Number.Uint64()), syncmode); err != nil {
if err = dl.Synchronise("local", currentHeader.Hash(), hc.GetTd(currentHeader.Hash(), currentHeader.Number.Uint64()), syncMode); err != nil {
return err
}
for dl.Synchronising() {
Expand Down
7 changes: 7 additions & 0 deletions core/rawdb/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ func (t *table) NewIterator() ethdb.Iterator {
return t.NewIteratorWithPrefix(nil)
}

// NewIteratorWithStart creates a binary-alphabetical iterator over a subset of
// database content starting at a particular initial key (or after, if it does
// not exist).
func (t *table) NewIteratorWithStart(start []byte) ethdb.Iterator {
return t.db.NewIteratorWithStart(start)
}

// NewIteratorWithPrefix creates a binary-alphabetical iterator over a subset
// of database content with a particular key prefix.
func (t *table) NewIteratorWithPrefix(prefix []byte) ethdb.Iterator {
Expand Down
4 changes: 2 additions & 2 deletions core/state/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

// NewStateSync create a new state trie download scheduler.
func NewStateSync(root common.Hash, database ethdb.Reader) *trie.Sync {
func NewStateSync(root common.Hash, database ethdb.Reader, bloom *trie.SyncBloom) *trie.Sync {
var syncer *trie.Sync
callback := func(leaf []byte, parent common.Hash) error {
var obj Account
Expand All @@ -37,6 +37,6 @@ func NewStateSync(root common.Hash, database ethdb.Reader) *trie.Sync {
syncer.AddRawEntry(common.BytesToHash(obj.CodeHash), 64, parent)
return nil
}
syncer = trie.NewSync(root, database, callback)
syncer = trie.NewSync(root, database, callback, bloom)
return syncer
}
13 changes: 7 additions & 6 deletions core/state/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/trie"
)

Expand Down Expand Up @@ -125,7 +126,7 @@ func checkStateConsistency(db ethdb.Database, root common.Hash) error {
// Tests that an empty state is not scheduled for syncing.
func TestEmptyStateSync(t *testing.T) {
empty := common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")
if req := NewStateSync(empty, rawdb.NewMemoryDatabase()).Missing(1); len(req) != 0 {
if req := NewStateSync(empty, rawdb.NewMemoryDatabase(), trie.NewSyncBloom(1, memorydb.New())).Missing(1); len(req) != 0 {
t.Errorf("content requested for empty state: %v", req)
}
}
Expand All @@ -141,7 +142,7 @@ func testIterativeStateSync(t *testing.T, batch int) {

// Create a destination state and sync with the scheduler
dstDb := rawdb.NewMemoryDatabase()
sched := NewStateSync(srcRoot, dstDb)
sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb))

queue := append([]common.Hash{}, sched.Missing(batch)...)
for len(queue) > 0 {
Expand Down Expand Up @@ -173,7 +174,7 @@ func TestIterativeDelayedStateSync(t *testing.T) {

// Create a destination state and sync with the scheduler
dstDb := rawdb.NewMemoryDatabase()
sched := NewStateSync(srcRoot, dstDb)
sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb))

queue := append([]common.Hash{}, sched.Missing(0)...)
for len(queue) > 0 {
Expand Down Expand Up @@ -210,7 +211,7 @@ func testIterativeRandomStateSync(t *testing.T, batch int) {

// Create a destination state and sync with the scheduler
dstDb := rawdb.NewMemoryDatabase()
sched := NewStateSync(srcRoot, dstDb)
sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb))

queue := make(map[common.Hash]struct{})
for _, hash := range sched.Missing(batch) {
Expand Down Expand Up @@ -250,7 +251,7 @@ func TestIterativeRandomDelayedStateSync(t *testing.T) {

// Create a destination state and sync with the scheduler
dstDb := rawdb.NewMemoryDatabase()
sched := NewStateSync(srcRoot, dstDb)
sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb))

queue := make(map[common.Hash]struct{})
for _, hash := range sched.Missing(0) {
Expand Down Expand Up @@ -297,7 +298,7 @@ func TestIncompleteStateSync(t *testing.T) {

// Create a destination state and sync with the scheduler
dstDb := rawdb.NewMemoryDatabase()
sched := NewStateSync(srcRoot, dstDb)
sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb))

added := []common.Hash{}
queue := append([]common.Hash{}, sched.Missing(1)...)
Expand Down
5 changes: 3 additions & 2 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,11 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
}
eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain)

if eth.protocolManager, err = NewProtocolManager(chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb, config.Whitelist); err != nil {
// Permit the downloader to use the trie cache allowance during fast sync
cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit
if eth.protocolManager, err = NewProtocolManager(chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb, cacheLimit, config.Whitelist); err != nil {
return nil, err
}

eth.miner = miner.New(eth, &config.Miner, chainConfig, eth.EventMux(), eth.engine, eth.isLocalBlock)
eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData))

Expand Down
36 changes: 29 additions & 7 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
)

var (
Expand Down Expand Up @@ -104,7 +105,9 @@ type Downloader struct {
genesis uint64 // Genesis block number to limit sync to (e.g. light client CHT)
queue *queue // Scheduler for selecting the hashes to download
peers *peerSet // Set of active peers from which download can proceed
stateDB ethdb.Database

stateDB ethdb.Database // Database to state sync into (and deduplicate via)
stateBloom *trie.SyncBloom // Bloom filter for fast trie node existence checks

rttEstimate uint64 // Round trip time to target for download requests
rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
Expand Down Expand Up @@ -207,13 +210,13 @@ type BlockChain interface {
}

// New creates a new downloader to fetch hashes and blocks from remote peers.
func New(mode SyncMode, checkpoint uint64, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
if lightchain == nil {
lightchain = chain
}
dl := &Downloader{
mode: mode,
stateDB: stateDb,
stateBloom: stateBloom,
mux: mux,
checkpoint: checkpoint,
queue: newQueue(),
Expand Down Expand Up @@ -255,13 +258,15 @@ func (d *Downloader) Progress() ethereum.SyncProgress {
defer d.syncStatsLock.RUnlock()

current := uint64(0)
switch d.mode {
case FullSync:
switch {
case d.blockchain != nil && d.mode == FullSync:
current = d.blockchain.CurrentBlock().NumberU64()
case FastSync:
case d.blockchain != nil && d.mode == FastSync:
current = d.blockchain.CurrentFastBlock().NumberU64()
case LightSync:
case d.lightchain != nil:
current = d.lightchain.CurrentHeader().Number.Uint64()
default:
log.Error("Unknown downloader chain/mode combo", "light", d.lightchain != nil, "full", d.blockchain != nil, "mode", d.mode)
}
return ethereum.SyncProgress{
StartingBlock: d.syncStatsChainOrigin,
Expand Down Expand Up @@ -363,6 +368,12 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
log.Info("Block synchronisation started")
}
// If we are already full syncing, but have a fast-sync bloom filter laying
// around, make sure it does't use memory any more. This is a special case
// when the user attempts to fast sync a new empty network.
if mode == FullSync && d.stateBloom != nil {
d.stateBloom.Close()
}
// Reset the queue, peer set and wake channels to clean any internal leftover state
d.queue.Reset()
d.peers.Reset()
Expand Down Expand Up @@ -1662,13 +1673,24 @@ func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *state
func (d *Downloader) commitPivotBlock(result *fetchResult) error {
block := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
log.Debug("Committing fast sync pivot as new head", "number", block.Number(), "hash", block.Hash())

// Commit the pivot block as the new head, will require full sync from here on
if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []types.Receipts{result.Receipts}); err != nil {
return err
}
if err := d.blockchain.FastSyncCommitHead(block.Hash()); err != nil {
return err
}
atomic.StoreInt32(&d.committed, 1)

// If we had a bloom filter for the state sync, deallocate it now. Note, we only
// deallocate internally, but keep the empty wrapper. This ensures that if we do
// a rollback after committing the pivot and restarting fast sync, we don't end
// up using a nil bloom. Empty bloom is fine, it just returns that it does not
// have the info we need, so reach down to the database instead.
if d.stateBloom != nil {
d.stateBloom.Close()
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func newTester() *downloadTester {
tester.stateDb = rawdb.NewMemoryDatabase()
tester.stateDb.Put(testGenesis.Root().Bytes(), []byte{0x00})

tester.downloader = New(FullSync, 0, tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer)
tester.downloader = New(0, tester.stateDb, trie.NewSyncBloom(1, tester.stateDb), new(event.TypeMux), tester, nil, tester.dropPeer)
return tester
}

Expand Down
3 changes: 2 additions & 1 deletion eth/downloader/statesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type stateSyncStats struct {

// syncState starts downloading state with the given root hash.
func (d *Downloader) syncState(root common.Hash) *stateSync {
// Create the state sync
s := newStateSync(d, root)
select {
case d.stateSyncStart <- s:
Expand Down Expand Up @@ -239,7 +240,7 @@ type stateTask struct {
func newStateSync(d *Downloader, root common.Hash) *stateSync {
return &stateSync{
d: d,
sched: state.NewStateSync(root, d.stateDB),
sched: state.NewStateSync(root, d.stateDB, d.stateBloom),
keccak: sha3.NewLegacyKeccak256(),
tasks: make(map[common.Hash]*stateTask),
deliver: make(chan *stateReq),
Expand Down
25 changes: 15 additions & 10 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)

const (
Expand Down Expand Up @@ -105,7 +106,7 @@ type ProtocolManager struct {

// NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the Ethereum network.
func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, whitelist map[uint64]common.Hash) (*ProtocolManager, error) {
func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, cacheLimit int, whitelist map[uint64]common.Hash) (*ProtocolManager, error) {
// Create the protocol manager with the base fields
manager := &ProtocolManager{
networkID: networkID,
Expand All @@ -120,12 +121,8 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
}
// Figure out whether to allow fast sync or not
if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {
log.Warn("Blockchain not empty, fast sync disabled")
mode = downloader.FullSync
}
if mode == downloader.FastSync {
// If fast sync was requested and our database is empty, grant it
if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() == 0 {
manager.fastSync = uint32(1)
}
// If we have trusted checkpoints, enforce them on the chain
Expand All @@ -137,7 +134,8 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
for i, version := range ProtocolVersions {
// Skip protocol version if incompatible with the mode of operation
if mode == downloader.FastSync && version < eth63 {
// TODO(karalabe): hard-drop eth/62 from the code base
if atomic.LoadUint32(&manager.fastSync) == 1 && version < eth63 {
continue
}
// Compatible; initialise the sub-protocol
Expand Down Expand Up @@ -171,9 +169,16 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
if len(manager.SubProtocols) == 0 {
return nil, errIncompatibleConfig
}
// Construct the different synchronisation mechanisms
manager.downloader = downloader.New(mode, manager.checkpointNumber, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
// Construct the downloader (long sync) and its backing state bloom if fast
// sync is requested. The downloader is responsible for deallocating the state
// bloom when it's done.
var stateBloom *trie.SyncBloom
if atomic.LoadUint32(&manager.fastSync) == 1 {
stateBloom = trie.NewSyncBloom(uint64(cacheLimit), chaindb)
}
manager.downloader = downloader.New(manager.checkpointNumber, chaindb, stateBloom, manager.eventMux, blockchain, nil, manager.removePeer)

// Construct the fetcher (short sync)
validator := func(header *types.Header) error {
return engine.VerifyHeader(blockchain, header, true)
}
Expand Down
4 changes: 2 additions & 2 deletions eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo
if err != nil {
t.Fatalf("failed to create new blockchain: %v", err)
}
pm, err := NewProtocolManager(config, syncmode, DefaultConfig.NetworkId, new(event.TypeMux), new(testTxPool), ethash.NewFaker(), blockchain, db, nil)
pm, err := NewProtocolManager(config, syncmode, DefaultConfig.NetworkId, new(event.TypeMux), new(testTxPool), ethash.NewFaker(), blockchain, db, 1, nil)
if err != nil {
t.Fatalf("failed to start test protocol manager: %v", err)
}
Expand Down Expand Up @@ -615,7 +615,7 @@ func testBroadcastBlock(t *testing.T, totalPeers, broadcastExpected int) {
if err != nil {
t.Fatalf("failed to create new blockchain: %v", err)
}
pm, err := NewProtocolManager(config, downloader.FullSync, DefaultConfig.NetworkId, evmux, new(testTxPool), pow, blockchain, db, nil)
pm, err := NewProtocolManager(config, downloader.FullSync, DefaultConfig.NetworkId, evmux, new(testTxPool), pow, blockchain, db, 1, nil)
if err != nil {
t.Fatalf("failed to start test protocol manager: %v", err)
}
Expand Down
3 changes: 1 addition & 2 deletions eth/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func
if _, err := blockchain.InsertChain(chain); err != nil {
panic(err)
}

pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, nil)
pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, 1, nil)
if err != nil {
return nil, nil, err
}
Expand Down
5 changes: 5 additions & 0 deletions ethdb/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ type Iteratee interface {
// contained within the key-value database.
NewIterator() Iterator

// NewIteratorWithStart creates a binary-alphabetical iterator over a subset of
// database content starting at a particular initial key (or after, if it does
// not exist).
NewIteratorWithStart(start []byte) Iterator

// NewIteratorWithPrefix creates a binary-alphabetical iterator over a subset
// of database content with a particular key prefix.
NewIteratorWithPrefix(prefix []byte) Iterator
Expand Down
9 changes: 8 additions & 1 deletion ethdb/leveldb/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,14 @@ func (db *Database) NewBatch() ethdb.Batch {
// NewIterator creates a binary-alphabetical iterator over the entire keyspace
// contained within the leveldb database.
func (db *Database) NewIterator() ethdb.Iterator {
return db.NewIteratorWithPrefix(nil)
return db.db.NewIterator(new(util.Range), nil)
}

// NewIteratorWithStart creates a binary-alphabetical iterator over a subset of
// database content starting at a particular initial key (or after, if it does
// not exist).
func (db *Database) NewIteratorWithStart(start []byte) ethdb.Iterator {
return db.db.NewIterator(&util.Range{Start: start}, nil)
}

// NewIteratorWithPrefix creates a binary-alphabetical iterator over a subset
Expand Down
Loading

0 comments on commit 9effd64

Please sign in to comment.