Skip to content

Commit

Permalink
*: refactor mpp err recovery logic (pingcap#50234)
Browse files Browse the repository at this point in the history
  • Loading branch information
guo-shaoge authored Jan 11, 2024
1 parent 190488d commit 02e912c
Show file tree
Hide file tree
Showing 9 changed files with 337 additions and 282 deletions.
4 changes: 2 additions & 2 deletions pkg/distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ import (
"google.golang.org/grpc/metadata"
)

// GenSelectResultFromResponse generates an iterator from response.
func GenSelectResultFromResponse(sctx sessionctx.Context, fieldTypes []*types.FieldType, planIDs []int, rootID int, resp kv.Response) SelectResult {
// GenSelectResultFromMPPResponse generates an iterator from response.
func GenSelectResultFromMPPResponse(sctx sessionctx.Context, fieldTypes []*types.FieldType, planIDs []int, rootID int, resp kv.Response) SelectResult {
// TODO: Add metric label and set open tracing.
return &selectResult{
label: "mpp",
Expand Down
2 changes: 0 additions & 2 deletions pkg/executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,6 @@ go_library(
"//pkg/executor/internal/vecgroupchecker",
"//pkg/executor/lockstats",
"//pkg/executor/metrics",
"//pkg/executor/mppcoordmanager",
"//pkg/executor/mpperr",
"//pkg/executor/sortexec",
"//pkg/expression",
"//pkg/expression/aggregation",
Expand Down
8 changes: 7 additions & 1 deletion pkg/executor/internal/mpp/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "mpp",
srcs = ["local_mpp_coordinator.go"],
srcs = [
"executor_with_retry.go",
"local_mpp_coordinator.go",
"recovery_handler.go",
],
importpath = "github.com/pingcap/tidb/pkg/executor/internal/mpp",
visibility = ["//pkg/executor:__subpackages__"],
deps = [
Expand All @@ -11,6 +15,7 @@ go_library(
"//pkg/executor/internal/builder",
"//pkg/executor/internal/util",
"//pkg/executor/metrics",
"//pkg/executor/mppcoordmanager",
"//pkg/infoschema",
"//pkg/kv",
"//pkg/planner/core",
Expand All @@ -23,6 +28,7 @@ go_library(
"//pkg/util/execdetails",
"//pkg/util/logutil",
"//pkg/util/memory",
"//pkg/util/tiflashcompute",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/mpp",
Expand Down
252 changes: 252 additions & 0 deletions pkg/executor/internal/mpp/executor_with_retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mpp

import (
"context"
"fmt"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/executor/mppcoordmanager"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/memory"
"go.uber.org/zap"
)

// ExecutorWithRetry receive mppResponse from localMppCoordinator,
// and tries to recovery mpp err if necessary.
// The abstraction layer of reading mpp resp:
// 1. MPPGather: As part of the TiDB Volcano model executor, it is equivalent to a TableReader.
// 2. selectResult: Decode select result(mppResponse) into chunk. Also record runtime info.
// 3. ExecutorWithRetry: Recovery mpp err if possible and retry MPP Task.
// 4. localMppCoordinator: Generate MPP fragment and dispatch MPPTask.
// And receive MPP status for better err msg and correct stats for Limit.
// 5. mppIterator: Send or receive MPP RPC.
type ExecutorWithRetry struct {
coord kv.MppCoordinator
sctx sessionctx.Context
is infoschema.InfoSchema
plan plannercore.PhysicalPlan
ctx context.Context
memTracker *memory.Tracker
// mppErrRecovery is designed for the recovery of MPP errors.
// Basic idea:
// 1. It attempts to hold the results of MPP. During the holding process, if an error occurs, it starts error recovery.
// If the recovery is successful, it discards held results and reconstructs the respIter, then re-executes the MPP task.
// If the recovery fails, an error is reported directly.
// 2. If the held MPP results exceed the capacity, will starts returning results to caller.
// Once the results start being returned, error recovery cannot be performed anymore.
mppErrRecovery *RecoveryHandler
planIDs []int
// Expose to let MPPGather access.
KVRanges []kv.KeyRange
queryID kv.MPPQueryID
startTS uint64
gatherID uint64
nodeCnt int
}

var _ kv.Response = &ExecutorWithRetry{}

// NewExecutorWithRetry create ExecutorWithRetry.
func NewExecutorWithRetry(ctx context.Context, sctx sessionctx.Context, parentTracker *memory.Tracker, planIDs []int,
plan plannercore.PhysicalPlan, startTS uint64, queryID kv.MPPQueryID,
is infoschema.InfoSchema) (*ExecutorWithRetry, error) {
// TODO: After add row info in tipb.DataPacket, we can use row count as capacity.
// For now, use the number of tipb.DataPacket as capacity.
const holdCap = 2

disaggTiFlashWithAutoScaler := config.GetGlobalConfig().DisaggregatedTiFlash && config.GetGlobalConfig().UseAutoScaler
_, allowTiFlashFallback := sctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash]

// 1. For now, mpp err recovery only support MemLimit, which is only useful when AutoScaler is used.
// 2. When enable fallback to tikv, the returned mpp err will be ErrTiFlashServerTimeout,
// which we cannot handle for now. Also there is no need to recovery because tikv will retry the query.
// 3. For cached table, will not dispatch tasks to TiFlash, so no need to recovery.
enableMPPRecovery := disaggTiFlashWithAutoScaler && !allowTiFlashFallback

failpoint.Inject("mpp_recovery_test_mock_enable", func() {
if !allowTiFlashFallback {
enableMPPRecovery = true
}
})

recoveryHandler := NewRecoveryHandler(disaggTiFlashWithAutoScaler,
uint64(holdCap), enableMPPRecovery, parentTracker)
memTracker := memory.NewTracker(parentTracker.Label(), 0)
memTracker.AttachTo(parentTracker)
retryer := &ExecutorWithRetry{
ctx: ctx,
sctx: sctx,
memTracker: memTracker,
planIDs: planIDs,
is: is,
plan: plan,
startTS: startTS,
queryID: queryID,
mppErrRecovery: recoveryHandler,
}

var err error
retryer.KVRanges, err = retryer.setupMPPCoordinator(ctx, false)
return retryer, err
}

// Next implements kv.Response interface.
func (r *ExecutorWithRetry) Next(ctx context.Context) (resp kv.ResultSubset, err error) {
if err = r.nextWithRecovery(ctx); err != nil {
return nil, err
}

if r.mppErrRecovery.NumHoldResp() != 0 {
if resp, err = r.mppErrRecovery.PopFrontResp(); err != nil {
return nil, err
}
} else if resp, err = r.coord.Next(ctx); err != nil {
return nil, err
}
return resp, nil
}

// Close implements kv.Response interface.
func (r *ExecutorWithRetry) Close() error {
r.mppErrRecovery.ResetHolder()
r.memTracker.Detach()
mppcoordmanager.InstanceMPPCoordinatorManager.Unregister(r.getCoordUniqueID())
return r.coord.Close()
}

func (r *ExecutorWithRetry) setupMPPCoordinator(ctx context.Context, recoverying bool) ([]kv.KeyRange, error) {
if recoverying {
// Sanity check.
if r.coord == nil {
return nil, errors.New("mpp coordinator should not be nil when recoverying")
}
// Only report runtime stats when there is no error.
r.coord.(*localMppCoordinator).closeWithoutReport()
mppcoordmanager.InstanceMPPCoordinatorManager.Unregister(r.getCoordUniqueID())
}

// Make sure gatherID is updated before build coord.
r.gatherID = allocMPPGatherID(r.sctx)

r.coord = r.buildCoordinator()
if err := mppcoordmanager.InstanceMPPCoordinatorManager.Register(r.getCoordUniqueID(), r.coord); err != nil {
return nil, err
}

_, kvRanges, err := r.coord.Execute(ctx)
if err != nil {
return nil, err
}

if r.nodeCnt = r.coord.GetNodeCnt(); r.nodeCnt <= 0 {
return nil, errors.Errorf("tiflash node count should be greater than zero: %v", r.nodeCnt)
}
return kvRanges, err
}

func (r *ExecutorWithRetry) nextWithRecovery(ctx context.Context) error {
if !r.mppErrRecovery.Enabled() {
return nil
}

for r.mppErrRecovery.CanHoldResult() {
resp, mppErr := r.coord.Next(ctx)

// Mock recovery n times.
failpoint.Inject("mpp_recovery_test_max_err_times", func(forceErrCnt failpoint.Value) {
forceErrCntInt := forceErrCnt.(int)
if r.mppErrRecovery.RecoveryCnt() < uint32(forceErrCntInt) {
mppErr = errors.New("mock mpp error")
}
})

if mppErr != nil {
recoveryErr := r.mppErrRecovery.Recovery(&RecoveryInfo{
MPPErr: mppErr,
NodeCnt: r.nodeCnt,
})

// Mock recovery succeed, ignore no recovery handler err.
failpoint.Inject("mpp_recovery_test_ignore_recovery_err", func() {
if recoveryErr == nil {
panic("mocked mpp err should got recovery err")
}
if strings.Contains(mppErr.Error(), "mock mpp error") && strings.Contains(recoveryErr.Error(), "no handler to recovery") {
recoveryErr = nil
}
})

if recoveryErr != nil {
logutil.BgLogger().Error("recovery mpp error failed", zap.Any("mppErr", mppErr),
zap.Any("recoveryErr", recoveryErr))
return mppErr
}

logutil.BgLogger().Info("recovery mpp error succeed, begin next retry",
zap.Any("mppErr", mppErr), zap.Any("recoveryCnt", r.mppErrRecovery.RecoveryCnt()))

if _, err := r.setupMPPCoordinator(r.ctx, true); err != nil {
logutil.BgLogger().Error("setup resp iter when recovery mpp err failed", zap.Any("err", err))
return mppErr
}
r.mppErrRecovery.ResetHolder()

continue
}

if resp == nil {
break
}

r.mppErrRecovery.HoldResult(resp.(*mppResponse))
}

failpoint.Inject("mpp_recovery_test_hold_size", func(num failpoint.Value) {
// Note: this failpoint only execute once.
curRows := r.mppErrRecovery.NumHoldResp()
numInt := num.(int)
if curRows != numInt {
panic(fmt.Sprintf("unexpected holding rows, cur: %d", curRows))
}
})
return nil
}

func allocMPPGatherID(ctx sessionctx.Context) uint64 {
mppQueryInfo := &ctx.GetSessionVars().StmtCtx.MPPQueryInfo
return mppQueryInfo.AllocatedMPPGatherID.Add(1)
}

func (r *ExecutorWithRetry) buildCoordinator() kv.MppCoordinator {
_, serverAddr := mppcoordmanager.InstanceMPPCoordinatorManager.GetServerAddr()
return NewLocalMPPCoordinator(r.sctx, r.is, r.plan, r.planIDs, r.startTS, r.queryID,
r.gatherID, serverAddr, r.memTracker)
}

func (r *ExecutorWithRetry) getCoordUniqueID() mppcoordmanager.CoordinatorUniqueID {
return mppcoordmanager.CoordinatorUniqueID{
MPPQueryID: r.queryID,
GatherID: r.gatherID,
}
}
8 changes: 6 additions & 2 deletions pkg/executor/internal/mpp/local_mpp_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,13 +613,17 @@ func (c *localMppCoordinator) IsClosed() bool {
// Close implements MppCoordinator interface
// TODO: Test the case that user cancels the query.
func (c *localMppCoordinator) Close() error {
c.closeWithoutReport()
c.handleAllReports()
return nil
}

func (c *localMppCoordinator) closeWithoutReport() {
if atomic.CompareAndSwapUint32(&c.closed, 0, 1) {
close(c.finishCh)
}
c.cancelFunc()
<-c.wgDoneChan
c.handleAllReports()
return nil
}

func (c *localMppCoordinator) handleMPPStreamResponse(bo *backoff.Backoffer, response *mpp.MPPDataPacket, req *kv.MPPDispatchRequest) (err error) {
Expand Down
Loading

0 comments on commit 02e912c

Please sign in to comment.