forked from XiaoMi/soar
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.go
1167 lines (1002 loc) · 37.3 KB
/
index.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Copyright 2018 Xiaomi, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package advisor
import (
"fmt"
"sort"
"strings"
"github.com/XiaoMi/soar/ast"
"github.com/XiaoMi/soar/common"
"github.com/XiaoMi/soar/database"
"github.com/XiaoMi/soar/env"
"github.com/dchest/uniuri"
"vitess.io/vitess/go/vt/sqlparser"
)
const (
// IndexNameMaxLength Ref. https://dev.mysql.com/doc/refman/8.0/en/identifiers.html
IndexNameMaxLength = 64
)
// IndexAdvisor 索引建议需要使用到的所有信息
type IndexAdvisor struct {
vEnv *env.VirtualEnv // 线下虚拟测试环境(测试环境)
rEnv database.Connector // 线上真实环境
Ast sqlparser.Statement // Vitess Parser生成的抽象语法树
where []*common.Column // 所有where条件中用到的列
whereEQ []*common.Column // where条件中可以加索引的等值条件列
whereINEQ []*common.Column // where条件中可以加索引的非等值条件列
groupBy []*common.Column // group by可以加索引列
orderBy []*common.Column // order by可以加索引列
joinCond [][]*common.Column // 由于join condition跨层级间索引不可共用,需要多一个维度用来维护层级关系
IndexMeta map[string]map[string]*database.TableIndexInfo
}
// IndexInfo 创建一条索引需要的信息
type IndexInfo struct {
Name string `json:"name"` // 索引名称
Database string `json:"database"` // 数据库名
Table string `json:"table"` // 表名
DDL string `json:"ddl"` // ALTER, CREATE 等类型的 DDL 语句
ColumnDetails []*common.Column `json:"column_details"` // 列详情
}
// IndexAdvises IndexAdvises列表
type IndexAdvises []IndexInfo
// mergeAdvices 合并索引建议
func mergeAdvices(dst []IndexInfo, src ...IndexInfo) IndexAdvises {
if len(src) == 0 {
return dst
}
for _, newIdx := range src {
has := false
for _, idx := range dst {
if newIdx.DDL == idx.DDL {
common.Log.Debug("merge index %s and %s", idx.Name, newIdx.Name)
has = true
}
}
if !has {
dst = append(dst, newIdx)
}
}
return dst
}
// NewAdvisor 构造一个 IndexAdvisor 的时候就会对其本身结构初始化
// 获取 condition 中的等值条件、非等值条件,以及group by 、 order by信息
func NewAdvisor(env *env.VirtualEnv, rEnv database.Connector, q Query4Audit) (*IndexAdvisor, error) {
common.Log.Debug("Enter: NewAdvisor(), Caller: %s", common.Caller())
if common.Config.TestDSN.Disable {
return nil, fmt.Errorf("TestDSN is Disabled: %s", common.Config.TestDSN.Addr)
}
// DDL 检测
switch stmt := q.Stmt.(type) {
case *sqlparser.DDL:
// 获取ast中用到的库表
sqlMeta := ast.GetMeta(q.Stmt, nil)
for db := range sqlMeta {
dbRef := db
if db == "" {
dbRef = rEnv.Database
}
// DDL 在 Env 初始化的时候已经执行过了
if _, ok := env.TableMap[dbRef]; !ok {
env.TableMap[dbRef] = make(map[string]string)
}
for _, tb := range sqlMeta[db].Table {
env.TableMap[dbRef][tb.TableName] = tb.TableName
}
}
return &IndexAdvisor{
vEnv: env,
rEnv: rEnv,
Ast: q.Stmt,
}, nil
case *sqlparser.DBDDL:
// 忽略建库语句
return nil, nil
case *sqlparser.Use:
// 如果是use,切基础环境
env.Database = env.DBHash(stmt.DBName.String())
return nil, nil
}
return &IndexAdvisor{
vEnv: env,
rEnv: rEnv,
Ast: q.Stmt,
// 所有的FindXXXXCols尽最大可能先排除不需要加索引的列,但由于元数据在此阶段尚未补齐,给出的列有可能也无法添加索引
// 后续需要通过CompleteColumnsInfo + calcCardinality补全后再进一步判断
joinCond: ast.FindJoinCols(q.Stmt),
whereEQ: ast.FindWhereEQ(q.Stmt),
whereINEQ: ast.FindWhereINEQ(q.Stmt),
groupBy: ast.FindGroupByCols(q.Stmt),
orderBy: ast.FindOrderByCols(q.Stmt),
where: ast.FindAllCols(q.Stmt, ast.WhereExpression),
IndexMeta: make(map[string]map[string]*database.TableIndexInfo),
}, nil
}
/*
关于如何添加索引:
在《Relational Database Index Design and the Optimizers》一书中,作者提出著名的的三星索引理论(Three-Star Index)
To Qualify for the First Star:
Pick the columns from all equal predicates (WHERE COL = . . .).
Make these the first columns of the index—in any order. For CURSOR41, the three-star index will begin with
columns LNAME, CITY or CITY, LNAME. In both cases the index slice that must be scanned will be as thin as possible.
To Qualify for the Second Star:
Add the ORDER BY columns. Do not change the order of these columns, but ignore columns that were already
picked in step 1. For example, if CURSOR41 had redundant columns in the ORDER BY, say ORDER BY LNAME,
FNAME or ORDER BY FNAME, CITY, only FNAME would be added in this step. When FNAME is the third index column,
the result table will be in the right order without sorting. The first FETCH call will return the row with
the smallest FNAME value.
To Qualify for the Third Star:
Add all the remaining columns from the SELECT statement. The order of the columns added in this step
has no impact on the performance of the SELECT, but the cost of updates should be reduced by placing volatile
columns at the end. Now the index contains all the columns required for an index-only access path.
索引添加算法正是以这个理想化索策略添为基础,尽可能的给予"三星"索引建议。
但又如《High Performance MySQL》一书中所说,索引并不总是最好的工具。只有当索引帮助存储引擎快速查找到记录带来的好处大于其
带来的额外工作时,索引才是有效的。
因此,在三星索引理论的基础上引入启发式索引算法,在第二颗星的实现上做了部分改进,对于非等值条件只会添加散粒度最高的一列到索引中,
并基于总体列的使用情况作出判断,按需对order by、group by添加索引,由此来想`增强索引建议的通用性。
*/
// IndexAdvise 索引优化建议算法入口主函数
// TODO 索引顺序该如何确定
func (idxAdv *IndexAdvisor) IndexAdvise() IndexAdvises {
// 支持不依赖DB的索引建议分析
if common.Config.TestDSN.Disable {
// 未开启Env原数据依赖,信息不全的情况下可能会给予错误的索引建议,请人工进行核查。
common.Log.Warn("TestDSN.Disable = true")
}
// 检查否是否含有子查询
subQueries := ast.FindSubquery(0, idxAdv.Ast)
var subQueryAdvises []IndexInfo
// 含有子查询对子查询进行单独评审,子查询评审建议报错忽略
if len(subQueries) > 0 {
for _, subSQL := range subQueries {
stmt, err := sqlparser.Parse(subSQL)
if err != nil {
continue
}
q := Query4Audit{
Query: subSQL,
Stmt: stmt,
}
subIdxAdv, _ := NewAdvisor(idxAdv.vEnv, idxAdv.rEnv, q)
subQueryAdvises = append(subQueryAdvises, subIdxAdv.IndexAdvise()...)
}
}
// 变量初始化,用于存放索引信息,按照db.tb.[cols]组织
indexList := make(map[string]map[string][]*common.Column)
// 为用到的每一列填充库名,表名等信息
var joinCond [][]*common.Column
for _, joinCols := range idxAdv.joinCond {
joinCond = append(joinCond, CompleteColumnsInfo(idxAdv.Ast, joinCols, idxAdv.vEnv))
}
idxAdv.joinCond = joinCond
idxAdv.where = CompleteColumnsInfo(idxAdv.Ast, idxAdv.where, idxAdv.vEnv)
idxAdv.whereEQ = CompleteColumnsInfo(idxAdv.Ast, idxAdv.whereEQ, idxAdv.vEnv)
idxAdv.whereINEQ = CompleteColumnsInfo(idxAdv.Ast, idxAdv.whereINEQ, idxAdv.vEnv)
idxAdv.groupBy = CompleteColumnsInfo(idxAdv.Ast, idxAdv.groupBy, idxAdv.vEnv)
idxAdv.orderBy = CompleteColumnsInfo(idxAdv.Ast, idxAdv.orderBy, idxAdv.vEnv)
// 只要在开启使用env元数据的时候才会计算散粒度
if !common.Config.TestDSN.Disable {
// 计算joinCond, whereEQ, whereINEQ用到的每一列的散粒度,并排序,方便后续添加复合索引
// groupBy, orderBy列按书写顺序给索引建议,不需要按散粒度排序
idxAdv.calcCardinality(idxAdv.whereEQ)
idxAdv.calcCardinality(idxAdv.whereINEQ)
idxAdv.calcCardinality(idxAdv.orderBy)
idxAdv.calcCardinality(idxAdv.groupBy)
for i, joinCols := range idxAdv.joinCond {
idxAdv.calcCardinality(joinCols)
joinCols = common.ColumnSort(joinCols)
idxAdv.joinCond[i] = joinCols
}
// 根据散粒度进行排序
// 对所有列进行排序,按散粒度由大到小排序
idxAdv.whereEQ = common.ColumnSort(idxAdv.whereEQ)
idxAdv.whereINEQ = common.ColumnSort(idxAdv.whereINEQ)
idxAdv.orderBy = common.ColumnSort(idxAdv.orderBy)
idxAdv.groupBy = common.ColumnSort(idxAdv.groupBy)
}
// 是否指定Where条件,打标签
hasWhere := false
err := sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) {
switch where := node.(type) {
case *sqlparser.Subquery:
return false, nil
case *sqlparser.Where:
if where != nil {
hasWhere = true
}
}
return true, nil
}, idxAdv.Ast)
common.LogIfError(err, "")
// 获取哪些列被忽略
var ignore []*common.Column
usedCols := append(idxAdv.whereINEQ, idxAdv.whereEQ...)
for _, whereCol := range idxAdv.where {
isUsed := false
for _, used := range usedCols {
if whereCol.Equal(used) {
isUsed = true
}
}
if !isUsed {
common.Log.Debug("column %s in `%s`.`%s` will ignore when adding index", whereCol.DB, whereCol.Table, whereCol.Name)
ignore = append(ignore, whereCol)
}
}
// 索引优化算法入口,从这里开始放大招
if hasWhere {
// 有Where条件的先分析 等值条件
for _, index := range idxAdv.whereEQ {
// 对应列在前面已经按散粒度由大到小排序好了
idxAdv.mergeIndex(indexList, index)
}
// 若存在非等值查询条件,可以给第一个非等值条件添加索引
if len(idxAdv.whereINEQ) > 0 {
idxAdv.mergeIndex(indexList, idxAdv.whereINEQ[0])
}
// 有WHERE条件,但 WHERE 条件未能给出索引建议就不能再加 GROUP BY 和 ORDER BY 建议了
if len(ignore) == 0 {
// 没有非等值查询条件时可以再为 GroupBy 和 OrderBy 添加索引
for _, index := range idxAdv.groupBy {
idxAdv.mergeIndex(indexList, index)
}
// OrderBy
// 没有 GroupBy 时可以为 OrderBy 加索引
if len(idxAdv.groupBy) == 0 {
for _, index := range idxAdv.orderBy {
idxAdv.mergeIndex(indexList, index)
}
}
}
} else {
// 未指定 Where 条件的,只需要 GroupBy 和 OrderBy 的索引建议
for _, index := range idxAdv.groupBy {
idxAdv.mergeIndex(indexList, index)
}
// OrderBy
// 没有GroupBy 时可以为 OrderBy 加索引
// 没有 where 条件时 OrderBy 的索引仅能够在索引覆盖的情况下被使用
// if len(idxAdv.groupBy) == 0 {
// for _, index := range idxAdv.orderBy {
// mergeIndex(indexList, index)
// }
// }
}
// 开始整合索引信息,添加索引
var indexes []IndexInfo
// 为join添加索引
// 获取 join condition 中需要加索引的表有哪些
defaultDB := ""
if !common.Config.TestDSN.Disable {
defaultDB = idxAdv.vEnv.RealDB(idxAdv.vEnv.Database)
}
if !common.Config.OnlineDSN.Disable {
defaultDB = idxAdv.rEnv.Database
}
// 根据join table的信息给予优化建议
joinTableMeta := ast.FindJoinTable(idxAdv.Ast, nil).SetDefault(idxAdv.rEnv.Database).SetDefault(defaultDB)
indexes = mergeAdvices(indexes, idxAdv.buildJoinIndex(joinTableMeta)...)
if common.Config.TestDSN.Disable || common.Config.OnlineDSN.Disable {
// 无 env 环境下只提供单列索引,无法确定 table 时不给予优化建议
// 仅有 table 信息时给出的建议不包含 DB 信息
indexes = mergeAdvices(indexes, idxAdv.buildIndexWithNoEnv(indexList)...)
} else {
// 给出尽可能详细的索引建议
indexes = mergeAdvices(indexes, idxAdv.buildIndex(indexList)...)
}
indexes = mergeAdvices(indexes, subQueryAdvises...)
// 在开启 env 的情况下,检查数据库版本,字段类型,索引总长度
indexes = idxAdv.idxColsTypeCheck(indexes)
// 在开启 env 的情况下,会对索引进行检查,对全索引进行过滤
// 在前几步都不会对 idx 生成 DDL 语句,DDL语句在这里生成
return idxAdv.mergeIndexes(indexes)
}
// idxColsTypeCheck 对超长的字段添加前缀索引,剔除无法添索引字段的列
// TODO: 暂不支持 fulltext 索引,
func (idxAdv *IndexAdvisor) idxColsTypeCheck(idxList []IndexInfo) []IndexInfo {
if common.Config.TestDSN.Disable {
return rmSelfDupIndex(idxList)
}
var indexes []IndexInfo
for _, idx := range idxList {
var newCols []*common.Column
var newColInfo []string
// 索引总长度
idxBytesTotal := 0
isOverFlow := false
for _, col := range idx.ColumnDetails {
// 获取字段 bytes
bytes := col.GetDataBytes(common.Config.OnlineDSN.Version)
tmpCol := col.Name
overFlow := 0
// 加上该列后是否索引长度过长
if bytes < 0 {
// bytes < 0 说明字段的长度是无法计算的
common.Log.Warning("%s.%s data type not support %s, can't add index",
col.Table, col.Name, col.DataType)
continue
}
// idx bytes over flow
if total := idxBytesTotal + bytes; total > common.Config.MaxIdxBytes {
common.Log.Debug("bytes: %d, idxBytesTotal: %d, total: %d, common.Config.MaxIdxBytes: %d",
bytes, idxBytesTotal, total, common.Config.MaxIdxBytes)
overFlow = total - common.Config.MaxIdxBytes
isOverFlow = true
} else {
idxBytesTotal = total
}
// common.Config.MaxIdxColBytes 默认大小 767
if bytes > common.Config.MaxIdxBytesPerColumn || isOverFlow {
// In 5.6, you may not include a column that equates to
// bigger than 767 bytes: VARCHAR(255) CHARACTER SET utf8 or VARCHAR(191) CHARACTER SET utf8mb4.
// In 5.7 you may not include a column that equates to
// bigger than 3072 bytes.
// v : 在 col.Character 字符集下每个字符占用 v bytes
v, ok := common.CharSets[strings.ToLower(col.Character)]
if !ok {
// 找不到对应字符集,不添加索引
// 如果出现不认识的字符集,认为每个字符占用4个字节
common.Log.Warning("%s.%s(%s) charset not support yet %s, use default 4 bytes length",
col.Table, col.Name, col.DataType, col.Character)
v = 4
}
// 保留两个字节的安全余量
length := (common.Config.MaxIdxBytesPerColumn - 2) / v
if isOverFlow {
// 在索引中添加该列会导致索引长度过长,建议根据需求转换为合理的前缀索引
// _OPR_SPLIT_ 是自定的用于后续处理的特殊分隔符
common.Log.Warning("adding index '%s(%s)' to table '%s' causes the index to be too long, overflow is %d",
col.Name, col.DataType, col.Table, overFlow)
tmpCol += fmt.Sprintf("_OPR_SPLIT_(N)")
} else {
// 索引没有过长,可以加一个最长的前缀索引
common.Log.Warning("index column too large: %s.%s --> %s.%s(%d), data type: %s",
col.Table, col.Name, col.Table, tmpCol, length, col.DataType)
tmpCol += fmt.Sprintf("_OPR_SPLIT_(%d)", length)
}
}
newCols = append(newCols, col)
newColInfo = append(newColInfo, tmpCol)
}
// 为新索引重建索引语句
idxName := "idx_"
idxCols := ""
for i, newCol := range newColInfo {
// 对名称和可能存在的长度进行拼接
// 用等号进行分割
tmp := strings.Split(newCol, "_OPR_SPLIT_")
idxName += tmp[0]
if len(tmp) > 1 {
idxCols += tmp[0] + "`" + tmp[1]
} else {
idxCols += tmp[0] + "`"
}
if i+1 < len(newColInfo) {
idxName += "_"
idxCols += ",`"
}
}
// 索引名称最大长度64
if len(idxName) > IndexNameMaxLength {
common.Log.Warn("index '%s' name large than IndexNameMaxLength", idxName)
idxName = strings.TrimRight(idxName[:IndexNameMaxLength], "_")
}
// 新的alter语句
newDDL := fmt.Sprintf("alter table `%s`.`%s` add index `%s` (`%s)", idxAdv.vEnv.RealDB(idx.Database),
idx.Table, idxName, idxCols)
// 将筛选改造后的索引信息信息加入到新的索引列表中
idx.ColumnDetails = newCols
idx.DDL = newDDL
indexes = append(indexes, idx)
}
return indexes
}
// mergeIndexes 与线上环境对比,将给出的索引建议进行去重
func (idxAdv *IndexAdvisor) mergeIndexes(idxList []IndexInfo) []IndexInfo {
// TODO 暂不支持前缀索引去重
if common.Config.TestDSN.Disable {
return rmSelfDupIndex(idxList)
}
var indexes []IndexInfo
for _, idx := range idxList {
// 将 DB 替换成 vEnv 中的数据库名称
dbInVEnv := idx.Database
if _, ok := idxAdv.vEnv.DBRef[idx.Database]; ok {
dbInVEnv = idxAdv.vEnv.DBRef[idx.Database]
}
// 检测索引添加的表是否是视图
if idxAdv.vEnv.IsView(idx.Table) {
common.Log.Info("%s.%s is a view. no need indexed", idx.Database, idx.Table)
continue
}
// 检测是否存在重复索引
indexMeta := idxAdv.IndexMeta[dbInVEnv][idx.Table]
isExisted := false
// 检测无索引列的情况
if len(idx.ColumnDetails) < 1 {
continue
}
if existedIndexes := indexMeta.FindIndex(database.IndexColumnName, idx.ColumnDetails[0].Name); len(existedIndexes) > 0 {
for _, existedIdx := range existedIndexes {
// flag: 用于标记已存在的索引是否是约束条件
isConstraint := false
var cols []string
var colsDetail []*common.Column
// 把已经存在的 key 摘出来遍历一遍对比是否是包含关系
for _, col := range indexMeta.FindIndex(database.IndexKeyName, existedIdx.KeyName) {
cols = append(cols, col.ColumnName)
colsDetail = append(colsDetail, &common.Column{
Name: col.ColumnName,
Table: idx.Table,
DB: idx.ColumnDetails[0].DB,
})
}
// 判断已存在的索引是否属于约束条件(唯一索引、主键)
// 这里可以忽略是否含有外键的情况,因为索引已经重复了,添加了新索引后原先重复的索引是可以删除的。
if existedIdx.NonUnique == 0 {
common.Log.Debug("%s.%s表%s为约束条件", dbInVEnv, idx.Table, existedIdx.KeyName)
isConstraint = true
}
// 如果已存在的索引与索引建议存在重叠,则说明无需添加新索引或可能需要给出删除索引的建议
if common.IsColsPart(colsDetail, idx.ColumnDetails) {
idxName := existedIdx.KeyName
// 如果已经存在的索引包含需要添加的索引,则无需添加
if len(colsDetail) >= len(idx.ColumnDetails) {
common.Log.Info(" `%s`.`%s` %s already had a index `%s`",
idx.Database, idx.Table, strings.Join(cols, ","), idxName)
isExisted = true
continue
}
// 库、表、列名需要用反撇转义
// TODO: 关于外键索引去重的优雅解决方案
if !isConstraint {
if common.Config.AllowDropIndex {
alterSQL := fmt.Sprintf("alter table `%s`.`%s` drop index `%s`", idx.Database, idx.Table, idxName)
indexes = append(indexes, IndexInfo{
Name: idxName,
Database: idx.Database,
Table: idx.Table,
DDL: alterSQL,
ColumnDetails: colsDetail,
})
} else {
common.Log.Warning("In table `%s`, the new index of column `%s` contains index `%s`,"+
" maybe you could drop one of them.", existedIdx.Table,
strings.Join(cols, ","), idxName)
}
}
}
}
}
if !isExisted {
// 检测索引名称是否重复?
if existedIndexes := indexMeta.FindIndex(database.IndexKeyName, idx.Name); len(existedIndexes) > 0 {
var newName string
idxSuffix := getRandomIndexSuffix()
if len(idx.Name) < IndexNameMaxLength-len(idxSuffix) {
newName = idx.Name + idxSuffix
} else {
newName = idx.Name[:IndexNameMaxLength-len(idxSuffix)] + idxSuffix
}
common.Log.Warning("duplicate index name '%s', new name is '%s'", idx.Name, newName)
idx.DDL = strings.Replace(idx.DDL, idx.Name, newName, -1)
idx.Name = newName
}
// 添加合并
indexes = mergeAdvices(indexes, idx)
}
}
// 对索引进行去重
return rmSelfDupIndex(indexes)
}
// getRandomIndexSuffix format: _xxxx, length: 5
func getRandomIndexSuffix() string {
return fmt.Sprintf("_%s", uniuri.New()[:4])
}
// rmSelfDupIndex 去重传入的[]IndexInfo中重复的索引
func rmSelfDupIndex(indexes []IndexInfo) []IndexInfo {
var resultIndex []IndexInfo
tmpIndexList := indexes
for _, a := range indexes {
tmp := a
for i, b := range tmpIndexList {
if common.IsColsPart(tmp.ColumnDetails, b.ColumnDetails) && tmp.Name != b.Name {
if len(b.ColumnDetails) > len(tmp.ColumnDetails) {
common.Log.Debug("remove duplicate index: %s", tmp.Name)
tmp = b
}
if i < len(tmpIndexList) {
tmpIndexList = append(tmpIndexList[:i], tmpIndexList[i+1:]...)
} else {
tmpIndexList = tmpIndexList[:i]
}
}
}
resultIndex = mergeAdvices(resultIndex, tmp)
}
return resultIndex
}
// buildJoinIndex 检查Join中使用的库表是否需要添加索引并给予索引建议
func (idxAdv *IndexAdvisor) buildJoinIndex(meta common.Meta) []IndexInfo {
var indexes []IndexInfo
for _, IndexCols := range idxAdv.joinCond {
// 如果该列的库表为join condition中需要添加索引的库表
indexColsList := make(map[string]map[string][]*common.Column)
for _, col := range IndexCols {
idxAdv.mergeIndex(indexColsList, col)
}
if common.Config.TestDSN.Disable || common.Config.OnlineDSN.Disable {
indexes = mergeAdvices(indexes, idxAdv.buildIndexWithNoEnv(indexColsList)...)
continue
}
indexes = mergeAdvices(indexes, idxAdv.buildIndex(indexColsList)...)
}
return indexes
}
// buildIndex 尽可能的将 map[string]map[string][]*common.Column 转换成 []IndexInfo
// 此处不判断索引是否重复
func (idxAdv *IndexAdvisor) buildIndex(idxList map[string]map[string][]*common.Column) []IndexInfo {
var indexes []IndexInfo
for db, tbs := range idxList {
for tb, cols := range tbs {
// 单个索引中含有的列收 config 中参数限制
if len(cols) > common.Config.MaxIdxColsCount {
cols = cols[:common.Config.MaxIdxColsCount]
}
var colNames []string
for _, col := range cols {
if col.DB == "" || col.Table == "" {
common.Log.Warn("can not get the meta info of column '%s'", col.Name)
continue
}
colNames = append(colNames, col.Name)
}
if len(colNames) == 0 {
continue
}
idxName := common.Config.IdxPrefix + strings.Join(colNames, "_")
// 索引名称最大长度64
if len(idxName) > IndexNameMaxLength {
common.Log.Warn("index '%s' name large than IndexNameMaxLength", idxName)
idxName = strings.TrimRight(idxName[:IndexNameMaxLength], "_")
}
alterSQL := fmt.Sprintf("alter table `%s`.`%s` add index `%s` (`%s`)", idxAdv.vEnv.RealDB(db), tb,
idxName, strings.Join(colNames, "`,`"))
indexes = append(indexes, IndexInfo{
Name: idxName,
Database: idxAdv.vEnv.RealDB(db),
Table: tb,
DDL: alterSQL,
ColumnDetails: cols,
})
}
}
return indexes
}
// buildIndexWithNoEnv 忽略原数据,给予最基础的索引
func (idxAdv *IndexAdvisor) buildIndexWithNoEnv(indexList map[string]map[string][]*common.Column) []IndexInfo {
// 如果不获取数据库原信息,则不去判断索引是否重复,且只给单列加索引
var indexes []IndexInfo
for _, tableIndex := range indexList {
for _, indexCols := range tableIndex {
for _, col := range indexCols {
if col.Table == "" {
common.Log.Warn("can not get the meta info of column '%s'", col.Name)
continue
}
idxName := common.Config.IdxPrefix + col.Name
// 库、表、列名需要用反撇转义
alterSQL := fmt.Sprintf("alter table `%s`.`%s` add index `%s` (`%s`)", idxAdv.vEnv.RealDB(col.DB), col.Table, idxName, col.Name)
if col.DB == "" {
alterSQL = fmt.Sprintf("alter table `%s` add index `%s` (`%s`)", col.Table, idxName, col.Name)
}
indexes = append(indexes, IndexInfo{
Name: idxName,
Database: idxAdv.vEnv.RealDB(col.DB),
Table: col.Table,
DDL: alterSQL,
ColumnDetails: []*common.Column{col},
})
}
}
}
return indexes
}
// mergeIndex 将索引用到的列去重后合并到一起
func (idxAdv *IndexAdvisor) mergeIndex(idxList map[string]map[string][]*common.Column, column *common.Column) {
// 散粒度低于阈值将不会添加索引
if common.Config.MinCardinality/100 > column.Cardinality {
return
}
db := column.DB
tb := column.Table
if idxList[db] == nil {
idxList[db] = make(map[string][]*common.Column)
}
if idxList[db][tb] == nil {
idxList[db][tb] = make([]*common.Column, 0)
}
// 去除重复列Append
exist := false
for _, cl := range idxList[db][tb] {
if cl.Name == column.Name {
exist = true
}
}
// 将 DB 替换成 vEnv 中的数据库名称
dbInVEnv := db
if _, ok := idxAdv.vEnv.DBRef[db]; ok {
dbInVEnv = idxAdv.vEnv.DBRef[db]
}
indexMeta := idxAdv.IndexMeta[dbInVEnv][tb]
// 主键列不需要追加
pr := indexMeta.FindIndex(database.IndexKeyName, "PRIMARY")
for _, c := range pr {
if c.ColumnName == column.Name {
exist = true
}
}
if !exist {
idxList[db][tb] = append(idxList[db][tb], column)
}
}
// CompleteColumnsInfo 补全索引可能会用到列的所属库名、表名等信息
func CompleteColumnsInfo(stmt sqlparser.Statement, cols []*common.Column, env *env.VirtualEnv) []*common.Column {
// 如果传过来的列是空的,没必要跑逻辑
if len(cols) == 0 {
return cols
}
// 从 Ast 中拿到 DBStructure,包含所有表的相关信息
dbs := ast.GetMeta(stmt, nil)
// 此处生成的 meta 信息中不应该含有""db的信息,若 DB 为空则认为是已传入的 db 为默认 db 并进行信息补全
// BUG Fix:
// 修补 dbs 中空 DB 的导致后续补全列信息时无法获取正确 table 名称的问题
if _, ok := dbs[""]; ok {
dbs[env.Database] = dbs[""]
delete(dbs, "")
}
tableCount := 0
for db := range dbs {
for tb := range dbs[db].Table {
if tb != "" {
tableCount++
}
}
}
var noEnvTmp []*common.Column
for _, col := range cols {
for db := range dbs {
// 对每一列进行比对,将别名转换为正确的名称
find := false
for _, tb := range dbs[db].Table {
for _, tbAlias := range tb.TableAliases {
if col.Table != "" && col.Table == tbAlias {
common.Log.Debug("column '%s' prefix change: %s --> %s", col.Name, col.Table, tb.TableName)
find = true
col.Table = tb.TableName
col.DB = db
break
}
}
if find {
break
}
}
// 如果不依赖env环境,利用ast中包含的信息推理列的库表信息
if common.Config.TestDSN.Disable {
if tableCount == 1 {
for _, tb := range dbs[db].Table {
col.Table = tb.TableName
// 因为tableMeta是按照库表组织的树状结构,db变量贯穿全局
// 只有在最终赋值前才能根据逻辑变更补全
if db == "" {
db = env.Database
}
col.DB = db
}
}
// 如果SQL中含有的表大于一个,则使用的列中必须含有前缀,不然无法判断该列属于哪个表
// 如果某一列未含有前缀信息,则认为两张表中都含有该列,需要由人去判断
if tableCount > 1 {
if col.Table == "" {
for _, tb := range dbs[db].Table {
if tb.TableName == "" {
common.Log.Warn("can not get the meta info of column '%s'", col.Name)
}
if db == "" {
db = env.RealDB(env.Database)
}
col.Table = tb.TableName
col.DB = db
tmp := *col
tmp.Table = tb.TableName
tmp.DB = db
noEnvTmp = append(noEnvTmp, &tmp)
}
}
if col.DB == "" {
if db == "" {
db = env.Database
}
col.DB = db
}
}
break
}
// 将已经获取到正确表信息的列信息带入到env中,利用show columns where table 获取库表信息
// 此出会传入之前从ast中,该 db 下获取的所有表来作为where限定条件,
// 防止与SQL无关的库表信息干扰准确性
// 此处传入的是测试环境,DB 是经过变换的,所以在寻找列名的时候需要将 DB 名称转换成测试环境中经过 hash 的 DB 名称
// 不然会找不到col的信息
realCols, err := env.FindColumn(col.Name, env.DBHash(db), dbs.Tables(db)...)
if err != nil {
common.Log.Warn("%v", err)
continue
}
// 对比 column 信息中的表名与从 env 中获取的库表名的一致性
for _, realCol := range realCols {
if col.Name == realCol.Name {
// 如果查询到了列名一致,但从 ast 中获取的列的前缀与 env 中的表信息不符
// 1.存在一个同名列,但不同表,该情况下忽略
// 2.存在一个未正确转换的别名(如表名为),该情况下修正,大概率是正确的
if col.Table != "" && col.Table != realCol.Table {
has, _ := env.FindColumn(col.Name, env.DBHash(db), col.Table)
if len(has) > 0 {
realCol = has[0]
}
}
col.DataType = realCol.DataType
col.Table = realCol.Table
col.DB = env.RealDB(realCol.DB)
col.Character = realCol.Character
col.Collation = realCol.Collation
}
}
}
}
// 如果不依赖env环境,将可能存在的列也加入到索引预处理列表中
if common.Config.TestDSN.Disable {
cols = append(cols, noEnvTmp...)
}
return cols
}
// calcCardinality 计算每一列的散粒度
// 这个函数需要在补全列的库表信息之后再调用,否则无法确定要计算列的归属
func (idxAdv *IndexAdvisor) calcCardinality(cols []*common.Column) []*common.Column {
common.Log.Debug("Enter: calcCardinality(), Caller: %s", common.Caller())
tmpDB := *idxAdv.vEnv
for _, col := range cols {
// 补全对应列的库->表->索引信息到IndexMeta
// 这将在后面用于判断某一列是否为主键或单列唯一索引,快速返回散粒度
if col.DB == "" {
col.DB = idxAdv.vEnv.Database
}
realDB := idxAdv.vEnv.DBHash(col.DB)
if idxAdv.IndexMeta[realDB] == nil {
idxAdv.IndexMeta[realDB] = make(map[string]*database.TableIndexInfo)
}
if idxAdv.IndexMeta[realDB][col.Table] == nil {
tmpDB.Database = realDB
indexInfo, err := tmpDB.ShowIndex(col.Table)
if err != nil {
// 如果是不存在的表就会报错,报错的可能性有三个:
// 1.数据库错误 2.表不存在 3.临时表
// 而这三种错误都是不需要在这一层关注的,直接跳过
common.Log.Warn("calcCardinality error: %v", err)
continue
}
// 将获取的索引信息以db.tb 维度组织到 IndexMeta 中
idxAdv.IndexMeta[realDB][col.Table] = indexInfo
}
// 检查对应列是否为主键或单列唯一索引,如果满足直接返回1,不再重复计算,提高效率
// 多列复合唯一索引不能跳过计算,单列普通索引不能跳过计算
for _, index := range idxAdv.IndexMeta[realDB][col.Table].Rows {
// 根据索引的名称判断该索引包含的列数,列数大于1即为复合索引
columnCount := len(idxAdv.IndexMeta[realDB][col.Table].FindIndex(database.IndexKeyName, index.KeyName))
if col.Name == index.ColumnName {
// 主键、唯一键 无需计算散粒度
if (index.KeyName == "PRIMARY" || index.NonUnique == 0) && columnCount == 1 {
common.Log.Debug("column '%s' is PK or UK, no need to calculate cardinality.", col.Name)
col.Cardinality = 1
break
}
}
}
// 给非 PRIMARY、UNIQUE 的列计算散粒度
if col.Cardinality != 1 {
col.Cardinality = idxAdv.vEnv.ColumnCardinality(col.Table, col.Name)
}
}
return cols
}
// Format 用于格式化输出索引建议
func (idxAdvs IndexAdvises) Format() map[string]Rule {
rulesMap := make(map[string]Rule)
number := 1
rules := make(map[string]*Rule)
sqls := make(map[string][]string)
for _, advise := range idxAdvs {
advKey := advise.Database + advise.Table
if _, ok := sqls[advKey]; !ok {
sqls[advKey] = make([]string, 0)
}
sqls[advKey] = append(sqls[advKey], advise.DDL)
if _, ok := rules[advKey]; !ok {
summary := fmt.Sprintf("为%s库的%s表添加索引", advise.Database, advise.Table)
if advise.Database == "" {
summary = fmt.Sprintf("为%s表添加索引", advise.Table)
}
rules[advKey] = &Rule{
Summary: summary,
Content: "",
Severity: "L2",
}
}
for _, col := range advise.ColumnDetails {
// 为了更好地显示效果
if common.Config.Sampling {
cardinal := fmt.Sprintf("%0.2f", col.Cardinality*100)
if cardinal != "0.00" {
rules[advKey].Content += fmt.Sprintf("为列%s添加索引,散粒度为: %s%%; ",
col.Name, cardinal)
}
} else {