Skip to content

Commit

Permalink
executor: remove Next function for executor/write.go (pingcap#5995)
Browse files Browse the repository at this point in the history
remove Next function for DeleteExec && LoadData && InsertExec && ReplaceExec && UpdateExec
  • Loading branch information
yangwenmai authored and shenli committed Mar 18, 2018
1 parent 11616c4 commit daa866c
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 198 deletions.
1 change: 0 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,6 @@ type TableScanExec struct {
baseExecutor

t table.Table
asName *model.CIStr
ranges []ranger.IntColumnRange
seekHandle int64
iter kv.Iterator
Expand Down
205 changes: 8 additions & 197 deletions executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,21 +182,6 @@ type DeleteExec struct {
finished bool
}

// Next implements the Executor Next interface.
func (e *DeleteExec) Next(ctx context.Context) (Row, error) {
if e.finished {
return nil, nil
}
defer func() {
e.finished = true
}()

if e.IsMultiTable {
return nil, e.deleteMultiTables(ctx)
}
return nil, e.deleteSingleTable(ctx)
}

// NextChunk implements the Executor NextChunk interface.
func (e *DeleteExec) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
Expand Down Expand Up @@ -225,29 +210,6 @@ type tblColPosInfo struct {
// the value in map[int64]Row is the deleting row.
type tableRowMapType map[int64]map[int64]Row

func (e *DeleteExec) deleteMultiTables(ctx context.Context) error {
if len(e.Tables) == 0 {
return nil
}

e.initialMultiTableTblMap()
colPosInfos := e.getColPosInfos(e.children[0].Schema())
tblRowMap := make(tableRowMapType)
for {
joinedRow, err := e.SelectExec.Next(ctx)
if err != nil {
return errors.Trace(err)
}
if joinedRow == nil {
break
}

e.composeTblRowMap(tblRowMap, colPosInfos, joinedRow)
}

return errors.Trace(e.removeRowsInTblRowMap(tblRowMap))
}

// matchingDeletingTable checks whether this column is from the table which is in the deleting list.
func (e *DeleteExec) matchingDeletingTable(tableID int64, col *expression.Column) bool {
names, ok := e.tblMap[tableID]
Expand All @@ -262,46 +224,6 @@ func (e *DeleteExec) matchingDeletingTable(tableID int64, col *expression.Column
return false
}

func (e *DeleteExec) deleteSingleTable(ctx context.Context) error {
var (
id int64
tbl table.Table
handleCol *expression.Column
rowCount int
)
for i, t := range e.tblID2Table {
id, tbl = i, t
handleCol = e.SelectExec.Schema().TblID2Handle[id][0]
break
}
// If tidb_batch_delete is ON and not in a transaction, we could use BatchDelete mode.
batchDelete := e.ctx.GetSessionVars().BatchDelete && !e.ctx.GetSessionVars().InTxn()
batchSize := e.ctx.GetSessionVars().DMLBatchSize
for {
if batchDelete && rowCount >= batchSize {
e.ctx.StmtCommit()
if err := e.ctx.NewTxn(); err != nil {
// We should return a special error for batch insert.
return ErrBatchInsertFail.Gen("BatchDelete failed with error: %v", err)
}
rowCount = 0
}
row, err := e.SelectExec.Next(ctx)
if err != nil {
return errors.Trace(err)
}
if row == nil {
break
}
err = e.deleteOneRow(tbl, handleCol, row)
if err != nil {
return errors.Trace(err)
}
rowCount++
}
return nil
}

func (e *DeleteExec) deleteOneRow(tbl table.Table, handleCol *expression.Column, row Row) error {
end := len(row)
if handleIsExtra(handleCol) {
Expand Down Expand Up @@ -789,40 +711,30 @@ func (k loadDataVarKeyType) String() string {
// LoadDataVarKey is a variable key for load data.
const LoadDataVarKey loadDataVarKeyType = 0

func (e *LoadData) exec(ctx context.Context) (Row, error) {
// NextChunk implements the Executor NextChunk interface.
func (e *LoadData) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
// TODO: support load data without local field.
if !e.IsLocal {
return nil, errors.New("Load Data: don't support load data without local field")
return errors.New("Load Data: don't support load data without local field")
}
// TODO: support lines terminated is "".
if len(e.loadDataInfo.LinesInfo.Terminated) == 0 {
return nil, errors.New("Load Data: don't support load data terminated is nil")
return errors.New("Load Data: don't support load data terminated is nil")
}

sctx := e.loadDataInfo.insertVal.ctx
val := sctx.Value(LoadDataVarKey)
if val != nil {
sctx.SetValue(LoadDataVarKey, nil)
return nil, errors.New("Load Data: previous load data option isn't closed normal")
return errors.New("Load Data: previous load data option isn't closed normal")
}
if e.loadDataInfo.Path == "" {
return nil, errors.New("Load Data: infile path is empty")
return errors.New("Load Data: infile path is empty")
}
sctx.SetValue(LoadDataVarKey, e.loadDataInfo)

return nil, nil
}

// Next implements the Executor Next interface.
func (e *LoadData) Next(ctx context.Context) (Row, error) {
return e.exec(ctx)
}

// NextChunk implements the Executor NextChunk interface.
func (e *LoadData) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
_, err := e.exec(ctx)
return errors.Trace(err)
return nil
}

// Close implements the Executor Close interface.
Expand Down Expand Up @@ -1083,29 +995,6 @@ func batchMarkDupRows(ctx sessionctx.Context, t table.Table, rows [][]types.Datu
return rows, nil
}

// Next implements the Executor Next interface.
func (e *InsertExec) Next(ctx context.Context) (Row, error) {
if e.finished {
return nil, nil
}
cols, err := e.getColumns(e.Table.Cols())
if err != nil {
return nil, errors.Trace(err)
}

var rows [][]types.Datum
if e.SelectExec != nil {
rows, err = e.getRowsSelect(ctx, cols, e.IgnoreErr)
} else {
rows, err = e.getRows(cols, e.IgnoreErr)
}
if err != nil {
return nil, errors.Trace(err)
}

return e.exec(ctx, rows)
}

// NextChunk implements Exec NextChunk interface.
func (e *InsertExec) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
Expand Down Expand Up @@ -1347,30 +1236,6 @@ func (e *InsertValues) fillDefaultValues(row []types.Datum, hasValue []bool, ign
return nil
}

func (e *InsertValues) getRowsSelect(ctx context.Context, cols []*table.Column, ignoreErr bool) ([][]types.Datum, error) {
// process `insert|replace into ... select ... from ...`
if e.SelectExec.Schema().Len() != len(cols) {
return nil, ErrWrongValueCountOnRow.GenByArgs(1)
}
var rows [][]types.Datum
for {
innerRow, err := e.SelectExec.Next(ctx)
if err != nil {
return nil, errors.Trace(err)
}
if innerRow == nil {
break
}
e.rowCount = uint64(len(rows))
row, err := e.fillRowData(cols, innerRow, ignoreErr)
if err != nil {
return nil, errors.Trace(err)
}
rows = append(rows, row)
}
return rows, nil
}

func (e *InsertValues) getRowsSelectChunk(ctx context.Context, cols []*table.Column, ignoreErr bool) ([][]types.Datum, error) {
// process `insert|replace into ... select ... from ...`
selectExec := e.children[0]
Expand Down Expand Up @@ -1695,29 +1560,6 @@ func (e *ReplaceExec) exec(ctx context.Context, rows [][]types.Datum) (Row, erro
return nil, nil
}

// Next implements the Executor Next interface.
func (e *ReplaceExec) Next(ctx context.Context) (Row, error) {
if e.finished {
return nil, nil
}
cols, err := e.getColumns(e.Table.Cols())
if err != nil {
return nil, errors.Trace(err)
}

var rows [][]types.Datum
if e.SelectExec != nil {
rows, err = e.getRowsSelect(ctx, cols, false)
} else {
rows, err = e.getRows(cols, false)
}
if err != nil {
return nil, errors.Trace(err)
}

return e.exec(ctx, rows)
}

// NextChunk implements the Executor NextChunk interface.
func (e *ReplaceExec) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
Expand Down Expand Up @@ -1811,19 +1653,6 @@ func (e *UpdateExec) exec(ctx context.Context, schema *expression.Schema) (Row,
return Row{}, nil
}

// Next implements the Executor Next interface.
func (e *UpdateExec) Next(ctx context.Context) (Row, error) {
if !e.fetched {
err := e.fetchRows(ctx)
if err != nil {
return nil, errors.Trace(err)
}
e.fetched = true
}

return e.exec(ctx, e.SelectExec.Schema())
}

// NextChunk implements the Executor NextChunk interface.
func (e *UpdateExec) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
Expand Down Expand Up @@ -1899,24 +1728,6 @@ func (e *UpdateExec) composeNewRow(oldRow Row) (Row, error) {
return newRowData, nil
}

func (e *UpdateExec) fetchRows(ctx context.Context) error {
for {
row, err := e.SelectExec.Next(ctx)
if err != nil {
return errors.Trace(err)
}
if row == nil {
return nil
}
newRowData, err := e.composeNewRow(row)
if err != nil {
return errors.Trace(err)
}
e.rows = append(e.rows, row)
e.newRowsData = append(e.newRowsData, newRowData)
}
}

func getTableOffset(schema *expression.Schema, handleCol *expression.Column) int {
for i, col := range schema.Columns {
if col.DBName.L == handleCol.DBName.L && col.TblName.L == handleCol.TblName.L {
Expand Down

0 comments on commit daa866c

Please sign in to comment.