Skip to content

Commit

Permalink
executor: extend ADMIN CHECK INDEX statement. (pingcap#5955)
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood authored Mar 11, 2018
1 parent 0d623bd commit e38f406
Show file tree
Hide file tree
Showing 9 changed files with 296 additions and 2 deletions.
11 changes: 10 additions & 1 deletion ast/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,8 +590,15 @@ const (
AdminShowDDLJobs
AdminCancelDDLJobs
AdminCheckIndex
AdminCheckIndexRange
)

// HandleRange represents a range where handle value >= Begin and < End.
type HandleRange struct {
Begin int64
End int64
}

// AdminStmt is the struct for Admin statement.
type AdminStmt struct {
stmtNode
Expand All @@ -600,9 +607,11 @@ type AdminStmt struct {
Index string
Tables []*TableName
JobIDs []int64

HandleRanges []HandleRange
}

// Accept implements Node Accpet interface.
// Accept implements Node Accept interface.
func (n *AdminStmt) Accept(v Visitor) (Node, bool) {
newNode, skipChildren := v.Enter(n)
if skipChildren {
Expand Down
139 changes: 139 additions & 0 deletions executor/admin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"github.com/juju/errors"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/plan"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tipb/go-tipb"
"golang.org/x/net/context"
)

var _ Executor = new(CheckIndexRangeExec)

// CheckIndexRangeExec outputs the index values which has handle between begin and end.
type CheckIndexRangeExec struct {
baseExecutor

table *model.TableInfo
index *model.IndexInfo
is infoschema.InfoSchema
startKey []types.Datum

handleRanges []ast.HandleRange
srcChunk *chunk.Chunk

result distsql.SelectResult
cols []*model.ColumnInfo
}

// NextChunk implements the Executor NextChunk interface.
func (e *CheckIndexRangeExec) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
handleIdx := e.schema.Len() - 1
for {
err := e.result.NextChunk(ctx, e.srcChunk)
if err != nil {
return errors.Trace(err)
}
if e.srcChunk.NumRows() == 0 {
return nil
}
iter := chunk.NewIterator4Chunk(e.srcChunk)
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
handle := row.GetInt64(handleIdx)
for _, hr := range e.handleRanges {
if handle >= hr.Begin && handle < hr.End {
chk.AppendRow(row)
break
}
}
}
if chk.NumRows() > 0 {
return nil
}
}
}

// Open implements the Executor Open interface.
func (e *CheckIndexRangeExec) Open(ctx context.Context) error {
tCols := e.table.Cols()
for _, ic := range e.index.Columns {
col := tCols[ic.Offset]
e.cols = append(e.cols, col)
}
e.cols = append(e.cols, &model.ColumnInfo{
ID: model.ExtraHandleID,
Name: model.ExtraHandleName,
})
e.srcChunk = e.newChunk()
dagPB, err := e.buildDAGPB()
if err != nil {
return errors.Trace(err)
}
sc := e.ctx.GetSessionVars().StmtCtx
var builder distsql.RequestBuilder
kvReq, err := builder.SetIndexRanges(sc, e.table.ID, e.index.ID, ranger.FullNewRange()).
SetDAGRequest(dagPB).
SetKeepOrder(true).
SetFromSessionVars(e.ctx.GetSessionVars()).
Build()

e.result, err = distsql.Select(ctx, e.ctx, kvReq, e.retFieldTypes)
if err != nil {
return errors.Trace(err)
}
e.result.Fetch(ctx)
return nil
}

func (e *CheckIndexRangeExec) buildDAGPB() (*tipb.DAGRequest, error) {
dagReq := &tipb.DAGRequest{}
dagReq.StartTs = e.ctx.Txn().StartTS()
dagReq.TimeZoneOffset = timeZoneOffset(e.ctx)
sc := e.ctx.GetSessionVars().StmtCtx
dagReq.Flags = statementContextToFlags(sc)
for i := range e.schema.Columns {
dagReq.OutputOffsets = append(dagReq.OutputOffsets, uint32(i))
}
execPB := e.constructIndexScanPB()
dagReq.Executors = append(dagReq.Executors, execPB)

err := plan.SetPBColumnsDefaultValue(e.ctx, dagReq.Executors[0].IdxScan.Columns, e.cols)
if err != nil {
return nil, errors.Trace(err)
}
return dagReq, nil
}

func (e *CheckIndexRangeExec) constructIndexScanPB() *tipb.Executor {
idxExec := &tipb.IndexScan{
TableId: e.table.ID,
IndexId: e.index.ID,
Columns: plan.ColumnsToProto(e.cols, e.table.PKIsHandle),
}
return &tipb.Executor{Tp: tipb.ExecType_TypeIndexScan, IdxScan: idxExec}
}

// Close implements the Executor Close interface.
func (e *CheckIndexRangeExec) Close() error {
return nil
}
36 changes: 36 additions & 0 deletions executor/admin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor_test

import (
. "github.com/pingcap/check"
"github.com/pingcap/tidb/util/testkit"
)

func (s *testSuite) TestAdminCheckIndexRange(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec(`drop table if exists check_index_test;`)
tk.MustExec(`create table check_index_test (a int, b varchar(10), index a_b (a, b), index b (b))`)
tk.MustExec(`insert check_index_test values (3, "ab"),(2, "cd"),(1, "ef"),(-1, "hi")`)
result := tk.MustQuery("admin check index check_index_test a_b (2, 4);")
result.Check(testkit.Rows("1 ef 3", "2 cd 2"))

result = tk.MustQuery("admin check index check_index_test a_b (3, 5);")
result.Check(testkit.Rows("-1 hi 4", "1 ef 3"))

tk.MustExec("use mysql")
result = tk.MustQuery("admin check index test.check_index_test a_b (2, 3), (4, 5);")
result.Check(testkit.Rows("-1 hi 4", "2 cd 2"))
}
26 changes: 26 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"math"
"sort"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -71,6 +72,8 @@ func (b *executorBuilder) build(p plan.Plan) Executor {
return b.buildCheckTable(v)
case *plan.CheckIndex:
return b.buildCheckIndex(v)
case *plan.CheckIndexRange:
return b.buildCheckIndexRange(v)
case *plan.DDL:
return b.buildDDL(v)
case *plan.Deallocate:
Expand Down Expand Up @@ -230,6 +233,29 @@ func (b *executorBuilder) buildCheckTable(v *plan.CheckTable) Executor {
return e
}

func (b *executorBuilder) buildCheckIndexRange(v *plan.CheckIndexRange) Executor {
tb, err := b.is.TableByName(v.Table.Schema, v.Table.Name)
if err != nil {
b.err = errors.Trace(err)
return nil
}
e := &CheckIndexRangeExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
handleRanges: v.HandleRanges,
table: tb.Meta(),
is: b.is,
}
idxName := strings.ToLower(v.IndexName)
for _, idx := range tb.Indices() {
if idx.Meta().Name.L == idxName {
e.index = idx.Meta()
e.startKey = make([]types.Datum, len(e.index.Columns))
break
}
}
return e
}

func (b *executorBuilder) buildDeallocate(v *plan.Deallocate) Executor {
e := &DeallocateExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID()),
Expand Down
28 changes: 28 additions & 0 deletions parser/parser.y
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,8 @@ import (
GroupByClause "GROUP BY clause"
HashString "Hashed string"
HavingClause "HAVING clause"
HandleRange "handle range"
HandleRangeList "handle range list"
IfExists "If Exists"
IfNotExists "If Not Exists"
IgnoreOptional "IGNORE or empty"
Expand Down Expand Up @@ -4763,6 +4765,15 @@ AdminStmt:
Index: string($5),
}
}
| "ADMIN" "CHECK" "INDEX" TableName Identifier HandleRangeList
{
$$ = &ast.AdminStmt{
Tp: ast.AdminCheckIndexRange,
Tables: []*ast.TableName{$4.(*ast.TableName)},
Index: string($5),
HandleRanges: $6.([]ast.HandleRange),
}
}
| "ADMIN" "CANCEL" "DDL" "JOBS" NumList
{
$$ = &ast.AdminStmt{
Expand All @@ -4771,6 +4782,23 @@ AdminStmt:
}
}

