Skip to content

Commit

Permalink
*: refactor some functions for better reusability (pingcap#39133)
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta authored Nov 14, 2022
1 parent 4dd00bc commit d734cc8
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 49 deletions.
2 changes: 1 addition & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1274,7 +1274,7 @@ func (w *baseIndexWorker) getIndexRecord(idxInfo *model.IndexInfo, handle kv.Han
idxVal[j] = idxColumnVal
}

rsData := tables.TryGetHandleRestoredDataWrapper(w.table, nil, w.rowMap, idxInfo)
rsData := tables.TryGetHandleRestoredDataWrapper(w.table.Meta(), nil, w.rowMap, idxInfo)
idxRecord := &indexRecord{handle: handle, key: recordKey, vals: idxVal, rsData: rsData}
return idxRecord, nil
}
Expand Down
8 changes: 4 additions & 4 deletions executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (e *CheckIndexRangeExec) buildDAGPB() (*tipb.DAGRequest, error) {
execPB := e.constructIndexScanPB()
dagReq.Executors = append(dagReq.Executors, execPB)

err := plannercore.SetPBColumnsDefaultValue(e.ctx, dagReq.Executors[0].IdxScan.Columns, e.cols)
err := tables.SetPBColumnsDefaultValue(e.ctx, dagReq.Executors[0].IdxScan.Columns, e.cols)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -227,7 +227,7 @@ func (e *RecoverIndexExec) Open(ctx context.Context) error {
func (e *RecoverIndexExec) constructTableScanPB(tblInfo *model.TableInfo, colInfos []*model.ColumnInfo) (*tipb.Executor, error) {
tblScan := tables.BuildTableScanFromInfos(tblInfo, colInfos)
tblScan.TableId = e.physicalID
err := plannercore.SetPBColumnsDefaultValue(e.ctx, tblScan.Columns, colInfos)
err := tables.SetPBColumnsDefaultValue(e.ctx, tblScan.Columns, colInfos)
return &tipb.Executor{Tp: tipb.ExecType_TypeTableScan, TblScan: tblScan}, err
}

Expand Down Expand Up @@ -380,7 +380,7 @@ func (e *RecoverIndexExec) fetchRecoverRows(ctx context.Context, srcResult dists
}
idxVals := extractIdxVals(row, e.idxValsBufs[result.scanRowCount], e.colFieldTypes, idxValLen)
e.idxValsBufs[result.scanRowCount] = idxVals
rsData := tables.TryGetHandleRestoredDataWrapper(e.table, plannercore.GetCommonHandleDatum(e.handleCols, row), nil, e.index.Meta())
rsData := tables.TryGetHandleRestoredDataWrapper(e.table.Meta(), plannercore.GetCommonHandleDatum(e.handleCols, row), nil, e.index.Meta())
e.recoverRows = append(e.recoverRows, recoverRows{handle: handle, idxVals: idxVals, rsData: rsData, skip: false})
result.scanRowCount++
result.currentHandle = handle
Expand Down Expand Up @@ -790,7 +790,7 @@ func (e *CleanupIndexExec) buildIdxDAGPB(txn kv.Transaction) (*tipb.DAGRequest,

execPB := e.constructIndexScanPB()
dagReq.Executors = append(dagReq.Executors, execPB)
err := plannercore.SetPBColumnsDefaultValue(e.ctx, dagReq.Executors[0].IdxScan.Columns, e.columns)
err := tables.SetPBColumnsDefaultValue(e.ctx, dagReq.Executors[0].IdxScan.Columns, e.columns)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2559,7 +2559,7 @@ func (b *executorBuilder) buildAnalyzeSamplingPushdown(task plannercore.AnalyzeC
e.analyzePB.ColReq.PrimaryPrefixColumnIds = tables.PrimaryPrefixColumnIDs(task.TblInfo)
}
}
b.err = plannercore.SetPBColumnsDefaultValue(b.ctx, e.analyzePB.ColReq.ColumnsInfo, task.ColsInfo)
b.err = tables.SetPBColumnsDefaultValue(b.ctx, e.analyzePB.ColReq.ColumnsInfo, task.ColsInfo)
return &analyzeTask{taskType: colTask, colExec: e, job: job}
}

Expand Down Expand Up @@ -2741,7 +2741,7 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plannercore.AnalyzeCo
e.analyzePB.Tp = tipb.AnalyzeType_TypeMixed
e.commonHandle = task.CommonHandleInfo
}
b.err = plannercore.SetPBColumnsDefaultValue(b.ctx, e.analyzePB.ColReq.ColumnsInfo, cols)
b.err = tables.SetPBColumnsDefaultValue(b.ctx, e.analyzePB.ColReq.ColumnsInfo, cols)
return &analyzeTask{taskType: colTask, colExec: e, job: job}
}

