Skip to content

Commit

Permalink
LOOKUP sentence supports DISTINCT and supports yield EdgeType._src/._…
Browse files Browse the repository at this point in the history
…dst (vesoft-inc#2415)

* lookup sentence supports distinct; lookup edge sentence supports ._src/._dst

* change dynamic_cast to static_cast

Co-authored-by: bright-starry-sky <[email protected]>
  • Loading branch information
LuconYang and bright-starry-sky authored Jan 10, 2021
1 parent 4ebcec1 commit 753234d
Show file tree
Hide file tree
Showing 3 changed files with 326 additions and 20 deletions.
127 changes: 108 additions & 19 deletions src/graph/LookupExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ Status LookupExecutor::prepareClauses() {
if (!status.ok()) {
break;
}
status = prepareDistinct();
if (!status.ok()) {
break;
}
} while (false);

if (!status.ok()) {
Expand Down Expand Up @@ -126,15 +130,21 @@ Status LookupExecutor::prepareYield() {
return Status::SyntaxError("Not supported yet");
}
auto* prop = col->expr();
if (prop->kind() != Expression::kAliasProp) {
return Status::SyntaxError("Expressions other than AliasProp are not supported");
const AliasPropertyExpression* aExpr;
if (prop->kind() == Expression::kAliasProp ||
(isEdge_ && prop->kind() == Expression::kEdgeSrcId) ||
(isEdge_ && prop->kind() == Expression::kEdgeDstId)) {
aExpr = static_cast<const AliasPropertyExpression*>(prop);
} else {
return Status::SyntaxError("Expressions other than "
"AliasProp/kEdgeSrcId/kEdgeDstId are not supported");
}
auto* aExpr = dynamic_cast<const AliasPropertyExpression*>(prop);
auto st = checkAliasProperty(aExpr);
if (!st.ok()) {
return st;
}
if (schema->getFieldIndex(aExpr->prop()->c_str()) < 0) {
if (prop->kind() == Expression::kAliasProp &&
schema->getFieldIndex(aExpr->prop()->c_str()) < 0) {
LOG(ERROR) << "Unknown column " << aExpr->prop()->c_str();
return Status::Error("Unknown column `%s' in schema", aExpr->prop()->c_str());
}
Expand All @@ -144,6 +154,14 @@ Status LookupExecutor::prepareYield() {
return Status::OK();
}

Status LookupExecutor::prepareDistinct() {
auto *clause = sentence_->yieldClause();
if (clause != nullptr) {
distinct_ = clause->isDistinct();
}
return Status::OK();
}

Status LookupExecutor::checkAliasProperty(const AliasPropertyExpression* aExpr) {
if (*aExpr->alias() != *from_) {
return Status::SyntaxError("Edge or Tag name error : %s ", aExpr->alias()->c_str());
Expand Down Expand Up @@ -491,7 +509,14 @@ std::vector<std::shared_ptr<nebula::cpp2::IndexItem>> LookupExecutor::findIndexF
void LookupExecutor::lookUp() {
auto *sc = ectx()->getStorageClient();
auto filter = Expression::encode(sentence_->whereClause()->filter());
auto future = sc->lookUpIndex(spaceId_, index_, filter, returnCols_, isEdge_);
// Get the columns that really need to be requested
std::vector<std::string> requestCols_;
for (auto& column : returnCols_) {
if (column != _SRC && column != _DST) {
requestCols_.emplace_back(column);
}
}
auto future = sc->lookUpIndex(spaceId_, index_, filter, requestCols_, isEdge_);
auto *runner = ectx()->rctx()->runner();
auto cb = [this] (auto &&result) {
auto completeness = result.completeness();
Expand Down Expand Up @@ -764,6 +789,10 @@ bool LookupExecutor::processFinalEdgeResult(RpcResponse &rpcResp, const Callback
std::vector<nebula::cpp2::SupportedType> colTypes;
std::vector<VariantType> record;
record.reserve(returnCols_.size() + 3);
// Store values specified by user within yield clause
std::vector<VariantType> yieldRecord;
yieldRecord.reserve(returnCols_.size());
auto uniqResult = std::make_unique<std::unordered_set<size_t>>();
std::shared_ptr<ResultSchemaProvider> schema = nullptr;
for (auto &resp : all) {
if (!resp.__isset.edges || resp.get_edges() == nullptr || resp.get_edges()->empty()) {
Expand All @@ -774,27 +803,61 @@ bool LookupExecutor::processFinalEdgeResult(RpcResponse &rpcResp, const Callback
colTypes.emplace_back(nebula::cpp2::SupportedType::VID);
colTypes.emplace_back(nebula::cpp2::SupportedType::INT);
if (!returnCols_.empty()) {
schema = std::make_shared<ResultSchemaProvider>(*(resp.get_schema()));
std::for_each(returnCols_.begin(), returnCols_.end(), [&](auto& col) {
colTypes.emplace_back(schema->getFieldType(col).get_type());
});
// when the columns really requested to client (in lookup())are empty,
// resp.get_schema() will be nullptr
if (resp.get_schema() != nullptr) {
schema = std::make_shared<ResultSchemaProvider>(*(resp.get_schema()));
}
for (auto& col : returnCols_) {
if (col == _SRC || col == _DST) {
colTypes.emplace_back(nebula::cpp2::SupportedType::VID);
} else {
if (schema == nullptr) {
LOG(ERROR) << "Schema is null unexpected, "
"get fieldType failed for " << col;
return false;
}
colTypes.emplace_back(schema->getFieldType(col).get_type());
}
}
}
}
for (const auto& data : *resp.get_edges()) {
const auto& edge = data.get_key();
record.emplace_back(edge.get_src());
record.emplace_back(edge.get_dst());
record.emplace_back(edge.get_ranking());
yieldRecord.clear();
for (auto& column : returnCols_) {
auto reader = RowReader::getRowReader(data.get_props(), schema);
auto res = RowReader::getPropByName(reader.get(), column);
if (!ok(res)) {
LOG(ERROR) << "Skip the bad value for prop " << column;
continue;
VariantType v;
if (column == _SRC) {
v = edge.get_src();
} else if (column == _DST) {
v = edge.get_dst();
} else {
if (schema == nullptr) {
LOG(ERROR) << "Schema is null unexpected, get value failed for " << column;
return false;
}
auto reader = RowReader::getRowReader(data.get_props(), schema);
auto res = RowReader::getPropByName(reader.get(), column);
if (!ok(res)) {
LOG(ERROR) << "Skip the bad value for prop " << column;
continue;
}
v = value(std::move(res));
}
auto&& v = value(std::move(res));
yieldRecord.emplace_back(v);
record.emplace_back(std::move(v));
}
// Check if duplicate, only for user-specified columns within yield clause
if (distinct_) {
auto ret = uniqResult->emplace(boost::hash_range(yieldRecord.begin(),
yieldRecord.end()));
if (!ret.second) {
continue;
}
}
auto cbStatus = cb(std::move(record), colTypes);
if (!cbStatus.ok()) {
LOG(ERROR) << cbStatus;
Expand All @@ -812,6 +875,10 @@ bool LookupExecutor::processFinalVertexResult(RpcResponse &rpcResp,
std::vector<nebula::cpp2::SupportedType> colTypes;
std::vector<VariantType> record;
record.reserve(returnCols_.size() + 1);
// Store values specified by user within yield clause
std::vector<VariantType> yieldRecord;
yieldRecord.reserve(returnCols_.size());
auto uniqResult = std::make_unique<std::unordered_set<size_t>>();
std::shared_ptr<ResultSchemaProvider> schema = nullptr;
for (auto &resp : all) {
if (!resp.__isset.vertices ||
Expand All @@ -822,24 +889,46 @@ bool LookupExecutor::processFinalVertexResult(RpcResponse &rpcResp,
if (colTypes.empty()) {
colTypes.emplace_back(nebula::cpp2::SupportedType::VID);
if (!returnCols_.empty()) {
schema = std::make_shared<ResultSchemaProvider>(*(resp.get_schema()));
std::for_each(returnCols_.begin(), returnCols_.end(), [&](auto& col) {
// when the columns really requested to client (in lookup())are empty,
// resp.get_schema() will be nullptr
if (resp.get_schema() != nullptr) {
schema = std::make_shared<ResultSchemaProvider>(*(resp.get_schema()));
}
for (auto& col : returnCols_) {
if (schema == nullptr) {
LOG(ERROR) << "Schema is null unexpected, get fieldType failed for " << col;
return false;
}
colTypes.emplace_back(schema->getFieldType(col).get_type());
});
}
}
}
for (const auto& data : *resp.get_vertices()) {
const auto& vertexId = data.get_vertex_id();
record.emplace_back(vertexId);
yieldRecord.clear();
for (auto& column : returnCols_) {
if (schema == nullptr) {
LOG(ERROR) << "Schema is null unexpected, get value failed for " << column;
return false;
}
auto reader = RowReader::getRowReader(data.get_props(), schema);
auto res = RowReader::getPropByName(reader.get(), column);
if (!ok(res)) {
LOG(ERROR) << "Skip the bad value for prop " << column;
continue;
}
auto&& v = value(std::move(res));
record.emplace_back(v);
yieldRecord.emplace_back(v);
record.emplace_back(std::move(v));
}
// Check if duplicate, only for user-specified columns within yield clause
if (distinct_) {
auto ret = uniqResult->emplace(boost::hash_range(yieldRecord.begin(),
yieldRecord.end()));
if (!ret.second) {
continue;
}
}
auto cbStatus = cb(std::move(record), colTypes);
if (!cbStatus.ok()) {
Expand Down
3 changes: 3 additions & 0 deletions src/graph/LookupExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class LookupExecutor final : public TraverseExecutor {

Status prepareYield();

Status prepareDistinct();

Status checkAliasProperty(const AliasPropertyExpression* aExpr);

Status optimize();
Expand Down Expand Up @@ -97,6 +99,7 @@ class LookupExecutor final : public TraverseExecutor {
const std::string *from_{nullptr};
std::vector<YieldColumn*> yields_;
std::unique_ptr<YieldClauseWrapper> yieldClauseWrapper_;
bool distinct_{false};
std::unique_ptr<ExpressionContext> expCtx_;
int32_t tagOrEdge_;
bool isEdge_{false};
Expand Down
Loading

0 comments on commit 753234d

Please sign in to comment.