Skip to content

Commit

Permalink
swarm/storage: fix garbage collector index skew (ethereum#18080)
Browse files Browse the repository at this point in the history
On file access LDBStore's tryAccessIdx() function created a faulty
GC Index Data entry, because not indexing the ikey correctly.
That caused the chunk addresses/hashes to start with '00' and the last
two digits were dropped. => Incorrect chunk address.

Besides the fix, the commit also contains a schema change which will
run the CleanGCIndex() function to clean the GC index from erroneous
entries.

Note: CleanGCIndex() rebuilds the index from scratch which can take
a really-really long time with a huge DB (possibly an hour).
  • Loading branch information
frncmx authored and nonsense committed Nov 13, 2018
1 parent 4fecc7a commit c41e1bd
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 54 deletions.
122 changes: 84 additions & 38 deletions swarm/storage/ldbstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ var (

var (
keyIndex = byte(0)
keyOldData = byte(1)
keyAccessCnt = []byte{2}
keyEntryCnt = []byte{3}
keyDataIdx = []byte{4}
Expand Down Expand Up @@ -285,6 +284,10 @@ func getGCIdxValue(index *dpaDBIndex, po uint8, addr Address) []byte {
return val
}

func parseGCIdxKey(key []byte) (byte, []byte) {
return key[0], key[1:]
}

func parseGCIdxEntry(accessCnt []byte, val []byte) (index *dpaDBIndex, po uint8, addr Address) {
index = &dpaDBIndex{
Idx: binary.BigEndian.Uint64(val[1:]),
Expand Down Expand Up @@ -504,7 +507,7 @@ func (s *LDBStore) Import(in io.Reader) (int64, error) {
}
}

//Cleanup iterates over the database and deletes chunks if they pass the `f` condition
// Cleanup iterates over the database and deletes chunks if they pass the `f` condition
func (s *LDBStore) Cleanup(f func(*chunk) bool) {
var errorsFound, removed, total int

Expand Down Expand Up @@ -569,47 +572,90 @@ func (s *LDBStore) Cleanup(f func(*chunk) bool) {
log.Warn(fmt.Sprintf("Found %v errors out of %v entries. Removed %v chunks.", errorsFound, total, removed))
}

func (s *LDBStore) ReIndex() {
//Iterates over the database and checks that there are no faulty chunks
// CleanGCIndex rebuilds the garbage collector index from scratch, while
// removing inconsistent elements, e.g., indices with missing data chunks.
// WARN: it's a pretty heavy, long running function.
func (s *LDBStore) CleanGCIndex() error {
s.lock.Lock()
defer s.lock.Unlock()

batch := leveldb.Batch{}

var okEntryCount uint64
var totalEntryCount uint64

// throw out all gc indices, we will rebuild from cleaned index
it := s.db.NewIterator()
startPosition := []byte{keyOldData}
it.Seek(startPosition)
var key []byte
var errorsFound, total int
it.Seek([]byte{keyGCIdx})
var gcDeletes int
for it.Valid() {
rowType, _ := parseGCIdxKey(it.Key())
if rowType != keyGCIdx {
break
}
batch.Delete(it.Key())
gcDeletes++
it.Next()
}
log.Debug("gc", "deletes", gcDeletes)
if err := s.db.Write(&batch); err != nil {
return err
}

it.Seek([]byte{keyIndex})
var idx dpaDBIndex
var poPtrs [256]uint64
for it.Valid() {
key = it.Key()
if (key == nil) || (key[0] != keyOldData) {
rowType, chunkHash := parseGCIdxKey(it.Key())
if rowType != keyIndex {
break
}
data := it.Value()
hasher := s.hashfunc()
hasher.Write(data)
hash := hasher.Sum(nil)

newKey := make([]byte, 10)
oldCntKey := make([]byte, 2)
newCntKey := make([]byte, 2)
oldCntKey[0] = keyDistanceCnt
newCntKey[0] = keyDistanceCnt
key[0] = keyData
key[1] = s.po(Address(key[1:]))
oldCntKey[1] = key[1]
newCntKey[1] = s.po(Address(newKey[1:]))
copy(newKey[2:], key[1:])
newValue := append(hash, data...)

batch := new(leveldb.Batch)
batch.Delete(key)
s.bucketCnt[oldCntKey[1]]--
batch.Put(oldCntKey, U64ToBytes(s.bucketCnt[oldCntKey[1]]))
batch.Put(newKey, newValue)
s.bucketCnt[newCntKey[1]]++
batch.Put(newCntKey, U64ToBytes(s.bucketCnt[newCntKey[1]]))
s.db.Write(batch)
err := decodeIndex(it.Value(), &idx)
if err != nil {
return fmt.Errorf("corrupt index: %v", err)
}
po := s.po(chunkHash)

// if we don't find the data key, remove the entry
dataKey := getDataKey(idx.Idx, po)
_, err = s.db.Get(dataKey)
if err != nil {
log.Warn("deleting inconsistent index (missing data)", "key", chunkHash)
batch.Delete(it.Key())
} else {
gcIdxKey := getGCIdxKey(&idx)
gcIdxData := getGCIdxValue(&idx, po, chunkHash)
batch.Put(gcIdxKey, gcIdxData)
log.Trace("clean ok", "key", chunkHash, "gcKey", gcIdxKey, "gcData", gcIdxData)
okEntryCount++
if idx.Idx > poPtrs[po] {
poPtrs[po] = idx.Idx
}
}
totalEntryCount++
it.Next()
}

it.Release()
log.Warn(fmt.Sprintf("Found %v errors out of %v entries", errorsFound, total))
log.Debug("gc cleanup entries", "ok", okEntryCount, "total", totalEntryCount, "batchlen", batch.Len())

var entryCount [8]byte
binary.BigEndian.PutUint64(entryCount[:], okEntryCount)
batch.Put(keyEntryCnt, entryCount[:])
var poKey [2]byte
poKey[0] = keyDistanceCnt
for i, poPtr := range poPtrs {
poKey[1] = uint8(i)
if poPtr == 0 {
batch.Delete(poKey[:])
} else {
var idxCount [8]byte
binary.BigEndian.PutUint64(idxCount[:], poPtr)
batch.Put(poKey[:], idxCount[:])
}
}

return s.db.Write(&batch)
}

// Delete is removes a chunk and updates indices.
Expand Down Expand Up @@ -826,7 +872,7 @@ func (s *LDBStore) tryAccessIdx(addr Address, po uint8) (*dpaDBIndex, bool) {
s.accessCnt++
s.batch.Put(ikey, idata)
newGCIdxKey := getGCIdxKey(index)
newGCIdxData := getGCIdxValue(index, po, ikey)
newGCIdxData := getGCIdxValue(index, po, ikey[1:])
s.batch.Delete(oldGCIdxKey)
s.batch.Put(newGCIdxKey, newGCIdxData)
select {
Expand All @@ -844,7 +890,7 @@ func (s *LDBStore) GetSchema() (string, error) {
data, err := s.db.Get(keySchema)
if err != nil {
if err == leveldb.ErrNotFound {
return "", nil
return DbSchemaNone, nil
}
return "", err
}
Expand Down
140 changes: 140 additions & 0 deletions swarm/storage/ldbstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package storage
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io/ioutil"
"os"
Expand Down Expand Up @@ -623,6 +624,145 @@ func TestLDBStoreCollectGarbageAccessUnlikeIndex(t *testing.T) {
log.Info("ldbstore", "total", n, "missing", missing, "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt)
}

func TestCleanIndex(t *testing.T) {
capacity := 5000
n := 3

ldb, cleanup := newLDBStore(t)
ldb.setCapacity(uint64(capacity))
defer cleanup()

chunks, err := mputRandomChunks(ldb, n, 4096)
if err != nil {
t.Fatal(err)
}

// remove the data of the first chunk
po := ldb.po(chunks[0].Address()[:])
dataKey := make([]byte, 10)
dataKey[0] = keyData
dataKey[1] = byte(po)
// dataKey[2:10] = first chunk has storageIdx 0 on [2:10]
if _, err := ldb.db.Get(dataKey); err != nil {
t.Fatal(err)
}
if err := ldb.db.Delete(dataKey); err != nil {
t.Fatal(err)
}

// remove the gc index row for the first chunk
gcFirstCorrectKey := make([]byte, 9)
gcFirstCorrectKey[0] = keyGCIdx
if err := ldb.db.Delete(gcFirstCorrectKey); err != nil {
t.Fatal(err)
}

// warp the gc data of the second chunk
// this data should be correct again after the clean
gcSecondCorrectKey := make([]byte, 9)
gcSecondCorrectKey[0] = keyGCIdx
binary.BigEndian.PutUint64(gcSecondCorrectKey[1:], uint64(1))
gcSecondCorrectVal, err := ldb.db.Get(gcSecondCorrectKey)
if err != nil {
t.Fatal(err)
}
warpedGCVal := make([]byte, len(gcSecondCorrectVal)+1)
copy(warpedGCVal[1:], gcSecondCorrectVal)
if err := ldb.db.Delete(gcSecondCorrectKey); err != nil {
t.Fatal(err)
}
if err := ldb.db.Put(gcSecondCorrectKey, warpedGCVal); err != nil {
t.Fatal(err)
}

if err := ldb.CleanGCIndex(); err != nil {
t.Fatal(err)
}

// the index without corresponding data should have been deleted
idxKey := make([]byte, 33)
idxKey[0] = keyIndex
copy(idxKey[1:], chunks[0].Address())
if _, err := ldb.db.Get(idxKey); err == nil {
t.Fatalf("expected chunk 0 idx to be pruned: %v", idxKey)
}

// the two other indices should be present
copy(idxKey[1:], chunks[1].Address())
if _, err := ldb.db.Get(idxKey); err != nil {
t.Fatalf("expected chunk 1 idx to be present: %v", idxKey)
}

copy(idxKey[1:], chunks[2].Address())
if _, err := ldb.db.Get(idxKey); err != nil {
t.Fatalf("expected chunk 2 idx to be present: %v", idxKey)
}

// first gc index should still be gone
if _, err := ldb.db.Get(gcFirstCorrectKey); err == nil {
t.Fatalf("expected gc 0 idx to be pruned: %v", idxKey)
}

// second gc index should still be fixed
if _, err := ldb.db.Get(gcSecondCorrectKey); err != nil {
t.Fatalf("expected gc 1 idx to be present: %v", idxKey)
}

// third gc index should be unchanged
binary.BigEndian.PutUint64(gcSecondCorrectKey[1:], uint64(2))
if _, err := ldb.db.Get(gcSecondCorrectKey); err != nil {
t.Fatalf("expected gc 2 idx to be present: %v", idxKey)
}

c, err := ldb.db.Get(keyEntryCnt)
if err != nil {
t.Fatalf("expected gc 2 idx to be present: %v", idxKey)
}

// entrycount should now be one less
entryCount := binary.BigEndian.Uint64(c)
if entryCount != 2 {
t.Fatalf("expected entrycnt to be 2, was %d", c)
}

// the chunks might accidentally be in the same bin
// if so that bin counter will now be 2 - the highest added index.
// if not, the total of them will be 3
poBins := []uint8{ldb.po(chunks[1].Address()), ldb.po(chunks[2].Address())}
if poBins[0] == poBins[1] {
poBins = poBins[:1]
}

var binTotal uint64
var currentBin [2]byte
currentBin[0] = keyDistanceCnt
if len(poBins) == 1 {
currentBin[1] = poBins[0]
c, err := ldb.db.Get(currentBin[:])
if err != nil {
t.Fatalf("expected gc 2 idx to be present: %v", idxKey)
}
binCount := binary.BigEndian.Uint64(c)
if binCount != 2 {
t.Fatalf("expected entrycnt to be 2, was %d", binCount)
}
} else {
for _, bin := range poBins {
currentBin[1] = bin
c, err := ldb.db.Get(currentBin[:])
if err != nil {
t.Fatalf("expected gc 2 idx to be present: %v", idxKey)
}
binCount := binary.BigEndian.Uint64(c)
binTotal += binCount

}
if binTotal != 3 {
t.Fatalf("expected sum of bin indices to be 3, was %d", binTotal)
}
}
}

func waitGc(ctx context.Context, ldb *LDBStore) {
<-ldb.gc.runC
ldb.gc.runC <- struct{}{}
Expand Down
47 changes: 32 additions & 15 deletions swarm/storage/localstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,31 +196,48 @@ func (ls *LocalStore) Close() {

// Migrate checks the datastore schema vs the runtime schema, and runs migrations if they don't match
func (ls *LocalStore) Migrate() error {
schema, err := ls.DbStore.GetSchema()
actualDbSchema, err := ls.DbStore.GetSchema()
if err != nil {
log.Error(err.Error())
return err
}

log.Debug("found schema", "schema", schema, "runtime-schema", CurrentDbSchema)
if schema != CurrentDbSchema {
// run migrations
log.Debug("running migrations for", "schema", actualDbSchema, "runtime-schema", CurrentDbSchema)

if schema == "" {
log.Debug("running migrations for", "schema", schema, "runtime-schema", CurrentDbSchema)
if actualDbSchema == CurrentDbSchema {
return nil
}

if actualDbSchema == DbSchemaNone {
ls.migrateFromNoneToPurity()
actualDbSchema = DbSchemaPurity
}

// delete chunks that are not valid, i.e. chunks that do not pass any of the ls.Validators
ls.DbStore.Cleanup(func(c *chunk) bool {
return !ls.isValid(c)
})
if err := ls.DbStore.PutSchema(actualDbSchema); err != nil {
return err
}

err := ls.DbStore.PutSchema(DbSchemaPurity)
if err != nil {
log.Error(err.Error())
return err
}
if actualDbSchema == DbSchemaPurity {
if err := ls.migrateFromPurityToHalloween(); err != nil {
return err
}
actualDbSchema = DbSchemaHalloween
}

if err := ls.DbStore.PutSchema(actualDbSchema); err != nil {
return err
}
return nil
}

func (ls *LocalStore) migrateFromNoneToPurity() {
// delete chunks that are not valid, i.e. chunks that do not pass
// any of the ls.Validators
ls.DbStore.Cleanup(func(c *chunk) bool {
return !ls.isValid(c)
})
}

func (ls *LocalStore) migrateFromPurityToHalloween() error {
return ls.DbStore.CleanGCIndex()
}
Loading

0 comments on commit c41e1bd

Please sign in to comment.