HandleRangeList:
HandleRange
{
$$ = []ast.HandleRange{$1.(ast.HandleRange)}
}
| HandleRangeList ',' HandleRange
{
$$ = append($1.([]ast.HandleRange), $3.(ast.HandleRange))
}

HandleRange:
'(' NUM ',' NUM ')'
{
$$ = ast.HandleRange{Begin: $2.(int64), End: $4.(int64)}
}


NumList:
NUM
{
Expand Down
1 change: 1 addition & 0 deletions parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ func (s *testParserSuite) TestDMLStmt(c *C) {
{"admin show ddl jobs;", true},
{"admin check table t1, t2;", true},
{"admin check index tableName idxName;", true},
{"admin check index tableName idxName (1, 2), (4, 5);", true},
{"admin cancel ddl jobs 1", true},
{"admin cancel ddl jobs 1, 2", true},

Expand Down
10 changes: 10 additions & 0 deletions plan/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ type CheckIndex struct {
IdxName string
}

// CheckIndexRange is used for checking index data, output the index values that handle within begin and end.
type CheckIndexRange struct {
baseSchemaProducer

Table *ast.TableName
IndexName string

HandleRanges []ast.HandleRange
}

// CancelDDLJobs represents a cancel DDL jobs plan.
type CancelDDLJobs struct {
baseSchemaProducer
Expand Down
2 changes: 1 addition & 1 deletion plan/plan_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (p *PhysicalIndexScan) ToPB(ctx sessionctx.Context) (*tipb.Executor, error)
if col.ID == model.ExtraHandleID {
columns = append(columns, &model.ColumnInfo{
ID: model.ExtraHandleID,
Name: model.NewCIStr("_rowid"),
Name: model.ExtraHandleName,
})
} else {
columns = append(columns, tableColumns[col.Position])
Expand Down
45 changes: 45 additions & 0 deletions plan/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,12 +495,57 @@ func (b *planBuilder) buildAdmin(as *ast.AdminStmt) Plan {
p := &CancelDDLJobs{JobIDs: as.JobIDs}
p.SetSchema(buildCancelDDLJobsFields())
ret = p
case ast.AdminCheckIndexRange:
p := &CheckIndexRange{Table: as.Tables[0], IndexName: as.Index, HandleRanges: as.HandleRanges}
schema, err := buildCheckIndexSchema(as.Tables[0], as.Index)
if err != nil {
b.err = errors.Trace(err)
break
}
p.SetSchema(schema)
ret = p
default:
b.err = ErrUnsupportedType.Gen("Unsupported type %T", as)
}
return ret
}

func buildCheckIndexSchema(tn *ast.TableName, indexName string) (*expression.Schema, error) {
schema := expression.NewSchema()
indexName = strings.ToLower(indexName)
indicesInfo := tn.TableInfo.Indices
cols := tn.TableInfo.Cols()
for _, idxInfo := range indicesInfo {
if idxInfo.Name.L != indexName {
continue
}
for i, idxCol := range idxInfo.Columns {
col := cols[idxCol.Offset]
schema.Append(&expression.Column{
FromID: 1,
ColName: idxCol.Name,
TblName: tn.Name,
DBName: tn.Schema,
RetType: &col.FieldType,
Position: i,
ID: col.ID})
}
schema.Append(&expression.Column{
FromID: 1,
ColName: model.NewCIStr("extra_handle"),
TblName: tn.Name,
DBName: tn.Schema,
RetType: types.NewFieldType(mysql.TypeLonglong),
Position: len(idxInfo.Columns),
ID: -1,
})
}
if schema.Len() == 0 {
return nil, errors.Errorf("index %s not found", indexName)
}
return schema, nil
}

// getColsInfo returns the info of index columns, normal columns and primary key.
func getColsInfo(tn *ast.TableName) (indicesInfo []*model.IndexInfo, colsInfo []*model.ColumnInfo, pkCol *model.ColumnInfo) {
tbl := tn.TableInfo
Expand Down

0 comments on commit e38f406

Please sign in to comment.