Skip to content

Commit

Permalink
move refresh agg index to enterprise (databendlabs#11980)
Browse files Browse the repository at this point in the history
  • Loading branch information
ariesdevil authored Jul 5, 2023
1 parent 1fc5f68 commit 72f145e
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use common_meta_app::schema::DropIndexReply;
use common_meta_app::schema::DropIndexReq;
use common_meta_app::schema::GetIndexReply;
use common_meta_app::schema::GetIndexReq;
use common_meta_app::schema::UpdateIndexReply;
use common_meta_app::schema::UpdateIndexReq;

#[async_trait::async_trait]
pub trait AggregatingIndexHandler: Sync + Send {
Expand All @@ -43,6 +45,12 @@ pub trait AggregatingIndexHandler: Sync + Send {
catalog: Arc<dyn Catalog>,
req: GetIndexReq,
) -> Result<GetIndexReply>;

async fn do_update_index(
&self,
catalog: Arc<dyn Catalog>,
req: UpdateIndexReq,
) -> Result<UpdateIndexReply>;
}

pub struct AggregatingIndexHandlerWrapper {
Expand Down Expand Up @@ -80,6 +88,15 @@ impl AggregatingIndexHandlerWrapper {
) -> Result<GetIndexReply> {
self.handler.do_get_index(catalog, req).await
}

#[async_backtrace::framed]
pub async fn do_update_index(
&self,
catalog: Arc<dyn Catalog>,
req: UpdateIndexReq,
) -> Result<UpdateIndexReply> {
self.handler.do_update_index(catalog, req).await
}
}

pub fn get_agg_index_handler() -> Arc<AggregatingIndexHandlerWrapper> {
Expand Down
11 changes: 11 additions & 0 deletions src/query/ee/src/aggregating_index/aggregating_index_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use common_meta_app::schema::DropIndexReply;
use common_meta_app::schema::DropIndexReq;
use common_meta_app::schema::GetIndexReply;
use common_meta_app::schema::GetIndexReq;
use common_meta_app::schema::UpdateIndexReply;
use common_meta_app::schema::UpdateIndexReq;

pub struct RealAggregatingIndexHandler {}

Expand Down Expand Up @@ -56,6 +58,15 @@ impl AggregatingIndexHandler for RealAggregatingIndexHandler {
) -> Result<GetIndexReply> {
catalog.get_index(req).await
}

#[async_backtrace::framed]
async fn do_update_index(
&self,
catalog: Arc<dyn Catalog>,
req: UpdateIndexReq,
) -> Result<UpdateIndexReply> {
catalog.update_index(req).await
}
}

impl RealAggregatingIndexHandler {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl Interpreter for DropIndexInterpreter {
let license_manager = get_license_manager();
license_manager.manager.check_enterprise_enabled(
&self.ctx.get_settings(),
self.ctx.get_tenant(),
tenant.clone(),
Feature::AggregateIndex,
)?;

Expand Down
12 changes: 11 additions & 1 deletion src/query/service/src/interpreters/interpreter_index_refresh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::sync::Arc;

use aggregating_index::get_agg_index_handler;
use common_base::runtime::GlobalIORuntime;
use common_catalog::plan::DataSourcePlan;
use common_catalog::plan::Partitions;
Expand All @@ -24,6 +25,8 @@ use common_expression::infer_table_schema;
use common_expression::DataField;
use common_expression::DataSchemaRefExt;
use common_expression::BLOCK_NAME_COL_NAME;
use common_license::license::Feature;
use common_license::license_manager::get_license_manager;
use common_meta_app::schema::IndexMeta;
use common_meta_app::schema::UpdateIndexReq;
use common_pipeline_core::processors::processor::ProcessorPtr;
Expand Down Expand Up @@ -178,6 +181,12 @@ impl Interpreter for RefreshIndexInterpreter {

#[async_backtrace::framed]
async fn execute2(&self) -> Result<PipelineBuildResult> {
let license_manager = get_license_manager();
license_manager.manager.check_enterprise_enabled(
&self.ctx.get_settings(),
self.ctx.get_tenant(),
Feature::AggregateIndex,
)?;
let (mut query_plan, output_schema, select_columns) = match self.plan.query_plan.as_ref() {
Plan::Query {
s_expr,
Expand Down Expand Up @@ -320,6 +329,7 @@ impl Interpreter for RefreshIndexInterpreter {

async fn modify_last_update(ctx: Arc<QueryContext>, req: UpdateIndexReq) -> Result<()> {
let catalog = ctx.get_catalog(&ctx.get_current_catalog())?;
catalog.update_index(req).await?;
let handler = get_agg_index_handler();
let _ = handler.do_update_index(catalog, req).await?;
Ok(())
}

0 comments on commit 72f145e

Please sign in to comment.