Skip to content

Commit

Permalink
domain: support plan_replayer_status system table (pingcap#38957)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer authored Nov 10, 2022
1 parent cfbe3c9 commit 066e9c8
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 9 deletions.
14 changes: 14 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ type Domain struct {
cancel context.CancelFunc
indexUsageSyncLease time.Duration
dumpFileGcChecker *dumpFileGcChecker
planReplayerHandle *planReplayerHandle
expiredTimeStamp4PC types.Time
logBackupAdvancer *daemon.OwnerDaemon

Expand Down Expand Up @@ -1530,6 +1531,19 @@ func (do *Domain) TelemetryRotateSubWindowLoop(ctx sessionctx.Context) {
}()
}

// SetupPlanReplayerHandle setup plan replayer handle
func (do *Domain) SetupPlanReplayerHandle(ctx sessionctx.Context) {
do.planReplayerHandle = &planReplayerHandle{
sctx: ctx,
}
do.dumpFileGcChecker.setupPlanReplayerHandle(do.planReplayerHandle)
}

// GetPlanReplayerHandle returns plan replayer handle
func (do *Domain) GetPlanReplayerHandle() *planReplayerHandle {
return do.planReplayerHandle
}

// DumpFileGcCheckerLoop creates a goroutine that handles `exit` and `gc`.
func (do *Domain) DumpFileGcCheckerLoop() {
do.wg.Add(1)
Expand Down
98 changes: 95 additions & 3 deletions domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
package domain

import (
"errors"
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
Expand All @@ -24,17 +25,23 @@ import (
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/zap"
)

// dumpFileGcChecker is used to gc dump file in circle
// For now it is used by `plan replayer` and `trace plan` statement
type dumpFileGcChecker struct {
sync.Mutex
gcLease time.Duration
paths []string
gcLease time.Duration
paths []string
planReplayerHandle *planReplayerHandle
}

// GetPlanReplayerDirName returns plan replayer directory path.
Expand All @@ -44,6 +51,10 @@ func GetPlanReplayerDirName() string {
return filepath.Join(tidbLogDir, "replayer")
}

func parseType(s string) string {
return strings.Split(s, "_")[0]
}

func parseTime(s string) (time.Time, error) {
startIdx := strings.LastIndex(s, "_")
if startIdx == -1 {
Expand All @@ -68,6 +79,10 @@ func (p *dumpFileGcChecker) gcDumpFiles(t time.Duration) {
}
}

func (p *dumpFileGcChecker) setupPlanReplayerHandle(handle *planReplayerHandle) {
p.planReplayerHandle = handle
}

func (p *dumpFileGcChecker) gcDumpFilesByPath(path string, t time.Duration) {
files, err := ioutil.ReadDir(path)
if err != nil {
Expand All @@ -84,13 +99,90 @@ func (p *dumpFileGcChecker) gcDumpFilesByPath(path string, t time.Duration) {
logutil.BgLogger().Error("[dumpFileGcChecker] parseTime failed", zap.Error(err), zap.String("filename", fileName))
continue
}
isPlanReplayer := parseType(fileName) == "replayer"
if !createTime.After(gcTime) {
err := os.Remove(filepath.Join(path, f.Name()))
if err != nil {
logutil.BgLogger().Warn("[dumpFileGcChecker] remove file failed", zap.Error(err), zap.String("filename", fileName))
continue
}
logutil.BgLogger().Info("dumpFileGcChecker successful", zap.String("filename", fileName))
if isPlanReplayer && p.planReplayerHandle != nil {
p.planReplayerHandle.deletePlanReplayerStatus(context.Background(), fileName)
}
}
}
}

type planReplayerHandle struct {
sync.Mutex
sctx sessionctx.Context
}

// DeletePlanReplayerStatus delete mysql.plan_replayer_status record
func (h *planReplayerHandle) deletePlanReplayerStatus(ctx context.Context, token string) {
ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
h.Lock()
defer h.Unlock()
exec := h.sctx.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(ctx1, fmt.Sprintf("delete from mysql.plan_replayer_status where token = %v", token))
if err != nil {
logutil.BgLogger().Warn("delete mysql.plan_replayer_status record failed", zap.String("token", token), zap.Error(err))
}
}

// InsertPlanReplayerStatus insert mysql.plan_replayer_status record
func (h *planReplayerHandle) InsertPlanReplayerStatus(ctx context.Context, records []PlanReplayerStatusRecord) {
ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
var instance string
serverInfo, err := infosync.GetServerInfo()
if err != nil {
logutil.BgLogger().Error("failed to get server info", zap.Error(err))
instance = "unknown"
} else {
instance = fmt.Sprintf("%s:%d", serverInfo.IP, serverInfo.Port)
}
for _, record := range records {
if !record.Internal {
if len(record.FailedReason) > 0 {
h.insertExternalPlanReplayerErrorStatusRecord(ctx1, instance, record)
} else {
h.insertExternalPlanReplayerSuccessStatusRecord(ctx1, instance, record)
}
}
}
}

func (h *planReplayerHandle) insertExternalPlanReplayerErrorStatusRecord(ctx context.Context, instance string, record PlanReplayerStatusRecord) {
h.Lock()
defer h.Unlock()
exec := h.sctx.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(ctx, fmt.Sprintf(
"insert into mysql.plan_replayer_status (origin_sql, fail_reason, instance) values ('%s','%s','%s')",
record.OriginSQL, record.FailedReason, instance))
if err != nil {
logutil.BgLogger().Warn("insert mysql.plan_replayer_status record failed",
zap.Error(err))
}
}

func (h *planReplayerHandle) insertExternalPlanReplayerSuccessStatusRecord(ctx context.Context, instance string, record PlanReplayerStatusRecord) {
h.Lock()
defer h.Unlock()
exec := h.sctx.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(ctx, fmt.Sprintf(
"insert into mysql.plan_replayer_status (origin_sql, token, instance) values ('%s','%s','%s')",
record.OriginSQL, record.Token, instance))
if err != nil {
logutil.BgLogger().Warn("insert mysql.plan_replayer_status record failed",
zap.Error(err))
}
}

// PlanReplayerStatusRecord indicates record in mysql.plan_replayer_status
type PlanReplayerStatusRecord struct {
Internal bool
OriginSQL string
Token string
FailedReason string
}
7 changes: 7 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,13 @@ func TestPlanReplayer(t *testing.T) {
tk.MustQuery("plan replayer dump explain select * from v1")
tk.MustQuery("plan replayer dump explain select * from v2")
require.True(t, len(tk.Session().GetSessionVars().LastPlanReplayerToken) > 0)

// clear the status table and assert
tk.MustExec("delete from mysql.plan_replayer_status")
tk.MustQuery("plan replayer dump explain select * from v2")
token := tk.Session().GetSessionVars().LastPlanReplayerToken
rows := tk.MustQuery(fmt.Sprintf("select * from mysql.plan_replayer_status where token = '%v'", token)).Rows()
require.Len(t, rows, 1)
}

func TestShow(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion executor/infoschema_cluster_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func TestTableStorageStats(t *testing.T) {
"test 2",
))
rows := tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()
result := 37
result := 38
require.Len(t, rows, result)

// More tests about the privileges.
Expand Down
20 changes: 20 additions & 0 deletions executor/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,
sessionVars := task.SessionVars
execStmts := task.ExecStmts
zw := zip.NewWriter(zf)
records := generateRecords(task)
defer func() {
err = zw.Close()
if err != nil {
Expand All @@ -298,7 +299,12 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,
err = zf.Close()
if err != nil {
logutil.BgLogger().Error("Closing zip file failed", zap.Error(err), zap.String("filename", fileName))
for i, record := range records {
record.FailedReason = err.Error()
records[i] = record
}
}
domain.GetDomain(sctx).GetPlanReplayerHandle().InsertPlanReplayerStatus(ctx, records)
}()
// Dump config
if err = dumpConfig(zw); err != nil {
Expand Down Expand Up @@ -367,6 +373,20 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,
return dumpExplain(sctx, zw, execStmts, task.Analyze)
}

func generateRecords(task *PlanReplayerDumpTask) []domain.PlanReplayerStatusRecord {
records := make([]domain.PlanReplayerStatusRecord, 0)
if len(task.ExecStmts) > 0 {
for _, execStmt := range task.ExecStmts {
records = append(records, domain.PlanReplayerStatusRecord{
OriginSQL: execStmt.Text(),
Token: task.FileName,
Internal: false,
})
}
}
return records
}

func dumpConfig(zw *zip.Writer) error {
cf, err := zw.Create(configFile)
if err != nil {
Expand Down
24 changes: 23 additions & 1 deletion session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,16 @@ const (
CreateMDLView = `CREATE OR REPLACE VIEW mysql.tidb_mdl_view as (
select JOB_ID, DB_NAME, TABLE_NAME, QUERY, SESSION_ID, TxnStart, TIDB_DECODE_SQL_DIGESTS(ALL_SQL_DIGESTS, 4096) AS SQL_DIGESTS from information_schema.ddl_jobs, information_schema.CLUSTER_TIDB_TRX, information_schema.CLUSTER_PROCESSLIST where ddl_jobs.STATE = 'running' and find_in_set(ddl_jobs.table_id, CLUSTER_TIDB_TRX.RELATED_TABLE_IDS) and CLUSTER_TIDB_TRX.SESSION_ID=CLUSTER_PROCESSLIST.ID
);`

// CreatePlanReplayerStatusTable is a table about plan replayer status
CreatePlanReplayerStatusTable = `CREATE TABLE IF NOT EXISTS mysql.plan_replayer_status (
sql_digest VARCHAR(128),
plan_digest VARCHAR(128),
origin_sql TEXT,
token VARCHAR(128),
update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
fail_reason TEXT,
instance VARCHAR(512) NOT NULL comment 'address of the TiDB instance executing the plan replayer job');`
)

// bootstrap initiates system DB for a store.
Expand Down Expand Up @@ -644,11 +654,13 @@ const (
version99 = 99
// version100 converts server-memory-quota to a sysvar
version100 = 100
// version101 add mysql.plan_replayer_status table
version101 = 101
)

// currentBootstrapVersion is defined as a variable, so we can modify its value for testing.
// please make sure this is the largest version
var currentBootstrapVersion int64 = version100
var currentBootstrapVersion int64 = version101

// DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it.
var internalSQLTimeout = owner.ManagerSessionTTL + 15
Expand Down Expand Up @@ -753,6 +765,7 @@ var (
upgradeToVer97,
upgradeToVer98,
upgradeToVer100,
upgradeToVer101,
}
)

Expand Down Expand Up @@ -1987,6 +2000,13 @@ func upgradeToVer98(s Session, ver int64) {
doReentrantDDL(s, "ALTER TABLE mysql.user ADD COLUMN IF NOT EXISTS `Token_issuer` varchar(255)")
}

func upgradeToVer101(s Session, ver int64) {
if ver >= version101 {
return
}
doReentrantDDL(s, CreatePlanReplayerStatusTable)
}

func upgradeToVer99Before(s Session, ver int64) bool {
if ver >= version99 {
return false
Expand Down Expand Up @@ -2122,6 +2142,8 @@ func doDDLWorks(s Session) {
mustExecute(s, CreateAdvisoryLocks)
// Create mdl view.
mustExecute(s, CreateMDLView)
// Create plan_replayer_status table
mustExecute(s, CreatePlanReplayerStatusTable)
}

// inTestSuite checks if we are bootstrapping in the context of tests.
Expand Down
20 changes: 16 additions & 4 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2895,7 +2895,7 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {

analyzeConcurrencyQuota := int(config.GetGlobalConfig().Performance.AnalyzePartitionConcurrencyQuota)
concurrency := int(config.GetGlobalConfig().Performance.StatsLoadConcurrency)
ses, err := createSessions(store, 7+concurrency+analyzeConcurrencyQuota)
ses, err := createSessions(store, 7)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2969,21 +2969,33 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
}()
}

// setup dumpFileGcChecker
dom.SetupPlanReplayerHandle(ses[6])
dom.DumpFileGcCheckerLoop()

// A sub context for update table stats, and other contexts for concurrent stats loading.
cnt := 1 + concurrency
syncStatsCtxs, err := createSessions(store, cnt)
if err != nil {
return nil, err
}
subCtxs := make([]sessionctx.Context, cnt)
for i := 0; i < cnt; i++ {
subCtxs[i] = sessionctx.Context(ses[6+i])
subCtxs[i] = sessionctx.Context(syncStatsCtxs[i])
}
if err = dom.LoadAndUpdateStatsLoop(subCtxs); err != nil {
return nil, err
}

analyzeCtxs, err := createSessions(store, analyzeConcurrencyQuota)
if err != nil {
return nil, err
}
subCtxs2 := make([]sessionctx.Context, analyzeConcurrencyQuota)
for i := 0; i < analyzeConcurrencyQuota; i++ {
subCtxs2[i] = ses[7+concurrency+i]
subCtxs2[i] = analyzeCtxs[i]
}
dom.SetupAnalyzeExec(subCtxs2)
dom.DumpFileGcCheckerLoop()
dom.LoadSigningCertLoop(cfg.Security.SessionTokenSigningCert, cfg.Security.SessionTokenSigningKey)

if raw, ok := store.(kv.EtcdBackend); ok {
Expand Down

0 comments on commit 066e9c8

Please sign in to comment.