Skip to content

Commit

Permalink
Add a new SetWithDiscard method. (dgraph-io#475)
Browse files Browse the repository at this point in the history
Add a new SetWithDiscard method, which indicates discarding of earlier versions. This information is used by compaction to discard versions. Value log GC can then discard key-value pairs which are no longer in the LSM tree (due to removal by compactions). Thus, this simple change removes the need to have PurgeBelowTs and other PurgeOlderVersions methods. Note that PurgeOlderVersions method is already made obsolete by NumVersionsToKeep option.

Only the flip side, this would make the prediction of which value log can be removed harder. Though, compaction can tackle that by updating the discard stats for value log. Maybe in another PR.

Also simplifies MergeOperator code.
  • Loading branch information
manishrjain authored May 5, 2018
1 parent 6d2a20b commit 79c98fc
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 471 deletions.
197 changes: 32 additions & 165 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package badger

import (
"bytes"
"encoding/binary"
"expvar"
"log"
Expand Down Expand Up @@ -932,132 +931,6 @@ func (db *DB) updateSize(lc *y.Closer) {
}
}

// PurgeVersionsBelow will delete all versions of a key below the specified version
func (db *DB) PurgeVersionsBelow(key []byte, ts uint64) error {
txn := db.NewTransaction(false)
defer txn.Discard()
return db.purgeVersionsBelow(txn, key, ts)
}

func (db *DB) purgeVersionsBelow(txn *Txn, key []byte, ts uint64) error {
opts := DefaultIteratorOptions
opts.AllVersions = true
opts.PrefetchValues = false
it := txn.NewIterator(opts)
defer it.Close()

var entries []*Entry

for it.Seek(key); it.ValidForPrefix(key); it.Next() {
item := it.Item()
if !bytes.Equal(key, item.Key()) || item.Version() >= ts {
continue
}
if isDeletedOrExpired(item.meta, item.ExpiresAt()) {
continue
}

// Found an older version. Mark for deletion
entries = append(entries,
&Entry{
Key: y.KeyWithTs(key, item.version),
meta: bitDelete,
})
db.vlog.updateGCStats(item)
}
return db.batchSet(entries)
}

// PurgeOlderVersions deletes older versions of all keys.
//
// This function could be called prior to doing garbage collection to clean up
// older versions that are no longer needed. The caller must make sure that
// there are no long-running read transactions running before this function is
// called, otherwise they will not work as expected.
func (db *DB) PurgeOlderVersions() error {
return db.View(func(txn *Txn) error {
opts := DefaultIteratorOptions
opts.AllVersions = true
opts.PrefetchValues = false
it := txn.NewIterator(opts)
defer it.Close()

var entries []*Entry
var lastKey []byte
var count, size int
var wg sync.WaitGroup
errChan := make(chan error, 1)

// func to check for pending error before sending off a batch for writing
batchSetAsyncIfNoErr := func(entries []*Entry) error {
select {
case err := <-errChan:
return err
default:
wg.Add(1)
return txn.db.batchSetAsync(entries, func(err error) {
defer wg.Done()
if err != nil {
select {
case errChan <- err:
default:
}
}
})
}
}

for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
if !bytes.Equal(lastKey, item.Key()) {
lastKey = y.SafeCopy(lastKey, item.Key())
continue
}
if isDeletedOrExpired(item.meta, item.ExpiresAt()) {
continue
}
// Found an older version. Mark for deletion
e := &Entry{
Key: y.KeyWithTs(lastKey, item.version),
meta: bitDelete,
}
db.vlog.updateGCStats(item)
curSize := e.estimateSize(db.opt.ValueThreshold)

// Batch up min(1000, maxBatchCount) entries at a time and write
// Ensure that total batch size doesn't exceed maxBatchSize
if count == 1000 || count+1 >= int(db.opt.maxBatchCount) ||
size+curSize >= int(db.opt.maxBatchSize) {
if err := batchSetAsyncIfNoErr(entries); err != nil {
return err
}
count = 0
size = 0
entries = []*Entry{}
}
size += curSize
count++
entries = append(entries, e)
}

// Write last batch pending deletes
if count > 0 {
if err := batchSetAsyncIfNoErr(entries); err != nil {
return err
}
}

wg.Wait()

select {
case err := <-errChan:
return err
default:
return nil
}
})
}

