Skip to content

Commit

Permalink
Merge function should process items in order (dgraph-io#841)
Browse files Browse the repository at this point in the history
With this commit, merge operator processes the items in the order of 
which they were inserted. Earlier it would be processed by the merge 
function in reverse order.

For instance, if `A` , `B`, `C` were inserted, and merge function was
simple `append` operation, they would be passed to the merge function 
as
```
mergeFunc(C, B) => result CB
mergeFunc(CB, A) => result CBA
```

With this change, they're passed to the merge function as
```
mergeFunc(B, C) => result BC
mergeFunc(A, BC) => result ABC
```
  • Loading branch information
Ibrahim Jarif authored Jun 4, 2019
1 parent 633a8fa commit 18f8a33
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 30 deletions.
48 changes: 35 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -278,11 +278,41 @@ for {
```

### Merge Operations
Badger provides support for unordered merge operations. You can define a func
Badger provides support for ordered merge operations. You can define a func
of type `MergeFunc` which takes in an existing value, and a value to be
_merged_ with it. It returns a new value which is the result of the _merge_
operation. All values are specified in byte arrays. For e.g., here is a merge
function (`add`) which adds a `uint64` value to an existing `uint64` value.
function (`add`) which appends a `[]byte` value to an existing `[]byte` value.

```Go
// Merge function to append one byte slice to another
func add(originalValue, newValue []byte) []byte {
return append(originalValue, newValue...)
}
```

This function can then be passed to the `DB.GetMergeOperator()` method, along
with a key, and a duration value. The duration specifies how often the merge
function is run on values that have been added using the `MergeOperator.Add()`
method.

`MergeOperator.Get()` method can be used to retrieve the cumulative value of the key
associated with the merge operation.

```Go
key := []byte("merge")

m := db.GetMergeOperator(key, add, 200*time.Millisecond)
defer m.Stop()

m.Add([]byte("A"))
m.Add([]byte("B"))
m.Add([]byte("C"))

res, _ := m.Get() // res should have value ABC encoded
```

Example: Merge operator which increments a counter

```Go
func uint64ToBytes(i uint64) []byte {
Expand All @@ -300,26 +330,18 @@ func add(existing, new []byte) []byte {
return uint64ToBytes(bytesToUint64(existing) + bytesToUint64(new))
}
```

This function can then be passed to the `DB.GetMergeOperator()` method, along
with a key, and a duration value. The duration specifies how often the merge
function is run on values that have been added using the `MergeOperator.Add()`
method.

`MergeOperator.Get()` method can be used to retrieve the cumulative value of the key
associated with the merge operation.

It can be used as
```Go
key := []byte("merge")

m := db.GetMergeOperator(key, add, 200*time.Millisecond)
defer m.Stop()

m.Add(uint64ToBytes(1))
m.Add(uint64ToBytes(2))
m.Add(uint64ToBytes(3))

res, err := m.Get() // res should have value 6 encoded
fmt.Println(bytesToUint64(res))
res, _ := m.Get() // res should have value 6 encoded
```

### Setting Time To Live(TTL) and User Metadata on Keys
Expand Down
21 changes: 11 additions & 10 deletions merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,8 @@ type MergeOperator struct {
// another representing a new value that needs to be ‘merged’ into it. MergeFunc
// contains the logic to perform the ‘merge’ and return an updated value.
// MergeFunc could perform operations like integer addition, list appends etc.
// Note that the ordering of the operands is unspecified, so the merge func
// should either be agnostic to ordering or do additional handling if ordering
// is required.
type MergeFunc func(existing, val []byte) []byte
// Note that the ordering of the operands is maintained.
type MergeFunc func(existingVal, newVal []byte) []byte

// GetMergeOperator creates a new MergeOperator for a given key and returns a
// pointer to it. It also fires off a goroutine that performs a compaction using
Expand All @@ -60,7 +58,7 @@ func (db *DB) GetMergeOperator(key []byte,

var errNoMerge = errors.New("No need for merge")

func (op *MergeOperator) iterateAndMerge() (val []byte, latest uint64, err error) {
func (op *MergeOperator) iterateAndMerge() (newVal []byte, latest uint64, err error) {
txn := op.db.NewTransaction(false)
defer txn.Discard()
opt := DefaultIteratorOptions
Expand All @@ -73,14 +71,17 @@ func (op *MergeOperator) iterateAndMerge() (val []byte, latest uint64, err error
item := it.Item()
numVersions++
if numVersions == 1 {
val, err = item.ValueCopy(val)
// This should be the newVal, considering this is the latest version.
newVal, err = item.ValueCopy(newVal)
if err != nil {
return nil, 0, err
}
latest = item.Version()
} else {
if err := item.Value(func(newVal []byte) error {
val = op.f(val, newVal)
if err := item.Value(func(oldVal []byte) error {
// The merge should always be on the newVal considering it has the merge result of
// the latest version. The value read should be the oldVal.
newVal = op.f(oldVal, newVal)
return nil
}); err != nil {
return nil, 0, err
Expand All @@ -93,9 +94,9 @@ func (op *MergeOperator) iterateAndMerge() (val []byte, latest uint64, err error
if numVersions == 0 {
return nil, latest, ErrKeyNotFound
} else if numVersions == 1 {
return val, latest, errNoMerge
return newVal, latest, errNoMerge
}
return val, latest, nil
return newVal, latest, nil
}

func (op *MergeOperator) compact() error {
Expand Down
12 changes: 5 additions & 7 deletions merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,19 @@ func TestGetMergeOperator(t *testing.T) {
t.Run("Add and Get slices", func(t *testing.T) {
// Merge function to merge two byte slices
add := func(originalValue, newValue []byte) []byte {
// We append original value to new value because the values
// are retrieved in reverse order (Last insertion will be the first value)
return append(newValue, originalValue...)
return append(originalValue, newValue...)
}
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
m := db.GetMergeOperator([]byte("fooprefix"), add, 2*time.Millisecond)
defer m.Stop()

require.Nil(t, m.Add([]byte("1")))
require.Nil(t, m.Add([]byte("2")))
require.Nil(t, m.Add([]byte("3")))
require.Nil(t, m.Add([]byte("A")))
require.Nil(t, m.Add([]byte("B")))
require.Nil(t, m.Add([]byte("C")))

value, err := m.Get()
require.Nil(t, err)
require.Equal(t, "123", string(value))
require.Equal(t, "ABC", string(value))
})
})
t.Run("Get Before Compact", func(t *testing.T) {
Expand Down

0 comments on commit 18f8a33

Please sign in to comment.