Skip to content

Commit

Permalink
Add bitMergeEntry to entries inserted by merge operator (dgraph-io#794)
Browse files Browse the repository at this point in the history
When too many entries are added to the merge operator, the compactions running
in background might accidentally delete entries before they're merged since
every entry added by the merge operator has the same key but different version.
If a user has set NumVersionsToKeep to a low value then the compactions would
delete the older values before they're merged.

With this commit, we add a new bit bitMergeEntry to all the entries inserted by
the merge operator except the entry which stores the result of merge operation.
The compactions would ignore all the entries which have bitMergeEntry set but
these entries will eventually be deleted because we set the bitDiscardEarlier
versions flag for the entry which holds the result of the merge operation.
  • Loading branch information
Ibrahim Jarif authored May 10, 2019
1 parent 3374ec2 commit 82e4521
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 6 deletions.
3 changes: 2 additions & 1 deletion levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,8 @@ func (s *levelsController) compactBuildTables(

vs := it.Value()
version := y.ParseTs(it.Key())
if version <= discardTs {
// Do not discard entries inserted by merge operator. These entries will be discarded once they're merged
if version <= discardTs && vs.Meta&bitMergeEntry == 0 {
// Keep track of the number of versions encountered for this key. Only consider the
// versions which are below the minReadTs, otherwise, we might end up discarding the
// only valid version for a running transaction.
Expand Down
6 changes: 3 additions & 3 deletions merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ func (op *MergeOperator) compact() error {
if err != nil {
return err
}

// Write value back to db
// Write value back to the DB. It is important that we do not set the bitMergeEntry bit
// here. When compaction happens, all the older merged entries will be removed.
return txn.SetWithDiscard(op.key, val, 0)
})

Expand Down Expand Up @@ -144,7 +144,7 @@ func (op *MergeOperator) runCompactions(dur time.Duration) {
// routine into the values that were recorded by previous invocations to Add().
func (op *MergeOperator) Add(val []byte) error {
return op.db.Update(func(txn *Txn) error {
return txn.Set(op.key, val)
return txn.setMergeEntry(op.key, val)
})
}

Expand Down
43 changes: 43 additions & 0 deletions merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package badger

import (
"encoding/binary"
"io/ioutil"
"os"
"testing"
"time"

Expand Down Expand Up @@ -111,6 +113,47 @@ func TestGetMergeOperator(t *testing.T) {
require.Equal(t, uint64(6), bytesToUint64(res))
})
})
t.Run("Old keys should be removed after compaction", func(t *testing.T) {
dir, err := ioutil.TempDir(".", "badger-test")
require.NoError(t, err)
defer os.RemoveAll(dir)

opts := getTestOptions(dir)
db, err := Open(opts)
require.NoError(t, err)
mergeKey := []byte("foo")
m := db.GetMergeOperator(mergeKey, add, 2*time.Millisecond)

count := 5000 // This will cause compaction from L0->L1
for i := 0; i < count; i++ {
require.NoError(t, m.Add(uint64ToBytes(1)))
}
value, err := m.Get()
require.Nil(t, err)
require.Equal(t, uint64(count), bytesToUint64(value))
m.Stop()

// Force compaction by closing DB. The compaction should discard all the old merged values
require.Nil(t, db.Close())
db, err = Open(opts)
require.NoError(t, err)
defer db.Close()

keyCount := 0
txn := db.NewTransaction(false)
defer txn.Discard()
iopt := DefaultIteratorOptions
iopt.AllVersions = true
it := txn.NewKeyIterator(mergeKey, iopt)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
keyCount++
}
// We should have only one key in badger. All the other keys should've been removed by
// compaction
require.Equal(t, 1, keyCount)
})

}

func uint64ToBytes(i uint64) []byte {
Expand Down
8 changes: 7 additions & 1 deletion txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (txn *Txn) SetWithDiscard(key, val []byte, meta byte) error {

// SetWithTTL adds a key-value pair to the database, along with a time-to-live
// (TTL) setting. A key stored with a TTL would automatically expire after the
// time has elapsed , and be eligible for garbage collection.
// time has elapsed, and be eligible for garbage collection.
//
// The current transaction keeps a reference to the key and val byte slice
// arguments. Users must not modify key and val until the end of the
Expand All @@ -370,6 +370,12 @@ func (txn *Txn) SetWithTTL(key, val []byte, dur time.Duration) error {
return txn.SetEntry(e)
}

// setMergeEntry is similar to SetEntry but it sets the bitMergeEntry flag
func (txn *Txn) setMergeEntry(key, val []byte) error {
e := &Entry{Key: key, Value: val, meta: bitMergeEntry}
return txn.SetEntry(e)
}

func exceedsSize(prefix string, max int64, key []byte) error {
return errors.Errorf("%s with size %d exceeded %d limit. %s:\n%s",
prefix, len(key), max, prefix, hex.Dump(key[:1<<10]))
Expand Down
3 changes: 2 additions & 1 deletion value.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ const (
bitDelete byte = 1 << 0 // Set if the key has been deleted.
bitValuePointer byte = 1 << 1 // Set if the value is NOT stored directly next to key.
bitDiscardEarlierVersions byte = 1 << 2 // Set if earlier versions can be discarded.

// Set if item shouldn't be discarded via compactions (used by merge operator)
bitMergeEntry byte = 1 << 3
// The MSB 2 bits are for transactions.
bitTxn byte = 1 << 6 // Set if the entry is part of a txn.
bitFinTxn byte = 1 << 7 // Set if the entry is to indicate end of txn in value log.
Expand Down

0 comments on commit 82e4521

Please sign in to comment.