Skip to content

Commit

Permalink
fix unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
jyizheng committed Apr 20, 2021
1 parent 8f842c0 commit 8076d86
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 19 deletions.
28 changes: 11 additions & 17 deletions fusequery/query/src/optimizers/optimizer_projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use common_planners::AggregatorFinalPlan;
use common_planners::AggregatorPartialPlan;
use common_planners::EmptyPlan;
use common_planners::ExpressionPlan;
use common_planners::FilterPlan;
use common_planners::PlanNode;
use common_planners::ProjectionPlan;
use common_planners::ReadDataSourcePlan;
Expand Down Expand Up @@ -106,7 +107,6 @@ fn optimize_plan(
has_projection: bool,
) -> Result<PlanNode> {
let mut new_required_columns = required_columns.clone();
println!("(1) {:?}", required_columns);
match plan {
PlanNode::Projection(ProjectionPlan {
expr,
Expand All @@ -119,7 +119,6 @@ fn optimize_plan(
let mut new_expr = Vec::new();
let mut new_fields = Vec::new();
// Gather all columns needed
println!("Projection: (2) {:?}", schema.fields());
schema
.fields()
.iter()
Expand Down Expand Up @@ -147,6 +146,14 @@ fn optimize_plan(
}))
}
}
PlanNode::Filter(FilterPlan { predicate, input }) => {
expr_to_column_names(predicate, &mut new_required_columns)?;
let new_input =
optimize_plan(optimizer, &input, &new_required_columns, has_projection)?;
let mut cloned_plan = plan.clone();
cloned_plan.set_input(&new_input)?;
Ok(cloned_plan)
}
PlanNode::AggregatorFinal(AggregatorFinalPlan {
aggr_expr,
group_expr,
Expand Down Expand Up @@ -249,7 +256,6 @@ fn optimize_plan(
}
PlanNode::Empty(_) => Ok(plan.clone()),
_ => {
println!("others:{:?}", plan);
let input = plan.input();
let new_input = optimize_plan(optimizer, &input, &required_columns, has_projection)?;
let mut cloned_plan = plan.clone();
Expand All @@ -265,25 +271,13 @@ impl IOptimizer for ProjectionPushDownOptimizer {
}

fn optimize(&mut self, plan: &PlanNode) -> Result<PlanNode> {
let mut rewritten_node = PlanNode::Empty(EmptyPlan {
schema: Arc::new(DataSchema::empty()),
});

// set of all columns referred by the plan
println!("get required columns");
let required_columns = plan
.schema()
.fields()
.iter()
.map(|f| f.name().clone())
.collect::<HashSet<String>>();
println!("postorder");
plan.walk_postorder(|node| {
let mut new_node = optimize_plan(self, node, &required_columns, true)?;
new_node.set_input(&rewritten_node)?;
rewritten_node = new_node;
Ok(true)
})?;
Ok(rewritten_node)
let mut new_node = optimize_plan(self, plan, &required_columns, false)?;
Ok(new_node)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ fn test_projection_push_down_optimizer_2() -> anyhow::Result<()> {
.build()?;

let plan = PlanNode::Projection(ProjectionPlan {
expr: vec![col("a"), col("c")],
expr: vec![col("a")],
schema: Arc::new(DataSchema::new(vec![DataField::new(
"a",
DataType::Utf8,
Expand All @@ -91,7 +91,7 @@ fn test_projection_push_down_optimizer_2() -> anyhow::Result<()> {
let expect = "\
Projection: a:Utf8\
\n Filter: ((a > 6) and (b <= 10))\
\n ReadDataSource: scan partitions: [8], scan schema: [a:Utf8], statistics: [read_rows: 10000, read_bytes: 80000]";
\n ReadDataSource: scan partitions: [8], scan schema: [a:Utf8, b:Utf8], statistics: [read_rows: 10000, read_bytes: 80000]";
let actual = format!("{:?}", optimized);
assert_eq!(expect, actual);

Expand Down

0 comments on commit 8076d86

Please sign in to comment.