forked from dgraph-io/badger
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmanaged_db.go
193 lines (173 loc) · 5.51 KB
/
managed_db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
/*
* Copyright 2017 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package badger
import (
"math"
"sync"
"sync/atomic"
"time"
"github.com/dgraph-io/badger/y"
"github.com/pkg/errors"
)
// ManagedDB allows end users to manage the transactions themselves. Transaction
// start and commit timestamps are set by end-user.
//
// This is only useful for databases built on top of Badger (like Dgraph), and
// can be ignored by most users.
//
// WARNING: This is an experimental feature and may be changed significantly in
// a future release. So please proceed with caution.
type ManagedDB struct {
*DB
}
// OpenManaged returns a new ManagedDB, which allows more control over setting
// transaction timestamps.
//
// This is only useful for databases built on top of Badger (like Dgraph), and
// can be ignored by most users.
func OpenManaged(opts Options) (*ManagedDB, error) {
opts.managedTxns = true
db, err := Open(opts)
if err != nil {
return nil, err
}
return &ManagedDB{db}, nil
}
// NewTransaction overrides DB.NewTransaction() and panics when invoked. Use
// NewTransactionAt() instead.
func (db *ManagedDB) NewTransaction(update bool) {
panic("Cannot use NewTransaction() for ManagedDB. Use NewTransactionAt() instead.")
}
// NewTransactionAt follows the same logic as DB.NewTransaction(), but uses the
// provided read timestamp.
//
// This is only useful for databases built on top of Badger (like Dgraph), and
// can be ignored by most users.
func (db *ManagedDB) NewTransactionAt(readTs uint64, update bool) *Txn {
txn := db.DB.NewTransaction(update)
txn.readTs = readTs
return txn
}
// CommitAt commits the transaction, following the same logic as Commit(), but
// at the given commit timestamp. This will panic if not used with ManagedDB.
//
// This is only useful for databases built on top of Badger (like Dgraph), and
// can be ignored by most users.
func (txn *Txn) CommitAt(commitTs uint64, callback func(error)) error {
if !txn.db.opt.managedTxns {
return ErrManagedTxn
}
txn.commitTs = commitTs
return txn.Commit(callback)
}
// GetSequence is not supported on ManagedDB. Calling this would result
// in a panic.
func (db *ManagedDB) GetSequence(_ []byte, _ uint64) (*Sequence, error) {
panic("Cannot use GetSequence for ManagedDB.")
}
// SetDiscardTs sets a timestamp at or below which, any invalid or deleted
// versions can be discarded from the LSM tree, and thence from the value log to
// reclaim disk space.
func (db *ManagedDB) SetDiscardTs(ts uint64) {
db.orc.setDiscardTs(ts)
}
var errDone = errors.New("Done deleting keys")
// DropAll would drop all the data stored in Badger. It does this in the following way.
// - Stop accepting new writes.
// - Pause the compactions.
// - Pick all tables from all levels, create a changeset to delete all these
// tables and apply it to manifest. DO not pick up the latest table from level
// 0, to preserve the (persistent) badgerHead key.
// - Iterate over the KVs in Level 0, and run deletes on them via transactions.
// - The deletions are done at the same timestamp as the latest version of the
// key. Thus, we could write the keys back at the same timestamp as before.
func (db *ManagedDB) DropAll() error {
// Stop accepting new writes.
atomic.StoreInt32(&db.blockWrites, 1)
// Wait for writeCh to reach size of zero. This is not ideal, but a very
// simple way to allow writeCh to flush out, before we proceed.
tick := time.NewTicker(100 * time.Millisecond)
for range tick.C {
if len(db.writeCh) == 0 {
break
}
}
tick.Stop()
// Stop the compactions.
if db.closers.compactors != nil {
db.closers.compactors.SignalAndWait()
}
_, err := db.lc.deleteLSMTree()
// Allow writes so that we can run transactions. Ideally, the user must ensure that they're not
// doing more writes concurrently while this operation is happening.
atomic.StoreInt32(&db.blockWrites, 0)
// Need compactions to happen so deletes below can be flushed out.
if db.closers.compactors != nil {
db.closers.compactors = y.NewCloser(1)
db.lc.startCompact(db.closers.compactors)
}
if err != nil {
return err
}
type KV struct {
key []byte
version uint64
}
var kvs []KV
getKeys := func() error {
txn := db.NewTransactionAt(math.MaxUint64, false)
defer txn.Discard()
opts := DefaultIteratorOptions
opts.PrefetchValues = false
itr := txn.NewIterator(opts)
defer itr.Close()
for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
kvs = append(kvs, KV{item.KeyCopy(nil), item.Version()})
}
return nil
}
if err := getKeys(); err != nil {
return err
}
var wg sync.WaitGroup
errCh := make(chan error, 1)
for _, kv := range kvs {
wg.Add(1)
txn := db.NewTransactionAt(math.MaxUint64, true)
if err := txn.Delete(kv.key); err != nil {
return err
}
if err := txn.CommitAt(kv.version, func(rerr error) {
if rerr != nil {
select {
case errCh <- rerr:
default:
}
}
wg.Done()
}); err != nil {
return err
}
}
wg.Wait()
select {
case err := <-errCh:
return err
default:
return nil
}
}