Skip to content

Commit

Permalink
*: trace the execution of the insert operation (pingcap#11667)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored and sre-bot committed Aug 9, 2019
1 parent 1595c01 commit d210889
Show file tree
Hide file tree
Showing 52 changed files with 301 additions and 177 deletions.
2 changes: 1 addition & 1 deletion ddl/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (s *testColumnSuite) checkColumnKVExist(ctx sessionctx.Context, t table.Tab
if err != nil {
return errors.Trace(err)
}
data, err := txn.Get(key)
data, err := txn.Get(context.TODO(), key)
if !isExist {
if terror.ErrorEqual(err, kv.ErrNotExist) {
return nil
Expand Down
7 changes: 7 additions & 0 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
Expand All @@ -33,6 +34,12 @@ const (
// Select sends a DAG request, returns SelectResult.
// In kvReq, KeyRanges is required, Concurrency/KeepOrder/Desc/IsolationLevel/Priority are optional.
func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fieldTypes []*types.FieldType, fb *statistics.QueryFeedback) (SelectResult, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("distsql.Select", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

// For testing purpose.
if hook := ctx.Value("CheckSelectRequestHook"); hook != nil {
hook.(func(*kv.Request))(kvReq)
Expand Down
16 changes: 11 additions & 5 deletions executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor
import (
"context"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -89,7 +90,7 @@ func (b *batchChecker) encodeNewRow(ctx sessionctx.Context, t table.Table, row [

// getKeysNeedCheck gets keys converted from to-be-insert rows to record keys and unique index keys,
// which need to be checked whether they are duplicate keys.
func (b *batchChecker) getKeysNeedCheck(ctx sessionctx.Context, t table.Table, rows [][]types.Datum) ([]toBeCheckedRow, error) {
func (b *batchChecker) getKeysNeedCheck(ctx context.Context, sctx sessionctx.Context, t table.Table, rows [][]types.Datum) ([]toBeCheckedRow, error) {
nUnique := 0
for _, v := range t.WritableIndices() {
if v.Meta().Unique {
Expand All @@ -111,7 +112,7 @@ func (b *batchChecker) getKeysNeedCheck(ctx sessionctx.Context, t table.Table, r

var err error
for _, row := range rows {
toBeCheckRows, err = b.getKeysNeedCheckOneRow(ctx, t, row, nUnique, handleCol, toBeCheckRows)
toBeCheckRows, err = b.getKeysNeedCheckOneRow(sctx, t, row, nUnique, handleCol, toBeCheckRows)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -191,7 +192,7 @@ func (b *batchChecker) getKeysNeedCheckOneRow(ctx sessionctx.Context, t table.Ta
// batchGetInsertKeys uses batch-get to fetch all key-value pairs to be checked for ignore or duplicate key update.
func (b *batchChecker) batchGetInsertKeys(ctx context.Context, sctx sessionctx.Context, t table.Table, newRows [][]types.Datum) (err error) {
// Get keys need to be checked.
b.toBeCheckedRows, err = b.getKeysNeedCheck(sctx, t, newRows)
b.toBeCheckedRows, err = b.getKeysNeedCheck(ctx, sctx, t, newRows)
if err != nil {
return err
}
Expand Down Expand Up @@ -251,6 +252,11 @@ func (b *batchChecker) initDupOldRowFromUniqueKey(ctx context.Context, sctx sess

// initDupOldRowValue initializes dupOldRowValues which contain the to-be-updated rows from storage.
func (b *batchChecker) initDupOldRowValue(ctx context.Context, sctx sessionctx.Context, t table.Table, newRows [][]types.Datum) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("batchCheck.initDupOldRowValue", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
b.dupOldRowValues = make(map[string][]byte, len(newRows))
b.initDupOldRowFromHandleKey()
return b.initDupOldRowFromUniqueKey(ctx, sctx, newRows)
Expand All @@ -270,8 +276,8 @@ func (b *batchChecker) fillBackKeys(t table.Table, row toBeCheckedRow, handle in
}

// deleteDupKeys picks primary/unique key-value pairs from rows and remove them from the dupKVs
func (b *batchChecker) deleteDupKeys(ctx sessionctx.Context, t table.Table, rows [][]types.Datum) error {
cleanupRows, err := b.getKeysNeedCheck(ctx, t, rows)
func (b *batchChecker) deleteDupKeys(ctx context.Context, sctx sessionctx.Context, t table.Table, rows [][]types.Datum) error {
cleanupRows, err := b.getKeysNeedCheck(ctx, sctx, t, rows)
if err != nil {
return err
}
Expand Down
32 changes: 20 additions & 12 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package executor
import (
"context"
"fmt"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -76,7 +78,7 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
}
} else {
for _, row := range rows {
if _, err := e.addRecord(row); err != nil {
if _, err := e.addRecord(ctx, row); err != nil {
return err
}
}
Expand Down Expand Up @@ -104,7 +106,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
if err != nil {
return err
}
err = e.updateDupRow(r, handle, e.OnDuplicate)
err = e.updateDupRow(ctx, r, handle, e.OnDuplicate)
if err != nil {
return err
}
Expand All @@ -117,7 +119,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
if err != nil {
return err
}
err = e.updateDupRow(r, handle, e.OnDuplicate)
err = e.updateDupRow(ctx, r, handle, e.OnDuplicate)
if err != nil {
return err
}
Expand All @@ -130,7 +132,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
// and key-values should be filled back to dupOldRowValues for the further row check,
// due to there may be duplicate keys inside the insert statement.
if newRows[i] != nil {
newHandle, err := e.addRecord(newRows[i])
newHandle, err := e.addRecord(ctx, newRows[i])
if err != nil {
return err
}
Expand Down Expand Up @@ -169,26 +171,32 @@ func (e *InsertExec) Open(ctx context.Context) error {
}

// updateDupRow updates a duplicate row to a new row.
func (e *InsertExec) updateDupRow(row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) error {
func (e *InsertExec) updateDupRow(ctx context.Context, row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("InsertExec.updateDupRow", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

oldRow, err := e.getOldRow(e.ctx, row.t, handle, e.GenExprs)
if err != nil {
logutil.BgLogger().Error("get old row failed when insert on dup", zap.Int64("handle", handle), zap.String("toBeInsertedRow", types.DatumsToStrNoErr(row.row)))
return err
}
// Do update row.
updatedRow, handleChanged, newHandle, err := e.doDupRowUpdate(handle, oldRow, row.row, onDuplicate)
updatedRow, handleChanged, newHandle, err := e.doDupRowUpdate(ctx, handle, oldRow, row.row, onDuplicate)
if e.ctx.GetSessionVars().StmtCtx.DupKeyAsWarning && kv.ErrKeyExists.Equal(err) {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(err)
return nil
}
if err != nil {
return err
}
return e.updateDupKeyValues(handle, newHandle, handleChanged, oldRow, updatedRow)
return e.updateDupKeyValues(ctx, handle, newHandle, handleChanged, oldRow, updatedRow)
}

// doDupRowUpdate updates the duplicate row.
func (e *InsertExec) doDupRowUpdate(handle int64, oldRow []types.Datum, newRow []types.Datum,
func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle int64, oldRow []types.Datum, newRow []types.Datum,
cols []*expression.Assignment) ([]types.Datum, bool, int64, error) {
assignFlag := make([]bool, len(e.Table.WritableCols()))
// See http://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_values
Expand All @@ -212,23 +220,23 @@ func (e *InsertExec) doDupRowUpdate(handle int64, oldRow []types.Datum, newRow [
}

newData := row4Update[:len(oldRow)]
_, handleChanged, newHandle, err := updateRecord(e.ctx, handle, oldRow, newData, assignFlag, e.Table, true)
_, handleChanged, newHandle, err := updateRecord(ctx, e.ctx, handle, oldRow, newData, assignFlag, e.Table, true)
if err != nil {
return nil, false, 0, err
}
return newData, handleChanged, newHandle, nil
}

// updateDupKeyValues updates the dupKeyValues for further duplicate key check.
func (e *InsertExec) updateDupKeyValues(oldHandle int64, newHandle int64,
func (e *InsertExec) updateDupKeyValues(ctx context.Context, oldHandle int64, newHandle int64,
handleChanged bool, oldRow []types.Datum, updatedRow []types.Datum) error {
// There is only one row per update.
fillBackKeysInRows, err := e.getKeysNeedCheck(e.ctx, e.Table, [][]types.Datum{updatedRow})
fillBackKeysInRows, err := e.getKeysNeedCheck(ctx, e.ctx, e.Table, [][]types.Datum{updatedRow})
if err != nil {
return err
}
// Delete old keys and fill back new key-values of the updated row.
err = e.deleteDupKeys(e.ctx, e.Table, [][]types.Datum{oldRow})
err = e.deleteDupKeys(ctx, e.ctx, e.Table, [][]types.Datum{oldRow})
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ func (e *InsertValues) handleWarning(err error) {

// batchCheckAndInsert checks rows with duplicate errors.
// All duplicate rows will be ignored and appended as duplicate warnings.
func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.Datum, addRecord func(row []types.Datum) (int64, error)) error {
func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.Datum, addRecord func(ctx context.Context, row []types.Datum) (int64, error)) error {
// all the rows will be checked, so it is safe to set BatchCheck = true
e.ctx.GetSessionVars().StmtCtx.BatchCheck = true
err := e.batchGetInsertKeys(ctx, e.ctx, e.Table, rows)
Expand Down Expand Up @@ -611,7 +611,7 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
// There may be duplicate keys inside the insert statement.
if !skip {
e.ctx.GetSessionVars().StmtCtx.AddCopiedRows(1)
_, err = addRecord(rows[i])
_, err = addRecord(ctx, rows[i])
if err != nil {
return err
}
Expand All @@ -626,15 +626,15 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
return nil
}

func (e *InsertValues) addRecord(row []types.Datum) (int64, error) {
func (e *InsertValues) addRecord(ctx context.Context, row []types.Datum) (int64, error) {
txn, err := e.ctx.Txn(true)
if err != nil {
return 0, err
}
if !e.ctx.GetSessionVars().ConstraintCheckInPlace {
txn.SetOption(kv.PresumeKeyNotExists, nil)
}
h, err := e.Table.AddRecord(e.ctx, row)
h, err := e.Table.AddRecord(e.ctx, row, table.WithCtx(ctx))
txn.DelOption(kv.PresumeKeyNotExists)
if err != nil {
return 0, err
Expand Down
4 changes: 2 additions & 2 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,11 +332,11 @@ func (e *LoadDataInfo) colsToRow(ctx context.Context, cols []field) []types.Datu
return row
}

func (e *LoadDataInfo) addRecordLD(row []types.Datum) (int64, error) {
func (e *LoadDataInfo) addRecordLD(ctx context.Context, row []types.Datum) (int64, error) {
if row == nil {
return 0, nil
}
h, err := e.addRecord(row)
h, err := e.addRecord(ctx, row)
if err != nil {
e.handleWarning(err)
}
Expand Down
10 changes: 5 additions & 5 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
return err1
}

handleVal, err1 := e.get(idxKey)
handleVal, err1 := e.get(ctx, idxKey)
if err1 != nil && !kv.ErrNotExist.Equal(err1) {
return err1
}
Expand Down Expand Up @@ -125,7 +125,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
}

key := tablecodec.EncodeRowKeyWithHandle(e.tblInfo.ID, e.handle)
val, err := e.get(key)
val, err := e.get(ctx, key)
if err != nil && !kv.ErrNotExist.Equal(err) {
return err
}
Expand Down Expand Up @@ -175,15 +175,15 @@ func (e *PointGetExecutor) encodeIndexKey() (_ []byte, err error) {
return tablecodec.EncodeIndexSeekKey(e.tblInfo.ID, e.idxInfo.ID, encodedIdxVals), nil
}

func (e *PointGetExecutor) get(key kv.Key) (val []byte, err error) {
func (e *PointGetExecutor) get(ctx context.Context, key kv.Key) (val []byte, err error) {
txn, err := e.ctx.Txn(true)
if err != nil {
return nil, err
}
if txn != nil && txn.Valid() && !txn.IsReadOnly() {
// We cannot use txn.Get directly here because the snapshot in txn and the snapshot of e.snapshot may be
// different for pessimistic transaction.
val, err = txn.GetMemBuffer().Get(key)
val, err = txn.GetMemBuffer().Get(ctx, key)
if err == nil {
return val, err
}
Expand All @@ -192,7 +192,7 @@ func (e *PointGetExecutor) get(key kv.Key) (val []byte, err error) {
}
// fallthrough to snapshot get.
}
return e.snapshot.Get(key)
return e.snapshot.Get(ctx, key)
}

func (e *PointGetExecutor) decodeRowValToChunk(rowVal []byte, chk *chunk.Chunk) error {
Expand Down
18 changes: 9 additions & 9 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (e *ReplaceExec) Open(ctx context.Context) error {

// removeRow removes the duplicate row and cleanup its keys in the key-value map,
// but if the to-be-removed row equals to the to-be-added row, no remove or add things to do.
func (e *ReplaceExec) removeRow(handle int64, r toBeCheckedRow) (bool, error) {
func (e *ReplaceExec) removeRow(ctx context.Context, handle int64, r toBeCheckedRow) (bool, error) {
newRow := r.row
oldRow, err := e.batchChecker.getOldRow(e.ctx, r.t, handle, e.GenExprs)
if err != nil {
Expand All @@ -75,22 +75,22 @@ func (e *ReplaceExec) removeRow(handle int64, r toBeCheckedRow) (bool, error) {
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)

// Cleanup keys map, because the record was removed.
err = e.deleteDupKeys(e.ctx, r.t, [][]types.Datum{oldRow})
err = e.deleteDupKeys(ctx, e.ctx, r.t, [][]types.Datum{oldRow})
if err != nil {
return false, err
}
return false, nil
}

// replaceRow removes all duplicate rows for one row, then inserts it.
func (e *ReplaceExec) replaceRow(r toBeCheckedRow) error {
func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error {
if r.handleKey != nil {
if _, found := e.dupKVs[string(r.handleKey.newKV.key)]; found {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKV.key)
if err != nil {
return err
}
rowUnchanged, err := e.removeRow(handle, r)
rowUnchanged, err := e.removeRow(ctx, handle, r)
if err != nil {
return err
}
Expand All @@ -102,7 +102,7 @@ func (e *ReplaceExec) replaceRow(r toBeCheckedRow) error {

// Keep on removing duplicated rows.
for {
rowUnchanged, foundDupKey, err := e.removeIndexRow(r)
rowUnchanged, foundDupKey, err := e.removeIndexRow(ctx, r)
if err != nil {
return err
}
Expand All @@ -116,7 +116,7 @@ func (e *ReplaceExec) replaceRow(r toBeCheckedRow) error {
}

// No duplicated rows now, insert the row.
newHandle, err := e.addRecord(r.row)
newHandle, err := e.addRecord(ctx, r.row)
if err != nil {
return err
}
Expand All @@ -130,14 +130,14 @@ func (e *ReplaceExec) replaceRow(r toBeCheckedRow) error {
// 2. bool: true when found the duplicated key. This only means that duplicated key was found,
// and the row was removed.
// 3. error: the error.
func (e *ReplaceExec) removeIndexRow(r toBeCheckedRow) (bool, bool, error) {
func (e *ReplaceExec) removeIndexRow(ctx context.Context, r toBeCheckedRow) (bool, bool, error) {
for _, uk := range r.uniqueKeys {
if val, found := e.dupKVs[string(uk.newKV.key)]; found {
handle, err := tables.DecodeHandle(val)
if err != nil {
return false, found, err
}
rowUnchanged, err := e.removeRow(handle, r)
rowUnchanged, err := e.removeRow(ctx, handle, r)
if err != nil {
return false, found, err
}
Expand Down Expand Up @@ -172,7 +172,7 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error {
}
e.ctx.GetSessionVars().StmtCtx.AddRecordRows(uint64(len(newRows)))
for _, r := range e.toBeCheckedRows {
err = e.replaceRow(r)
err = e.replaceRow(ctx, r)
if err != nil {
return err
}
Expand Down
7 changes: 7 additions & 0 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -83,6 +84,12 @@ type TableReaderExecutor struct {

// Open initialzes necessary variables for using this executor.
func (e *TableReaderExecutor) Open(ctx context.Context) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("TableReaderExecutor.Open", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaDistSQL)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)

Expand Down
Loading

0 comments on commit d210889

Please sign in to comment.