Skip to content

Commit

Permalink
Allow WriteBatch to have keys with different timestamp sizes (faceboo…
Browse files Browse the repository at this point in the history
…k#8725)

Summary:
In the past, we unnecessarily requires all keys in the same write batch
to be from column families whose timestamps' formats are the same for
simplicity. Specifically, we cannot use the same write batch to write to
two column families, one of which enables timestamp while the other
disables it.

The limitation is due to the member `timestamp_size_` that used to exist
in each `WriteBatch` object. We pass a timestamp_size to the constructor
of `WriteBatch`. Therefore, users can simply use the old
`WriteBatch::Put()`, `WriteBatch::Delete()`, etc APIs for write, while
the internal implementation of `WriteBatch` will take care of memory
allocation for timestamps.

The above is not necessary.
One the one hand, users can set up a memory buffer to store user key and
then contiguously append the timestamp to the user key. Then the user
can pass this buffer to the `WriteBatch::Put(Slice&)` API.
On the other hand, users can set up a SliceParts object which is an
array of Slices and let the last Slice to point to the memory buffer
storing timestamp. Then the user can pass the SliceParts object to the
`WriteBatch::Put(SliceParts&)` API.

Pull Request resolved: facebook#8725

Test Plan: make check

Reviewed By: ltamasi

Differential Revision: D30654499

Pulled By: riversand963

fbshipit-source-id: 9d848c77ad3c9dd629aa5fc4e2bc16fb0687b4a2
  • Loading branch information
riversand963 authored and facebook-github-bot committed Sep 12, 2021
1 parent 5f40b05 commit 2a2b3e0
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 138 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* Added properties for BlobDB: `rocksdb.num-blob-files`, `rocksdb.blob-stats`, `rocksdb.total-blob-file-size`, and `rocksdb.live-blob-file-size`. The existing property `rocksdb.estimate_live-data-size` was also extended to include live bytes residing in blob files.
* Added two new RateLimiter IOPriorities: `Env::IO_USER`,`Env::IO_MID`. `Env::IO_USER` will have superior priority over all other RateLimiter IOPriorities without being subject to fair scheduling constraint.
* `SstFileWriter` now supports `Put`s and `Delete`s with user-defined timestamps. Note that the ingestion logic itself is not timestamp-aware yet.
* Allow a single write batch to include keys from multiple column families whose timestamps' formats can differ. For example, some column families may disable timestamp, while others enable timestamp.

### Public API change
* Remove obsolete implementation details FullKey and ParseFullKey from public API
Expand Down
39 changes: 23 additions & 16 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1998,13 +1998,18 @@ Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
size_t ts_sz = ts->size();
assert(column_family->GetComparator());
assert(ts_sz == column_family->GetComparator()->timestamp_size());
WriteBatch batch(key.size() + ts_sz + value.size() + 24, /*max_bytes=*/0,
ts_sz);
Status s = batch.Put(column_family, key, value);
if (!s.ok()) {
return s;
WriteBatch batch;
Status s;
if (key.data() + key.size() == ts->data()) {
Slice key_with_ts = Slice(key.data(), key.size() + ts_sz);
s = batch.Put(column_family, key_with_ts, value);
} else {
std::array<Slice, 2> key_with_ts_slices{{key, *ts}};
SliceParts key_with_ts(key_with_ts_slices.data(), 2);
std::array<Slice, 1> value_slices{{value}};
SliceParts values(value_slices.data(), 1);
s = batch.Put(column_family, key_with_ts, values);
}
s = batch.AssignTimestamp(*ts);
if (!s.ok()) {
return s;
}
Expand All @@ -2023,17 +2028,19 @@ Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
}
const Slice* ts = opt.timestamp;
assert(ts != nullptr);
const size_t ts_sz = ts->size();
constexpr size_t kKeyAndValueLenSize = 11;
constexpr size_t kWriteBatchOverhead =
WriteBatchInternal::kHeader + sizeof(ValueType) + kKeyAndValueLenSize;
WriteBatch batch(key.size() + ts_sz + kWriteBatchOverhead, /*max_bytes=*/0,
ts_sz);
Status s = batch.Delete(column_family, key);
if (!s.ok()) {
return s;
size_t ts_sz = ts->size();
assert(column_family->GetComparator());
assert(ts_sz == column_family->GetComparator()->timestamp_size());
WriteBatch batch;
Status s;
if (key.data() + key.size() == ts->data()) {
Slice key_with_ts = Slice(key.data(), key.size() + ts_sz);
s = batch.Delete(column_family, key_with_ts);
} else {
std::array<Slice, 2> key_with_ts_slices{{key, *ts}};
SliceParts key_with_ts(key_with_ts_slices.data(), 2);
s = batch.Delete(column_family, key_with_ts);
}
s = batch.AssignTimestamp(*ts);
if (!s.ok()) {
return s;
}
Expand Down
10 changes: 4 additions & 6 deletions db/db_kv_checksum_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ class DbKvChecksumTest
corrupt_byte_addend_ = std::get<1>(GetParam());
}

std::pair<WriteBatch, Status> GetWriteBatch(size_t ts_sz,
ColumnFamilyHandle* cf_handle) {
std::pair<WriteBatch, Status> GetWriteBatch(ColumnFamilyHandle* cf_handle) {
Status s;
WriteBatch wb(0 /* reserved_bytes */, 0 /* max_bytes */, ts_sz,
WriteBatch wb(0 /* reserved_bytes */, 0 /* max_bytes */,
8 /* protection_bytes_per_entry */);
switch (op_type_) {
case WriteBatchOpType::kPut:
Expand Down Expand Up @@ -151,8 +150,7 @@ TEST_P(DbKvChecksumTest, MemTableAddCorrupted) {
Reopen(options);

SyncPoint::GetInstance()->EnableProcessing();
auto batch_and_status =
GetWriteBatch(0 /* ts_sz */, nullptr /* cf_handle */);
auto batch_and_status = GetWriteBatch(nullptr /* cf_handle */);
ASSERT_OK(batch_and_status.second);
ASSERT_TRUE(
db_->Write(WriteOptions(), &batch_and_status.first).IsCorruption());
Expand Down Expand Up @@ -183,7 +181,7 @@ TEST_P(DbKvChecksumTest, MemTableAddWithColumnFamilyCorrupted) {
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);

SyncPoint::GetInstance()->EnableProcessing();
auto batch_and_status = GetWriteBatch(0 /* ts_sz */, handles_[1]);
auto batch_and_status = GetWriteBatch(handles_[1]);
ASSERT_OK(batch_and_status.second);
ASSERT_TRUE(
db_->Write(WriteOptions(), &batch_and_status.first).IsCorruption());
Expand Down
102 changes: 93 additions & 9 deletions db/db_with_timestamp_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,69 @@ class DBBasicTestWithTimestamp : public DBBasicTestWithTimestampBase {
: DBBasicTestWithTimestampBase("db_basic_test_with_timestamp") {}
};

TEST_F(DBBasicTestWithTimestamp, MixedCfs) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.avoid_flush_during_shutdown = true;
DestroyAndReopen(options);

Options options1 = CurrentOptions();
options1.env = env_;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options1.comparator = &test_cmp;
ColumnFamilyHandle* handle = nullptr;
Status s = db_->CreateColumnFamily(options1, "data", &handle);
ASSERT_OK(s);

WriteBatch wb;
ASSERT_OK(wb.Put("a", "value"));
{
std::string key("a");
std::string ts(kTimestampSize, '\0');
std::array<Slice, 2> key_with_ts_slices{{key, ts}};
SliceParts key_with_ts(key_with_ts_slices.data(), 2);
std::string value_str("value");
Slice value_slice(value_str.data(), value_str.size());
SliceParts value(&value_slice, 1);
ASSERT_OK(wb.Put(handle, key_with_ts, value));
}
{
std::string ts = Timestamp(1, 0);
std::vector<Slice> ts_list({Slice(), ts});
ASSERT_OK(wb.AssignTimestamps(ts_list));
ASSERT_OK(db_->Write(WriteOptions(), &wb));
}

const auto verify_db = [this](ColumnFamilyHandle* h) {
ASSERT_EQ("value", Get("a"));
std::string ts = Timestamp(1, 0);
Slice read_ts_slice(ts);
ReadOptions read_opts;
read_opts.timestamp = &read_ts_slice;
std::string value;
ASSERT_OK(db_->Get(read_opts, h, "a", &value));
ASSERT_EQ("value", value);
};

verify_db(handle);

delete handle;
Close();

std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back("data", options1);
options.create_if_missing = false;
s = DB::Open(options, dbname_, cf_descs, &handles_, &db_);
ASSERT_OK(s);

verify_db(handles_[1]);

Close();
}

TEST_F(DBBasicTestWithTimestamp, CompactRangeWithSpecifiedRange) {
Options options = CurrentOptions();
options.env = env_;
Expand Down Expand Up @@ -766,11 +829,15 @@ TEST_F(DBBasicTestWithTimestamp, ChangeIterationDirection) {
const std::vector<std::tuple<std::string, std::string>> kvs = {
std::make_tuple("aa", "value1"), std::make_tuple("ab", "value2")};
for (const auto& ts : timestamps) {
WriteBatch wb(0, 0, kTimestampSize);
WriteBatch wb;
for (const auto& kv : kvs) {
const std::string& key = std::get<0>(kv);
const std::string& value = std::get<1>(kv);
ASSERT_OK(wb.Put(key, value));
std::array<Slice, 2> key_with_ts_slices{{Slice(key), Slice(ts)}};
SliceParts key_with_ts(key_with_ts_slices.data(), 2);
std::array<Slice, 1> value_slices{{Slice(value)}};
SliceParts values(value_slices.data(), 1);
ASSERT_OK(wb.Put(key_with_ts, values));
}

ASSERT_OK(wb.AssignTimestamp(ts));
Expand Down Expand Up @@ -1072,9 +1139,20 @@ TEST_F(DBBasicTestWithTimestamp, ReseekToNextUserKey) {
}
{
std::string ts_str = Timestamp(static_cast<uint64_t>(kNumKeys + 1), 0);
WriteBatch batch(0, 0, kTimestampSize);
ASSERT_OK(batch.Put("a", "new_value"));
ASSERT_OK(batch.Put("b", "new_value"));
WriteBatch batch;
const std::string dummy_ts(kTimestampSize, '\0');
{
std::array<Slice, 2> key_with_ts_slices{{"a", dummy_ts}};
SliceParts key_with_ts(key_with_ts_slices.data(), 2);
std::array<Slice, 1> value_slices{{"new_value"}};
SliceParts values(value_slices.data(), 1);
ASSERT_OK(batch.Put(key_with_ts, values));
}
{
std::string key_with_ts("b");
key_with_ts.append(dummy_ts);
ASSERT_OK(batch.Put(key_with_ts, "new_value"));
}
s = batch.AssignTimestamp(ts_str);
ASSERT_OK(s);
s = db_->Write(write_opts, &batch);
Expand Down Expand Up @@ -2615,17 +2693,23 @@ TEST_F(DBBasicTestWithTimestamp, BatchWriteAndMultiGet) {
}
};

const std::string dummy_ts(ts_sz, '\0');
for (size_t i = 0; i != kNumTimestamps; ++i) {
write_ts_list.push_back(Timestamp(i * 2, 0));
read_ts_list.push_back(Timestamp(1 + i * 2, 0));
const Slice& write_ts = write_ts_list.back();
for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
WriteOptions wopts;
WriteBatch batch(0, 0, ts_sz);
WriteBatch batch;
for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
ASSERT_OK(
batch.Put(handles_[cf], Key1(j),
"value_" + std::to_string(j) + "_" + std::to_string(i)));
const std::string key = Key1(j);
const std::string value =
"value_" + std::to_string(j) + "_" + std::to_string(i);
std::array<Slice, 2> key_with_ts_slices{{key, dummy_ts}};
SliceParts key_with_ts(key_with_ts_slices.data(), 2);
std::array<Slice, 1> value_slices{{value}};
SliceParts values(value_slices.data(), 1);
ASSERT_OK(batch.Put(handles_[cf], key_with_ts, values));
}
ASSERT_OK(batch.AssignTimestamp(write_ts));
ASSERT_OK(db_->Write(wopts, &batch));
Expand Down
23 changes: 9 additions & 14 deletions db/kv_checksum.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,10 @@ class ProtectionInfo {

Status GetStatus() const;
ProtectionInfoKVOT<T> ProtectKVOT(const Slice& key, const Slice& value,
ValueType op_type,
const Slice& timestamp) const;
ValueType op_type) const;
ProtectionInfoKVOT<T> ProtectKVOT(const SliceParts& key,
const SliceParts& value, ValueType op_type,
const Slice& timestamp) const;
const SliceParts& value,
ValueType op_type) const;

private:
friend class ProtectionInfoKVOT<T>;
Expand Down Expand Up @@ -222,25 +221,23 @@ Status ProtectionInfo<T>::GetStatus() const {
}

template <typename T>
ProtectionInfoKVOT<T> ProtectionInfo<T>::ProtectKVOT(
const Slice& key, const Slice& value, ValueType op_type,
const Slice& timestamp) const {
ProtectionInfoKVOT<T> ProtectionInfo<T>::ProtectKVOT(const Slice& key,
const Slice& value,
ValueType op_type) const {
T val = GetVal();
val = val ^ static_cast<T>(GetSliceNPHash64(key, ProtectionInfo<T>::kSeedK));
val =
val ^ static_cast<T>(GetSliceNPHash64(value, ProtectionInfo<T>::kSeedV));
val = val ^
static_cast<T>(NPHash64(reinterpret_cast<char*>(&op_type),
sizeof(op_type), ProtectionInfo<T>::kSeedO));
val = val ^
static_cast<T>(GetSliceNPHash64(timestamp, ProtectionInfo<T>::kSeedT));
return ProtectionInfoKVOT<T>(val);
}

template <typename T>
ProtectionInfoKVOT<T> ProtectionInfo<T>::ProtectKVOT(
const SliceParts& key, const SliceParts& value, ValueType op_type,
const Slice& timestamp) const {
ProtectionInfoKVOT<T> ProtectionInfo<T>::ProtectKVOT(const SliceParts& key,
const SliceParts& value,
ValueType op_type) const {
T val = GetVal();
val = val ^
static_cast<T>(GetSlicePartsNPHash64(key, ProtectionInfo<T>::kSeedK));
Expand All @@ -249,8 +246,6 @@ ProtectionInfoKVOT<T> ProtectionInfo<T>::ProtectKVOT(
val = val ^
static_cast<T>(NPHash64(reinterpret_cast<char*>(&op_type),
sizeof(op_type), ProtectionInfo<T>::kSeedO));
val = val ^
static_cast<T>(GetSliceNPHash64(timestamp, ProtectionInfo<T>::kSeedT));
return ProtectionInfoKVOT<T>(val);
}

Expand Down
Loading

0 comments on commit 2a2b3e0

Please sign in to comment.