Expand Down
36 changes: 2 additions & 34 deletions planner/core/plan_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,9 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/telemetry"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tipb/go-tipb"
)
Expand Down Expand Up @@ -206,7 +203,7 @@ func (p *PhysicalTableScan) ToPB(ctx sessionctx.Context, storeType kv.StoreType)
telemetry.CurrentTiflashTableScanWithFastScanCount.Inc()
}
}
err := SetPBColumnsDefaultValue(ctx, tsExec.Columns, p.Columns)
err := tables.SetPBColumnsDefaultValue(ctx, tsExec.Columns, p.Columns)
return &tipb.Executor{Tp: tipb.ExecType_TypeTableScan, TblScan: tsExec, ExecutorId: &executorID}, err
}

Expand All @@ -218,7 +215,7 @@ func (p *PhysicalTableScan) partitionTableScanToPBForFlash(ctx sessionctx.Contex
}
ptsExec.Desc = p.Desc
executorID := p.ExplainID().String()
err := SetPBColumnsDefaultValue(ctx, ptsExec.Columns, p.Columns)
err := tables.SetPBColumnsDefaultValue(ctx, ptsExec.Columns, p.Columns)
return &tipb.Executor{Tp: tipb.ExecType_TypePartitionTableScan, PartitionTableScan: ptsExec, ExecutorId: &executorID}, err
}

Expand Down Expand Up @@ -599,32 +596,3 @@ func (p *PhysicalSort) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*ti
FineGrainedShuffleBatchSize: ctx.GetSessionVars().TiFlashFineGrainedShuffleBatchSize,
}, nil
}

// SetPBColumnsDefaultValue sets the default values of tipb.ColumnInfos.
func SetPBColumnsDefaultValue(ctx sessionctx.Context, pbColumns []*tipb.ColumnInfo, columns []*model.ColumnInfo) error {
for i, c := range columns {
// For virtual columns, we set their default values to NULL so that TiKV will return NULL properly,
// They real values will be compute later.
if c.IsGenerated() && !c.GeneratedStored {
pbColumns[i].DefaultVal = []byte{codec.NilFlag}
}
if c.GetOriginDefaultValue() == nil {
continue
}

sessVars := ctx.GetSessionVars()
originStrict := sessVars.StrictSQLMode
sessVars.StrictSQLMode = false
d, err := table.GetColOriginDefaultValue(ctx, c)
sessVars.StrictSQLMode = originStrict
if err != nil {
return err
}

pbColumns[i].DefaultVal, err = tablecodec.EncodeValue(sessVars.StmtCtx, nil, d)
if err != nil {
return err
}
}
return nil
}
2 changes: 1 addition & 1 deletion session/schema_amender.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func (a *amendOperationAddIndexInfo) genIndexKeyValue(ctx context.Context, sctx
idxVals = append(idxVals, chk.GetRow(0).GetDatum(oldCol.Offset, &oldCol.FieldType))
}

rsData := tables.TryGetHandleRestoredDataWrapper(a.tblInfoAtCommit, getCommonHandleDatum(a.tblInfoAtCommit, chk.GetRow(0)), nil, a.indexInfoAtCommit.Meta())
rsData := tables.TryGetHandleRestoredDataWrapper(a.tblInfoAtCommit.Meta(), getCommonHandleDatum(a.tblInfoAtCommit, chk.GetRow(0)), nil, a.indexInfoAtCommit.Meta())

