Skip to content

Commit

Permalink
fix: update ttl if new ttl greater than current ttl when depoly sql (4…
Browse files Browse the repository at this point in the history
  • Loading branch information
keyu813 authored May 10, 2022
1 parent 9121617 commit d963b5a
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 41 deletions.
77 changes: 76 additions & 1 deletion src/cmd/sql_cmd_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,81 @@ TEST_P(DBSDKTest, Deploy) {
ASSERT_FALSE(status.IsOK());
}

TEST_P(DBSDKTest, DeployWithSameIndex) {
auto cli = GetParam();
cs = cli->cs;
sr = cli->sr;
HandleSQL("create database test1;");
HandleSQL("use test1;");
std::string create_sql =
"create table trans (c1 string, c3 int, c4 bigint, c5 float, c6 double, c7 timestamp, "
"c8 date, index(key=c1, ts=c7, ttl=1, ttl_type=latest));";

HandleSQL(create_sql);
if (!cs->IsClusterMode()) {
HandleSQL("insert into trans values ('aaa', 11, 22, 1.2, 1.3, 1635247427000, \"2021-05-20\");");
}

// origin index
std::string msg;
auto ns_client = cs->GetNsClient();
std::vector<::openmldb::nameserver::TableInfo> tables;
ASSERT_TRUE(ns_client->ShowTable("trans", "test1", false, tables, msg));
::openmldb::nameserver::TableInfo table = tables[0];

ASSERT_EQ(table.column_key_size(), 1);
::openmldb::common::ColumnKey column_key = table.column_key(0);
ASSERT_EQ(column_key.col_name_size(), 1);
ASSERT_EQ(column_key.col_name(0), "c1");
ASSERT_EQ(column_key.ts_name(), "c7");
ASSERT_TRUE(column_key.has_ttl());
ASSERT_EQ(column_key.ttl().ttl_type(), ::openmldb::type::TTLType::kLatestTime);
ASSERT_EQ(column_key.ttl().lat_ttl(), 1);

std::string deploy_sql =
"deploy demo SELECT c1, c3, sum(c4) OVER w1 as w1_c4_sum FROM trans "
" WINDOW w1 AS (PARTITION BY trans.c1 ORDER BY trans.c7 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW);";
hybridse::sdk::Status status;
sr->ExecuteSQL(deploy_sql, &status);
ASSERT_TRUE(status.IsOK());

// new index, update ttl
tables.clear();
ASSERT_TRUE(ns_client->ShowTable("trans", "test1", false, tables, msg));
table = tables[0];

ASSERT_EQ(table.column_key_size(), 1);
column_key = table.column_key(0);
ASSERT_EQ(column_key.col_name_size(), 1);
ASSERT_EQ(column_key.col_name(0), "c1");
ASSERT_EQ(column_key.ts_name(), "c7");
ASSERT_TRUE(column_key.has_ttl());
ASSERT_EQ(column_key.ttl().ttl_type(), ::openmldb::type::TTLType::kLatestTime);
ASSERT_EQ(column_key.ttl().lat_ttl(), 2);

// type mismatch case
create_sql =
"create table trans1 (c1 string, c3 int, c4 bigint, c5 float, c6 double, c7 timestamp, "
"c8 date, index(key=c1, ts=c7, ttl=1m, ttl_type=absolute));";
HandleSQL(create_sql);
if (!cs->IsClusterMode()) {
HandleSQL("insert into trans1 values ('aaa', 11, 22, 1.2, 1.3, 1635247427000, \"2021-05-20\");");
}
deploy_sql =
"deploy demo SELECT c1, c3, sum(c4) OVER w1 as w1_c4_sum FROM trans1 "
" WINDOW w1 AS (PARTITION BY trans1.c1 ORDER BY trans1.c7 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW);";
sr->ExecuteSQL(deploy_sql, &status);
ASSERT_FALSE(status.IsOK());
ASSERT_EQ(status.msg, "new ttl type kLatestTime doesn't match the old ttl type kAbsoluteTime");


ASSERT_FALSE(cs->GetNsClient()->DropTable("test1", "trans", msg));
ASSERT_TRUE(cs->GetNsClient()->DropProcedure("test1", "demo", msg));
ASSERT_TRUE(cs->GetNsClient()->DropTable("test1", "trans", msg));
ASSERT_TRUE(cs->GetNsClient()->DropTable("test1", "trans1", msg));
ASSERT_TRUE(cs->GetNsClient()->DropDatabase("test1", msg));
}

TEST_P(DBSDKTest, DeployCol) {
auto cli = GetParam();
cs = cli->cs;
Expand Down Expand Up @@ -464,7 +539,7 @@ TEST_P(DBSDKTest, DeployLongWindows) {
HandleSQL("use test2;");
std::string create_sql =
"create table trans (c1 string, c3 int, c4 bigint, c5 float, c6 double, c7 timestamp, "
"c8 date, index(key=c1, ts=c4, abs_ttl=0, ttl_type=absolute));";
"c8 date, index(key=c1, ts=c4, ttl=0, ttl_type=latest));";
HandleSQL(create_sql);
if (!cs->IsClusterMode()) {
HandleSQL("insert into trans values ('aaa', 11, 22, 1.2, 1.3, 1635247427000, \"2021-05-20\");");
Expand Down
162 changes: 125 additions & 37 deletions src/sdk/sql_cluster_router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2912,6 +2912,31 @@ hybridse::sdk::Status SQLClusterRouter::HandleDeploy(const hybridse::node::Deplo
sp_info.set_sql(str_stream.str());
sp_info.set_type(::openmldb::type::ProcedureType::kReqDeployment);

auto index_status = HandleIndex(table_pair, select_sql);
if (!index_status.IsOK()) {
return index_status;
}

auto lw_status = HandleLongWindows(deploy_node, table_pair, select_sql);
if (!lw_status.IsOK()) {
return lw_status;
}
for (const auto& o : *deploy_node->Options()) {
auto option = sp_info.add_options();
option->set_name(o.first);
option->mutable_value()->set_value(o.second->GetExprString());
}

auto status = cluster_sdk_->GetNsClient()->CreateProcedure(sp_info, options_.request_timeout);
if (!status.OK()) {
return {::hybridse::common::StatusCode::kCmdError, status.msg};
}
return {};
}

hybridse::sdk::Status SQLClusterRouter::HandleIndex(const std::set<std::pair<std::string, std::string>>& table_pair,
const std::string& select_sql) {
std::string db = GetDatabase();
// extract index from sql
std::vector<::openmldb::nameserver::TableInfo> tables;
auto ns = cluster_sdk_->GetNsClient();
Expand All @@ -2933,60 +2958,138 @@ hybridse::sdk::Status SQLClusterRouter::HandleDeploy(const hybridse::node::Deplo
}
auto index_map = base::DDLParser::ExtractIndexes(select_sql, table_schema_map);
std::map<std::string, std::vector<::openmldb::common::ColumnKey>> new_index_map;
auto get_index_status = GetNewIndex(table_map, index_map, &new_index_map);
if (!get_index_status.IsOK()) {
return get_index_status;
}

auto add_index_status = AddNewIndex(table_map, new_index_map);
if (!add_index_status.IsOK()) {
return add_index_status;
}
return {};
}

hybridse::sdk::Status SQLClusterRouter::GetNewIndex(
const std::map<std::string, ::openmldb::nameserver::TableInfo>& table_map,
const std::map<std::string, std::vector<::openmldb::common::ColumnKey>>& index_map,
std::map<std::string, std::vector<::openmldb::common::ColumnKey>>* new_index_map) {
auto ns = cluster_sdk_->GetNsClient();
for (auto& kv : index_map) {
auto it = table_map.find(kv.first);
std::string table_name = kv.first;
std::vector<::openmldb::common::ColumnKey> extract_column_keys = kv.second;
auto it = table_map.find(table_name);
if (it == table_map.end()) {
return {::hybridse::common::StatusCode::kCmdError, "table " + kv.first + "is not exist"};
return {::hybridse::common::StatusCode::kCmdError, "table " + table_name + "is not exist"};
}
auto& table = it->second;
std::set<std::string> col_set;
for (const auto& column_desc : it->second.column_desc()) {
for (const auto& column_desc : table.column_desc()) {
col_set.insert(column_desc.name());
}
std::set<std::string> index_id_set;
for (const auto& column_key : it->second.column_key()) {
index_id_set.insert(openmldb::schema::IndexUtil::GetIDStr(column_key));
std::map<std::string, ::openmldb::common::ColumnKey> id_columnkey_map;
for (const auto& column_key : table.column_key()) {
if (id_columnkey_map.find(openmldb::schema::IndexUtil::GetIDStr(column_key)) != id_columnkey_map.end()) {
LOG(WARNING) << "exist two indexes which are the same id "
<< openmldb::schema::IndexUtil::GetIDStr(column_key) << " in table " << table_name;
}
id_columnkey_map.emplace(openmldb::schema::IndexUtil::GetIDStr(column_key), column_key);
}
int cur_index_num = it->second.column_key_size();
int cur_index_num = table.column_key_size();
int add_index_num = 0;
std::vector<::openmldb::common::ColumnKey> new_indexs;
for (auto& column_key : kv.second) {
for (auto& column_key : extract_column_keys) {
if (!column_key.has_ttl()) {
return {::hybridse::common::StatusCode::kCmdError, "table " + kv.first + " index has not ttl"};
return {::hybridse::common::StatusCode::kCmdError, "table " + table_name + " index has not ttl"};
}
if (!column_key.ts_name().empty() && col_set.count(column_key.ts_name()) == 0) {
return {::hybridse::common::StatusCode::kCmdError,
"ts col " + column_key.ts_name() + " is not exist in table " + kv.first};
"ts col " + column_key.ts_name() + " is not exist in table " + table_name};
}
for (const auto& col : column_key.col_name()) {
if (col_set.count(col) == 0) {
return {::hybridse::common::StatusCode::kCmdError,
"col " + col + " is not exist in table " + kv.first};
"col " + col + " is not exist in table " + table_name};
}
}
if (index_id_set.count(openmldb::schema::IndexUtil::GetIDStr(column_key)) > 0) {
// skip exist index
continue;
std::string index_id = openmldb::schema::IndexUtil::GetIDStr(column_key);
// index exist, if type match && new ttl greater than old ttl && old ttl != 0, update ttl, else skip
auto it = id_columnkey_map.find(index_id);
if (it != id_columnkey_map.end()) {
auto& old_column_key = it->second;
// type mismatch, return here and deploy failed
if (old_column_key.ttl().ttl_type() != column_key.ttl().ttl_type()) {
return {::hybridse::common::StatusCode::kCmdError,
"new ttl type " + ::openmldb::type::TTLType_Name(column_key.ttl().ttl_type()) +
" doesn't match the old ttl type " +
::openmldb::type::TTLType_Name(old_column_key.ttl().ttl_type())};
} else {
// type match, if old ttl == 0, won't update
::openmldb::type::TTLType type = column_key.ttl().ttl_type();
uint64_t old_abs_ttl = 0;
uint64_t old_lat_ttl = 0;
uint64_t new_abs_ttl = 0;
uint64_t new_lat_ttl = 0;
if (type == ::openmldb::type::TTLType::kAbsoluteTime) {
if (old_column_key.ttl().abs_ttl() != 0) {
old_abs_ttl = old_column_key.ttl().abs_ttl();
new_abs_ttl = column_key.ttl().abs_ttl();
}
} else if (type == ::openmldb::type::TTLType::kLatestTime) {
if (old_column_key.ttl().lat_ttl() != 0) {
old_lat_ttl = old_column_key.ttl().lat_ttl();
new_lat_ttl = column_key.ttl().lat_ttl();
}
} else {
// absandlat && absorlat should check abs_ttl and lat_ttl
if (old_column_key.ttl().abs_ttl() != 0) {
old_abs_ttl = old_column_key.ttl().abs_ttl();
new_abs_ttl = column_key.ttl().abs_ttl();
}
if (old_column_key.ttl().lat_ttl() != 0) {
old_lat_ttl = old_column_key.ttl().lat_ttl();
new_lat_ttl = column_key.ttl().lat_ttl();
}
}
if (new_abs_ttl > old_abs_ttl || new_lat_ttl > old_lat_ttl) {
// update ttl
auto ns_ptr = cluster_sdk_->GetNsClient();
std::string err;
bool ok = ns_ptr->UpdateTTL(table_name, type, new_abs_ttl, new_lat_ttl, column_key.index_name(), err);
if (!ok) {
return {::hybridse::common::StatusCode::kCmdError, "update ttl failed"};
}
}
}
} else {
column_key.set_index_name("INDEX_" + std::to_string(cur_index_num + add_index_num) + "_" +
std::to_string(::baidu::common::timer::now_time()));
add_index_num++;
new_indexs.emplace_back(column_key);
}
column_key.set_index_name("INDEX_" + std::to_string(cur_index_num + add_index_num) + "_" +
std::to_string(::baidu::common::timer::now_time()));
add_index_num++;
new_indexs.emplace_back(column_key);
}
if (!new_indexs.empty()) {
if (cluster_sdk_->IsClusterMode()) {
uint64_t record_cnt = 0;
for (int idx = 0; idx < it->second.table_partition_size(); idx++) {
record_cnt += it->second.table_partition(idx).record_cnt();
for (int idx = 0; idx < table.table_partition_size(); idx++) {
record_cnt += table.table_partition(idx).record_cnt();
}
if (record_cnt > 0) {
return {::hybridse::common::StatusCode::kCmdError,
"table " + kv.first +
"table " + table_name +
" has online data, cannot deploy. please drop this table and create a new one"};
}
}
new_index_map.emplace(kv.first, std::move(new_indexs));
new_index_map->emplace(table_name, std::move(new_indexs));
}
}
return {};
}

hybridse::sdk::Status SQLClusterRouter::AddNewIndex(
const std::map<std::string, ::openmldb::nameserver::TableInfo>& table_map,
const std::map<std::string, std::vector<::openmldb::common::ColumnKey>>& new_index_map) {
auto ns = cluster_sdk_->GetNsClient();
if (cluster_sdk_->IsClusterMode()) {
for (auto& kv : new_index_map) {
auto status = ns->AddMultiIndex(kv.first, kv.second);
Expand Down Expand Up @@ -3036,21 +3139,6 @@ hybridse::sdk::Status SQLClusterRouter::HandleDeploy(const hybridse::node::Deplo
}
}
}

auto lw_status = HandleLongWindows(deploy_node, table_pair, select_sql);
if (!lw_status.IsOK()) {
return lw_status;
}
for (const auto& o : *deploy_node->Options()) {
auto option = sp_info.add_options();
option->set_name(o.first);
option->mutable_value()->set_value(o.second->GetExprString());
}

auto status = ns->CreateProcedure(sp_info, options_.request_timeout);
if (!status.OK()) {
return {::hybridse::common::StatusCode::kCmdError, status.msg};
}
return {};
}

Expand Down
12 changes: 12 additions & 0 deletions src/sdk/sql_cluster_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,18 @@ class SQLClusterRouter : public SQLRouter {

hybridse::sdk::Status HandleDeploy(const hybridse::node::DeployPlanNode* deploy_node);

hybridse::sdk::Status HandleIndex(const std::set<std::pair<std::string, std::string>>& table_pair,
const std::string& select_sql);

hybridse::sdk::Status GetNewIndex(
const std::map<std::string, ::openmldb::nameserver::TableInfo>& table_map,
const std::map<std::string, std::vector<::openmldb::common::ColumnKey>>& index_map,
std::map<std::string, std::vector<::openmldb::common::ColumnKey>>* new_index_map);

hybridse::sdk::Status AddNewIndex(
const std::map<std::string, ::openmldb::nameserver::TableInfo>& table_map,
const std::map<std::string, std::vector<::openmldb::common::ColumnKey>>& new_index_map);

hybridse::sdk::Status HandleCreateFunction(const hybridse::node::CreateFunctionPlanNode* node);

hybridse::sdk::Status HandleLongWindows(const hybridse::node::DeployPlanNode* deploy_node,
Expand Down
4 changes: 2 additions & 2 deletions src/sdk/sql_cluster_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ TEST_F(SQLClusterTest, CreatePreAggrTable) {
"("
"col1 string, col2 bigint, col3 int,"
" index(key=col1, ts=col2,"
" TTL_TYPE=latest, TTL=1)) options(partitionnum=8, replicanum=2);";
" TTL_TYPE=absolute, TTL=1m)) options(partitionnum=8, replicanum=2);";
ok = router->ExecuteDDL(base_db, ddl, &status);
ASSERT_TRUE(ok);
ASSERT_TRUE(router->RefreshCatalog());
Expand Down Expand Up @@ -626,7 +626,7 @@ TEST_F(SQLClusterTest, PreAggrTableExist) {
"("
" col1 string, col2 bigint, col3 int,"
" index(key=col1, ts=col2,"
" TTL_TYPE=latest, TTL=1)) options(partitionnum=8);";
" TTL_TYPE=absolute, TTL=1m)) options(partitionnum=8);";
ok = router->ExecuteDDL(base_db, ddl, &status);
ASSERT_TRUE(ok);
ASSERT_TRUE(router->RefreshCatalog());
Expand Down
2 changes: 1 addition & 1 deletion src/sdk/sql_standalone_sdk_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ TEST_F(SQLSDKTest, CreatePreAggrTable) {
std::string deploy_sql = "deploy test1 options(long_windows='w1:1000') select col1,"
" sum(col3) over w1 as w1_sum_col3 from " + base_table +
" WINDOW w1 AS (PARTITION BY col1 ORDER BY col2"
" ROWS_RANGE BETWEEN 20s PRECEDING AND CURRENT ROW);";
" ROWS BETWEEN 1 PRECEDING AND CURRENT ROW);";
router->ExecuteSQL(base_db, "use " + base_db + ";", &status);
router->ExecuteSQL(base_db, deploy_sql, &status);

Expand Down

0 comments on commit d963b5a

Please sign in to comment.