forked from dgraph-io/badger
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmerge.go
173 lines (156 loc) · 4.33 KB
/
merge.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
/*
* Copyright 2017 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package badger
import (
"sync"
"time"
"github.com/dgraph-io/badger/y"
"github.com/pkg/errors"
)
// MergeOperator represents a Badger merge operator.
type MergeOperator struct {
sync.RWMutex
f MergeFunc
db *DB
key []byte
closer *y.Closer
}
// MergeFunc accepts two byte slices, one representing an existing value, and
// another representing a new value that needs to be ‘merged’ into it. MergeFunc
// contains the logic to perform the ‘merge’ and return an updated value.
// MergeFunc could perform operations like integer addition, list appends etc.
// Note that the ordering of the operands is unspecified, so the merge func
// should either be agnostic to ordering or do additional handling if ordering
// is required.
type MergeFunc func(existing, val []byte) []byte
// GetMergeOperator creates a new MergeOperator for a given key and returns a
// pointer to it. It also fires off a goroutine that performs a compaction using
// the merge function that runs periodically, as specified by dur.
func (db *DB) GetMergeOperator(key []byte,
f MergeFunc, dur time.Duration) *MergeOperator {
op := &MergeOperator{
f: f,
db: db,
key: key,
closer: y.NewCloser(1),
}
go op.runCompactions(dur)
return op
}
var errNoMerge = errors.New("No need for merge")
func (op *MergeOperator) iterateAndMerge(txn *Txn) (val []byte, err error) {
opt := DefaultIteratorOptions
opt.AllVersions = true
it := txn.NewIterator(opt)
defer it.Close()
var numVersions int
for it.Rewind(); it.ValidForPrefix(op.key); it.Next() {
item := it.Item()
numVersions++
if numVersions == 1 {
val, err = item.ValueCopy(val)
if err != nil {
return nil, err
}
} else {
if err := item.Value(func(newVal []byte) error {
val = op.f(val, newVal)
return nil
}); err != nil {
return nil, err
}
}
if item.DiscardEarlierVersions() {
break
}
}
if numVersions == 0 {
return nil, ErrKeyNotFound
} else if numVersions == 1 {
return val, errNoMerge
}
return val, nil
}
func (op *MergeOperator) compact() error {
op.Lock()
defer op.Unlock()
err := op.db.Update(func(txn *Txn) error {
var (
val []byte
err error
)
val, err = op.iterateAndMerge(txn)
if err != nil {
return err
}
// Write value back to db
return txn.SetWithDiscard(op.key, val, 0)
})
if err == ErrKeyNotFound || err == errNoMerge {
// pass.
} else if err != nil {
return err
}
return nil
}
func (op *MergeOperator) runCompactions(dur time.Duration) {
ticker := time.NewTicker(dur)
defer op.closer.Done()
var stop bool
for {
select {
case <-op.closer.HasBeenClosed():
stop = true
case <-ticker.C: // wait for tick
}
if err := op.compact(); err != nil {
op.db.opt.Errorf("failure while running merge operation: %s", err)
}
if stop {
ticker.Stop()
break
}
}
}
// Add records a value in Badger which will eventually be merged by a background
// routine into the values that were recorded by previous invocations to Add().
func (op *MergeOperator) Add(val []byte) error {
return op.db.Update(func(txn *Txn) error {
return txn.Set(op.key, val)
})
}
// Get returns the latest value for the merge operator, which is derived by
// applying the merge function to all the values added so far.
//
// If Add has not been called even once, Get will return ErrKeyNotFound.
func (op *MergeOperator) Get() ([]byte, error) {
op.RLock()
defer op.RUnlock()
var existing []byte
err := op.db.View(func(txn *Txn) (err error) {
existing, err = op.iterateAndMerge(txn)
return err
})
if err == errNoMerge {
return existing, nil
}
return existing, err
}
// Stop waits for any pending merge to complete and then stops the background
// goroutine.
func (op *MergeOperator) Stop() {
op.closer.SignalAndWait()
}