// Generate index key buf.
newIdxKey, distinct, err := tablecodec.GenIndexKey(sctx.GetSessionVars().StmtCtx,
Expand Down
2 changes: 1 addition & 1 deletion table/tables/mutation_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func buildIndexKeyValue(index table.Index, rowToInsert []types.Datum, sessVars *
if err != nil {
return nil, nil, err
}
rsData := TryGetHandleRestoredDataWrapper(table, rowToInsert, nil, indexInfo)
rsData := TryGetHandleRestoredDataWrapper(table.meta, rowToInsert, nil, indexInfo)
value, err := tablecodec.GenIndexValuePortal(
sessVars.StmtCtx, &tableInfo, indexInfo, NeedRestoredData(indexInfo.Columns, tableInfo.Columns),
distinct, false, indexedValues, handle, 0, rsData,
Expand Down
41 changes: 35 additions & 6 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,7 @@ func (t *TableCommon) addIndices(sctx sessionctx.Context, recordID kv.Handle, r
}
dupErr = kv.ErrKeyExists.FastGenByArgs(entryKey, fmt.Sprintf("%s.%s", v.TableMeta().Name.String(), v.Meta().Name.String()))
}
rsData := TryGetHandleRestoredDataWrapper(t, r, nil, v.Meta())
rsData := TryGetHandleRestoredDataWrapper(t.meta, r, nil, v.Meta())
if dupHandle, err := v.Create(sctx, txn, indexVals, recordID, rsData, opts...); err != nil {
if kv.ErrKeyExists.Equal(err) {
return dupHandle, dupErr
Expand Down Expand Up @@ -1346,7 +1346,7 @@ func (t *TableCommon) buildIndexForRow(ctx sessionctx.Context, h kv.Handle, vals
if untouched {
opts = append(opts, table.IndexIsUntouched)
}
rsData := TryGetHandleRestoredDataWrapper(t, newData, nil, idx.Meta())
rsData := TryGetHandleRestoredDataWrapper(t.meta, newData, nil, idx.Meta())
if _, err := idx.Create(ctx, txn, vals, h, rsData, opts...); err != nil {
if kv.ErrKeyExists.Equal(err) {
// Make error message consistent with MySQL.
Expand Down Expand Up @@ -1867,14 +1867,14 @@ func (t *TableCommon) GetSequenceCommon() *sequenceCommon {
}

// TryGetHandleRestoredDataWrapper tries to get the restored data for handle if needed. The argument can be a slice or a map.
func TryGetHandleRestoredDataWrapper(t table.Table, row []types.Datum, rowMap map[int64]types.Datum, idx *model.IndexInfo) []types.Datum {
if !collate.NewCollationEnabled() || !t.Meta().IsCommonHandle || t.Meta().CommonHandleVersion == 0 {
func TryGetHandleRestoredDataWrapper(tblInfo *model.TableInfo, row []types.Datum, rowMap map[int64]types.Datum, idx *model.IndexInfo) []types.Datum {
if !collate.NewCollationEnabled() || !tblInfo.IsCommonHandle || tblInfo.CommonHandleVersion == 0 {
return nil
}
rsData := make([]types.Datum, 0, 4)
pkIdx := FindPrimaryIndex(t.Meta())
pkIdx := FindPrimaryIndex(tblInfo)
for _, pkIdxCol := range pkIdx.Columns {
pkCol := t.Meta().Columns[pkIdxCol.Offset]
pkCol := tblInfo.Columns[pkIdxCol.Offset]
if !types.NeedRestoredData(&pkCol.FieldType) {
continue
}
Expand Down Expand Up @@ -1958,6 +1958,35 @@ func BuildPartitionTableScanFromInfos(tableInfo *model.TableInfo, columnInfos []
return tsExec
}

// SetPBColumnsDefaultValue sets the default values of tipb.ColumnInfo.
func SetPBColumnsDefaultValue(ctx sessionctx.Context, pbColumns []*tipb.ColumnInfo, columns []*model.ColumnInfo) error {
for i, c := range columns {
// For virtual columns, we set their default values to NULL so that TiKV will return NULL properly,
// They real values will be computed later.
if c.IsGenerated() && !c.GeneratedStored {
pbColumns[i].DefaultVal = []byte{codec.NilFlag}
}
if c.GetOriginDefaultValue() == nil {
continue
}

sessVars := ctx.GetSessionVars()
originStrict := sessVars.StrictSQLMode
sessVars.StrictSQLMode = false
d, err := table.GetColOriginDefaultValue(ctx, c)
sessVars.StrictSQLMode = originStrict
if err != nil {
return err
}

pbColumns[i].DefaultVal, err = tablecodec.EncodeValue(sessVars.StmtCtx, nil, d)
if err != nil {
return err
}
}
return nil
}

// TemporaryTable is used to store transaction-specific or session-specific information for global / local temporary tables.
// For example, stats and autoID should have their own copies of data, instead of being shared by all sessions.
type TemporaryTable struct {
Expand Down

0 comments on commit d734cc8

Please sign in to comment.