This repository was archived by the owner on Nov 4, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 47
/
Copy pathwrite_batch_impl.hpp
338 lines (282 loc) · 9.26 KB
/
write_batch_impl.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
#pragma once
#include <x86intrin.h>
#include <cstring>
#include <unordered_set>
#include <vector>
#include "alias.hpp"
#include "hash_table.hpp"
#include "kvdk/write_batch.hpp"
#include "utils/codec.hpp"
#include "utils/utils.hpp"
namespace KVDK_NAMESPACE {
class WriteBatchImpl final : public WriteBatch {
public:
struct StringOp {
StringOp(WriteOp o, const StringView& k, const StringView& v)
: op(o), key(string_view_2_string(k)), value(string_view_2_string(v)) {}
WriteOp op;
std::string key;
std::string value;
};
struct SortedOp {
SortedOp(WriteOp o, const StringView& c, const StringView& k,
const StringView& v)
: op(o),
collection(string_view_2_string(c)),
key(string_view_2_string(k)),
value(string_view_2_string(v)) {}
WriteOp op;
std::string collection;
std::string key;
std::string value;
};
struct HashOp {
HashOp(WriteOp o, const StringView& c, const StringView& k,
const StringView& v)
: op(o),
collection(string_view_2_string(c)),
key(string_view_2_string(k)),
value(string_view_2_string(v)) {}
WriteOp op;
std::string collection;
std::string key;
std::string value;
};
struct HashEq {
size_t operator()(StringOp const& string_op) const {
return xxh_hash(string_op.key);
}
size_t operator()(SortedOp const& sorted_op) const {
return xxh_hash(sorted_op.collection) ^ xxh_hash(sorted_op.key);
}
size_t operator()(HashOp const& hash_op) const {
return xxh_hash(hash_op.collection) ^ xxh_hash(hash_op.key);
}
bool operator()(StringOp const& lhs, StringOp const& rhs) const {
return lhs.key == rhs.key;
}
bool operator()(SortedOp const& lhs, SortedOp const& rhs) const {
return lhs.collection == rhs.collection && lhs.key == rhs.key;
}
bool operator()(HashOp const& lhs, HashOp const& rhs) const {
return lhs.collection == rhs.collection && lhs.key == rhs.key;
}
};
void StringPut(const StringView key, const StringView value) final {
StringOp op{WriteOp::Put, key, value};
string_ops_.erase(op);
string_ops_.insert(op);
}
void StringDelete(const StringView key) final {
StringOp op{WriteOp::Delete, key, std::string{}};
string_ops_.erase(op);
string_ops_.insert(op);
}
void SortedPut(const StringView collection, const StringView key,
const StringView value) final {
SortedOp op{WriteOp::Put, collection, key, value};
sorted_ops_.erase(op);
sorted_ops_.insert(op);
}
void SortedDelete(const StringView collection, const StringView key) final {
SortedOp op{WriteOp::Delete, collection, key, std::string{}};
sorted_ops_.erase(op);
sorted_ops_.insert(op);
}
void HashPut(const StringView collection, const StringView key,
const StringView value) final {
HashOp op{WriteOp::Put, collection, key, value};
hash_ops_.erase(op);
hash_ops_.insert(op);
}
void HashDelete(const StringView collection, const StringView key) final {
HashOp op{WriteOp::Delete, collection, key, std::string{}};
hash_ops_.erase(op);
hash_ops_.insert(op);
}
// Get a string op from this batch
// if key not exist in this batch, return nullptr
const StringOp* StringGet(const StringView key) {
StringOp op{WriteOp::Put, key, ""};
auto iter = string_ops_.find(op);
if (iter == string_ops_.end()) {
return nullptr;
}
return &(*iter);
}
// Get a sorted op from this batch
// if collection key not exist in this batch, return nullptr
const SortedOp* SortedGet(const StringView collection, const StringView key) {
SortedOp op{WriteOp::Put, collection, key, ""};
auto iter = sorted_ops_.find(op);
if (iter == sorted_ops_.end()) {
return nullptr;
}
return &(*iter);
}
// Get a hash op from this batch
// if the collection key not exist in this batch, return nullptr
const HashOp* HashGet(const StringView collection, const StringView key) {
HashOp op{WriteOp::Put, collection, key, ""};
auto iter = hash_ops_.find(op);
if (iter == hash_ops_.end()) {
return nullptr;
}
return &(*iter);
}
void Clear() final {
string_ops_.clear();
sorted_ops_.clear();
hash_ops_.clear();
}
size_t Size() const final {
return string_ops_.size() + sorted_ops_.size() + hash_ops_.size();
}
using StringOpBatch = std::unordered_set<StringOp, HashEq, HashEq>;
using SortedOpBatch = std::unordered_set<SortedOp, HashEq, HashEq>;
using HashOpBatch = std::unordered_set<HashOp, HashEq, HashEq>;
StringOpBatch const& StringOps() const { return string_ops_; }
SortedOpBatch const& SortedOps() const { return sorted_ops_; }
HashOpBatch const& HashOps() const { return hash_ops_; }
private:
StringOpBatch string_ops_;
SortedOpBatch sorted_ops_;
HashOpBatch hash_ops_;
};
struct StringWriteArgs {
StringView key;
StringView value;
WriteOp op;
SpaceEntry space;
TimestampType ts;
HashTable::LookupResult res;
StringRecord* new_rec;
void Assign(WriteBatchImpl::StringOp const& string_op) {
key = string_op.key;
value = string_op.value;
op = string_op.op;
}
};
class BatchWriteLog {
public:
// The batch is first persisted to PMem with Stage::Initializing.
// After persisting is done, it enters Stage::Processing.
// When all batches are executed, it is marked as Stage::Committed
// and then purged from PMem.
// During recovery, a batch in
// Stage::Initializing is directly discarded and purged.
// Stage::Processing is rolled back.
// Stage::Committed is directly purged.
enum class Stage : size_t {
// Initializing must be 0 so that empty file can be skipped.
Initializing = 0,
Processing,
Committed,
};
enum class Op : size_t { Put, Delete };
struct StringLogEntry {
Op op;
PMemOffsetType offset;
};
struct SortedLogEntry {
Op op;
PMemOffsetType offset;
};
struct HashLogEntry {
Op op;
PMemOffsetType offset;
};
struct ListLogEntry {
Op op;
PMemOffsetType offset;
};
explicit BatchWriteLog() {}
void SetTimestamp(TimestampType ts) { timestamp_ = ts; }
void StringPut(PMemOffsetType offset) {
string_logs_.emplace_back(StringLogEntry{Op::Put, offset});
}
void StringDelete(PMemOffsetType offset) {
string_logs_.emplace_back(StringLogEntry{Op::Delete, offset});
}
void SortedPut(PMemOffsetType offset) {
sorted_logs_.emplace_back(SortedLogEntry{Op::Put, offset});
}
void SortedDelete(PMemOffsetType offset) {
sorted_logs_.emplace_back(SortedLogEntry{Op::Delete, offset});
}
void HashPut(PMemOffsetType offset) {
hash_logs_.emplace_back(HashLogEntry{Op::Put, offset});
}
void HashDelete(PMemOffsetType offset) {
hash_logs_.emplace_back(HashLogEntry{Op::Delete, offset});
}
void ListEmplace(PMemOffsetType offset) {
list_logs_.emplace_back(ListLogEntry{Op::Put, offset});
}
void ListDelete(PMemOffsetType offset) {
list_logs_.emplace_back(ListLogEntry{Op::Delete, offset});
}
void Clear() {
string_logs_.clear();
sorted_logs_.clear();
hash_logs_.clear();
list_logs_.clear();
}
size_t Size() const {
return string_logs_.size() + sorted_logs_.size() + hash_logs_.size() +
list_logs_.size();
}
static size_t Capacity() { return (1UL << 20); }
static size_t MaxBytes() {
static_assert(sizeof(HashLogEntry) >= sizeof(StringLogEntry), "");
static_assert(sizeof(HashLogEntry) >= sizeof(SortedLogEntry), "");
static_assert(sizeof(HashLogEntry) >= sizeof(ListLogEntry), "");
return sizeof(size_t) + sizeof(TimestampType) + sizeof(Stage) +
sizeof(size_t) + Capacity() * sizeof(HashLogEntry);
}
// Format of the BatchWriteLog
// total_bytes | timestamp | stage |
// N | StringLogEntry*N |
// M | SortedLogEntry*M
// K | HashLogEntry*K
// L | ListLogEntry*K
// dst is expected to have capacity of MaxBytes().
void EncodeTo(char* dst);
void DecodeFrom(char const* src);
static void MarkProcessing(char* dst) {
dst = &dst[sizeof(size_t) + sizeof(TimestampType)];
*reinterpret_cast<Stage*>(dst) = Stage::Processing;
_mm_clflush(dst);
_mm_mfence();
}
static void MarkCommitted(char* dst) {
dst = &dst[sizeof(size_t) + sizeof(TimestampType)];
*reinterpret_cast<Stage*>(dst) = Stage::Committed;
_mm_clflush(dst);
_mm_mfence();
}
// For rollback
static void MarkInitializing(char* dst) {
dst = &dst[sizeof(size_t) + sizeof(TimestampType)];
*reinterpret_cast<Stage*>(dst) = Stage::Initializing;
_mm_clflush(dst);
_mm_mfence();
}
using StringLog = std::vector<StringLogEntry>;
using SortedLog = std::vector<SortedLogEntry>;
using HashLog = std::vector<HashLogEntry>;
using ListLog = std::vector<ListLogEntry>;
StringLog const& StringLogs() const { return string_logs_; }
SortedLog const& SortedLogs() const { return sorted_logs_; }
HashLog const& HashLogs() const { return hash_logs_; }
ListLog const& ListLogs() const { return list_logs_; }
TimestampType Timestamp() const { return timestamp_; }
private:
Stage stage{Stage::Initializing};
TimestampType timestamp_{0};
StringLog string_logs_;
SortedLog sorted_logs_;
HashLog hash_logs_;
ListLog list_logs_;
};
} // namespace KVDK_NAMESPACE