Skip to content

Commit

Permalink
add stream agg executor. (pingcap#1730)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanfei1991 authored Sep 18, 2016
1 parent 5caa0f5 commit 648bee5
Show file tree
Hide file tree
Showing 5 changed files with 514 additions and 19 deletions.
23 changes: 13 additions & 10 deletions executor/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,32 +30,35 @@ func (s *testAggFuncSuite) SetUpSuite(c *C) {
func (s *testAggFuncSuite) TearDownSuite(c *C) {
}

type mockExec struct {
type MockExec struct {
fields []*ast.ResultField
rows []*Row
Rows []*Row
curRowIdx int
}

func (m *mockExec) Schema() expression.Schema {
func (m *MockExec) Schema() expression.Schema {
return nil
}

func (m *mockExec) Fields() []*ast.ResultField {
func (m *MockExec) Fields() []*ast.ResultField {
return m.fields
}

func (m *mockExec) Next() (*Row, error) {
if m.curRowIdx >= len(m.rows) {
func (m *MockExec) Next() (*Row, error) {
if m.curRowIdx >= len(m.Rows) {
return nil, nil
}
r := m.rows[m.curRowIdx]
r := m.Rows[m.curRowIdx]
m.curRowIdx++
for i, d := range r.Data {
m.fields[i].Expr.SetValue(d.GetValue())
if len(m.fields) > 0 {
for i, d := range r.Data {
m.fields[i].Expr.SetValue(d.GetValue())
}
}
return r, nil
}

func (m *mockExec) Close() error {
func (m *MockExec) Close() error {
m.curRowIdx = 0
return nil
}
119 changes: 119 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1111,6 +1111,125 @@ func (e *AggregationExec) innerNext() (ret bool, err error) {
return true, nil
}

// StreamAggExec deals with all the aggregate functions.
// It is built from Aggregate Plan. When Next() is called, it reads all the data from Src and updates all the items in AggFuncs.
type StreamAggExec struct {
Src Executor
schema expression.Schema
ResultFields []*ast.ResultField
executed bool
hasData bool
ctx context.Context
AggFuncs []expression.AggregationFunction
GroupByItems []expression.Expression
curGroupEncodedKey []byte
curGroupKey []types.Datum
tmpGroupKey []types.Datum
}

// Close implements Executor Close interface.
func (e *StreamAggExec) Close() error {
e.executed = false
e.hasData = false
for _, agg := range e.AggFuncs {
agg.Clear()
}
return e.Src.Close()
}

// Schema implements Executor Schema interface.
func (e *StreamAggExec) Schema() expression.Schema {
return e.schema
}

// Fields implements Executor Fields interface.
func (e *StreamAggExec) Fields() []*ast.ResultField {
return e.ResultFields
}

// Next implements Executor Next interface.
func (e *StreamAggExec) Next() (*Row, error) {
if e.executed {
return nil, nil
}
retRow := &Row{Data: make([]types.Datum, 0, len(e.AggFuncs))}
for {
row, err := e.Src.Next()
if err != nil {
return nil, errors.Trace(err)
}
var newGroup bool
if row == nil {
newGroup = true
e.executed = true
} else {
e.hasData = true
newGroup, err = e.meetNewGroup(row)
if err != nil {
return nil, errors.Trace(err)
}
}
if newGroup {
for _, af := range e.AggFuncs {
retRow.Data = append(retRow.Data, af.GetStreamResult())
}

}
if e.executed {
break
}
for _, af := range e.AggFuncs {
err = af.StreamUpdate(row.Data, e.ctx)
if err != nil {
return nil, errors.Trace(err)
}
}
if newGroup {
break
}
}
if !e.hasData && len(e.GroupByItems) > 0 {
return nil, nil
}
return retRow, nil
}

// meetNewGroup returns a value that represents if the new group is different from last group.
func (e *StreamAggExec) meetNewGroup(row *Row) (bool, error) {
if len(e.GroupByItems) == 0 {
return false, nil
}
e.tmpGroupKey = e.tmpGroupKey[:0]
matched, firstGroup := true, false
if len(e.curGroupKey) == 0 {
matched, firstGroup = false, true
}
for i, item := range e.GroupByItems {
v, err := item.Eval(row.Data, e.ctx)
if err != nil {
return false, errors.Trace(err)
}
if matched {
c, err := v.CompareDatum(e.curGroupKey[i])
if err != nil {
return false, errors.Trace(err)
}
matched = c == 0
}
e.tmpGroupKey = append(e.tmpGroupKey, v)
}
if matched {
return false, nil
}
e.curGroupKey = e.tmpGroupKey
var err error
e.curGroupEncodedKey, err = codec.EncodeValue(e.curGroupEncodedKey[0:0:cap(e.curGroupEncodedKey)], e.curGroupKey...)
if err != nil {
return false, errors.Trace(err)
}
return !firstGroup, nil
}

// ProjectionExec represents a select fields executor.
type ProjectionExec struct {
Src Executor
Expand Down
89 changes: 89 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/inspectkv"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
Expand Down Expand Up @@ -2003,6 +2004,94 @@ func (s *testSuite) TestNewTableScan(c *C) {
result.Check(testkit.Rows(rowStr1, rowStr2))
}

func (s *testSuite) TestStreamAgg(c *C) {
col := &expression.Column{
Index: 1,
}
gbyCol := &expression.Column{
Index: 0,
}
sumAgg := expression.NewAggFunction(ast.AggFuncSum, []expression.Expression{col}, false)
cntAgg := expression.NewAggFunction(ast.AggFuncCount, []expression.Expression{col}, false)
avgAgg := expression.NewAggFunction(ast.AggFuncAvg, []expression.Expression{col}, false)
maxAgg := expression.NewAggFunction(ast.AggFuncMax, []expression.Expression{col}, false)
cases := []struct {
aggFunc expression.AggregationFunction
result string
input [][]interface{}
result1 []string
}{
{
sumAgg,
"<nil>",
[][]interface{}{
{0, 1}, {0, nil}, {1, 2}, {1, 3},
},
[]string{
"1", "5",
},
},
{
cntAgg,
"0",
[][]interface{}{
{0, 1}, {0, nil}, {1, 2}, {1, 3},
},
[]string{
"1", "2",
},
},
{
avgAgg,
"<nil>",
[][]interface{}{
{0, 1}, {0, nil}, {1, 2}, {1, 3},
},
[]string{
"1.0000", "2.5000",
},
},
{
maxAgg,
"<nil>",
[][]interface{}{
{0, 1}, {0, nil}, {1, 2}, {1, 3},
},
[]string{
"1", "3",
},
},
}
for _, ca := range cases {
mock := &executor.MockExec{}
e := &executor.StreamAggExec{
AggFuncs: []expression.AggregationFunction{ca.aggFunc},
Src: mock,
}
row, err := e.Next()
c.Check(err, IsNil)
c.Check(row, NotNil)
c.Assert(fmt.Sprintf("%v", row.Data[0].GetValue()), Equals, ca.result)
e.GroupByItems = append(e.GroupByItems, gbyCol)
e.Close()
row, err = e.Next()
c.Check(err, IsNil)
c.Check(row, IsNil)
e.Close()
for _, input := range ca.input {
data := types.MakeDatums(input...)
mock.Rows = append(mock.Rows, &executor.Row{Data: data})
}
for _, res := range ca.result1 {
row, err = e.Next()
c.Check(err, IsNil)
c.Check(row, NotNil)
c.Assert(fmt.Sprintf("%v", row.Data[0].GetValue()), Equals, res)
}
}

}

func (s *testSuite) TestAggregation(c *C) {
defer testleak.AfterTest(c)()
tk := testkit.NewTestKit(c, s.store)
Expand Down
Loading

0 comments on commit 648bee5

Please sign in to comment.