Skip to content

Commit

Permalink
Optimize the prefix performance of RocksDB iterator after delete range (
Browse files Browse the repository at this point in the history
vesoft-inc#5525)

* Optimize the prefix performance of RocksDB iterator after delete range

* fix the ci

* fix the ci

* resolve the comments

* fix the ci

---------

Co-authored-by: haowen <[email protected]>
  • Loading branch information
luyade and wenhaocs authored May 18, 2023
1 parent 3c1a8af commit 4707b71
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 6 deletions.
16 changes: 12 additions & 4 deletions src/kvstore/RocksEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,9 @@ nebula::cpp2::ErrorCode RocksEngine::range(const std::string& start,
const std::string& end,
std::unique_ptr<KVIterator>* storageIter) {
memory::MemoryCheckOffGuard guard;
storageIter->reset(new RocksRangeIter(start, end));
rocksdb::ReadOptions options;
options.iterate_upper_bound = dynamic_cast<RocksRangeIter*>(storageIter->get())->upperBound();
if (!isPlainTable_) {
options.total_order_seek = FLAGS_enable_rocksdb_prefix_filtering;
} else {
Expand All @@ -196,8 +198,8 @@ nebula::cpp2::ErrorCode RocksEngine::range(const std::string& start,
std::unique_ptr<rocksdb::Iterator> iter(db_->NewIterator(options));
if (iter) {
iter->Seek(rocksdb::Slice(start));
dynamic_cast<RocksRangeIter*>(storageIter->get())->reset(std::move(iter));
}
storageIter->reset(new RocksRangeIter(std::move(iter), start, end));
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

Expand All @@ -218,23 +220,27 @@ nebula::cpp2::ErrorCode RocksEngine::prefixWithExtractor(const std::string& pref
const void* snapshot,
std::unique_ptr<KVIterator>* storageIter) {
memory::MemoryCheckOffGuard guard;
storageIter->reset(new RocksPrefixIter(prefix));
rocksdb::ReadOptions options;
options.iterate_upper_bound = dynamic_cast<RocksPrefixIter*>(storageIter->get())->upperBound();
if (UNLIKELY(snapshot != nullptr)) {
options.snapshot = reinterpret_cast<const rocksdb::Snapshot*>(snapshot);
}
options.prefix_same_as_start = true;
std::unique_ptr<rocksdb::Iterator> iter(db_->NewIterator(options));
if (iter) {
iter->Seek(rocksdb::Slice(prefix));
dynamic_cast<RocksPrefixIter*>(storageIter->get())->reset(std::move(iter));
}
storageIter->reset(new RocksPrefixIter(std::move(iter), prefix));
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

nebula::cpp2::ErrorCode RocksEngine::prefixWithoutExtractor(
const std::string& prefix, const void* snapshot, std::unique_ptr<KVIterator>* storageIter) {
memory::MemoryCheckOffGuard guard;
storageIter->reset(new RocksPrefixIter(prefix));
rocksdb::ReadOptions options;
options.iterate_upper_bound = dynamic_cast<RocksPrefixIter*>(storageIter->get())->upperBound();
if (snapshot != nullptr) {
options.snapshot = reinterpret_cast<const rocksdb::Snapshot*>(snapshot);
}
Expand All @@ -243,16 +249,18 @@ nebula::cpp2::ErrorCode RocksEngine::prefixWithoutExtractor(
std::unique_ptr<rocksdb::Iterator> iter(db_->NewIterator(options));
if (iter) {
iter->Seek(rocksdb::Slice(prefix));
dynamic_cast<RocksPrefixIter*>(storageIter->get())->reset(std::move(iter));
}
storageIter->reset(new RocksPrefixIter(std::move(iter), prefix));
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

nebula::cpp2::ErrorCode RocksEngine::rangeWithPrefix(const std::string& start,
const std::string& prefix,
std::unique_ptr<KVIterator>* storageIter) {
memory::MemoryCheckOffGuard guard;
storageIter->reset(new RocksPrefixIter(prefix));
rocksdb::ReadOptions options;
options.iterate_upper_bound = dynamic_cast<RocksPrefixIter*>(storageIter->get())->upperBound();
if (!isPlainTable_) {
options.total_order_seek = FLAGS_enable_rocksdb_prefix_filtering;
} else {
Expand All @@ -261,8 +269,8 @@ nebula::cpp2::ErrorCode RocksEngine::rangeWithPrefix(const std::string& start,
std::unique_ptr<rocksdb::Iterator> iter(db_->NewIterator(options));
if (iter) {
iter->Seek(rocksdb::Slice(start));
dynamic_cast<RocksPrefixIter*>(storageIter->get())->reset(std::move(iter));
}
storageIter->reset(new RocksPrefixIter(std::move(iter), prefix));
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

Expand Down
45 changes: 43 additions & 2 deletions src/kvstore/RocksEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <memory>

#include "common/base/Base.h"
#include "common/utils/NebulaKeyUtils.h"
#include "kvstore/KVEngine.h"
#include "kvstore/KVIterator.h"
#include "kvstore/RocksEngineConfig.h"
Expand All @@ -32,6 +33,9 @@ class RocksRangeIter : public KVIterator {
RocksRangeIter(std::unique_ptr<rocksdb::Iterator> iter, rocksdb::Slice start, rocksdb::Slice end)
: iter_(std::move(iter)), start_(start), end_(end) {}

RocksRangeIter(rocksdb::Slice start, rocksdb::Slice end)
: start_(start), end_(end), upperBound_(end) {}

~RocksRangeIter() = default;

bool valid() const override {
Expand All @@ -54,10 +58,25 @@ class RocksRangeIter : public KVIterator {
return folly::StringPiece(iter_->value().data(), iter_->value().size());
}

const rocksdb::Slice* upperBound() {
return &upperBound_;
}

void reset(rocksdb::Iterator* iter) {
iter_.reset(iter);
}

void reset(std::unique_ptr<rocksdb::Iterator> iter) {
iter_ = std::move(iter);
}

private:
std::unique_ptr<rocksdb::Iterator> iter_;
std::unique_ptr<rocksdb::Iterator> iter_{nullptr};
rocksdb::Slice start_;
rocksdb::Slice end_;
// Make sure that the lifetime of iterate_upper_bound in rocksdb::ReadOptions
// is the same as RocksRangeIter object
rocksdb::Slice upperBound_;
};

/**
Expand All @@ -70,6 +89,11 @@ class RocksPrefixIter : public KVIterator {
RocksPrefixIter(std::unique_ptr<rocksdb::Iterator> iter, rocksdb::Slice prefix)
: iter_(std::move(iter)), prefix_(prefix) {}

explicit RocksPrefixIter(rocksdb::Slice prefix) : prefix_(prefix) {
scratch_ = NebulaKeyUtils::lastKey(prefix_.ToString(), 128);
upperBound_ = rocksdb::Slice(scratch_);
}

~RocksPrefixIter() = default;

bool valid() const override {
Expand All @@ -92,9 +116,26 @@ class RocksPrefixIter : public KVIterator {
return folly::StringPiece(iter_->value().data(), iter_->value().size());
}

const rocksdb::Slice* upperBound() {
return &upperBound_;
}

void reset(rocksdb::Iterator* iter) {
iter_.reset(iter);
}

void reset(std::unique_ptr<rocksdb::Iterator> iter) {
iter_ = std::move(iter);
}

protected:
std::unique_ptr<rocksdb::Iterator> iter_;
std::unique_ptr<rocksdb::Iterator> iter_{nullptr};
rocksdb::Slice prefix_;
// Make sure that the lifetime of iterate_upper_bound in rocksdb::ReadOptions
// is the same as RocksPrefixIter object
rocksdb::Slice upperBound_;
// use scratch_ as temporary storage to make sure upperBound_ valid during its lifetime
std::string scratch_;
};

/**
Expand Down

0 comments on commit 4707b71

Please sign in to comment.