Skip to content

Commit

Permalink
use range in delete edges (vesoft-inc#2404)
Browse files Browse the repository at this point in the history
Co-authored-by: Yee <[email protected]>
  • Loading branch information
critical27 and yixinglu authored Dec 2, 2020
1 parent 740909d commit f163ebc
Show file tree
Hide file tree
Showing 9 changed files with 432 additions and 75 deletions.
8 changes: 4 additions & 4 deletions src/common/utils/NebulaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -520,15 +520,15 @@ class NebulaKeyUtils final {
// See KeyType enum
static constexpr uint32_t kTypeMask = 0x000000FF;

// The Tag/Edge type bit Mask
// The most significant bit is sign bit, tag is always 0
// The second most significant bit is tag/edge type bit Mask
// 0 for Tag, 1 for Edge
// 0x40 - 0b0100,0000
static constexpr uint32_t kTagEdgeMask = 0x40000000;
// For extract Tag/Edge value
static constexpr uint32_t kTagEdgeValueMask = ~kTagEdgeMask;
// Write edge by &=
// Write edge by |= 0x40000000
static constexpr uint32_t kEdgeMaskSet = kTagEdgeMask;
// Write Tag by |=
// Write Tag by &= 0xbfffffff
static constexpr uint32_t kTagMaskSet = ~kTagEdgeMask;
};

Expand Down
46 changes: 23 additions & 23 deletions src/kvstore/EventListner.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,23 @@ class EventListener : public rocksdb::EventListener {
// A callback function to RocksDB which will be called
// before a RocksDB starts to flush memtables.
void OnFlushBegin(rocksdb::DB*, const rocksdb::FlushJobInfo& info) override {
LOG(INFO) << "Rocksdb start flush column family: " << info.cf_name
<< " because of " << flushReasonString(info.flush_reason)
<< " the newly created file: " << info.file_path
<< " the smallest sequence number is " << info.smallest_seqno
<< " the largest sequence number is " << info.largest_seqno
<< " the properties of the table: " << info.table_properties.ToString();
VLOG(1) << "Rocksdb start flush column family: " << info.cf_name
<< " because of " << flushReasonString(info.flush_reason)
<< ", the newly created file: " << info.file_path
<< ", the smallest sequence number is " << info.smallest_seqno
<< ", the largest sequence number is " << info.largest_seqno
<< ", the properties of the table: " << info.table_properties.ToString();
}

// A callback function to RocksDB which will be called
// whenever a registered RocksDB flushes a file.
void OnFlushCompleted(rocksdb::DB*, const rocksdb::FlushJobInfo& info) override {
LOG(INFO) << "Rocksdb flush completed column family: " << info.cf_name
<< " because of " << flushReasonString(info.flush_reason)
<< " the newly created file: " << info.file_path
<< " the smallest sequence number is " << info.smallest_seqno
<< " the largest sequence number is " << info.largest_seqno
<< " the properties of the table: " << info.table_properties.ToString();
VLOG(1) << "Rocksdb flush completed column family: " << info.cf_name
<< " because of " << flushReasonString(info.flush_reason)
<< " the newly created file: " << info.file_path
<< " the smallest sequence number is " << info.smallest_seqno
<< " the largest sequence number is " << info.largest_seqno
<< " the properties of the table: " << info.table_properties.ToString();
}

// A callback function for RocksDB which will be called whenever a SST file is created.
Expand All @@ -71,18 +71,18 @@ class EventListener : public rocksdb::EventListener {

// A callback function for RocksDB which will be called before a SST file is being created.
void OnTableFileCreationStarted(const rocksdb::TableFileCreationBriefInfo& info) override {
LOG(INFO) << " database's name is " << info.db_name
<< " column family's name is " << info.cf_name
<< " the created file is " << info.file_path
<< " because of " << tableFileCreationReasonString(info.reason);
VLOG(3) << " database's name is " << info.db_name
<< ", column family's name is " << info.cf_name
<< ", the created file is " << info.file_path
<< ", because of " << tableFileCreationReasonString(info.reason);
}

// A callback function for RocksDB which will be called before
// a memtable is made immutable.
void OnMemTableSealed(const rocksdb::MemTableInfo& info) override {
VLOG(3) << "MemTable Sealed column family: " << info.cf_name
<< " the total number of entries: " << info.num_entries
<< " the total number of deletes: " << info.num_deletes;
<< ", the total number of entries: " << info.num_entries
<< ", the total number of deletes: " << info.num_deletes;
}

// A callback function for RocksDB which will be called before
Expand All @@ -94,9 +94,9 @@ class EventListener : public rocksdb::EventListener {
void OnExternalFileIngested(rocksdb::DB*,
const rocksdb::ExternalFileIngestionInfo& info) override {
LOG(INFO) << "Ingest external SST file: column family " << info.cf_name
<< " the external file path " << info. external_file_path
<< " the internal file path " << info.internal_file_path
<< " the properties of the table: " << info.table_properties.ToString();
<< ", the external file path " << info. external_file_path
<< ", the internal file path " << info.internal_file_path
<< ", the properties of the table: " << info.table_properties.ToString();
}

// A callback function for RocksDB which will be called before setting the
Expand All @@ -110,8 +110,8 @@ class EventListener : public rocksdb::EventListener {
// of superversion triggers a change of the stall conditions.
void OnStallConditionsChanged(const rocksdb::WriteStallInfo& info) override {
LOG(INFO) << "Stall conditions changed column family: " << info.cf_name
<< " current condition: " << writeStallConditionString(info.condition.cur)
<< " previous condition: " << writeStallConditionString(info.condition.prev);
<< ", current condition: " << writeStallConditionString(info.condition.cur)
<< ", previous condition: " << writeStallConditionString(info.condition.prev);
}

// A callback function for RocksDB which will be called whenever a file read
Expand Down
5 changes: 5 additions & 0 deletions src/storage/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ class BaseProcessor {

void doRemove(GraphSpaceID spaceId, PartitionID partId, std::vector<std::string> keys);

void doRemoveRange(GraphSpaceID spaceId,
PartitionID partId,
const std::string& start,
const std::string& end);

kvstore::ResultCode doRange(GraphSpaceID spaceId, PartitionID partId, const std::string& start,
const std::string& end, std::unique_ptr<kvstore::KVIterator>* iter);

Expand Down
11 changes: 11 additions & 0 deletions src/storage/BaseProcessor.inl
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,17 @@ void BaseProcessor<RESP>::doRemove(GraphSpaceID spaceId,
});
}

template <typename RESP>
void BaseProcessor<RESP>::doRemoveRange(GraphSpaceID spaceId,
PartitionID partId,
const std::string& start,
const std::string& end) {
this->kvstore_->asyncRemoveRange(
spaceId, partId, start, end, [spaceId, partId, this](kvstore::ResultCode code) {
handleAsync(spaceId, partId, code);
});
}

template<typename RESP>
kvstore::ResultCode BaseProcessor<RESP>::doRange(GraphSpaceID spaceId,
PartitionID partId,
Expand Down
71 changes: 37 additions & 34 deletions src/storage/mutate/DeleteEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <algorithm>
#include <limits>
#include "utils/NebulaKeyUtils.h"
#include "storage/StorageFlags.h"

namespace nebula {
namespace storage {
Expand All @@ -20,42 +21,44 @@ void DeleteEdgesProcessor::process(const cpp2::DeleteEdgesRequest& req) {
}

if (indexes_.empty()) {
std::for_each(req.parts.begin(), req.parts.end(), [this](auto &partEdges) {
this->callingNum_ += partEdges.second.size();
});
std::vector<std::string> keys;
keys.reserve(16);
for (auto& partEdges : req.parts) {
auto partId = partEdges.first;
for (auto& edgeKey : partEdges.second) {
auto start = NebulaKeyUtils::edgeKey(partId,
edgeKey.src,
edgeKey.edge_type,
edgeKey.ranking,
edgeKey.dst,
0);
auto end = NebulaKeyUtils::edgeKey(partId,
edgeKey.src,
edgeKey.edge_type,
edgeKey.ranking,
edgeKey.dst,
std::numeric_limits<int64_t>::max());
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = this->kvstore_->range(spaceId, partId, start, end, &iter);
if (ret != kvstore::ResultCode::SUCCEEDED) {
VLOG(3) << "Error! ret = " << static_cast<int32_t>(ret)
<< ", spaceID " << spaceId;
this->handleErrorCode(ret, spaceId, partId);
this->onFinished();
return;
if (FLAGS_enable_multi_versions) {
std::for_each(req.parts.begin(), req.parts.end(), [this](const auto &partEdges) {
this->callingNum_ += partEdges.second.size();
});
for (auto& partEdges : req.parts) {
auto partId = partEdges.first;
for (auto& edgeKey : partEdges.second) {
auto start = NebulaKeyUtils::edgeKey(partId,
edgeKey.src,
edgeKey.edge_type,
edgeKey.ranking,
edgeKey.dst,
0);
auto end = NebulaKeyUtils::edgeKey(partId,
edgeKey.src,
edgeKey.edge_type,
edgeKey.ranking,
edgeKey.dst,
std::numeric_limits<int64_t>::max());
doRemoveRange(spaceId, partId, start, end);
}
keys.clear();
while (iter && iter->valid()) {
auto key = iter->key();
keys.emplace_back(key.data(), key.size());
iter->next();
}
} else {
callingNum_ = req.parts.size();
std::vector<std::string> keys;
keys.reserve(16);
for (auto& partEdges : req.parts) {
auto partId = partEdges.first;
for (auto& edgeKey : partEdges.second) {
auto key = NebulaKeyUtils::edgeKey(partId,
edgeKey.src,
edgeKey.edge_type,
edgeKey.ranking,
edgeKey.dst,
0L);
keys.emplace_back(std::move(key));
}
doRemove(spaceId, partId, keys);
doRemove(spaceId, partId, std::move(keys));
}
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/query/ScanEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ void ScanEdgeProcessor::process(const cpp2::ScanEdgeRequest& req) {
std::unique_ptr<kvstore::KVIterator> iter;
auto kvRet = doRangeWithPrefix(spaceId_, partId_, start, prefix, &iter);
if (kvRet != kvstore::ResultCode::SUCCEEDED) {
pushResultCode(to(kvRet), partId_);
handleErrorCode(kvRet, spaceId_, partId_);
onFinished();
return;
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/query/ScanVertexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ void ScanVertexProcessor::process(const cpp2::ScanVertexRequest& req) {
std::unique_ptr<kvstore::KVIterator> iter;
auto kvRet = doRangeWithPrefix(spaceId_, partId_, start, prefix, &iter);
if (kvRet != kvstore::ResultCode::SUCCEEDED) {
pushResultCode(to(kvRet), partId_);
handleErrorCode(kvRet, spaceId_, partId_);
onFinished();
return;
}
Expand Down
Loading

0 comments on commit f163ebc

Please sign in to comment.