forked from Disposrestfully/DB
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcompaction_merging_iterator.cc
370 lines (325 loc) · 11.8 KB
/
compaction_merging_iterator.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "table/compaction_merging_iterator.h"
namespace ROCKSDB_NAMESPACE {
class CompactionMergingIterator : public InternalIterator {
public:
CompactionMergingIterator(
const InternalKeyComparator* comparator, InternalIterator** children,
int n, bool is_arena_mode,
std::vector<std::pair<std::unique_ptr<TruncatedRangeDelIterator>,
std::unique_ptr<TruncatedRangeDelIterator>**>>&
range_tombstones)
: is_arena_mode_(is_arena_mode),
comparator_(comparator),
current_(nullptr),
minHeap_(CompactionHeapItemComparator(comparator_)),
pinned_iters_mgr_(nullptr) {
children_.resize(n);
for (int i = 0; i < n; i++) {
children_[i].level = i;
children_[i].iter.Set(children[i]);
assert(children_[i].type == HeapItem::ITERATOR);
}
assert(range_tombstones.size() == static_cast<size_t>(n));
for (auto& p : range_tombstones) {
range_tombstone_iters_.push_back(std::move(p.first));
}
pinned_heap_item_.resize(n);
for (int i = 0; i < n; ++i) {
if (range_tombstones[i].second) {
// for LevelIterator
*range_tombstones[i].second = &range_tombstone_iters_[i];
}
pinned_heap_item_[i].level = i;
pinned_heap_item_[i].type = HeapItem::DELETE_RANGE_START;
}
}
void considerStatus(const Status& s) {
if (!s.ok() && status_.ok()) {
status_ = s;
}
}
~CompactionMergingIterator() override {
range_tombstone_iters_.clear();
for (auto& child : children_) {
child.iter.DeleteIter(is_arena_mode_);
}
status_.PermitUncheckedError();
}
bool Valid() const override { return current_ != nullptr && status_.ok(); }
Status status() const override { return status_; }
void SeekToFirst() override;
void Seek(const Slice& target) override;
void Next() override;
Slice key() const override {
assert(Valid());
return current_->key();
}
Slice value() const override {
assert(Valid());
if (LIKELY(current_->type == HeapItem::ITERATOR)) {
return current_->iter.value();
} else {
return dummy_tombstone_val;
}
}
// Here we simply relay MayBeOutOfLowerBound/MayBeOutOfUpperBound result
// from current child iterator. Potentially as long as one of child iterator
// report out of bound is not possible, we know current key is within bound.
bool MayBeOutOfLowerBound() override {
assert(Valid());
return current_->type == HeapItem::DELETE_RANGE_START ||
current_->iter.MayBeOutOfLowerBound();
}
IterBoundCheck UpperBoundCheckResult() override {
assert(Valid());
return current_->type == HeapItem::DELETE_RANGE_START
? IterBoundCheck::kUnknown
: current_->iter.UpperBoundCheckResult();
}
void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
pinned_iters_mgr_ = pinned_iters_mgr;
for (auto& child : children_) {
child.iter.SetPinnedItersMgr(pinned_iters_mgr);
}
}
bool IsDeleteRangeSentinelKey() const override {
assert(Valid());
return current_->type == HeapItem::DELETE_RANGE_START;
}
// Compaction uses the above subset of InternalIterator interface.
void SeekToLast() override { assert(false); }
void SeekForPrev(const Slice&) override { assert(false); }
void Prev() override { assert(false); }
bool NextAndGetResult(IterateResult*) override {
assert(false);
return false;
}
bool IsKeyPinned() const override {
assert(false);
return false;
}
bool IsValuePinned() const override {
assert(false);
return false;
}
bool PrepareValue() override {
assert(false);
return false;
}
private:
struct HeapItem {
HeapItem() = default;
IteratorWrapper iter;
size_t level = 0;
std::string tombstone_str;
enum Type { ITERATOR, DELETE_RANGE_START };
Type type = ITERATOR;
explicit HeapItem(size_t _level, InternalIteratorBase<Slice>* _iter)
: level(_level), type(Type::ITERATOR) {
iter.Set(_iter);
}
void SetTombstoneForCompaction(const ParsedInternalKey&& pik) {
tombstone_str.clear();
AppendInternalKey(&tombstone_str, pik);
}
[[nodiscard]] Slice key() const {
return type == ITERATOR ? iter.key() : tombstone_str;
}
};
class CompactionHeapItemComparator {
public:
explicit CompactionHeapItemComparator(
const InternalKeyComparator* comparator)
: comparator_(comparator) {}
bool operator()(HeapItem* a, HeapItem* b) const {
int r = comparator_->Compare(a->key(), b->key());
// For each file, we assume all range tombstone start keys come before
// its file boundary sentinel key (file's meta.largest key).
// In the case when meta.smallest = meta.largest and range tombstone start
// key is truncated at meta.smallest, the start key will have op_type =
// kMaxValid to make it smaller (see TruncatedRangeDelIterator
// constructor). The following assertion validates this assumption.
assert(a->type == b->type || r != 0);
return r > 0;
}
private:
const InternalKeyComparator* comparator_;
};
using CompactionMinHeap = BinaryHeap<HeapItem*, CompactionHeapItemComparator>;
bool is_arena_mode_;
const InternalKeyComparator* comparator_;
// HeapItem for all child point iterators.
std::vector<HeapItem> children_;
// HeapItem for range tombstones. pinned_heap_item_[i] corresponds to the
// current range tombstone from range_tombstone_iters_[i].
std::vector<HeapItem> pinned_heap_item_;
// range_tombstone_iters_[i] contains range tombstones in the sorted run that
// corresponds to children_[i]. range_tombstone_iters_[i] ==
// nullptr means the sorted run of children_[i] does not have range
// tombstones (or the current SSTable does not have range tombstones in the
// case of LevelIterator).
std::vector<std::unique_ptr<TruncatedRangeDelIterator>>
range_tombstone_iters_;
// Used as value for range tombstone keys
std::string dummy_tombstone_val{};
// Skip file boundary sentinel keys.
void FindNextVisibleKey();
// top of minHeap_
HeapItem* current_;
// If any of the children have non-ok status, this is one of them.
Status status_;
CompactionMinHeap minHeap_;
PinnedIteratorsManager* pinned_iters_mgr_;
// Process a child that is not in the min heap.
// If valid, add to the min heap. Otherwise, check status.
void AddToMinHeapOrCheckStatus(HeapItem*);
HeapItem* CurrentForward() const {
return !minHeap_.empty() ? minHeap_.top() : nullptr;
}
void InsertRangeTombstoneAtLevel(size_t level) {
if (range_tombstone_iters_[level]->Valid()) {
pinned_heap_item_[level].SetTombstoneForCompaction(
range_tombstone_iters_[level]->start_key());
minHeap_.push(&pinned_heap_item_[level]);
}
}
};
void CompactionMergingIterator::SeekToFirst() {
minHeap_.clear();
status_ = Status::OK();
for (auto& child : children_) {
child.iter.SeekToFirst();
AddToMinHeapOrCheckStatus(&child);
}
for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) {
if (range_tombstone_iters_[i]) {
range_tombstone_iters_[i]->SeekToFirst();
InsertRangeTombstoneAtLevel(i);
}
}
FindNextVisibleKey();
current_ = CurrentForward();
}
void CompactionMergingIterator::Seek(const Slice& target) {
minHeap_.clear();
status_ = Status::OK();
for (auto& child : children_) {
child.iter.Seek(target);
AddToMinHeapOrCheckStatus(&child);
}
ParsedInternalKey pik;
ParseInternalKey(target, &pik, false /* log_err_key */)
.PermitUncheckedError();
for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) {
if (range_tombstone_iters_[i]) {
range_tombstone_iters_[i]->Seek(pik.user_key);
// For compaction, output keys should all be after seek target.
while (range_tombstone_iters_[i]->Valid() &&
comparator_->Compare(range_tombstone_iters_[i]->start_key(), pik) <
0) {
range_tombstone_iters_[i]->Next();
}
InsertRangeTombstoneAtLevel(i);
}
}
FindNextVisibleKey();
current_ = CurrentForward();
}
void CompactionMergingIterator::Next() {
assert(Valid());
// For the heap modifications below to be correct, current_ must be the
// current top of the heap.
assert(current_ == CurrentForward());
// as the current points to the current record. move the iterator forward.
if (current_->type == HeapItem::ITERATOR) {
current_->iter.Next();
if (current_->iter.Valid()) {
// current is still valid after the Next() call above. Call
// replace_top() to restore the heap property. When the same child
// iterator yields a sequence of keys, this is cheap.
assert(current_->iter.status().ok());
minHeap_.replace_top(current_);
} else {
// current stopped being valid, remove it from the heap.
considerStatus(current_->iter.status());
minHeap_.pop();
}
} else {
assert(current_->type == HeapItem::DELETE_RANGE_START);
size_t level = current_->level;
assert(range_tombstone_iters_[level]);
range_tombstone_iters_[level]->Next();
if (range_tombstone_iters_[level]->Valid()) {
pinned_heap_item_[level].SetTombstoneForCompaction(
range_tombstone_iters_[level]->start_key());
minHeap_.replace_top(&pinned_heap_item_[level]);
} else {
minHeap_.pop();
}
}
FindNextVisibleKey();
current_ = CurrentForward();
}
void CompactionMergingIterator::FindNextVisibleKey() {
while (!minHeap_.empty()) {
HeapItem* current = minHeap_.top();
// IsDeleteRangeSentinelKey() here means file boundary sentinel keys.
if (current->type != HeapItem::ITERATOR ||
!current->iter.IsDeleteRangeSentinelKey()) {
return;
}
// range tombstone start keys from the same SSTable should have been
// exhausted
assert(!range_tombstone_iters_[current->level] ||
!range_tombstone_iters_[current->level]->Valid());
// current->iter is a LevelIterator, and it enters a new SST file in the
// Next() call here.
current->iter.Next();
if (current->iter.Valid()) {
assert(current->iter.status().ok());
minHeap_.replace_top(current);
} else {
considerStatus(current->iter.status());
minHeap_.pop();
}
if (range_tombstone_iters_[current->level]) {
InsertRangeTombstoneAtLevel(current->level);
}
}
}
void CompactionMergingIterator::AddToMinHeapOrCheckStatus(HeapItem* child) {
if (child->iter.Valid()) {
assert(child->iter.status().ok());
minHeap_.push(child);
} else {
considerStatus(child->iter.status());
}
}
InternalIterator* NewCompactionMergingIterator(
const InternalKeyComparator* comparator, InternalIterator** children, int n,
std::vector<std::pair<std::unique_ptr<TruncatedRangeDelIterator>,
std::unique_ptr<TruncatedRangeDelIterator>**>>&
range_tombstone_iters,
Arena* arena) {
assert(n >= 0);
if (n == 0) {
return NewEmptyInternalIterator<Slice>(arena);
} else {
if (arena == nullptr) {
return new CompactionMergingIterator(comparator, children, n,
false /* is_arena_mode */,
range_tombstone_iters);
} else {
auto mem = arena->AllocateAligned(sizeof(CompactionMergingIterator));
return new (mem) CompactionMergingIterator(comparator, children, n,
true /* is_arena_mode */,
range_tombstone_iters);
}
}
}
} // namespace ROCKSDB_NAMESPACE