// RunValueLogGC triggers a value log garbage collection.
//
// It picks value log files to perform GC based on statistics that are collected
Expand Down Expand Up @@ -1220,11 +1093,10 @@ func (db *DB) Tables() []TableInfo {
// MergeOperator represents a Badger merge operator.
type MergeOperator struct {
sync.RWMutex
f MergeFunc
db *DB
key []byte
skipAtOrBelow uint64
closer *y.Closer
f MergeFunc
db *DB
key []byte
closer *y.Closer
}

// MergeFunc accepts two byte slices, one representing an existing value, and
Expand Down Expand Up @@ -1252,66 +1124,65 @@ func (db *DB) GetMergeOperator(key []byte,
return op
}

func (op *MergeOperator) iterateAndMerge(txn *Txn) (maxVersion uint64, val []byte, err error) {
var errNoMerge = errors.New("No need for merge")

func (op *MergeOperator) iterateAndMerge(txn *Txn) (val []byte, err error) {
opt := DefaultIteratorOptions
opt.AllVersions = true
it := txn.NewIterator(opt)
var first bool
var numVersions int
for it.Rewind(); it.ValidForPrefix(op.key); it.Next() {
item := it.Item()
if item.Version() <= op.skipAtOrBelow {
continue
}
if item.Version() > maxVersion {
maxVersion = item.Version()
}
if !first {
first = true
numVersions++
if numVersions == 1 {
val, err = item.ValueCopy(val)
if err != nil {
return 0, nil, err
return nil, err
}
} else {
newVal, err := item.Value()
if err != nil {
return 0, nil, err
return nil, err
}
val = op.f(val, newVal)
}
if item.DiscardEarlierVersions() {
break
}
}
if !first {
return 0, nil, ErrKeyNotFound
if numVersions == 0 {
return nil, ErrKeyNotFound
} else if numVersions == 1 {
return val, errNoMerge
}
return maxVersion, val, nil
return val, nil
}

func (op *MergeOperator) compact() error {
op.Lock()
defer op.Unlock()
var maxVersion uint64
err := op.db.Update(func(txn *Txn) error {
var (
val []byte
err error
)
maxVersion, val, err = op.iterateAndMerge(txn)
val, err = op.iterateAndMerge(txn)
if err != nil {
return err
}

// Write value back to db
if maxVersion > op.skipAtOrBelow {
if err := txn.Set(op.key, val); err != nil {
return err
}
if err := txn.SetWithDiscard(op.key, val, 0); err != nil {
return err
}
return nil
})
if err != nil && err != ErrKeyNotFound { // Ignore ErrKeyNotFound errors during compaction

if err == ErrKeyNotFound || err == errNoMerge {
// pass.
} else if err != nil {
return err
}
// Update version
op.skipAtOrBelow = maxVersion
return nil
}

Expand All @@ -1325,16 +1196,9 @@ func (op *MergeOperator) runCompactions(dur time.Duration) {
stop = true
case <-ticker.C: // wait for tick
}
oldSkipVersion := op.skipAtOrBelow
if err := op.compact(); err != nil {
log.Printf("Error while running merge operation: %s", err)
}
// Purge older versions if version has updated
if op.skipAtOrBelow > oldSkipVersion {
if err := op.db.PurgeVersionsBelow(op.key, op.skipAtOrBelow+1); err != nil {
log.Printf("Error purging merged keys: %s", err)
}
}
if stop {
ticker.Stop()
break
Expand All @@ -1353,15 +1217,18 @@ func (op *MergeOperator) Add(val []byte) error {
// Get returns the latest value for the merge operator, which is derived by
// applying the merge function to all the values added so far.
//
// If Add has not been called even once, Get will return ErrKeyNotFound
// If Add has not been called even once, Get will return ErrKeyNotFound.
func (op *MergeOperator) Get() ([]byte, error) {
op.RLock()
defer op.RUnlock()
var existing []byte
err := op.db.View(func(txn *Txn) (err error) {
_, existing, err = op.iterateAndMerge(txn)
existing, err = op.iterateAndMerge(txn)
return err
})
if err == errNoMerge {
return existing, nil
}
return existing, err
}

Expand Down
Loading

0 comments on commit 79c98fc

Please sign in to comment.