From 2b4acb0caf5c098fab7e44bead2978ed84d5ee09 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Wed, 27 Jul 2016 11:10:52 +0800 Subject: [PATCH] support cbo (#1498) --- executor/aggregate_xapi.go | 9 +- executor/builder.go | 2 + executor/executor.go | 1 + executor/executor_test.go | 7 +- executor/new_builder.go | 32 +- executor/new_executor.go | 7 +- executor/new_executor_xapi.go | 7 +- expression/expression.go | 3 + meta/meta.go | 3 + plan/expression_rewriter.go | 12 +- plan/logical_plan_builder.go | 35 +- plan/logical_plans.go | 6 +- plan/match_property.go | 196 +++++++++++ plan/new_plan_test.go | 117 ++++++- plan/optimizer.go | 9 +- plan/physical_plan_builder.go | 628 +++++++++++++++++++++++++++------- plan/physical_plans.go | 120 ++++++- plan/plan.go | 40 ++- plan/predicate_push_down.go | 8 +- plan/push_limit.go | 247 +++++++++++++ plan/refiner.go | 17 +- plan/stringer.go | 33 +- 22 files changed, 1329 insertions(+), 210 deletions(-) create mode 100644 plan/match_property.go create mode 100644 plan/push_limit.go diff --git a/executor/aggregate_xapi.go b/executor/aggregate_xapi.go index e2bc6961119d1..eb5a878d30239 100644 --- a/executor/aggregate_xapi.go +++ b/executor/aggregate_xapi.go @@ -56,9 +56,6 @@ func (n *finalAggregater) update(count uint64, value types.Datum) error { // GetContext gets aggregate evaluation context for the current group. // If it is nil, add a new context into contextPerGroupMap. func (n *finalAggregater) getContext() *ast.AggEvaluateContext { - if n.contextPerGroupMap == nil { - n.contextPerGroupMap = make(map[string](*ast.AggEvaluateContext)) - } if _, ok := n.contextPerGroupMap[string(n.currentGroup)]; !ok { n.contextPerGroupMap[string(n.currentGroup)] = &ast.AggEvaluateContext{} } @@ -153,7 +150,8 @@ func (e *XAggregateExec) Next() (*Row, error) { e.aggregaters = make([]*finalAggregater, len(e.AggFuncs)) for i, af := range e.AggFuncs { agg := &finalAggregater{ - name: strings.ToLower(af.F), + name: strings.ToLower(af.F), + contextPerGroupMap: make(map[string](*ast.AggEvaluateContext)), } e.aggregaters[i] = agg } @@ -298,7 +296,8 @@ func (e *NewXAggregateExec) Next() (*Row, error) { e.aggregaters = make([]*finalAggregater, len(e.AggFuncs)) for i, af := range e.AggFuncs { agg := &finalAggregater{ - name: strings.ToLower(af.GetName()), + name: strings.ToLower(af.GetName()), + contextPerGroupMap: make(map[string](*ast.AggEvaluateContext)), } e.aggregaters[i] = agg } diff --git a/executor/builder.go b/executor/builder.go index d35f3f3daed79..2fb17864f4498 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -675,10 +675,12 @@ func (b *executorBuilder) buildNewUnionScanExec(src Executor, condition expressi us := &UnionScanExec{ctx: b.ctx, Src: src} switch x := src.(type) { case *NewXSelectTableExec: + us.desc = x.desc us.dirty = getDirtyDB(b.ctx).getDirtyTable(x.table.Meta().ID) us.newCondition = condition us.newBuildAndSortAddedRows(x.table, x.asName) case *NewXSelectIndexExec: + us.desc = x.indexPlan.Desc for _, ic := range x.indexPlan.Index.Columns { for i, col := range x.indexPlan.GetSchema() { if col.ColName.L == ic.Name.L { diff --git a/executor/executor.go b/executor/executor.go index df065fe9ac06b..0da8349f758ab 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -941,6 +941,7 @@ func (e *LimitExec) Next() (*Row, error) { // Close implements Executor Close interface. func (e *LimitExec) Close() error { + e.Idx = 0 return e.Src.Close() } diff --git a/executor/executor_test.go b/executor/executor_test.go index bdf875c5c658d..c39031db46d79 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -1342,7 +1342,7 @@ func (s *testSuite) TestDirtyTransaction(c *C) { tk.MustExec("begin") tk.MustQuery("select * from t").Check(testkit.Rows("2 3", "4 8", "6 8")) tk.MustExec("insert t values (1, 5), (3, 4), (7, 6)") - tk.MustQuery("select * from t").Check(testkit.Rows("2 3", "3 4", "1 5", "7 6", "4 8", "6 8")) + tk.MustQuery("select * from t").Check(testkit.Rows("1 5", "2 3", "3 4", "4 8", "6 8", "7 6")) tk.MustQuery("select * from t where a = 1").Check(testkit.Rows("1 5")) tk.MustQuery("select * from t order by a desc").Check(testkit.Rows("7 6", "6 8", "4 8", "3 4", "2 3", "1 5")) tk.MustQuery("select * from t order by b, a").Check(testkit.Rows("2 3", "3 4", "1 5", "7 6", "4 8", "6 8")) @@ -1350,12 +1350,13 @@ func (s *testSuite) TestDirtyTransaction(c *C) { tk.MustQuery("select b from t where b = 8 order by b desc").Check(testkit.Rows("8", "8")) // Delete a snapshot row and a dirty row. tk.MustExec("delete from t where a = 2 or a = 3") + tk.MustQuery("select * from t").Check(testkit.Rows("1 5", "4 8", "6 8", "7 6")) tk.MustQuery("select * from t order by a desc").Check(testkit.Rows("7 6", "6 8", "4 8", "1 5")) tk.MustQuery("select * from t order by b, a").Check(testkit.Rows("1 5", "7 6", "4 8", "6 8")) tk.MustQuery("select * from t order by b desc, a desc").Check(testkit.Rows("6 8", "4 8", "7 6", "1 5")) // Add deleted row back. tk.MustExec("insert t values (2, 3), (3, 4)") - tk.MustQuery("select * from t").Check(testkit.Rows("2 3", "3 4", "1 5", "7 6", "4 8", "6 8")) + tk.MustQuery("select * from t").Check(testkit.Rows("1 5", "2 3", "3 4", "4 8", "6 8", "7 6")) tk.MustQuery("select * from t order by a desc").Check(testkit.Rows("7 6", "6 8", "4 8", "3 4", "2 3", "1 5")) tk.MustQuery("select * from t order by b, a").Check(testkit.Rows("2 3", "3 4", "1 5", "7 6", "4 8", "6 8")) tk.MustQuery("select * from t order by b desc, a desc").Check(testkit.Rows("6 8", "4 8", "7 6", "1 5", "3 4", "2 3")) @@ -1700,6 +1701,8 @@ func (s *testSuite) TestAggregation(c *C) { tk.MustExec("insert t values (4, 3)") result := tk.MustQuery("select count(*) from t group by d") result.Check(testkit.Rows("3", "2", "2")) + result = tk.MustQuery("select - c, c as d from t group by c having null not between c and avg(distinct d) - d") + result.Check(testkit.Rows()) result = tk.MustQuery("select count(*) from (select d, c from t) k where d != 0 group by d") result.Check(testkit.Rows("3", "2", "2")) result = tk.MustQuery("select c as a from t group by d having a < 0") diff --git a/executor/new_builder.go b/executor/new_builder.go index 767e4652a7c5f..5e3d3d1c60562 100644 --- a/executor/new_builder.go +++ b/executor/new_builder.go @@ -47,32 +47,21 @@ func (b *executorBuilder) buildJoin(v *plan.PhysicalHashJoin) Executor { ctx: b.ctx, targetTypes: targetTypes, } - switch v.JoinType { - case plan.LeftOuterJoin: - e.outter = true - e.leftSmall = false + if v.SmallTable == 1 { e.smallFilter = expression.ComposeCNFCondition(v.RightConditions) e.bigFilter = expression.ComposeCNFCondition(v.LeftConditions) e.smallHashKey = rightHashKey e.bigHashKey = leftHashKey - case plan.RightOuterJoin: - e.outter = true + e.leftSmall = false + } else { e.leftSmall = true e.smallFilter = expression.ComposeCNFCondition(v.LeftConditions) e.bigFilter = expression.ComposeCNFCondition(v.RightConditions) e.smallHashKey = leftHashKey e.bigHashKey = rightHashKey - case plan.InnerJoin: - //TODO: assume right table is the small one before cbo is realized. - e.outter = false - e.leftSmall = false - e.smallFilter = expression.ComposeCNFCondition(v.RightConditions) - e.bigFilter = expression.ComposeCNFCondition(v.LeftConditions) - e.smallHashKey = rightHashKey - e.bigHashKey = leftHashKey - default: - b.err = ErrUnknownPlan.Gen("Unknown Join Type !!") - return nil + } + if v.JoinType == plan.LeftOuterJoin || v.JoinType == plan.RightOuterJoin { + e.outer = true } if e.leftSmall { e.smallExec = b.build(v.GetChildByIndex(0)) @@ -272,6 +261,8 @@ func (b *executorBuilder) buildNewTableScan(v *plan.PhysicalTableScan, s *plan.S schema: v.GetSchema(), Columns: v.Columns, ranges: v.Ranges, + desc: v.Desc, + limitCount: v.LimitCount, } ret = st if !txn.IsReadOnly() { @@ -298,8 +289,7 @@ func (b *executorBuilder) buildNewTableScan(v *plan.PhysicalTableScan, s *plan.S ranges: v.Ranges, } if v.Desc { - b.err = errors.New("Not implement yet.") - return nil + return &ReverseExec{Src: ts} } return ts } @@ -403,9 +393,9 @@ func (b *executorBuilder) buildNewUnion(v *plan.NewUnion) Executor { e := &NewUnionExec{ schema: v.GetSchema(), fields: v.Fields(), - Srcs: make([]Executor, len(v.Selects)), + Srcs: make([]Executor, len(v.GetChildren())), } - for i, sel := range v.Selects { + for i, sel := range v.GetChildren() { selExec := b.build(sel) e.Srcs[i] = selExec } diff --git a/executor/new_executor.go b/executor/new_executor.go index 84ab8b3c38ef9..1befdbf14d47b 100644 --- a/executor/new_executor.go +++ b/executor/new_executor.go @@ -43,7 +43,7 @@ type HashJoinExec struct { bigFilter expression.Expression otherFilter expression.Expression schema expression.Schema - outter bool + outer bool leftSmall bool matchedRows []*Row cursor int @@ -168,7 +168,6 @@ func (e *HashJoinExec) constructMatchedRows(bigRow *Row) (matchedRows []*Row, er } // match eq condition for _, smallRow := range rows { - //TODO: remove result fields in order to reduce memory copy cost. otherMatched := true var matchedRow *Row if e.leftSmall { @@ -257,7 +256,7 @@ func (e *HashJoinExec) Next() (*Row, error) { row, ok := e.returnRecord() if ok { return row, nil - } else if e.outter { + } else if e.outer { row = e.fillNullRow(bigRow) return row, nil } @@ -1333,7 +1332,7 @@ func (e *MaxOneRowExec) Next() (*Row, error) { return nil, errors.Trace(err) } if srcRow == nil { - return &Row{Data: []types.Datum{types.NewDatum(nil)}}, nil + return &Row{Data: make([]types.Datum, len(e.schema))}, nil } srcRow1, err := e.Src.Next() if err != nil { diff --git a/executor/new_executor_xapi.go b/executor/new_executor_xapi.go index a91a675ee6a4e..0d8a9b2d34ea5 100644 --- a/executor/new_executor_xapi.go +++ b/executor/new_executor_xapi.go @@ -191,7 +191,6 @@ func (e *NewXSelectIndexExec) doIndexRequest() (*xapi.SelectResult, error) { startTs := txn.StartTS() selIdxReq.StartTs = &startTs selIdxReq.IndexInfo = xapi.IndexToProto(e.table.Meta(), e.indexPlan.Index) - // Push limit to index request only if there is not filter conditions. selIdxReq.Limit = e.indexPlan.LimitCount if e.indexPlan.Desc { selIdxReq.OrderBy = append(selIdxReq.OrderBy, &tipb.ByItem{Desc: &e.indexPlan.Desc}) @@ -394,6 +393,8 @@ type NewXSelectTableExec struct { Columns []*model.ColumnInfo schema expression.Schema ranges []plan.TableRange + desc bool + limitCount *int64 /* The following attributes are used for aggregation push down. @@ -428,6 +429,10 @@ func (e *NewXSelectTableExec) doRequest() error { selReq.TableInfo = &tipb.TableInfo{ TableId: proto.Int64(e.tableInfo.ID), } + if e.supportDesc && e.desc { + selReq.OrderBy = append(selReq.OrderBy, &tipb.ByItem{Desc: &e.desc}) + } + selReq.Limit = e.limitCount selReq.TableInfo.Columns = xapi.ColumnsToProto(columns, e.tableInfo.PKIsHandle) // Aggregate Info selReq.Aggregates = e.aggFuncs diff --git a/expression/expression.go b/expression/expression.go index d9ca6fe6bf3c0..83e761bd44540 100644 --- a/expression/expression.go +++ b/expression/expression.go @@ -120,6 +120,9 @@ func (col *Column) Eval(row []types.Datum, _ context.Context) (types.Datum, erro // DeepCopy implements Expression interface. func (col *Column) DeepCopy() Expression { + if col.Correlated { + return col + } newCol := *col return &newCol } diff --git a/meta/meta.go b/meta/meta.go index 1227a8d93403f..5e48cd92a48b2 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -682,6 +682,9 @@ func (m *Meta) GetTableStats(tableID int64) (*statistics.TablePB, error) { if err != nil { return nil, errors.Trace(err) } + if len(data) == 0 { + return nil, nil + } tpb := &statistics.TablePB{} err = proto.Unmarshal(data, tpb) if err != nil { diff --git a/plan/expression_rewriter.go b/plan/expression_rewriter.go index f309c19d3f897..246021207414d 100644 --- a/plan/expression_rewriter.go +++ b/plan/expression_rewriter.go @@ -250,11 +250,12 @@ func (er *expressionRewriter) handleExistSubquery(v *ast.ExistsSubqueryExpr) (as er.err = errors.Trace(err) return v, true } - phyPlan := np.Convert2PhysicalPlan() - if err = refine(phyPlan); err != nil { + _, res, _, err := np.convert2PhysicalPlan(nil) + if err != nil { er.err = errors.Trace(err) return v, true } + phyPlan := res.p.PushLimit(nil) d, err := EvalSubquery(phyPlan, er.b.is, er.b.ctx) if err != nil { er.err = errors.Trace(err) @@ -368,11 +369,12 @@ func (er *expressionRewriter) handleScalarSubquery(v *ast.SubqueryExpr) (ast.Nod er.err = errors.Trace(err) return v, true } - phyPlan := np.Convert2PhysicalPlan() - if err = refine(phyPlan); err != nil { + _, res, _, err := np.convert2PhysicalPlan(nil) + if err != nil { er.err = errors.Trace(err) return v, true } + phyPlan := res.p.PushLimit(nil) d, err := EvalSubquery(phyPlan, er.b.is, er.b.ctx) if err != nil { er.err = errors.Trace(err) @@ -653,7 +655,7 @@ func (er *expressionRewriter) caseToScalarFunc(v *ast.CaseExpr) { value := er.ctxStack[stkLen-argsLen-1] args = make([]expression.Expression, 0, argsLen) for i := stkLen - argsLen; i < stkLen-1; i += 2 { - arg, err := expression.NewFunction(ast.EQ, types.NewFieldType(mysql.TypeTiny), value, er.ctxStack[i]) + arg, err := expression.NewFunction(ast.EQ, types.NewFieldType(mysql.TypeTiny), value.DeepCopy(), er.ctxStack[i]) if err != nil { er.err = errors.Trace(err) return diff --git a/plan/logical_plan_builder.go b/plan/logical_plan_builder.go index 7b17024c54665..3bd465d4d3ffe 100644 --- a/plan/logical_plan_builder.go +++ b/plan/logical_plan_builder.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/model" "github.com/pingcap/tidb/mysql" + "github.com/pingcap/tidb/plan/statistics" "github.com/pingcap/tidb/util/types" ) @@ -313,13 +314,13 @@ func (b *planBuilder) buildNewDistinct(src LogicalPlan) LogicalPlan { func (b *planBuilder) buildNewUnion(union *ast.UnionStmt) LogicalPlan { u := &NewUnion{baseLogicalPlan: newBaseLogicalPlan(Un, b.allocator)} u.initID() - u.Selects = make([]LogicalPlan, len(union.SelectList.Selects)) + u.children = make([]Plan, len(union.SelectList.Selects)) for i, sel := range union.SelectList.Selects { - u.Selects[i] = b.buildNewSelect(sel) - u.correlated = u.correlated || u.Selects[i].IsCorrelated() + u.children[i] = b.buildNewSelect(sel) + u.correlated = u.correlated || u.children[i].IsCorrelated() } - firstSchema := u.Selects[0].GetSchema().DeepCopy() - for _, sel := range u.Selects { + firstSchema := u.children[0].GetSchema().DeepCopy() + for _, sel := range u.children { if len(firstSchema) != len(sel.GetSchema()) { b.err = errors.New("The used SELECT statements have a different number of columns") return nil @@ -344,7 +345,7 @@ func (b *planBuilder) buildNewUnion(union *ast.UnionStmt) LogicalPlan { firstSchema[i].RetType.Tp = col.RetType.Tp } } - addChild(u, sel) + sel.SetParents(u) } for _, v := range firstSchema { v.FromID = u.id @@ -401,10 +402,6 @@ func (b *planBuilder) buildNewLimit(src LogicalPlan, limit *ast.Limit) LogicalPl } li.initID() li.correlated = src.IsCorrelated() - if s, ok := src.(*NewSort); ok { - s.ExecLimit = li - return s - } addChild(li, src) li.SetSchema(src.GetSchema().DeepCopy()) return li @@ -440,6 +437,9 @@ func resolveFromSelectFields(v *ast.ColumnNameExpr, fields []*ast.SelectField) ( var matchedExpr ast.ExprNode index = -1 for i, field := range fields { + if field.Auxiliary { + continue + } if matchField(field, v) { curCol, isCol := field.Expr.(*ast.ColumnNameExpr) if !isCol { @@ -821,11 +821,21 @@ func (b *planBuilder) buildNewTableDual() LogicalPlan { return dual } +func (b *planBuilder) getTableStats(table *model.TableInfo) *statistics.Table { + // TODO: Currently we always retrun a pseudo table for good performance. We will use a cache in future. + return statistics.PseudoTable(table) +} + func (b *planBuilder) buildDataSource(tn *ast.TableName) LogicalPlan { + statisticTable := b.getTableStats(tn.TableInfo) + if b.err != nil { + return nil + } p := &DataSource{ table: tn, Table: tn.TableInfo, baseLogicalPlan: newBaseLogicalPlan(Ts, b.allocator), + statisticTable: statisticTable, } p.initID() // Equal condition contains a column from previous joined table. @@ -975,7 +985,8 @@ func (b *planBuilder) buildSemiJoin(outerPlan, innerPlan LogicalPlan, onConditio joinPlan.JoinType = SemiJoin } joinPlan.anti = not - addChild(joinPlan, outerPlan) - addChild(joinPlan, innerPlan) + joinPlan.SetChildren(outerPlan, innerPlan) + outerPlan.SetParents(joinPlan) + innerPlan.SetParents(joinPlan) return joinPlan } diff --git a/plan/logical_plans.go b/plan/logical_plans.go index d15dd42a91cb6..7f202411fdec8 100644 --- a/plan/logical_plans.go +++ b/plan/logical_plans.go @@ -18,6 +18,7 @@ import ( "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/model" + "github.com/pingcap/tidb/plan/statistics" ) // JoinType contains CrossJoin, InnerJoin, LeftOuterJoin, RightOuterJoin, FullOuterJoin, SemiJoin. @@ -60,6 +61,7 @@ type Projection struct { // Aggregation represents an aggregate plan. type Aggregation struct { baseLogicalPlan + // TODO: implement hash aggregation and streamed aggreagtion AggFuncs []expression.AggregationFunction GroupByItems []expression.Expression } @@ -113,6 +115,8 @@ type DataSource struct { TableAsName *model.CIStr LimitCount *int64 + + statisticTable *statistics.Table } // Trim trims child's rows. @@ -123,8 +127,6 @@ type Trim struct { // NewUnion represents Union plan. type NewUnion struct { baseLogicalPlan - - Selects []LogicalPlan } // NewSort stands for the order by plan. diff --git a/plan/match_property.go b/plan/match_property.go new file mode 100644 index 0000000000000..a08a202ed6c49 --- /dev/null +++ b/plan/match_property.go @@ -0,0 +1,196 @@ +// Copyright 2016 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package plan + +import ( + "github.com/pingcap/tidb/util/types" + "math" +) + +// matchProperty implements PhysicalPlan matchProperty interface. +func (ts *PhysicalTableScan) matchProperty(prop requiredProperty, rowCounts []uint64, _ ...*physicalPlanInfo) *physicalPlanInfo { + rowCount := float64(rowCounts[0]) + cost := rowCount * netWorkFactor + if len(prop) == 0 { + return &physicalPlanInfo{p: ts, cost: cost} + } + if len(prop) == 1 && ts.pkCol != nil && ts.pkCol == prop[0].col { + sortedTs := *ts + sortedTs.Desc = prop[0].desc + return &physicalPlanInfo{p: &sortedTs, cost: cost} + } + return &physicalPlanInfo{p: ts, cost: math.MaxFloat64} +} + +// matchProperty implements PhysicalPlan matchProperty interface. +func (is *PhysicalIndexScan) matchProperty(prop requiredProperty, rowCounts []uint64, _ ...*physicalPlanInfo) *physicalPlanInfo { + rowCount := float64(rowCounts[0]) + // currently index read from kv 2 times. + cost := rowCount * netWorkFactor * 2 + if len(prop) == 0 { + return &physicalPlanInfo{p: is, cost: cost} + } + matched := 0 + allDesc, allAsc := true, true + for i, indexCol := range is.Index.Columns { + if indexCol.Length != types.UnspecifiedLength { + break + } + if prop[matched].col.ColName.L != indexCol.Name.L { + if matched == 0 && i < is.accessEqualCount { + continue + } + break + } + if prop[matched].desc { + allAsc = false + } else { + allDesc = false + } + matched++ + if matched == len(prop) { + break + } + } + if matched == len(prop) { + sortedCost := cost + rowCount*math.Log2(rowCount)*cpuFactor + if allDesc { + sortedIs := *is + sortedIs.Desc = true + sortedIs.OutOfOrder = false + return &physicalPlanInfo{p: &sortedIs, cost: sortedCost} + } + if allAsc { + sortedIs := *is + sortedIs.OutOfOrder = false + return &physicalPlanInfo{p: &sortedIs, cost: sortedCost} + } + } + return &physicalPlanInfo{p: is, cost: math.MaxFloat64} +} + +// matchProperty implements PhysicalPlan matchProperty interface. +func (p *PhysicalHashSemiJoin) matchProperty(prop requiredProperty, _ []uint64, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo { + lRes, rRes := childPlanInfo[0], childPlanInfo[1] + np := *p + np.SetChildren(lRes.p, rRes.p) + cost := lRes.cost + rRes.cost + return &physicalPlanInfo{p: &np, cost: cost} +} + +// matchProperty implements PhysicalPlan matchProperty interface. +func (p *PhysicalApply) matchProperty(prop requiredProperty, rowCounts []uint64, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo { + np := *p + np.SetChildren(childPlanInfo[0].p) + return &physicalPlanInfo{p: &np, cost: childPlanInfo[0].cost} +} + +// matchProperty implements PhysicalPlan matchProperty interface. +func (p *PhysicalHashJoin) matchProperty(prop requiredProperty, rowCounts []uint64, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo { + lRes, rRes := childPlanInfo[0], childPlanInfo[1] + lCount, rCount := float64(rowCounts[0]), float64(rowCounts[1]) + np := *p + np.SetChildren(lRes.p, rRes.p) + cost := lRes.cost + rRes.cost + if p.SmallTable == 1 { + cost += lCount + memoryFactor*rCount + } else { + cost += rCount + memoryFactor*lCount + } + return &physicalPlanInfo{p: &np, cost: cost} +} + +// matchProperty implements PhysicalPlan matchProperty interface. +func (p *NewUnion) matchProperty(prop requiredProperty, _ []uint64, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo { + np := *p + children := make([]Plan, 0, len(childPlanInfo)) + cost := float64(0) + for _, res := range childPlanInfo { + children = append(children, res.p) + cost += res.cost + } + np.SetChildren(children...) + return &physicalPlanInfo{p: &np, cost: cost} +} + +// matchProperty implements PhysicalPlan matchProperty interface. +func (p *Selection) matchProperty(prop requiredProperty, rowCounts []uint64, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo { + if len(childPlanInfo) == 0 { + res := p.GetChildByIndex(0).(PhysicalPlan).matchProperty(prop, rowCounts) + sel := *p + sel.SetChildren(res.p) + res.p = &sel + return res + } + np := *p + np.SetChildren(childPlanInfo[0].p) + return &physicalPlanInfo{p: &np, cost: childPlanInfo[0].cost} +} + +// matchProperty implements PhysicalPlan matchProperty interface. +func (p *Projection) matchProperty(_ requiredProperty, _ []uint64, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo { + np := *p + np.SetChildren(childPlanInfo[0].p) + return &physicalPlanInfo{p: &np, cost: childPlanInfo[0].cost} +} + +// matchProperty implements PhysicalPlan matchProperty interface. +func (p *MaxOneRow) matchProperty(_ requiredProperty, _ []uint64, _ ...*physicalPlanInfo) *physicalPlanInfo { + panic("You can't call this function!") +} + +// matchProperty implements PhysicalPlan matchProperty interface. +func (p *Exists) matchProperty(_ requiredProperty, _ []uint64, _ ...*physicalPlanInfo) *physicalPlanInfo { + panic("You can't call this function!") +} + +// matchProperty implements PhysicalPlan matchProperty interface. +func (p *Trim) matchProperty(_ requiredProperty, _ []uint64, _ ...*physicalPlanInfo) *physicalPlanInfo { + panic("You can't call this function!") +} + +// matchProperty implements PhysicalPlan matchProperty interface. +func (p *Aggregation) matchProperty(_ requiredProperty, _ []uint64, _ ...*physicalPlanInfo) *physicalPlanInfo { + panic("You can't call this function!") +} + +// matchProperty implements PhysicalPlan matchProperty interface. +func (p *Limit) matchProperty(_ requiredProperty, _ []uint64, _ ...*physicalPlanInfo) *physicalPlanInfo { + panic("You can't call this function!") +} + +// matchProperty implements PhysicalPlan matchProperty interface. +func (p *Distinct) matchProperty(_ requiredProperty, _ []uint64, _ ...*physicalPlanInfo) *physicalPlanInfo { + panic("You can't call this function!") +} + +// matchProperty implements PhysicalPlan matchProperty interface. +func (p *NewTableDual) matchProperty(_ requiredProperty, _ []uint64, _ ...*physicalPlanInfo) *physicalPlanInfo { + panic("You can't call this function!") +} + +// matchProperty implements PhysicalPlan matchProperty interface. +func (p *NewSort) matchProperty(_ requiredProperty, _ []uint64, _ ...*physicalPlanInfo) *physicalPlanInfo { + panic("You can't call this function!") +} + +// matchProperty implements PhysicalPlan matchProperty interface. +func (p *Insert) matchProperty(_ requiredProperty, _ []uint64, _ ...*physicalPlanInfo) *physicalPlanInfo { + panic("You can't call this function!") +} + +// matchProperty implements PhysicalPlan matchProperty interface. +func (p *SelectLock) matchProperty(_ requiredProperty, _ []uint64, _ ...*physicalPlanInfo) *physicalPlanInfo { + panic("You can't call this function!") +} diff --git a/plan/new_plan_test.go b/plan/new_plan_test.go index 79b97742c4af1..73eff36701d2e 100644 --- a/plan/new_plan_test.go +++ b/plan/new_plan_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tidb/util/testleak" + "github.com/pingcap/tidb/util/types" ) func newMockResolve(node ast.Node) error { @@ -32,13 +33,16 @@ func newMockResolve(node ast.Node) error { Name: model.NewCIStr("c_d_e"), Columns: []*model.IndexColumn{ { - Name: model.NewCIStr("c"), + Name: model.NewCIStr("c"), + Length: types.UnspecifiedLength, }, { - Name: model.NewCIStr("d"), + Name: model.NewCIStr("d"), + Length: types.UnspecifiedLength, }, { - Name: model.NewCIStr("e"), + Name: model.NewCIStr("e"), + Length: types.UnspecifiedLength, }, }, }, @@ -171,6 +175,7 @@ func (s *testPlanSuite) TestPredicatePushDown(c *C) { builder := &planBuilder{ allocator: new(idAllocator), + ctx: mock.NewContext(), } p := builder.build(stmt) c.Assert(builder.err, IsNil) @@ -186,6 +191,88 @@ func (s *testPlanSuite) TestPredicatePushDown(c *C) { UseNewPlanner = false } +func (s *testPlanSuite) TestCBO(c *C) { + UseNewPlanner = true + defer testleak.AfterTest(c)() + cases := []struct { + sql string + best string + }{ + { + sql: "select * from t a where a.c = 1 order by a.d limit 2", + best: "Index(t.c_d_e)[[1,1]]->Projection", + }, + { + sql: "select * from t a where 1 = a.c and a.d > 1 order by a.d desc limit 2", + best: "Index(t.c_d_e)[(1 1,1 ]]->Projection", + }, + { + sql: "select * from t a where a.c < 10000 order by a.a limit 2", + best: "Table(t)->Selection->Limit->Projection", + }, + { + sql: "select * from (select * from t) a left outer join (select * from t) b on 1 order by a.c", + best: "LeftHashJoin{Index(t.c_d_e)[[,]]->Projection->Table(t)->Projection}->Projection", + }, + { + sql: "select * from (select * from t) a left outer join (select * from t) b on 1 order by b.c", + best: "LeftHashJoin{Table(t)->Projection->Table(t)->Projection}->Projection->Sort", + }, + { + sql: "select * from (select * from t) a right outer join (select * from t) b on 1 order by a.c", + best: "RightHashJoin{Table(t)->Projection->Table(t)->Projection}->Projection->Sort", + }, + { + sql: "select * from (select * from t) a right outer join (select * from t) b on 1 order by b.c", + best: "RightHashJoin{Table(t)->Projection->Index(t.c_d_e)[[,]]->Projection}->Projection", + }, + { + sql: "select * from t a where exists(select * from t b where a.a = b.a) and a.c = 1 order by a.d limit 3", + best: "SemiJoin{Index(t.c_d_e)[[1,1]]->Table(t)}->Limit->Projection", + }, + { + sql: "select exists(select * from t b where a.a = b.a and b.c = 1) from t a order by a.c limit 3", + best: "SemiJoinWithAux{Index(t.c_d_e)[[,]]->Index(t.c_d_e)[[1,1]]}->Projection->Trim", + }, + { + sql: "select * from (select t.a from t union select t.d from t where t.c = 1 union select t.c from t) k order by a limit 1", + best: "UnionAll{Table(t)->Projection->Index(t.c_d_e)[[1,1]]->Projection->Index(t.c_d_e)[[,]]->Projection}->Distinct->Limit->Projection", + }, + { + sql: "select * from (select t.a from t union select t.d from t union select t.c from t) k order by a limit 1", + best: "UnionAll{Table(t)->Projection->Table(t)->Projection->Table(t)->Projection}->Distinct->Projection->Sort + Limit(1) + Offset(0)", + }, + } + for _, ca := range cases { + comment := Commentf("for %s", ca.sql) + stmt, err := s.ParseOneStmt(ca.sql, "", "") + c.Assert(err, IsNil, comment) + ast.SetFlag(stmt) + + err = newMockResolve(stmt) + c.Assert(err, IsNil) + + builder := &planBuilder{ + allocator: new(idAllocator), + ctx: mock.NewContext(), + colMapper: make(map[*ast.ColumnNameExpr]int), + } + p := builder.build(stmt) + c.Assert(builder.err, IsNil) + lp := p.(LogicalPlan) + + _, lp, err = lp.PredicatePushDown(nil) + c.Assert(err, IsNil) + _, err = lp.PruneColumnsAndResolveIndices(lp.GetSchema()) + c.Assert(err, IsNil) + _, res, _, err := lp.convert2PhysicalPlan(nil) + c.Assert(err, IsNil) + p = res.p.PushLimit(nil) + c.Assert(ToString(p), Equals, ca.best, Commentf("for %s", ca.sql)) + } + UseNewPlanner = false +} + func (s *testPlanSuite) TestRefine(c *C) { UseNewPlanner = true defer testleak.AfterTest(c)() @@ -195,11 +282,11 @@ func (s *testPlanSuite) TestRefine(c *C) { }{ { sql: "select a from t where c = 4 and d = 5 and e = 6", - best: "Index(t.c_d_e)[[4 5 6,4 5 6]]->Selection->Projection", + best: "Index(t.c_d_e)[[4 5 6,4 5 6]]->Projection", }, { sql: "select a from t where d = 4 and c = 5", - best: "Index(t.c_d_e)[[5 4,5 4]]->Selection->Projection", + best: "Index(t.c_d_e)[[5 4,5 4]]->Projection", }, { sql: "select a from t where c = 4 and e < 5", @@ -207,11 +294,11 @@ func (s *testPlanSuite) TestRefine(c *C) { }, { sql: "select a from t where c = 4 and d <= 5 and d > 3", - best: "Index(t.c_d_e)[[4 3,4 5]]->Selection->Projection", + best: "Index(t.c_d_e)[(4 3,4 5]]->Projection", }, { sql: "select a from t where d <= 5 and d > 3", - best: "Index(t.c_d_e)[[,]]->Selection->Projection", + best: "Table(t)->Selection->Projection", }, { sql: "select a from t where c <= 5 and c >= 3 and d = 1", @@ -229,6 +316,7 @@ func (s *testPlanSuite) TestRefine(c *C) { builder := &planBuilder{ allocator: new(idAllocator), + ctx: mock.NewContext(), } p := builder.build(stmt).(LogicalPlan) c.Assert(builder.err, IsNil) @@ -237,9 +325,9 @@ func (s *testPlanSuite) TestRefine(c *C) { c.Assert(err, IsNil) _, err = p.PruneColumnsAndResolveIndices(p.GetSchema()) c.Assert(err, IsNil) - np := p.Convert2PhysicalPlan() - err = refine(np) + _, res, _, err := p.convert2PhysicalPlan(nil) c.Assert(err, IsNil) + np := res.p.PushLimit(nil) c.Assert(ToString(np), Equals, ca.best, Commentf("for %s", ca.sql)) } UseNewPlanner = false @@ -366,7 +454,9 @@ func (s *testPlanSuite) TestColumnPruning(c *C) { builder := &planBuilder{ colMapper: make(map[*ast.ColumnNameExpr]int), - allocator: new(idAllocator)} + allocator: new(idAllocator), + ctx: mock.NewContext(), + } p := builder.build(stmt).(LogicalPlan) c.Assert(builder.err, IsNil, comment) @@ -380,9 +470,9 @@ func (s *testPlanSuite) TestColumnPruning(c *C) { } func (s *testPlanSuite) TestAllocID(c *C) { - pA := &PhysicalTableScan{baseLogicalPlan: newBaseLogicalPlan(Ts, new(idAllocator))} + pA := &DataSource{baseLogicalPlan: newBaseLogicalPlan(Ts, new(idAllocator))} - pB := &PhysicalTableScan{baseLogicalPlan: newBaseLogicalPlan(Ts, new(idAllocator))} + pB := &DataSource{baseLogicalPlan: newBaseLogicalPlan(Ts, new(idAllocator))} pA.initID() pB.initID() @@ -553,7 +643,8 @@ func (s *testPlanSuite) TestNewRangeBuilder(c *C) { err = newMockResolve(stmt) c.Assert(err, IsNil) - p, err := BuildPlan(stmt, nil) + builder := &planBuilder{allocator: new(idAllocator), ctx: mock.NewContext()} + p := builder.build(stmt) c.Assert(err, IsNil, Commentf("error %v, for build plan, expr %s", err, ca.exprStr)) var selection *Selection for _, child := range p.GetChildren() { diff --git a/plan/optimizer.go b/plan/optimizer.go index 090e0dabaedb6..0ce81846fd71e 100644 --- a/plan/optimizer.go +++ b/plan/optimizer.go @@ -15,6 +15,7 @@ package plan import ( "github.com/juju/errors" + "github.com/ngaut/log" "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/context" "github.com/pingcap/tidb/infoschema" @@ -52,7 +53,13 @@ func Optimize(ctx context.Context, node ast.Node, sb SubQueryBuilder, is infosch if err != nil { return nil, errors.Trace(err) } - p = logic.Convert2PhysicalPlan() + _, res, _, err := logic.convert2PhysicalPlan(nil) + if err != nil { + return nil, errors.Trace(err) + } + p = res.p.PushLimit(nil) + log.Debugf("[PLAN] %s", ToString(p)) + return p, nil } err := Refine(p) if err != nil { diff --git a/plan/physical_plan_builder.go b/plan/physical_plan_builder.go index e8eaff4fe0aa3..c5dc6b6b965fa 100644 --- a/plan/physical_plan_builder.go +++ b/plan/physical_plan_builder.go @@ -13,46 +13,307 @@ package plan -// Convert2PhysicalPlan implements LogicalPlan Convert2PhysicalPlan interface. -func (p *DataSource) Convert2PhysicalPlan() (np PhysicalPlan) { - indices, includeTableScan := availableIndices(p.table) - if includeTableScan && len(indices) == 0 { - np = &PhysicalTableScan{ - Table: p.Table, - Columns: p.Columns, - TableAsName: p.TableAsName, - DBName: p.DBName, +import ( + "github.com/juju/errors" + "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/model" + "github.com/pingcap/tidb/mysql" + "github.com/pingcap/tidb/plan/statistics" + "github.com/pingcap/tidb/util/types" + "math" +) + +const ( + netWorkFactor = 1.5 + memoryFactor = 5.0 + selectionFactor = 0.8 + distinctFactor = 0.7 + cpuFactor = 0.9 +) + +func getRowCountByIndexRange(table *statistics.Table, indexRange *IndexRange, indexInfo *model.IndexInfo) (uint64, error) { + count := float64(table.Count) + for i := 0; i < len(indexRange.LowVal); i++ { + l := indexRange.LowVal[i] + r := indexRange.HighVal[i] + var rowCount int64 + var err error + offset := indexInfo.Columns[i].Offset + if l.Kind() == types.KindMinNotNull && r.Kind() == types.KindMaxValue { + break + } else if l.Kind() == types.KindMinNotNull { + rowCount, err = table.Columns[offset].LessRowCount(r) + } else if r.Kind() == types.KindMaxValue { + rowCount, err = table.Columns[offset].LessRowCount(r) + rowCount = table.Count - rowCount + } else { + rowCount, err = table.Columns[offset].BetweenRowCount(l, r) + } + if err != nil { + return 0, errors.Trace(err) + } + count = count / float64(table.Count) * float64(rowCount) + } + return uint64(count), nil +} + +func (p *DataSource) handleTableScan(prop requiredProperty) (*physicalPlanInfo, *physicalPlanInfo, error) { + statsTbl := p.statisticTable + table := p.Table + var resultPlan PhysicalPlan + ts := &PhysicalTableScan{ + Table: p.Table, + Columns: p.Columns, + TableAsName: p.TableAsName, + DBName: p.DBName, + } + ts.SetSchema(p.GetSchema()) + resultPlan = ts + if sel, ok := p.GetParentByIndex(0).(*Selection); ok { + newSel := *sel + conds := make([]expression.Expression, 0, len(sel.Conditions)) + for _, cond := range sel.Conditions { + conds = append(conds, cond.DeepCopy()) + } + ts.AccessCondition, newSel.Conditions = detachTableScanConditions(conds, table) + err := buildNewTableRange(ts) + if err != nil { + return nil, nil, errors.Trace(err) + } + if len(newSel.Conditions) > 0 { + newSel.SetChildren(ts) + resultPlan = &newSel } } else { - // TODO: Temporily we choose a random index. - np = &PhysicalIndexScan{ - Index: indices[0], - Table: p.Table, - Columns: p.Columns, - TableAsName: p.TableAsName, - DBName: p.DBName, + ts.Ranges = []TableRange{{math.MinInt64, math.MaxInt64}} + } + rowCount := uint64(statsTbl.Count) + if table.PKIsHandle { + for i, colInfo := range ts.Columns { + if mysql.HasPriKeyFlag(colInfo.Flag) { + ts.pkCol = p.GetSchema()[i] + break + } + } + var offset int + for _, colInfo := range table.Columns { + if mysql.HasPriKeyFlag(colInfo.Flag) { + offset = colInfo.Offset + break + } + } + rowCount = 0 + for _, rg := range ts.Ranges { + var cnt int64 + var err error + if rg.LowVal == math.MinInt64 && rg.HighVal == math.MaxInt64 { + cnt = statsTbl.Count + } else if rg.LowVal == math.MinInt64 { + cnt, err = statsTbl.Columns[offset].LessRowCount(types.NewDatum(rg.HighVal)) + } else if rg.HighVal == math.MaxInt64 { + cnt, err = statsTbl.Columns[offset].LessRowCount(types.NewDatum(rg.HighVal)) + cnt = statsTbl.Count - cnt + } else { + cnt, err = statsTbl.Columns[offset].BetweenRowCount(types.NewDatum(rg.LowVal), types.NewDatum(rg.HighVal)) + } + if err != nil { + return nil, nil, errors.Trace(err) + } + rowCount += uint64(cnt) } } - np.SetSchema(p.GetSchema()) - return np + rowCounts := []uint64{rowCount} + return resultPlan.matchProperty(prop, rowCounts), resultPlan.matchProperty(nil, rowCounts), nil +} + +func (p *DataSource) handleIndexScan(prop requiredProperty, index *model.IndexInfo) (*physicalPlanInfo, *physicalPlanInfo, error) { + statsTbl := p.statisticTable + var resultPlan PhysicalPlan + is := &PhysicalIndexScan{ + Index: index, + Table: p.Table, + Columns: p.Columns, + TableAsName: p.TableAsName, + OutOfOrder: true, + DBName: p.DBName, + } + is.SetSchema(p.schema) + rowCount := uint64(statsTbl.Count) + resultPlan = is + if sel, ok := p.GetParentByIndex(0).(*Selection); ok { + rowCount = 0 + newSel := *sel + conds := make([]expression.Expression, 0, len(sel.Conditions)) + for _, cond := range sel.Conditions { + conds = append(conds, cond.DeepCopy()) + } + is.AccessCondition, newSel.Conditions = detachIndexScanConditions(conds, is) + err := buildNewIndexRange(is) + if err != nil { + return nil, nil, errors.Trace(err) + } + for _, idxRange := range is.Ranges { + cnt, err := getRowCountByIndexRange(statsTbl, idxRange, is.Index) + if err != nil { + return nil, nil, errors.Trace(err) + } + rowCount += cnt + } + if len(newSel.Conditions) > 0 { + newSel.SetChildren(is) + resultPlan = &newSel + } + } else { + rb := rangeBuilder{} + is.Ranges = rb.buildIndexRanges(fullRange) + } + rowCounts := []uint64{rowCount} + return resultPlan.matchProperty(prop, rowCounts), resultPlan.matchProperty(nil, rowCounts), nil +} + +// convert2PhysicalPlan implements LogicalPlan convert2PhysicalPlan interface. +func (p *DataSource) convert2PhysicalPlan(prop requiredProperty) (*physicalPlanInfo, *physicalPlanInfo, uint64, error) { + statsTbl := p.statisticTable + indices, includeTableScan := availableIndices(p.table) + var sortedRes, unsortedRes *physicalPlanInfo + var err error + if includeTableScan { + sortedRes, unsortedRes, err = p.handleTableScan(prop) + if err != nil { + return nil, nil, 0, errors.Trace(err) + } + } + for _, index := range indices { + sortedIsRes, unsortedIsRes, err := p.handleIndexScan(prop, index) + if err != nil { + return nil, nil, 0, errors.Trace(err) + } + if sortedRes == nil || sortedIsRes.cost < sortedRes.cost { + sortedRes = sortedIsRes + } + if unsortedRes == nil || unsortedIsRes.cost < unsortedRes.cost { + unsortedRes = unsortedIsRes + } + } + return sortedRes, unsortedRes, uint64(statsTbl.Count), nil +} + +func addPlanToResponse(p PhysicalPlan, res *physicalPlanInfo) *physicalPlanInfo { + np := p.Copy() + np.SetChildren(res.p) + return &physicalPlanInfo{p: np, cost: res.cost} +} + +// convert2PhysicalPlan implements LogicalPlan convert2PhysicalPlan interface. +func (p *Limit) convert2PhysicalPlan(prop requiredProperty) (*physicalPlanInfo, *physicalPlanInfo, uint64, error) { + res0, res1, count, err := p.GetChildByIndex(0).(LogicalPlan).convert2PhysicalPlan(prop) + if err != nil { + return nil, nil, 0, errors.Trace(err) + } + if p.Offset+p.Count < count { + count = p.Offset + p.Count + } + return addPlanToResponse(p, res0), addPlanToResponse(p, res1), count, nil +} + +func estimateJoinCount(lc uint64, rc uint64) uint64 { + return lc * rc / 3 +} + +func (p *Join) handleLeftJoin(prop requiredProperty, innerJoin bool) (*physicalPlanInfo, *physicalPlanInfo, uint64, error) { + lChild := p.GetChildByIndex(0).(LogicalPlan) + rChild := p.GetChildByIndex(1).(LogicalPlan) + allLeft := true + for _, col := range prop { + if lChild.GetSchema().GetIndex(col.col) == -1 { + allLeft = false + } + } + join := &PhysicalHashJoin{ + EqualConditions: p.EqualConditions, + LeftConditions: p.LeftConditions, + RightConditions: p.RightConditions, + OtherConditions: p.OtherConditions, + SmallTable: 1, + } + join.SetSchema(p.schema) + if innerJoin { + join.JoinType = InnerJoin + } else { + join.JoinType = LeftOuterJoin + } + if !allLeft { + prop = nil + } + lRes0, lRes1, lCount, err := lChild.convert2PhysicalPlan(prop) + if err != nil { + return nil, nil, 0, errors.Trace(err) + } + if !allLeft { + lRes0.cost = math.MaxFloat64 + } + rRes0, rRes1, rCount, err := rChild.convert2PhysicalPlan(nil) + if err != nil { + return nil, nil, 0, errors.Trace(err) + } + res0 := join.matchProperty(prop, []uint64{lCount, rCount}, lRes0, rRes0) + res1 := join.matchProperty(prop, []uint64{lCount, rCount}, lRes1, rRes1) + return res0, res1, estimateJoinCount(lCount, rCount), nil } -// Convert2PhysicalPlan implements LogicalPlan Convert2PhysicalPlan interface. -func (p *Limit) Convert2PhysicalPlan() PhysicalPlan { - child := p.GetChildByIndex(0).(LogicalPlan).Convert2PhysicalPlan() - p.SetChildren(child) - child.SetParents(p) - return p +func (p *Join) handleRightJoin(prop requiredProperty, innerJoin bool) (*physicalPlanInfo, *physicalPlanInfo, uint64, error) { + lChild := p.GetChildByIndex(0).(LogicalPlan) + rChild := p.GetChildByIndex(1).(LogicalPlan) + allRight := true + for _, col := range prop { + if rChild.GetSchema().GetIndex(col.col) == -1 { + allRight = false + } + } + join := &PhysicalHashJoin{ + EqualConditions: p.EqualConditions, + LeftConditions: p.LeftConditions, + RightConditions: p.RightConditions, + OtherConditions: p.OtherConditions, + } + join.SetSchema(p.schema) + if innerJoin { + join.JoinType = InnerJoin + } else { + join.JoinType = RightOuterJoin + } + lRes0, lRes1, lCount, err := lChild.convert2PhysicalPlan(nil) + if err != nil { + return nil, nil, 0, errors.Trace(err) + } + if !allRight { + prop = nil + } + rRes0, rRes1, rCount, err := rChild.convert2PhysicalPlan(prop) + if err != nil { + return nil, nil, 0, errors.Trace(err) + } + if !allRight { + rRes0.cost = math.MaxFloat64 + } + res0 := join.matchProperty(prop, []uint64{lCount, rCount}, lRes0, rRes0) + res1 := join.matchProperty(prop, []uint64{lCount, rCount}, lRes1, rRes1) + return res0, res1, estimateJoinCount(lCount, rCount), nil } -// Convert2PhysicalPlan implements LogicalPlan Convert2PhysicalPlan interface. -func (p *Join) Convert2PhysicalPlan() PhysicalPlan { - l := p.GetChildByIndex(0).(LogicalPlan).Convert2PhysicalPlan() - r := p.GetChildByIndex(1).(LogicalPlan).Convert2PhysicalPlan() - var physicalPlan PhysicalPlan +// convert2PhysicalPlan implements LogicalPlan convert2PhysicalPlan interface. +func (p *Join) convert2PhysicalPlan(prop requiredProperty) (*physicalPlanInfo, *physicalPlanInfo, uint64, error) { switch p.JoinType { case SemiJoin, SemiJoinWithAux: - physicalPlan = &PhysicalHashSemiJoin{ + lChild := p.GetChildByIndex(0).(LogicalPlan) + rChild := p.GetChildByIndex(1).(LogicalPlan) + allLeft := true + for _, col := range prop { + if lChild.GetSchema().GetIndex(col.col) == -1 { + allLeft = false + } + } + join := &PhysicalHashSemiJoin{ WithAux: SemiJoinWithAux == p.JoinType, EqualConditions: p.EqualConditions, LeftConditions: p.LeftConditions, @@ -60,132 +321,257 @@ func (p *Join) Convert2PhysicalPlan() PhysicalPlan { OtherConditions: p.OtherConditions, Anti: p.anti, } + join.SetSchema(p.schema) + if !allLeft { + prop = nil + } + lRes0, lRes1, lCount, err := lChild.convert2PhysicalPlan(prop) + if err != nil { + return nil, nil, 0, errors.Trace(err) + } + rRes0, rRes1, rCount, err := rChild.convert2PhysicalPlan(nil) + if err != nil { + return nil, nil, 0, errors.Trace(err) + } + res0 := join.matchProperty(prop, []uint64{lCount, rCount}, lRes0, rRes0) + res1 := join.matchProperty(prop, []uint64{lCount, rCount}, lRes1, rRes1) + if p.JoinType == SemiJoin { + lCount = uint64(float64(lCount) * selectionFactor) + } + if !allLeft { + res0.cost = math.MaxFloat64 + } + return res0, res1, lCount, err + case LeftOuterJoin: + return p.handleLeftJoin(prop, false) + case RightOuterJoin: + return p.handleRightJoin(prop, false) default: - physicalPlan = &PhysicalHashJoin{ - JoinType: p.JoinType, - EqualConditions: p.EqualConditions, - LeftConditions: p.LeftConditions, - RightConditions: p.RightConditions, - OtherConditions: p.OtherConditions, + lres0, lres1, count, err := p.handleLeftJoin(prop, true) + if err != nil { + return nil, nil, 0, errors.Trace(err) } + rres0, rres1, _, err := p.handleRightJoin(prop, true) + if err != nil { + return nil, nil, 0, errors.Trace(err) + } + if rres0.cost < lres0.cost { + lres0 = rres0 + } + if rres1.cost < lres1.cost { + lres1 = rres1 + } + return lres0, lres1, count, errors.Trace(err) } - physicalPlan.SetChildren(l, r) - physicalPlan.SetSchema(p.schema) - l.SetParents(physicalPlan) - r.SetParents(physicalPlan) - return physicalPlan } -// Convert2PhysicalPlan implements LogicalPlan Convert2PhysicalPlan interface. -func (p *Aggregation) Convert2PhysicalPlan() PhysicalPlan { - child := p.GetChildByIndex(0).(LogicalPlan).Convert2PhysicalPlan() - p.SetChildren(child) - child.SetParents(p) - return p +// convert2PhysicalPlan implements LogicalPlan convert2PhysicalPlan interface. +func (p *Aggregation) convert2PhysicalPlan(prop requiredProperty) (*physicalPlanInfo, *physicalPlanInfo, uint64, error) { + _, res, cnt, err := p.GetChildByIndex(0).(LogicalPlan).convert2PhysicalPlan(nil) + if len(prop) != 0 { + return &physicalPlanInfo{cost: math.MaxFloat64}, addPlanToResponse(p, res), cnt / 3, errors.Trace(err) + } + res = addPlanToResponse(p, res) + return res, res, cnt / 3, errors.Trace(err) } -// Convert2PhysicalPlan implements LogicalPlan Convert2PhysicalPlan interface. -func (p *NewUnion) Convert2PhysicalPlan() PhysicalPlan { - newChildren := make([]Plan, len(p.GetChildren())) - for i, child := range p.GetChildren() { - newChildren[i] = child.(LogicalPlan).Convert2PhysicalPlan() - child.SetParents(p) +// convert2PhysicalPlan implements LogicalPlan convert2PhysicalPlan interface. +func (p *NewUnion) convert2PhysicalPlan(prop requiredProperty) (*physicalPlanInfo, *physicalPlanInfo, uint64, error) { + count := uint64(0) + var res0Collection, res1Collection []*physicalPlanInfo + for _, child := range p.GetChildren() { + newProp := make(requiredProperty, 0, len(prop)) + for _, c := range prop { + idx := p.GetSchema().GetIndex(c.col) + newProp = append(newProp, &columnProp{col: child.GetSchema()[idx], desc: c.desc}) + } + res0, res1, cnt, err := child.(LogicalPlan).convert2PhysicalPlan(newProp) + count += cnt + if err != nil { + return nil, nil, 0, errors.Trace(err) + } + res0Collection = append(res0Collection, res0) + res1Collection = append(res1Collection, res1) } - p.SetChildren(newChildren...) - return p + return p.matchProperty(prop, nil, res0Collection...), p.matchProperty(prop, nil, res1Collection...), count, nil } -// Convert2PhysicalPlan implements LogicalPlan Convert2PhysicalPlan interface. -func (p *Selection) Convert2PhysicalPlan() PhysicalPlan { - child := p.GetChildByIndex(0).(LogicalPlan).Convert2PhysicalPlan() - p.SetChildren(child) - child.SetParents(p) - return p +// convert2PhysicalPlan implements LogicalPlan convert2PhysicalPlan interface. +func (p *Selection) convert2PhysicalPlan(prop requiredProperty) (*physicalPlanInfo, *physicalPlanInfo, uint64, error) { + res0, res1, count, err := p.GetChildByIndex(0).(LogicalPlan).convert2PhysicalPlan(prop) + if err != nil { + return nil, nil, 0, errors.Trace(err) + } + if _, ok := p.GetChildByIndex(0).(*DataSource); ok { + count = uint64(float64(count) * selectionFactor) + return res0, res1, count, nil + } + return p.matchProperty(prop, nil, res0), p.matchProperty(prop, nil, res1), count / 3, nil } -// Convert2PhysicalPlan implements LogicalPlan Convert2PhysicalPlan interface. -func (p *Projection) Convert2PhysicalPlan() PhysicalPlan { - child := p.GetChildByIndex(0).(LogicalPlan).Convert2PhysicalPlan() - p.SetChildren(child) - child.SetParents(p) - return p +// convert2PhysicalPlan implements LogicalPlan convert2PhysicalPlan interface. +func (p *Projection) convert2PhysicalPlan(prop requiredProperty) (*physicalPlanInfo, *physicalPlanInfo, uint64, error) { + newProp := make(requiredProperty, 0, len(prop)) + childSchema := p.GetChildByIndex(0).GetSchema() + usedCols := make([]bool, len(childSchema)) + canPassSort := true +loop: + for _, c := range prop { + idx := p.schema.GetIndex(c.col) + switch v := p.Exprs[idx].(type) { + case *expression.Column: + childIdx := childSchema.GetIndex(v) + if !usedCols[childIdx] { + usedCols[childIdx] = true + newProp = append(newProp, &columnProp{col: v, desc: c.desc}) + } + case *expression.ScalarFunction: + newProp = nil + canPassSort = false + break loop + } + } + res0, res1, count, err := p.GetChildByIndex(0).(LogicalPlan).convert2PhysicalPlan(newProp) + if err != nil { + return nil, nil, count, errors.Trace(err) + } + res1 = addPlanToResponse(p, res1) + if !canPassSort { + return &physicalPlanInfo{cost: math.MaxFloat64}, res1, count, nil + } + + return addPlanToResponse(p, res0), res1, count, nil } -// Convert2PhysicalPlan implements LogicalPlan Convert2PhysicalPlan interface. -func (p *NewSort) Convert2PhysicalPlan() PhysicalPlan { - child := p.GetChildByIndex(0).(LogicalPlan).Convert2PhysicalPlan() - p.SetChildren(child) - child.SetParents(p) - return p +func matchProp(target, new requiredProperty) bool { + if len(target) > len(new) { + return false + } + for i := 0; i < len(target); i++ { + if target[i].desc != new[i].desc || + target[i].col.FromID != new[i].col.FromID || + target[i].col.Index != new[i].col.Index { + return false + } + } + return true } -// Convert2PhysicalPlan implements LogicalPlan Convert2PhysicalPlan interface. -func (p *Apply) Convert2PhysicalPlan() PhysicalPlan { - child := p.GetChildByIndex(0).(LogicalPlan).Convert2PhysicalPlan() +// convert2PhysicalPlan implements LogicalPlan convert2PhysicalPlan interface. +func (p *NewSort) convert2PhysicalPlan(prop requiredProperty) (*physicalPlanInfo, *physicalPlanInfo, uint64, error) { + selfProp := make(requiredProperty, 0, len(p.ByItems)) + for _, by := range p.ByItems { + if col, ok := by.Expr.(*expression.Column); ok { + selfProp = append(selfProp, &columnProp{col: col, desc: by.Desc}) + } else { + selfProp = nil + break + } + } + res0, res1, count, err := p.GetChildByIndex(0).(LogicalPlan).convert2PhysicalPlan(selfProp) + if err != nil { + return nil, nil, 0, errors.Trace(err) + } + cnt := float64(count) + sortCost := cnt*math.Log2(cnt)*cpuFactor + memoryFactor*cnt + if len(selfProp) == 0 { + res0 = addPlanToResponse(p, res1) + } else if sortCost+res1.cost < res0.cost { + res0.cost = sortCost + res1.cost + res0 = addPlanToResponse(p, res1) + } + if matchProp(prop, selfProp) { + return res0, res0, count, nil + } + return &physicalPlanInfo{cost: math.MaxFloat64}, res0, count, nil +} + +// convert2PhysicalPlan implements LogicalPlan convert2PhysicalPlan interface. +func (p *Apply) convert2PhysicalPlan(prop requiredProperty) (*physicalPlanInfo, *physicalPlanInfo, uint64, error) { + child := p.GetChildByIndex(0).(LogicalPlan) + _, innerRes, _, err := p.InnerPlan.convert2PhysicalPlan(nil) + if err != nil { + return nil, nil, 0, errors.Trace(err) + } np := &PhysicalApply{ OuterSchema: p.OuterSchema, Checker: p.Checker, - InnerPlan: p.InnerPlan.Convert2PhysicalPlan(), + InnerPlan: innerRes.p, } np.SetSchema(p.GetSchema()) - np.SetChildren(child) - child.SetParents(np) - return np + res0, res1, count, err := child.convert2PhysicalPlan(prop) + if err != nil { + return nil, nil, 0, errors.Trace(err) + } + return addPlanToResponse(np, res0), addPlanToResponse(np, res1), count, nil } -// Convert2PhysicalPlan implements LogicalPlan Convert2PhysicalPlan interface. -func (p *Distinct) Convert2PhysicalPlan() PhysicalPlan { - child := p.GetChildByIndex(0).(LogicalPlan).Convert2PhysicalPlan() - p.SetChildren(child) - child.SetParents(p) - return p +// convert2PhysicalPlan implements LogicalPlan convert2PhysicalPlan interface. +func (p *Distinct) convert2PhysicalPlan(prop requiredProperty) (*physicalPlanInfo, *physicalPlanInfo, uint64, error) { + child := p.GetChildByIndex(0).(LogicalPlan) + res0, res1, count, err := child.convert2PhysicalPlan(prop) + if err != nil { + return nil, nil, 0, errors.Trace(err) + } + return addPlanToResponse(p, res0), addPlanToResponse(p, res1), uint64(float64(count) * distinctFactor), nil } -// Convert2PhysicalPlan implements LogicalPlan Convert2PhysicalPlan interface. -func (p *NewTableDual) Convert2PhysicalPlan() PhysicalPlan { - return p +// convert2PhysicalPlan implements LogicalPlan convert2PhysicalPlan interface. +func (p *NewTableDual) convert2PhysicalPlan(prop requiredProperty) (*physicalPlanInfo, *physicalPlanInfo, uint64, error) { + res := &physicalPlanInfo{p: p, cost: 1.0} + return res, res, 1, nil } -// Convert2PhysicalPlan implements LogicalPlan Convert2PhysicalPlan interface. -func (p *MaxOneRow) Convert2PhysicalPlan() PhysicalPlan { - child := p.GetChildByIndex(0).(LogicalPlan).Convert2PhysicalPlan() - p.SetChildren(child) - child.SetParents(p) - return p +// convert2PhysicalPlan implements LogicalPlan convert2PhysicalPlan interface. +func (p *MaxOneRow) convert2PhysicalPlan(prop requiredProperty) (*physicalPlanInfo, *physicalPlanInfo, uint64, error) { + child := p.GetChildByIndex(0).(LogicalPlan) + res0, res1, count, err := child.convert2PhysicalPlan(prop) + if err != nil { + return nil, nil, 0, errors.Trace(err) + } + return addPlanToResponse(p, res0), addPlanToResponse(p, res1), count, nil } -// Convert2PhysicalPlan implements LogicalPlan Convert2PhysicalPlan interface. -func (p *Exists) Convert2PhysicalPlan() PhysicalPlan { - child := p.GetChildByIndex(0).(LogicalPlan).Convert2PhysicalPlan() - p.SetChildren(child) - child.SetParents(p) - return p +// convert2PhysicalPlan implements LogicalPlan convert2PhysicalPlan interface. +func (p *Exists) convert2PhysicalPlan(prop requiredProperty) (*physicalPlanInfo, *physicalPlanInfo, uint64, error) { + child := p.GetChildByIndex(0).(LogicalPlan) + res0, res1, count, err := child.convert2PhysicalPlan(prop) + if err != nil { + return nil, nil, 0, errors.Trace(err) + } + return addPlanToResponse(p, res0), addPlanToResponse(p, res1), count, nil } -// Convert2PhysicalPlan implements LogicalPlan Convert2PhysicalPlan interface. -func (p *Trim) Convert2PhysicalPlan() PhysicalPlan { - child := p.GetChildByIndex(0).(LogicalPlan).Convert2PhysicalPlan() - p.SetChildren(child) - child.SetParents(p) - return p +// convert2PhysicalPlan implements LogicalPlan convert2PhysicalPlan interface. +func (p *Trim) convert2PhysicalPlan(prop requiredProperty) (*physicalPlanInfo, *physicalPlanInfo, uint64, error) { + child := p.GetChildByIndex(0).(LogicalPlan) + res0, res1, count, err := child.convert2PhysicalPlan(prop) + if err != nil { + return nil, nil, 0, errors.Trace(err) + } + return addPlanToResponse(p, res0), addPlanToResponse(p, res1), count, nil } -// Convert2PhysicalPlan implements LogicalPlan Convert2PhysicalPlan interface. -func (p *SelectLock) Convert2PhysicalPlan() PhysicalPlan { - child := p.GetChildByIndex(0).(LogicalPlan).Convert2PhysicalPlan() - p.SetChildren(child) - child.SetParents(p) - return p +// convert2PhysicalPlan implements LogicalPlan convert2PhysicalPlan interface. +func (p *SelectLock) convert2PhysicalPlan(prop requiredProperty) (*physicalPlanInfo, *physicalPlanInfo, uint64, error) { + child := p.GetChildByIndex(0).(LogicalPlan) + res0, res1, count, err := child.convert2PhysicalPlan(prop) + if err != nil { + return nil, nil, 0, errors.Trace(err) + } + return addPlanToResponse(p, res0), addPlanToResponse(p, res1), count, nil } -// Convert2PhysicalPlan implements LogicalPlan Convert2PhysicalPlan interface. -func (p *Insert) Convert2PhysicalPlan() PhysicalPlan { +// convert2PhysicalPlan implements LogicalPlan convert2PhysicalPlan interface. +func (p *Insert) convert2PhysicalPlan(prop requiredProperty) (*physicalPlanInfo, *physicalPlanInfo, uint64, error) { if len(p.GetChildren()) == 0 { - return p + res := &physicalPlanInfo{p: p} + return res, res, 0, nil + } + child := p.GetChildByIndex(0).(LogicalPlan) + res0, res1, count, err := child.convert2PhysicalPlan(prop) + if err != nil { + return nil, nil, 0, errors.Trace(err) } - child := p.GetChildByIndex(0).(LogicalPlan).Convert2PhysicalPlan() - p.SetChildren(child) - child.SetParents(p) - p.SelectPlan = child - return p + return addPlanToResponse(p, res0), addPlanToResponse(p, res1), count, nil } diff --git a/plan/physical_plans.go b/plan/physical_plans.go index 9212992b63072..98e8b53372b1e 100644 --- a/plan/physical_plans.go +++ b/plan/physical_plans.go @@ -20,7 +20,7 @@ import ( // PhysicalIndexScan represents an index scan plan. type PhysicalIndexScan struct { - basePhysicalPlan + basePlan Table *model.TableInfo Index *model.IndexInfo @@ -40,13 +40,14 @@ type PhysicalIndexScan struct { // PhysicalTableScan represents a table scan plan. type PhysicalTableScan struct { - baseLogicalPlan + basePlan Table *model.TableInfo Columns []*model.ColumnInfo DBName *model.CIStr Desc bool Ranges []TableRange + pkCol *expression.Column AccessCondition []expression.Expression @@ -57,7 +58,7 @@ type PhysicalTableScan struct { // PhysicalApply represents apply plan, only used for subquery. type PhysicalApply struct { - basePhysicalPlan + basePlan InnerPlan PhysicalPlan OuterSchema expression.Schema @@ -66,7 +67,7 @@ type PhysicalApply struct { // PhysicalHashJoin represents hash join for inner/ outer join. type PhysicalHashJoin struct { - basePhysicalPlan + basePlan JoinType JoinType @@ -74,11 +75,12 @@ type PhysicalHashJoin struct { LeftConditions []expression.Expression RightConditions []expression.Expression OtherConditions []expression.Expression + SmallTable int } // PhysicalHashSemiJoin represents hash join for semi join. type PhysicalHashSemiJoin struct { - basePhysicalPlan + basePlan WithAux bool Anti bool @@ -88,3 +90,111 @@ type PhysicalHashSemiJoin struct { RightConditions []expression.Expression OtherConditions []expression.Expression } + +// Copy implements the PhysicalPlan Copy interface. +func (p *PhysicalIndexScan) Copy() PhysicalPlan { + np := *p + return &np +} + +// Copy implements the PhysicalPlan Copy interface. +func (p *PhysicalTableScan) Copy() PhysicalPlan { + np := *p + return &np +} + +// Copy implements the PhysicalPlan Copy interface. +func (p *PhysicalApply) Copy() PhysicalPlan { + np := *p + return &np +} + +// Copy implements the PhysicalPlan Copy interface. +func (p *PhysicalHashSemiJoin) Copy() PhysicalPlan { + np := *p + return &np +} + +// Copy implements the PhysicalPlan Copy interface. +func (p *PhysicalHashJoin) Copy() PhysicalPlan { + np := *p + return &np +} + +// Copy implements the PhysicalPlan Copy interface. +func (p *Distinct) Copy() PhysicalPlan { + np := *p + return &np +} + +// Copy implements the PhysicalPlan Copy interface. +func (p *Selection) Copy() PhysicalPlan { + np := *p + return &np +} + +// Copy implements the PhysicalPlan Copy interface. +func (p *Projection) Copy() PhysicalPlan { + np := *p + return &np +} + +// Copy implements the PhysicalPlan Copy interface. +func (p *Exists) Copy() PhysicalPlan { + np := *p + return &np +} + +// Copy implements the PhysicalPlan Copy interface. +func (p *MaxOneRow) Copy() PhysicalPlan { + np := *p + return &np +} + +// Copy implements the PhysicalPlan Copy interface. +func (p *Insert) Copy() PhysicalPlan { + np := *p + return &np +} + +// Copy implements the PhysicalPlan Copy interface. +func (p *Limit) Copy() PhysicalPlan { + np := *p + return &np +} + +// Copy implements the PhysicalPlan Copy interface. +func (p *NewUnion) Copy() PhysicalPlan { + np := *p + return &np +} + +// Copy implements the PhysicalPlan Copy interface. +func (p *NewSort) Copy() PhysicalPlan { + np := *p + return &np +} + +// Copy implements the PhysicalPlan Copy interface. +func (p *NewTableDual) Copy() PhysicalPlan { + np := *p + return &np +} + +// Copy implements the PhysicalPlan Copy interface. +func (p *Trim) Copy() PhysicalPlan { + np := *p + return &np +} + +// Copy implements the PhysicalPlan Copy interface. +func (p *SelectLock) Copy() PhysicalPlan { + np := *p + return &np +} + +// Copy implements the PhysicalPlan Copy interface. +func (p *Aggregation) Copy() PhysicalPlan { + np := *p + return &np +} diff --git a/plan/plan.go b/plan/plan.go index 6ce0630dfbeb0..4a6e1c0472665 100644 --- a/plan/plan.go +++ b/plan/plan.go @@ -102,6 +102,18 @@ type Plan interface { SetChildren(...Plan) } +type requiredProperty []*columnProp + +type physicalPlanInfo struct { + p PhysicalPlan + cost float64 +} + +type columnProp struct { + col *expression.Column + desc bool +} + // LogicalPlan is a tree of logical operators. // We can do a lot of logical optimization to it, like predicate push down and column pruning. type LogicalPlan interface { @@ -118,20 +130,28 @@ type LogicalPlan interface { // how many columns referenced by inner plan exactly. PruneColumnsAndResolveIndices([]*expression.Column) ([]*expression.Column, error) - // Convert2PhysicalPlan converts logical plan to physical plan. - Convert2PhysicalPlan() PhysicalPlan + // convert2PhysicalPlan converts logical plan to physical plan. The arg prop means the required sort property. + // This function returns two response. The first one is the best plan that matches the required property strictly. + // The second one is the best plan that needn't matches the required property. + convert2PhysicalPlan(prop requiredProperty) (*physicalPlanInfo, *physicalPlanInfo, uint64, error) } // PhysicalPlan is a tree of physical operators. type PhysicalPlan interface { Plan -} -type baseLogicalPlan struct { - basePlan + // matchProperty means that this physical plan will try to return the best plan that matches the required property. + // rowCounts means the child row counts, and childPlanInfo means the plan infos returned by children. + matchProperty(prop requiredProperty, rowCounts []uint64, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo + + // Copy copies the current plan. + Copy() PhysicalPlan + + // PushLimit tries to push down limit as deeply as possible. + PushLimit(l *Limit) PhysicalPlan } -type basePhysicalPlan struct { +type baseLogicalPlan struct { basePlan } @@ -147,7 +167,7 @@ func newBaseLogicalPlan(tp string, a *idAllocator) baseLogicalPlan { // PredicatePushDown implements LogicalPlan PredicatePushDown interface. func (p *baseLogicalPlan) PredicatePushDown(predicates []expression.Expression) ([]expression.Expression, LogicalPlan, error) { if len(p.GetChildren()) == 0 { - return predicates, p, nil + return predicates, nil, nil } child := p.GetChildByIndex(0).(LogicalPlan) rest, _, err := child.PredicatePushDown(predicates) @@ -160,11 +180,7 @@ func (p *baseLogicalPlan) PredicatePushDown(predicates []expression.Expression) return nil, nil, errors.Trace(err) } } - return nil, p, nil -} - -func (p *baseLogicalPlan) Convert2PhysicalPlan() PhysicalPlan { - return p + return nil, nil, nil } // PruneColumnsAndResolveIndices implements LogicalPlan PruneColumnsAndResolveIndices interface. diff --git a/plan/predicate_push_down.go b/plan/predicate_push_down.go index 74f862c68e427..568ad9b1e0b02 100644 --- a/plan/predicate_push_down.go +++ b/plan/predicate_push_down.go @@ -18,7 +18,7 @@ import ( "github.com/pingcap/tidb/expression" ) -func addSelection(p LogicalPlan, child LogicalPlan, conditions []expression.Expression, allocator *idAllocator) error { +func addSelection(p Plan, child LogicalPlan, conditions []expression.Expression, allocator *idAllocator) error { selection := &Selection{ Conditions: conditions, baseLogicalPlan: newBaseLogicalPlan(Sel, allocator)} @@ -165,18 +165,18 @@ func (p *Projection) PredicatePushDown(predicates []expression.Expression) (ret // PredicatePushDown implements LogicalPlan PredicatePushDown interface. func (p *NewUnion) PredicatePushDown(predicates []expression.Expression) (ret []expression.Expression, retPlan LogicalPlan, err error) { retPlan = p - for _, proj := range p.Selects { + for _, proj := range p.children { newExprs := make([]expression.Expression, 0, len(predicates)) for _, cond := range predicates { newCond := columnSubstitute(cond.DeepCopy(), p.GetSchema(), expression.Schema2Exprs(proj.GetSchema())) newExprs = append(newExprs, newCond) } - retCond, _, err := proj.PredicatePushDown(newExprs) + retCond, _, err := proj.(LogicalPlan).PredicatePushDown(newExprs) if err != nil { return nil, nil, errors.Trace(err) } if len(retCond) != 0 { - addSelection(p, proj, retCond, p.allocator) + addSelection(p, proj.(LogicalPlan), retCond, p.allocator) } } return diff --git a/plan/push_limit.go b/plan/push_limit.go new file mode 100644 index 0000000000000..eee68641bdae0 --- /dev/null +++ b/plan/push_limit.go @@ -0,0 +1,247 @@ +// Copyright 2016 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package plan + +func insertLimit(p PhysicalPlan, l *Limit) *Limit { + l.SetSchema(p.GetSchema()) + l.SetChildren(p) + p.SetParents(l) + return l +} + +// PushLimit implements PhysicalPlan PushLimit interface. +func (p *Limit) PushLimit(l *Limit) PhysicalPlan { + child := p.GetChildByIndex(0).(PhysicalPlan) + newChild := child.PushLimit(p) + if l != nil { + p.Count = l.Count + p.Offset = l.Offset + p.SetChildren(newChild) + newChild.SetParents(p) + return p + } + return newChild +} + +// PushLimit implements PhysicalPlan PushLimit interface. +func (p *NewSort) PushLimit(l *Limit) PhysicalPlan { + child := p.GetChildByIndex(0).(PhysicalPlan) + newChild := child.PushLimit(nil) + p.ExecLimit = l + p.SetChildren(newChild) + newChild.SetParents(p) + return p +} + +// PushLimit implements PhysicalPlan PushLimit interface. +func (p *Selection) PushLimit(l *Limit) PhysicalPlan { + child := p.GetChildByIndex(0).(PhysicalPlan) + newChild := child.PushLimit(nil) + if l != nil { + return insertLimit(p, l) + } + p.SetChildren(newChild) + newChild.SetParents(l) + return p +} + +// PushLimit implements PhysicalPlan PushLimit interface. +func (p *PhysicalHashSemiJoin) PushLimit(l *Limit) PhysicalPlan { + lChild := p.GetChildByIndex(0).(PhysicalPlan) + rChild := p.GetChildByIndex(1).(PhysicalPlan) + var newLChild, newRChild PhysicalPlan + if p.WithAux { + newLChild = lChild.PushLimit(l) + newRChild = rChild.PushLimit(nil) + } else { + newLChild = lChild.PushLimit(nil) + newRChild = rChild.PushLimit(nil) + } + p.SetChildren(newLChild, newRChild) + newLChild.SetParents(p) + newRChild.SetParents(p) + if l != nil && !p.WithAux { + return insertLimit(p, l) + } + return p +} + +// PushLimit implements PhysicalPlan PushLimit interface. +func (p *PhysicalHashJoin) PushLimit(l *Limit) PhysicalPlan { + lChild := p.GetChildByIndex(0).(PhysicalPlan) + rChild := p.GetChildByIndex(1).(PhysicalPlan) + var newLChild, newRChild PhysicalPlan + if p.JoinType == LeftOuterJoin { + if l != nil { + limit2Push := *l + limit2Push.Count += limit2Push.Offset + limit2Push.Offset = 0 + newLChild = lChild.PushLimit(&limit2Push) + } else { + newLChild = lChild.PushLimit(nil) + } + newRChild = rChild.PushLimit(nil) + } else if p.JoinType == RightOuterJoin { + if l != nil { + limit2Push := *l + limit2Push.Count += limit2Push.Offset + limit2Push.Offset = 0 + newRChild = rChild.PushLimit(&limit2Push) + } else { + newRChild = rChild.PushLimit(nil) + } + newLChild = lChild.PushLimit(nil) + } else { + newLChild = lChild.PushLimit(nil) + newRChild = rChild.PushLimit(nil) + } + p.SetChildren(newLChild, newRChild) + newLChild.SetParents(p) + newRChild.SetParents(p) + if l != nil { + return insertLimit(p, l) + } + return p +} + +// PushLimit implements PhysicalPlan PushLimit interface. +func (p *NewUnion) PushLimit(l *Limit) PhysicalPlan { + for i, child := range p.GetChildren() { + if l != nil { + p.children[i] = child.(PhysicalPlan).PushLimit(&Limit{Count: l.Count + l.Offset}) + } else { + p.children[i] = child.(PhysicalPlan).PushLimit(nil) + } + p.children[i].SetParents(p) + } + if l != nil { + return insertLimit(p, l) + } + return p +} + +// PushLimit implements PhysicalPlan PushLimit interface. +func (p *Projection) PushLimit(l *Limit) PhysicalPlan { + newChild := p.GetChildByIndex(0).(PhysicalPlan).PushLimit(l) + p.SetChildren(newChild) + newChild.SetParents(p) + return p +} + +// PushLimit implements PhysicalPlan PushLimit interface. +func (p *Trim) PushLimit(l *Limit) PhysicalPlan { + newChild := p.GetChildByIndex(0).(PhysicalPlan).PushLimit(l) + p.SetChildren(newChild) + newChild.SetParents(p) + return p +} + +// PushLimit implements PhysicalPlan PushLimit interface. +func (p *SelectLock) PushLimit(l *Limit) PhysicalPlan { + newChild := p.GetChildByIndex(0).(PhysicalPlan).PushLimit(l) + p.SetChildren(newChild) + newChild.SetParents(p) + return p +} + +// PushLimit implements PhysicalPlan PushLimit interface. +func (p *PhysicalApply) PushLimit(l *Limit) PhysicalPlan { + p.InnerPlan = p.InnerPlan.PushLimit(nil) + newChild := p.GetChildByIndex(0).(PhysicalPlan).PushLimit(l) + p.SetChildren(newChild) + newChild.SetParents(p) + return p +} + +// PushLimit implements PhysicalPlan PushLimit interface. +func (p *Aggregation) PushLimit(l *Limit) PhysicalPlan { + newChild := p.GetChildByIndex(0).(PhysicalPlan).PushLimit(nil) + p.SetChildren(newChild) + newChild.SetParents(p) + if l != nil { + return insertLimit(p, l) + } + return p +} + +// PushLimit implements PhysicalPlan PushLimit interface. +func (p *Distinct) PushLimit(l *Limit) PhysicalPlan { + newChild := p.GetChildByIndex(0).(PhysicalPlan).PushLimit(nil) + p.SetChildren(newChild) + newChild.SetParents(p) + if l != nil { + return insertLimit(p, l) + } + return p +} + +// PushLimit implements PhysicalPlan PushLimit interface. +func (p *MaxOneRow) PushLimit(_ *Limit) PhysicalPlan { + newChild := p.GetChildByIndex(0).(PhysicalPlan).PushLimit(&Limit{Count: 2}) + p.SetChildren(newChild) + newChild.SetParents(p) + return p +} + +// PushLimit implements PhysicalPlan PushLimit interface. +func (p *Exists) PushLimit(_ *Limit) PhysicalPlan { + newChild := p.GetChildByIndex(0).(PhysicalPlan).PushLimit(&Limit{Count: 1}) + p.SetChildren(newChild) + newChild.SetParents(p) + return p +} + +// PushLimit implements PhysicalPlan PushLimit interface. +func (p *PhysicalIndexScan) PushLimit(l *Limit) PhysicalPlan { + if l != nil { + count := int64(l.Offset + l.Count) + p.LimitCount = &count + if l.Offset != 0 { + return insertLimit(p, l) + } + } + return p +} + +// PushLimit implements PhysicalPlan PushLimit interface. +func (p *PhysicalTableScan) PushLimit(l *Limit) PhysicalPlan { + if l != nil { + count := int64(l.Offset + l.Count) + p.LimitCount = &count + if l.Offset != 0 { + return insertLimit(p, l) + } + } + return p +} + +// PushLimit implements PhysicalPlan PushLimit interface. +func (p *Insert) PushLimit(_ *Limit) PhysicalPlan { + if len(p.GetChildren()) == 0 { + return p + } + np := p.GetChildByIndex(0).(PhysicalPlan).PushLimit(nil) + p.SetChildren(np) + p.SelectPlan = np + np.SetParents(p) + return p +} + +// PushLimit implements PhysicalPlan PushLimit interface. +func (p *NewTableDual) PushLimit(l *Limit) PhysicalPlan { + if l == nil { + return p + } + return insertLimit(p, l) +} diff --git a/plan/refiner.go b/plan/refiner.go index 4b1b72da4679c..42ffdc4666b31 100644 --- a/plan/refiner.go +++ b/plan/refiner.go @@ -217,11 +217,23 @@ func detachIndexScanConditions(conditions []expression.Expression, indexScan *Ph indexScan.accessEqualCount = i break } + if indexScan.Index.Columns[i].Length != types.UnspecifiedLength { + filterConds = append(filterConds, cond) + } if i == len(accessConds)-1 { indexScan.accessEqualCount = len(accessConds) } } for _, cond := range conditions { + isAccess := false + for _, acCond := range accessConds { + if cond == acCond { + isAccess = true + } + } + if isAccess { + continue + } if indexScan.accessEqualCount < len(indexScan.Index.Columns) { checker := &conditionChecker{ tableName: indexScan.Table.Name, @@ -230,6 +242,9 @@ func detachIndexScanConditions(conditions []expression.Expression, indexScan *Ph } if checker.newCheck(cond) { accessConds = append(accessConds, cond) + if indexScan.Index.Columns[indexScan.accessEqualCount].Length != types.UnspecifiedLength { + filterConds = append(filterConds, cond) + } } else { filterConds = append(filterConds, cond) } @@ -432,7 +447,7 @@ func (c *conditionChecker) checkColumn(expr expression.Expression) bool { if !ok { return false } - if col.TblName.L != c.tableName.L { + if col.Correlated { return false } if c.pkName.L != "" { diff --git a/plan/stringer.go b/plan/stringer.go index e87887ffb59af..d4409a01b1535 100644 --- a/plan/stringer.go +++ b/plan/stringer.go @@ -27,7 +27,7 @@ func ToString(p Plan) string { func toString(in Plan, strs []string, idxs []int) ([]string, []int) { switch in.(type) { - case *JoinOuter, *JoinInner, *Join, *Union, *NewUnion: + case *JoinOuter, *JoinInner, *Join, *Union, *NewUnion, *PhysicalHashJoin, *PhysicalHashSemiJoin: idxs = append(idxs, len(strs)) } @@ -46,8 +46,34 @@ func toString(in Plan, strs []string, idxs []int) ([]string, []int) { } case *PhysicalIndexScan: str = fmt.Sprintf("Index(%s.%s)%v", x.Table.Name.L, x.Index.Name.L, x.Ranges) + case *PhysicalTableScan: + str = fmt.Sprintf("Table(%s)", x.Table.Name.L) + case *PhysicalHashJoin: + last := len(idxs) - 1 + idx := idxs[last] + children := strs[idx:] + strs = strs[:idx] + idxs = idxs[:last] + if x.SmallTable == 0 { + str = "RightHashJoin{" + strings.Join(children, "->") + "}" + } else { + str = "LeftHashJoin{" + strings.Join(children, "->") + "}" + } + case *PhysicalHashSemiJoin: + last := len(idxs) - 1 + idx := idxs[last] + children := strs[idx:] + strs = strs[:idx] + idxs = idxs[:last] + if x.WithAux { + str = "SemiJoinWithAux{" + strings.Join(children, "->") + "}" + } else { + str = "SemiJoin{" + strings.Join(children, "->") + "}" + } case *Apply: str = fmt.Sprintf("Apply(%s)", ToString(x.InnerPlan)) + case *PhysicalApply: + str = fmt.Sprintf("Apply(%s)", ToString(x.InnerPlan)) case *Exists: str = "Exists" case *MaxOneRow: @@ -67,6 +93,11 @@ func toString(in Plan, strs []string, idxs []int) ([]string, []int) { if x.ExecLimit != nil { str += fmt.Sprintf(" + Limit(%v) + Offset(%v)", x.ExecLimit.Count, x.ExecLimit.Offset) } + case *NewSort: + str = "Sort" + if x.ExecLimit != nil { + str += fmt.Sprintf(" + Limit(%v) + Offset(%v)", x.ExecLimit.Count, x.ExecLimit.Offset) + } case *TableScan: if len(x.Ranges) > 0 { ran := x.Ranges[0]