Skip to content

Commit

Permalink
support cbo (pingcap#1498)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanfei1991 authored Jul 27, 2016
1 parent 128af2e commit 2b4acb0
Show file tree
Hide file tree
Showing 22 changed files with 1,329 additions and 210 deletions.
9 changes: 4 additions & 5 deletions executor/aggregate_xapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ func (n *finalAggregater) update(count uint64, value types.Datum) error {
// GetContext gets aggregate evaluation context for the current group.
// If it is nil, add a new context into contextPerGroupMap.
func (n *finalAggregater) getContext() *ast.AggEvaluateContext {
if n.contextPerGroupMap == nil {
n.contextPerGroupMap = make(map[string](*ast.AggEvaluateContext))
}
if _, ok := n.contextPerGroupMap[string(n.currentGroup)]; !ok {
n.contextPerGroupMap[string(n.currentGroup)] = &ast.AggEvaluateContext{}
}
Expand Down Expand Up @@ -153,7 +150,8 @@ func (e *XAggregateExec) Next() (*Row, error) {
e.aggregaters = make([]*finalAggregater, len(e.AggFuncs))
for i, af := range e.AggFuncs {
agg := &finalAggregater{
name: strings.ToLower(af.F),
name: strings.ToLower(af.F),
contextPerGroupMap: make(map[string](*ast.AggEvaluateContext)),
}
e.aggregaters[i] = agg
}
Expand Down Expand Up @@ -298,7 +296,8 @@ func (e *NewXAggregateExec) Next() (*Row, error) {
e.aggregaters = make([]*finalAggregater, len(e.AggFuncs))
for i, af := range e.AggFuncs {
agg := &finalAggregater{
name: strings.ToLower(af.GetName()),
name: strings.ToLower(af.GetName()),
contextPerGroupMap: make(map[string](*ast.AggEvaluateContext)),
}
e.aggregaters[i] = agg
}
Expand Down
2 changes: 2 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,10 +675,12 @@ func (b *executorBuilder) buildNewUnionScanExec(src Executor, condition expressi
us := &UnionScanExec{ctx: b.ctx, Src: src}
switch x := src.(type) {
case *NewXSelectTableExec:
us.desc = x.desc
us.dirty = getDirtyDB(b.ctx).getDirtyTable(x.table.Meta().ID)
us.newCondition = condition
us.newBuildAndSortAddedRows(x.table, x.asName)
case *NewXSelectIndexExec:
us.desc = x.indexPlan.Desc
for _, ic := range x.indexPlan.Index.Columns {
for i, col := range x.indexPlan.GetSchema() {
if col.ColName.L == ic.Name.L {
Expand Down
1 change: 1 addition & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,7 @@ func (e *LimitExec) Next() (*Row, error) {

// Close implements Executor Close interface.
func (e *LimitExec) Close() error {
e.Idx = 0
return e.Src.Close()
}

Expand Down
7 changes: 5 additions & 2 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1342,20 +1342,21 @@ func (s *testSuite) TestDirtyTransaction(c *C) {
tk.MustExec("begin")
tk.MustQuery("select * from t").Check(testkit.Rows("2 3", "4 8", "6 8"))
tk.MustExec("insert t values (1, 5), (3, 4), (7, 6)")
tk.MustQuery("select * from t").Check(testkit.Rows("2 3", "3 4", "1 5", "7 6", "4 8", "6 8"))
tk.MustQuery("select * from t").Check(testkit.Rows("1 5", "2 3", "3 4", "4 8", "6 8", "7 6"))
tk.MustQuery("select * from t where a = 1").Check(testkit.Rows("1 5"))
tk.MustQuery("select * from t order by a desc").Check(testkit.Rows("7 6", "6 8", "4 8", "3 4", "2 3", "1 5"))
tk.MustQuery("select * from t order by b, a").Check(testkit.Rows("2 3", "3 4", "1 5", "7 6", "4 8", "6 8"))
tk.MustQuery("select * from t order by b desc, a desc").Check(testkit.Rows("6 8", "4 8", "7 6", "1 5", "3 4", "2 3"))
tk.MustQuery("select b from t where b = 8 order by b desc").Check(testkit.Rows("8", "8"))
// Delete a snapshot row and a dirty row.
tk.MustExec("delete from t where a = 2 or a = 3")
tk.MustQuery("select * from t").Check(testkit.Rows("1 5", "4 8", "6 8", "7 6"))
tk.MustQuery("select * from t order by a desc").Check(testkit.Rows("7 6", "6 8", "4 8", "1 5"))
tk.MustQuery("select * from t order by b, a").Check(testkit.Rows("1 5", "7 6", "4 8", "6 8"))
tk.MustQuery("select * from t order by b desc, a desc").Check(testkit.Rows("6 8", "4 8", "7 6", "1 5"))
// Add deleted row back.
tk.MustExec("insert t values (2, 3), (3, 4)")
tk.MustQuery("select * from t").Check(testkit.Rows("2 3", "3 4", "1 5", "7 6", "4 8", "6 8"))
tk.MustQuery("select * from t").Check(testkit.Rows("1 5", "2 3", "3 4", "4 8", "6 8", "7 6"))
tk.MustQuery("select * from t order by a desc").Check(testkit.Rows("7 6", "6 8", "4 8", "3 4", "2 3", "1 5"))
tk.MustQuery("select * from t order by b, a").Check(testkit.Rows("2 3", "3 4", "1 5", "7 6", "4 8", "6 8"))
tk.MustQuery("select * from t order by b desc, a desc").Check(testkit.Rows("6 8", "4 8", "7 6", "1 5", "3 4", "2 3"))
Expand Down Expand Up @@ -1700,6 +1701,8 @@ func (s *testSuite) TestAggregation(c *C) {
tk.MustExec("insert t values (4, 3)")
result := tk.MustQuery("select count(*) from t group by d")
result.Check(testkit.Rows("3", "2", "2"))
result = tk.MustQuery("select - c, c as d from t group by c having null not between c and avg(distinct d) - d")
result.Check(testkit.Rows())
result = tk.MustQuery("select count(*) from (select d, c from t) k where d != 0 group by d")
result.Check(testkit.Rows("3", "2", "2"))
result = tk.MustQuery("select c as a from t group by d having a < 0")
Expand Down
32 changes: 11 additions & 21 deletions executor/new_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,32 +47,21 @@ func (b *executorBuilder) buildJoin(v *plan.PhysicalHashJoin) Executor {
ctx: b.ctx,
targetTypes: targetTypes,
}
switch v.JoinType {
case plan.LeftOuterJoin:
e.outter = true
e.leftSmall = false
if v.SmallTable == 1 {
e.smallFilter = expression.ComposeCNFCondition(v.RightConditions)
e.bigFilter = expression.ComposeCNFCondition(v.LeftConditions)
e.smallHashKey = rightHashKey
e.bigHashKey = leftHashKey
case plan.RightOuterJoin:
e.outter = true
e.leftSmall = false
} else {
e.leftSmall = true
e.smallFilter = expression.ComposeCNFCondition(v.LeftConditions)
e.bigFilter = expression.ComposeCNFCondition(v.RightConditions)
e.smallHashKey = leftHashKey
e.bigHashKey = rightHashKey
case plan.InnerJoin:
//TODO: assume right table is the small one before cbo is realized.
e.outter = false
e.leftSmall = false
e.smallFilter = expression.ComposeCNFCondition(v.RightConditions)
e.bigFilter = expression.ComposeCNFCondition(v.LeftConditions)
e.smallHashKey = rightHashKey
e.bigHashKey = leftHashKey
default:
b.err = ErrUnknownPlan.Gen("Unknown Join Type !!")
return nil
}
if v.JoinType == plan.LeftOuterJoin || v.JoinType == plan.RightOuterJoin {
e.outer = true
}
if e.leftSmall {
e.smallExec = b.build(v.GetChildByIndex(0))
Expand Down Expand Up @@ -272,6 +261,8 @@ func (b *executorBuilder) buildNewTableScan(v *plan.PhysicalTableScan, s *plan.S
schema: v.GetSchema(),
Columns: v.Columns,
ranges: v.Ranges,
desc: v.Desc,
limitCount: v.LimitCount,
}
ret = st
if !txn.IsReadOnly() {
Expand All @@ -298,8 +289,7 @@ func (b *executorBuilder) buildNewTableScan(v *plan.PhysicalTableScan, s *plan.S
ranges: v.Ranges,
}
if v.Desc {
b.err = errors.New("Not implement yet.")
return nil
return &ReverseExec{Src: ts}
}
return ts
}
Expand Down Expand Up @@ -403,9 +393,9 @@ func (b *executorBuilder) buildNewUnion(v *plan.NewUnion) Executor {
e := &NewUnionExec{
schema: v.GetSchema(),
fields: v.Fields(),
Srcs: make([]Executor, len(v.Selects)),
Srcs: make([]Executor, len(v.GetChildren())),
}
for i, sel := range v.Selects {
for i, sel := range v.GetChildren() {
selExec := b.build(sel)
e.Srcs[i] = selExec
}
Expand Down
7 changes: 3 additions & 4 deletions executor/new_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type HashJoinExec struct {
bigFilter expression.Expression
otherFilter expression.Expression
schema expression.Schema
outter bool
outer bool
leftSmall bool
matchedRows []*Row
cursor int
Expand Down Expand Up @@ -168,7 +168,6 @@ func (e *HashJoinExec) constructMatchedRows(bigRow *Row) (matchedRows []*Row, er
}
// match eq condition
for _, smallRow := range rows {
//TODO: remove result fields in order to reduce memory copy cost.
otherMatched := true
var matchedRow *Row
if e.leftSmall {
Expand Down Expand Up @@ -257,7 +256,7 @@ func (e *HashJoinExec) Next() (*Row, error) {
row, ok := e.returnRecord()
if ok {
return row, nil
} else if e.outter {
} else if e.outer {
row = e.fillNullRow(bigRow)
return row, nil
}
Expand Down Expand Up @@ -1333,7 +1332,7 @@ func (e *MaxOneRowExec) Next() (*Row, error) {
return nil, errors.Trace(err)
}
if srcRow == nil {
return &Row{Data: []types.Datum{types.NewDatum(nil)}}, nil
return &Row{Data: make([]types.Datum, len(e.schema))}, nil
}
srcRow1, err := e.Src.Next()
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion executor/new_executor_xapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ func (e *NewXSelectIndexExec) doIndexRequest() (*xapi.SelectResult, error) {
startTs := txn.StartTS()
selIdxReq.StartTs = &startTs
selIdxReq.IndexInfo = xapi.IndexToProto(e.table.Meta(), e.indexPlan.Index)
// Push limit to index request only if there is not filter conditions.
selIdxReq.Limit = e.indexPlan.LimitCount
if e.indexPlan.Desc {
selIdxReq.OrderBy = append(selIdxReq.OrderBy, &tipb.ByItem{Desc: &e.indexPlan.Desc})
Expand Down Expand Up @@ -394,6 +393,8 @@ type NewXSelectTableExec struct {
Columns []*model.ColumnInfo
schema expression.Schema
ranges []plan.TableRange
desc bool
limitCount *int64

/*
The following attributes are used for aggregation push down.
Expand Down Expand Up @@ -428,6 +429,10 @@ func (e *NewXSelectTableExec) doRequest() error {
selReq.TableInfo = &tipb.TableInfo{
TableId: proto.Int64(e.tableInfo.ID),
}
if e.supportDesc && e.desc {
selReq.OrderBy = append(selReq.OrderBy, &tipb.ByItem{Desc: &e.desc})
}
selReq.Limit = e.limitCount
selReq.TableInfo.Columns = xapi.ColumnsToProto(columns, e.tableInfo.PKIsHandle)
// Aggregate Info
selReq.Aggregates = e.aggFuncs
Expand Down
3 changes: 3 additions & 0 deletions expression/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ func (col *Column) Eval(row []types.Datum, _ context.Context) (types.Datum, erro

// DeepCopy implements Expression interface.
func (col *Column) DeepCopy() Expression {
if col.Correlated {
return col
}
newCol := *col
return &newCol
}
Expand Down
3 changes: 3 additions & 0 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,9 @@ func (m *Meta) GetTableStats(tableID int64) (*statistics.TablePB, error) {
if err != nil {
return nil, errors.Trace(err)
}
if len(data) == 0 {
return nil, nil
}
tpb := &statistics.TablePB{}
err = proto.Unmarshal(data, tpb)
if err != nil {
Expand Down
12 changes: 7 additions & 5 deletions plan/expression_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,12 @@ func (er *expressionRewriter) handleExistSubquery(v *ast.ExistsSubqueryExpr) (as
er.err = errors.Trace(err)
return v, true
}
phyPlan := np.Convert2PhysicalPlan()
if err = refine(phyPlan); err != nil {
_, res, _, err := np.convert2PhysicalPlan(nil)
if err != nil {
er.err = errors.Trace(err)
return v, true
}
phyPlan := res.p.PushLimit(nil)
d, err := EvalSubquery(phyPlan, er.b.is, er.b.ctx)
if err != nil {
er.err = errors.Trace(err)
Expand Down Expand Up @@ -368,11 +369,12 @@ func (er *expressionRewriter) handleScalarSubquery(v *ast.SubqueryExpr) (ast.Nod
er.err = errors.Trace(err)
return v, true
}
phyPlan := np.Convert2PhysicalPlan()
if err = refine(phyPlan); err != nil {
_, res, _, err := np.convert2PhysicalPlan(nil)
if err != nil {
er.err = errors.Trace(err)
return v, true
}
phyPlan := res.p.PushLimit(nil)
d, err := EvalSubquery(phyPlan, er.b.is, er.b.ctx)
if err != nil {
er.err = errors.Trace(err)
Expand Down Expand Up @@ -653,7 +655,7 @@ func (er *expressionRewriter) caseToScalarFunc(v *ast.CaseExpr) {
value := er.ctxStack[stkLen-argsLen-1]
args = make([]expression.Expression, 0, argsLen)
for i := stkLen - argsLen; i < stkLen-1; i += 2 {
arg, err := expression.NewFunction(ast.EQ, types.NewFieldType(mysql.TypeTiny), value, er.ctxStack[i])
arg, err := expression.NewFunction(ast.EQ, types.NewFieldType(mysql.TypeTiny), value.DeepCopy(), er.ctxStack[i])
if err != nil {
er.err = errors.Trace(err)
return
Expand Down
35 changes: 23 additions & 12 deletions plan/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/plan/statistics"
"github.com/pingcap/tidb/util/types"
)

Expand Down Expand Up @@ -313,13 +314,13 @@ func (b *planBuilder) buildNewDistinct(src LogicalPlan) LogicalPlan {
func (b *planBuilder) buildNewUnion(union *ast.UnionStmt) LogicalPlan {
u := &NewUnion{baseLogicalPlan: newBaseLogicalPlan(Un, b.allocator)}
u.initID()
u.Selects = make([]LogicalPlan, len(union.SelectList.Selects))
u.children = make([]Plan, len(union.SelectList.Selects))
for i, sel := range union.SelectList.Selects {
u.Selects[i] = b.buildNewSelect(sel)
u.correlated = u.correlated || u.Selects[i].IsCorrelated()
u.children[i] = b.buildNewSelect(sel)
u.correlated = u.correlated || u.children[i].IsCorrelated()
}
firstSchema := u.Selects[0].GetSchema().DeepCopy()
for _, sel := range u.Selects {
firstSchema := u.children[0].GetSchema().DeepCopy()
for _, sel := range u.children {
if len(firstSchema) != len(sel.GetSchema()) {
b.err = errors.New("The used SELECT statements have a different number of columns")
return nil
Expand All @@ -344,7 +345,7 @@ func (b *planBuilder) buildNewUnion(union *ast.UnionStmt) LogicalPlan {
firstSchema[i].RetType.Tp = col.RetType.Tp
}
}
addChild(u, sel)
sel.SetParents(u)
}
for _, v := range firstSchema {
v.FromID = u.id
Expand Down Expand Up @@ -401,10 +402,6 @@ func (b *planBuilder) buildNewLimit(src LogicalPlan, limit *ast.Limit) LogicalPl
}
li.initID()
li.correlated = src.IsCorrelated()
if s, ok := src.(*NewSort); ok {
s.ExecLimit = li
return s
}
addChild(li, src)
li.SetSchema(src.GetSchema().DeepCopy())
return li
Expand Down Expand Up @@ -440,6 +437,9 @@ func resolveFromSelectFields(v *ast.ColumnNameExpr, fields []*ast.SelectField) (
var matchedExpr ast.ExprNode
index = -1
for i, field := range fields {
if field.Auxiliary {
continue
}
if matchField(field, v) {
curCol, isCol := field.Expr.(*ast.ColumnNameExpr)
if !isCol {
Expand Down Expand Up @@ -821,11 +821,21 @@ func (b *planBuilder) buildNewTableDual() LogicalPlan {
return dual
}

func (b *planBuilder) getTableStats(table *model.TableInfo) *statistics.Table {
// TODO: Currently we always retrun a pseudo table for good performance. We will use a cache in future.
return statistics.PseudoTable(table)
}

func (b *planBuilder) buildDataSource(tn *ast.TableName) LogicalPlan {
statisticTable := b.getTableStats(tn.TableInfo)
if b.err != nil {
return nil
}
p := &DataSource{
table: tn,
Table: tn.TableInfo,
baseLogicalPlan: newBaseLogicalPlan(Ts, b.allocator),
statisticTable: statisticTable,
}
p.initID()
// Equal condition contains a column from previous joined table.
Expand Down Expand Up @@ -975,7 +985,8 @@ func (b *planBuilder) buildSemiJoin(outerPlan, innerPlan LogicalPlan, onConditio
joinPlan.JoinType = SemiJoin
}
joinPlan.anti = not
addChild(joinPlan, outerPlan)
addChild(joinPlan, innerPlan)
joinPlan.SetChildren(outerPlan, innerPlan)
outerPlan.SetParents(joinPlan)
innerPlan.SetParents(joinPlan)
return joinPlan
}
6 changes: 4 additions & 2 deletions plan/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/plan/statistics"
)

// JoinType contains CrossJoin, InnerJoin, LeftOuterJoin, RightOuterJoin, FullOuterJoin, SemiJoin.
Expand Down Expand Up @@ -60,6 +61,7 @@ type Projection struct {
// Aggregation represents an aggregate plan.
type Aggregation struct {
baseLogicalPlan
// TODO: implement hash aggregation and streamed aggreagtion
AggFuncs []expression.AggregationFunction
GroupByItems []expression.Expression
}
Expand Down Expand Up @@ -113,6 +115,8 @@ type DataSource struct {
TableAsName *model.CIStr

LimitCount *int64

statisticTable *statistics.Table
}

// Trim trims child's rows.
Expand All @@ -123,8 +127,6 @@ type Trim struct {
// NewUnion represents Union plan.
type NewUnion struct {
baseLogicalPlan

Selects []LogicalPlan
}

// NewSort stands for the order by plan.
Expand Down
Loading

0 comments on commit 2b4acb0

Please sign in to comment.