Skip to content

Commit

Permalink
*: support collect and async load stats for mv index (pingcap#46731)
Browse files Browse the repository at this point in the history
  • Loading branch information
time-and-fate authored Sep 13, 2023
1 parent 762b2df commit fd7ff09
Show file tree
Hide file tree
Showing 12 changed files with 547 additions and 35 deletions.
17 changes: 13 additions & 4 deletions executor/analyze_idx.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,28 +66,34 @@ func analyzeIndexPushdown(idxExec *AnalyzeIndexExec) *statistics.AnalyzeResults
if idxExec.analyzePB.IdxReq.Version != nil {
statsVer = int(*idxExec.analyzePB.IdxReq.Version)
}
result := &statistics.AnalyzeResult{
idxResult := &statistics.AnalyzeResult{
Hist: []*statistics.Histogram{hist},
Cms: []*statistics.CMSketch{cms},
TopNs: []*statistics.TopN{topN},
Fms: []*statistics.FMSketch{fms},
IsIndex: 1,
}
if statsVer != statistics.Version2 {
idxResult.Cms = []*statistics.CMSketch{cms}
}
cnt := hist.NullCount
if hist.Len() > 0 {
cnt += hist.Buckets[hist.Len()-1].Count
}
if topN.TotalCount() > 0 {
cnt += int64(topN.TotalCount())
}
return &statistics.AnalyzeResults{
result := &statistics.AnalyzeResults{
TableID: idxExec.tableID,
Ars: []*statistics.AnalyzeResult{result},
Ars: []*statistics.AnalyzeResult{idxResult},
Job: idxExec.job,
StatsVer: statsVer,
Count: cnt,
Snapshot: idxExec.snapshot,
}
if idxExec.idxInfo.MVIndex {
result.ForMVIndex = true
}
return result
}

func (e *AnalyzeIndexExec) buildStats(ranges []*ranger.Range, considerNull bool) (hist *statistics.Histogram, cms *statistics.CMSketch, fms *statistics.FMSketch, topN *statistics.TopN, err error) {
Expand Down Expand Up @@ -224,6 +230,9 @@ func (e *AnalyzeIndexExec) buildStatsFromResult(result distsql.SelectResult, nee
if needCMS && topn.TotalCount() > 0 {
hist.RemoveVals(topn.TopN)
}
if statsVer == statistics.Version2 {
hist.StandardizeForV2AnalyzeIndex()
}
if needCMS && cms != nil {
cms.CalcDefaultValForAnalyze(uint64(hist.NDV))
}
Expand Down
258 changes: 254 additions & 4 deletions executor/test/analyzetest/analyze_test.go

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion planner/cardinality/row_count_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,20 @@ func GetRowCountByIndexRanges(sctx sessionctx.Context, coll *statistics.HistColl
sc := sctx.GetSessionVars().StmtCtx
idx, ok := coll.Indices[idxID]
colNames := make([]string, 0, 8)
isMVIndex := false
if ok {
if idx.Info != nil {
name = idx.Info.Name.O
for _, col := range idx.Info.Columns {
colNames = append(colNames, col.Name.O)
}
isMVIndex = idx.Info.MVIndex
}
}
recordUsedItemStatsStatus(sctx, idx, coll.PhysicalID, idxID)
if !ok || idx.IsInvalid(sctx, coll.Pseudo) {
// For the mv index case, now we have supported collecting stats and async loading stats, but sync loading and
// estimation is not well-supported, so we keep mv index using pseudo estimation for this period of time.
if !ok || idx.IsInvalid(sctx, coll.Pseudo) || isMVIndex {
colsLen := -1
if idx != nil && idx.Info.Unique {
colsLen = len(idx.Info.Columns)
Expand Down
8 changes: 3 additions & 5 deletions planner/core/indexmerge_path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/stretchr/testify/require"
)

func TestAnalyzeMVIndex(t *testing.T) {
func TestAnalyzeMVIndexWarnings(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand All @@ -40,14 +40,12 @@ index idx2(a, b, (cast(j->'$.str' as char(10) array)), c))`)
tk.MustExec("analyze table t")
tk.MustQuery("show warnings").Sort().Check(testkit.Rows(
"Note 1105 Analyze use auto adjusted sample rate 1.000000 for table test.t, reason to use this rate is \"use min(1, 110000/10000) as the sample-rate=1\"",
"Warning 1105 analyzing multi-valued indexes is not supported, skip idx",
"Warning 1105 analyzing multi-valued indexes is not supported, skip idx2"))
))
tk.MustExec("analyze table t index idx")
tk.MustQuery("show warnings").Sort().Check(testkit.Rows(
"Note 1105 Analyze use auto adjusted sample rate 1.000000 for table test.t, reason to use this rate is \"TiDB assumes that the table is empty, use sample-rate=1\"",
"Warning 1105 The version 2 would collect all statistics not only the selected indexes",
"Warning 1105 analyzing multi-valued indexes is not supported, skip idx",
"Warning 1105 analyzing multi-valued indexes is not supported, skip idx2"))
))

tk.MustExec("set tidb_analyze_version=1")
tk.MustExec("analyze table t")
Expand Down
40 changes: 25 additions & 15 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2439,14 +2439,17 @@ func getColOffsetForAnalyze(colsInfo []*model.ColumnInfo, colID int64) int {
// in tblInfo.Indices, index.Columns[i].Offset is set according to tblInfo.Columns. Since we decode row samples according to colsInfo rather than tbl.Columns
// in the execution phase of ANALYZE, we need to modify index.Columns[i].Offset according to colInfos.
// TODO: find a better way to find indexed columns in ANALYZE rather than use IndexColumn.Offset
func getModifiedIndexesInfoForAnalyze(sctx sessionctx.Context, tblInfo *model.TableInfo, allColumns bool, colsInfo []*model.ColumnInfo) []*model.IndexInfo {
// For multi-valued index, we need to collect it separately here and analyze it as independent index analyze task.
// See comments for AnalyzeResults.ForMVIndex for more details.
func getModifiedIndexesInfoForAnalyze(tblInfo *model.TableInfo, allColumns bool, colsInfo []*model.ColumnInfo) ([]*model.IndexInfo, []*model.IndexInfo) {
idxsInfo := make([]*model.IndexInfo, 0, len(tblInfo.Indices))
independentIdxsInfo := make([]*model.IndexInfo, 0)
for _, originIdx := range tblInfo.Indices {
if originIdx.State != model.StatePublic {
continue
}
if originIdx.MVIndex {
sctx.GetSessionVars().StmtCtx.AppendWarning(errors.Errorf("analyzing multi-valued indexes is not supported, skip %s", originIdx.Name.L))
independentIdxsInfo = append(independentIdxsInfo, originIdx)
continue
}
if allColumns {
Expand All @@ -2462,7 +2465,7 @@ func getModifiedIndexesInfoForAnalyze(sctx sessionctx.Context, tblInfo *model.Ta
}
idxsInfo = append(idxsInfo, idx)
}
return idxsInfo
return idxsInfo, independentIdxsInfo
}

// filterSkipColumnTypes filters out columns whose types are in the skipTypes list.
Expand Down Expand Up @@ -2501,14 +2504,13 @@ func (b *PlanBuilder) filterSkipColumnTypes(origin []*model.ColumnInfo, tbl *ast

func (b *PlanBuilder) buildAnalyzeFullSamplingTask(
as *ast.AnalyzeTableStmt,
tasks []AnalyzeColumnsTask,
analyzePlan *Analyze,
physicalIDs []int64,
partitionNames []string,
tbl *ast.TableName,
version int,
persistOpts bool,
rsOptionsMap map[int64]V2AnalyzeOptions,
) ([]AnalyzeColumnsTask, error) {
) error {
// Version 2 doesn't support incremental analyze.
// And incremental analyze will be deprecated in the future.
if as.Incremental {
Expand All @@ -2517,12 +2519,12 @@ func (b *PlanBuilder) buildAnalyzeFullSamplingTask(

astOpts, err := handleAnalyzeOptionsV2(as.AnalyzeOpts)
if err != nil {
return nil, err
return err
}
// Get all column info which need to be analyzed.
astColList, err := getAnalyzeColumnList(as.ColumnNames, tbl)
if err != nil {
return nil, err
return err
}

var predicateCols, mustAnalyzedCols calcOnceMap
Expand All @@ -2533,15 +2535,15 @@ func (b *PlanBuilder) buildAnalyzeFullSamplingTask(

astColsInfo, _, err := b.getFullAnalyzeColumnsInfo(tbl, as.ColumnChoice, astColList, &predicateCols, &mustAnalyzedCols, mustAllColumns, true)
if err != nil {
return nil, err
return err
}
isAnalyzeTable := len(as.PartitionNames) == 0
optionsMap, colsInfoMap, err := b.genV2AnalyzeOptions(persistOpts, tbl, isAnalyzeTable, physicalIDs, astOpts, as.ColumnChoice, astColList, &predicateCols, &mustAnalyzedCols, mustAllColumns)
if err != nil {
return nil, err
return err
}
for physicalID, opts := range optionsMap {
rsOptionsMap[physicalID] = opts
analyzePlan.OptionsMap[physicalID] = opts
}

// Build tasks for each partition.
Expand All @@ -2567,7 +2569,7 @@ func (b *PlanBuilder) buildAnalyzeFullSamplingTask(
}
execColsInfo = b.filterSkipColumnTypes(execColsInfo, tbl, &mustAnalyzedCols)
allColumns := len(tbl.TableInfo.Columns) == len(execColsInfo)
indexes := getModifiedIndexesInfoForAnalyze(b.ctx, tbl.TableInfo, allColumns, execColsInfo)
indexes, independentIndexes := getModifiedIndexesInfoForAnalyze(tbl.TableInfo, allColumns, execColsInfo)
handleCols := BuildHandleColsForAnalyze(b.ctx, tbl.TableInfo, allColumns, execColsInfo)
newTask := AnalyzeColumnsTask{
HandleCols: handleCols,
Expand All @@ -2582,10 +2584,18 @@ func (b *PlanBuilder) buildAnalyzeFullSamplingTask(
newTask.ColsInfo = append(newTask.ColsInfo, extraCol)
newTask.HandleCols = &IntHandleCols{col: colInfoToColumn(extraCol, len(newTask.ColsInfo)-1)}
}
tasks = append(tasks, newTask)
analyzePlan.ColTasks = append(analyzePlan.ColTasks, newTask)
for _, indexInfo := range independentIndexes {
newIdxTask := AnalyzeIndexTask{
IndexInfo: indexInfo,
TblInfo: tbl.TableInfo,
AnalyzeInfo: info,
}
analyzePlan.IdxTasks = append(analyzePlan.IdxTasks, newIdxTask)
}
}

return tasks, nil
return nil
}

func (b *PlanBuilder) genV2AnalyzeOptions(
Expand Down Expand Up @@ -2791,7 +2801,7 @@ func (b *PlanBuilder) buildAnalyzeTable(as *ast.AnalyzeTableStmt, opts map[ast.A
}
var commonHandleInfo *model.IndexInfo
if version == statistics.Version2 {
p.ColTasks, err = b.buildAnalyzeFullSamplingTask(as, p.ColTasks, physicalIDs, partitionNames, tbl, version, usePersistedOptions, p.OptionsMap)
err = b.buildAnalyzeFullSamplingTask(as, p, physicalIDs, partitionNames, tbl, version, usePersistedOptions)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion planner/core/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (ds *DataSource) initStats(colGroups [][]*expression.Column) {
tableStats := &property.StatsInfo{
RowCount: float64(ds.statisticTable.RealtimeCount),
ColNDVs: make(map[int64]float64, ds.schema.Len()),
HistColl: ds.statisticTable.GenerateHistCollFromColumnInfo(ds.tableInfo, ds.schema.Columns),
HistColl: ds.statisticTable.GenerateHistCollFromColumnInfo(ds.tableInfo, ds.TblCols),
StatsVersion: ds.statisticTable.Version,
}
if ds.statisticTable.Pseudo {
Expand Down
2 changes: 1 addition & 1 deletion statistics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ go_test(
data = glob(["testdata/**"]),
embed = [":statistics"],
flaky = True,
shard_count = 41,
shard_count = 42,
deps = [
"//config",
"//parser/ast",
Expand Down
16 changes: 16 additions & 0 deletions statistics/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,20 @@ type AnalyzeResults struct {
BaseCount int64
// BaseModifyCnt is the original modify_count in mysql.stats_meta at the beginning of analyze.
BaseModifyCnt int64
// For multi-valued index analyze, there are some very different behaviors, so we add this field to indicate it.
//
// Analyze result of multi-valued index come from an independent v2 analyze index task (AnalyzeIndexExec), and it's
// done by a scan on the index data and building stats. According to the original design rational of v2 stats, we
// should use the same samples to build stats for all columns/indexes. We created an exceptional case here to avoid
// loading the samples of JSON columns to tidb, which may cost too much memory, and we can't handle such case very
// well now.
//
// As the definition of multi-valued index, the row count and NDV of this index may be higher than the table row
// count. So we can't use this result to update the table-level row count.
// The snapshot field is used by v2 analyze to check if there are concurrent analyze, so we also can't update it.
// The multi-valued index analyze task is always together with another normal v2 analyze table task, which will
// take care of those table-level fields.
// In conclusion, when saving the analyze result for mv index, we need to store the index stats, as for the
// table-level fields, we only need to update the version.
ForMVIndex bool
}
43 changes: 40 additions & 3 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -1111,18 +1111,48 @@ func SaveTableStatsToStorage(sctx sessionctx.Context, results *statistics.Analyz
if len(rows) > 0 {
snapshot := rows[0].GetUint64(0)
// A newer version analyze result has been written, so skip this writing.
if snapshot >= results.Snapshot && results.StatsVer == statistics.Version2 {
// For multi-valued index analyze, this check is not needed because we expect there's another normal v2 analyze
// table task that may update the snapshot in stats_meta table (that task may finish before or after this task).
if snapshot >= results.Snapshot && results.StatsVer == statistics.Version2 && !results.ForMVIndex {
return nil
}
curCnt = int64(rows[0].GetUint64(1))
curModifyCnt = rows[0].GetInt64(2)
}

if len(rows) == 0 || results.StatsVer != statistics.Version2 {
if _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_meta (version, table_id, count, snapshot) values (%?, %?, %?, %?)", version, tableID, results.Count, results.Snapshot); err != nil {
// 1-1.
// a. There's no existing records we can update, we must insert a new row. Or
// b. it's stats v1.
// In these cases, we use REPLACE INTO to directly insert/update the version, count and snapshot.
snapShot := results.Snapshot
count := results.Count
if results.ForMVIndex {
snapShot = 0
count = 0
}
if _, err = exec.ExecuteInternal(ctx,
"replace into mysql.stats_meta (version, table_id, count, snapshot) values (%?, %?, %?, %?)",
version,
tableID,
count,
snapShot,
); err != nil {
return err
}
statsVer = version
} else if results.ForMVIndex {
// 1-2. There's already an existing record for this table, and we are handling stats for mv index now.
// In this case, we only update the version. See comments for AnalyzeResults.ForMVIndex for more details.
if _, err = exec.ExecuteInternal(ctx,
"update mysql.stats_meta set version=%? where table_id=%?",
version,
tableID,
); err != nil {
return err
}
} else {
// 1-3. There's already an existing records for this table, and we are handling a normal v2 analyze.
modifyCnt := curModifyCnt - results.BaseModifyCnt
if modifyCnt < 0 {
modifyCnt = 0
Expand Down Expand Up @@ -1154,7 +1184,14 @@ func SaveTableStatsToStorage(sctx sessionctx.Context, results *statistics.Analyz
zap.Int64("results.Count", results.Count),
zap.Int64("count", cnt))
}
if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version=%?, modify_count=%?, count=%?, snapshot=%? where table_id=%?", version, modifyCnt, cnt, results.Snapshot, tableID); err != nil {
if _, err = exec.ExecuteInternal(ctx,
"update mysql.stats_meta set version=%?, modify_count=%?, count=%?, snapshot=%? where table_id=%?",
version,
modifyCnt,
cnt,
results.Snapshot,
tableID,
); err != nil {
return err
}
statsVer = version
Expand Down
39 changes: 39 additions & 0 deletions statistics/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,45 @@ func (hg *Histogram) RemoveVals(valCntPairs []TopNMeta) {
}
}

// StandardizeForV2AnalyzeIndex fixes some "irregular" places in the Histogram, which come from current implementation of
// analyze index task in v2.
// For now, it does two things: 1. Remove empty buckets. 2. Reset Bucket.NDV to 0.
func (hg *Histogram) StandardizeForV2AnalyzeIndex() {
if hg == nil || len(hg.Buckets) == 0 {
return
}
// Note that hg.Buckets is []Bucket instead of []*Bucket, so we avoid extra memory allocation for the struct Bucket
// in the process below.

// remainedBktIdxs are the positions of the eventually remained buckets in the original hg.Buckets slice.
remainedBktIdxs := make([]int, 0, len(hg.Buckets))
// We use two pointers here.
// checkingIdx is the "fast" one, and it iterates the hg.Buckets and check if they are empty one by one.
// When we find a non-empty bucket, we move it to the position where nextRemainedBktIdx, which is the "slow"
// pointer, points to.
nextRemainedBktIdx := 0
for checkingIdx := range hg.Buckets {
if hg.BucketCount(checkingIdx) <= 0 && hg.Buckets[checkingIdx].Repeat <= 0 {
continue
}
remainedBktIdxs = append(remainedBktIdxs, checkingIdx)
if nextRemainedBktIdx != checkingIdx {
hg.Buckets[nextRemainedBktIdx] = hg.Buckets[checkingIdx]
}
hg.Buckets[nextRemainedBktIdx].NDV = 0
nextRemainedBktIdx++
}
hg.Buckets = hg.Buckets[:nextRemainedBktIdx]

// Get the new Bounds from the original Bounds according to the indexes we collect.
c := chunk.NewChunkWithCapacity([]*types.FieldType{hg.Tp}, len(remainedBktIdxs))
for _, i := range remainedBktIdxs {
c.AppendDatum(0, hg.GetLower(i))
c.AppendDatum(0, hg.GetUpper(i))
}
hg.Bounds = c
}

// AddIdxVals adds the given values to the histogram.
func (hg *Histogram) AddIdxVals(idxValCntPairs []TopNMeta) {
totalAddCnt := int64(0)
Expand Down
Loading

0 comments on commit fd7ff09

Please sign in to comment.