Skip to content

Commit

Permalink
disttask: rename scheduler to TaskExecutor (pingcap#49025)
Browse files Browse the repository at this point in the history
  • Loading branch information
ywqzzy authored Dec 1, 2023
1 parent 7353fbe commit b21bb3e
Show file tree
Hide file tree
Showing 49 changed files with 502 additions and 510 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,10 @@ mock_lightning: tools/bin/mockgen
tools/bin/mockgen -package mock github.com/pingcap/tidb/br/pkg/utils TaskRegister > br/pkg/mock/task_register.go

gen_mock: tools/bin/mockgen
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/scheduler TaskTable,Pool,Scheduler,Extension > pkg/disttask/framework/mock/scheduler_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor TaskTable,Pool,TaskExecutor,Extension > pkg/disttask/framework/mock/task_executor_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/dispatcher Dispatcher,CleanUpRoutine,TaskManager > pkg/disttask/framework/mock/dispatcher_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/dispatcher Extension > pkg/disttask/framework/dispatcher/mock/dispatcher_mock.go
tools/bin/mockgen -package execute github.com/pingcap/tidb/pkg/disttask/framework/scheduler/execute SubtaskExecutor > pkg/disttask/framework/mock/execute/execute_mock.go
tools/bin/mockgen -package execute github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute SubtaskExecutor > pkg/disttask/framework/mock/execute/execute_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/importinto MiniTaskExecutor > pkg/disttask/importinto/mock/import_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/planner LogicalPlan,PipelineSpec > pkg/disttask/framework/mock/plan_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/util/sqlexec RestrictedSQLExecutor > pkg/util/sqlexec/mock/restricted_sql_executor_mock.go
Expand Down
6 changes: 3 additions & 3 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ go_library(
"backfilling.go",
"backfilling_clean_s3.go",
"backfilling_dispatcher.go",
"backfilling_dist_scheduler.go",
"backfilling_dist_executor.go",
"backfilling_import_cloud.go",
"backfilling_merge_sort.go",
"backfilling_operators.go",
Expand Down Expand Up @@ -81,9 +81,9 @@ go_library(
"//pkg/disttask/framework/dispatcher",
"//pkg/disttask/framework/handle",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/scheduler",
"//pkg/disttask/framework/scheduler/execute",
"//pkg/disttask/framework/storage",
"//pkg/disttask/framework/taskexecutor",
"//pkg/disttask/framework/taskexecutor/execute",
"//pkg/disttask/operator",
"//pkg/domain/infosync",
"//pkg/domain/resourcegroup",
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/backfilling_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (*BackfillingDispatcherExt) OnDone(_ context.Context, _ dispatcher.TaskHand

// GetEligibleInstances implements dispatcher.Extension interface.
func (*BackfillingDispatcherExt) GetEligibleInstances(ctx context.Context, _ *proto.Task) ([]*infosync.ServerInfo, bool, error) {
serverInfos, err := dispatcher.GenerateSchedulerNodes(ctx)
serverInfos, err := dispatcher.GenerateTaskExecutorNodes(ctx)
if err != nil {
return nil, true, err
}
Expand Down Expand Up @@ -349,7 +349,7 @@ func generateGlobalSortIngestPlan(
if err != nil {
return nil, err
}
instanceIDs, err := dispatcher.GenerateSchedulerNodes(ctx)
instanceIDs, err := dispatcher.GenerateTaskExecutorNodes(ctx)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/backend/external"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler/execute"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/logutil"
Expand Down Expand Up @@ -100,28 +100,28 @@ func NewBackfillSubtaskExecutor(_ context.Context, taskMeta []byte, d *ddl,
}
}

type backfillDistScheduler struct {
*scheduler.BaseScheduler
type backfillDistExecutor struct {
*taskexecutor.BaseTaskExecutor
d *ddl
task *proto.Task
taskTable scheduler.TaskTable
taskTable taskexecutor.TaskTable
backendCtx ingest.BackendCtx
jobID int64
}

func newBackfillDistScheduler(ctx context.Context, id string, task *proto.Task, taskTable scheduler.TaskTable, d *ddl) scheduler.Scheduler {
s := &backfillDistScheduler{
BaseScheduler: scheduler.NewBaseScheduler(ctx, id, task.ID, taskTable),
d: d,
task: task,
taskTable: taskTable,
func newBackfillDistExecutor(ctx context.Context, id string, task *proto.Task, taskTable taskexecutor.TaskTable, d *ddl) taskexecutor.TaskExecutor {
s := &backfillDistExecutor{
BaseTaskExecutor: taskexecutor.NewBaseTaskExecutor(ctx, id, task.ID, taskTable),
d: d,
task: task,
taskTable: taskTable,
}
s.BaseScheduler.Extension = s
s.BaseTaskExecutor.Extension = s
return s
}

func (s *backfillDistScheduler) Init(ctx context.Context) error {
err := s.BaseScheduler.Init(ctx)
func (s *backfillDistExecutor) Init(ctx context.Context) error {
err := s.BaseTaskExecutor.Init(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -153,7 +153,7 @@ func (s *backfillDistScheduler) Init(ctx context.Context) error {
return nil
}

func (s *backfillDistScheduler) GetSubtaskExecutor(ctx context.Context, task *proto.Task, summary *execute.Summary) (execute.SubtaskExecutor, error) {
func (s *backfillDistExecutor) GetSubtaskExecutor(ctx context.Context, task *proto.Task, summary *execute.Summary) (execute.SubtaskExecutor, error) {
switch task.Step {
case proto.StepOne, proto.StepTwo, proto.StepThree:
return NewBackfillSubtaskExecutor(ctx, task.Meta, s.d, s.backendCtx, task.Step, summary)
Expand All @@ -162,13 +162,13 @@ func (s *backfillDistScheduler) GetSubtaskExecutor(ctx context.Context, task *pr
}
}

func (*backfillDistScheduler) IsIdempotent(*proto.Subtask) bool {
func (*backfillDistExecutor) IsIdempotent(*proto.Subtask) bool {
return true
}

func (s *backfillDistScheduler) Close() {
func (s *backfillDistExecutor) Close() {
if s.backendCtx != nil {
ingest.LitBackCtxMgr.Unregister(s.jobID)
}
s.BaseScheduler.Close()
s.BaseTaskExecutor.Close()
}
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/backend/external"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler/execute"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute"
"github.com/pingcap/tidb/pkg/disttask/operator"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
Expand Down
10 changes: 5 additions & 5 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
"github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/disttask/framework/dispatcher"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
Expand Down Expand Up @@ -677,10 +677,10 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
ddlJobCh: make(chan struct{}, 100),
}

scheduler.RegisterTaskType(proto.Backfill,
func(ctx context.Context, id string, task *proto.Task, taskTable scheduler.TaskTable) scheduler.Scheduler {
return newBackfillDistScheduler(ctx, id, task, taskTable, d)
}, scheduler.WithSummary,
taskexecutor.RegisterTaskType(proto.Backfill,
func(ctx context.Context, id string, task *proto.Task, taskTable taskexecutor.TaskTable) taskexecutor.TaskExecutor {
return newBackfillDistExecutor(ctx, id, task, taskTable, d)
}, taskexecutor.WithSummary,
)

dispatcher.RegisterDispatcherFactory(proto.Backfill,
Expand Down
1 change: 0 additions & 1 deletion pkg/disttask/framework/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ go_test(
"//pkg/disttask/framework/dispatcher",
"//pkg/disttask/framework/handle",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/scheduler",
"//pkg/disttask/framework/storage",
"//pkg/disttask/framework/testutil",
"//pkg/testkit",
Expand Down
53 changes: 26 additions & 27 deletions pkg/disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ var (
// TaskHandle provides the interface for operations needed by Dispatcher.
// Then we can use dispatcher's function in Dispatcher interface.
type TaskHandle interface {
// GetPreviousSchedulerIDs gets previous scheduler IDs.
GetPreviousSchedulerIDs(_ context.Context, taskID int64, step proto.Step) ([]string, error)
// GetPreviousTaskExecutorIDs gets previous task executor IDs.
GetPreviousTaskExecutorIDs(_ context.Context, taskID int64, step proto.Step) ([]string, error)
// GetPreviousSubtaskMetas gets previous subtask metas.
GetPreviousSubtaskMetas(taskID int64, step proto.Step) ([][]byte, error)
storage.SessionExecutor
Expand Down Expand Up @@ -98,9 +98,8 @@ type BaseDispatcher struct {
liveNodeFetchInterval int
// liveNodeFetchTick is the tick variable.
liveNodeFetchTick int
// TaskNodes stores the id of current scheduler nodes.
// TaskNodes stores the id of current task executor nodes.
TaskNodes []string

// rand is for generating random selection of nodes.
rand *rand.Rand
}
Expand Down Expand Up @@ -370,7 +369,7 @@ func (d *BaseDispatcher) BalanceSubtasks() error {
// 1. init TaskNodes if needed.
if len(d.TaskNodes) == 0 {
var err error
d.TaskNodes, err = d.taskMgr.GetSchedulerIDsByTaskIDAndStep(d.ctx, d.Task.ID, d.Task.Step)
d.TaskNodes, err = d.taskMgr.GetTaskExecutorIDsByTaskIDAndStep(d.ctx, d.Task.ID, d.Task.Step)
if err != nil {
return err
}
Expand All @@ -379,7 +378,7 @@ func (d *BaseDispatcher) BalanceSubtasks() error {
if d.liveNodeFetchTick == d.liveNodeFetchInterval {
// 2. update LiveNodes.
d.liveNodeFetchTick = 0
serverInfos, err := GenerateSchedulerNodes(d.ctx)
serverInfos, err := GenerateTaskExecutorNodes(d.ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -450,21 +449,21 @@ func (d *BaseDispatcher) ReDispatchSubtasks() error {
}
subtasks = append(subtasks, subtasksOnDeadNodes...)
}
// 3. group subtasks for each scheduler.
subtasksOnScheduler := make(map[string][]*proto.Subtask, len(d.LiveNodes)+len(deadNodes))
// 3. group subtasks for each task executor.
subtasksOnTaskExecutor := make(map[string][]*proto.Subtask, len(d.LiveNodes)+len(deadNodes))
for _, node := range d.LiveNodes {
execID := disttaskutil.GenerateExecID(node.IP, node.Port)
subtasksOnScheduler[execID] = make([]*proto.Subtask, 0)
subtasksOnTaskExecutor[execID] = make([]*proto.Subtask, 0)
}
for _, subtask := range subtasks {
subtasksOnScheduler[subtask.ExecID] = append(
subtasksOnScheduler[subtask.ExecID],
subtasksOnTaskExecutor[subtask.ExecID] = append(
subtasksOnTaskExecutor[subtask.ExecID],
subtask)
}
// 4. prepare subtasks that need to rebalance to other nodes.
averageSubtaskCnt := len(subtasks) / len(d.LiveNodes)
rebalanceSubtasks := make([]*proto.Subtask, 0)
for k, v := range subtasksOnScheduler {
for k, v := range subtasksOnTaskExecutor {
if ok := deadNodesMap[k]; ok {
rebalanceSubtasks = append(rebalanceSubtasks, v...)
continue
Expand All @@ -488,7 +487,7 @@ func (d *BaseDispatcher) ReDispatchSubtasks() error {
}
// 6.rebalance subtasks to other nodes.
rebalanceIdx := 0
for k, v := range subtasksOnScheduler {
for k, v := range subtasksOnTaskExecutor {
if ok := deadNodesMap[k]; !ok {
if len(v) < averageSubtaskCnt {
for i := 0; i < averageSubtaskCnt-len(v) && rebalanceIdx < len(rebalanceSubtasks); i++ {
Expand All @@ -508,7 +507,7 @@ func (d *BaseDispatcher) ReDispatchSubtasks() error {
}

// 8. update subtasks and do clean up logic.
if err = d.taskMgr.UpdateSubtasksSchedulerIDs(d.ctx, d.Task.ID, subtasks); err != nil {
if err = d.taskMgr.UpdateSubtasksExecIDs(d.ctx, d.Task.ID, subtasks); err != nil {
return err
}
logutil.Logger(d.logCtx).Info("rebalance subtasks",
Expand Down Expand Up @@ -562,7 +561,7 @@ func (d *BaseDispatcher) onErrHandlingStage(receiveErrs []error) error {
var subTasks []*proto.Subtask
// when step of task is `StepInit`, no need to do revert
if d.Task.Step != proto.StepInit {
instanceIDs, err := d.GetAllSchedulerIDs(d.ctx, d.Task)
instanceIDs, err := d.GetAllTaskExecutorIDs(d.ctx, d.Task)
if err != nil {
logutil.Logger(d.logCtx).Warn("get task's all instances failed", zap.Error(err))
return err
Expand Down Expand Up @@ -724,9 +723,9 @@ func (d *BaseDispatcher) handlePlanErr(err error) error {
// MockServerInfo exported for dispatcher_test.go
var MockServerInfo []*infosync.ServerInfo

// GenerateSchedulerNodes generate a eligible TiDB nodes.
func GenerateSchedulerNodes(ctx context.Context) (serverNodes []*infosync.ServerInfo, err error) {
failpoint.Inject("mockSchedulerNodes", func() {
// GenerateTaskExecutorNodes generate a eligible TiDB nodes.
func GenerateTaskExecutorNodes(ctx context.Context) (serverNodes []*infosync.ServerInfo, err error) {
failpoint.Inject("mockTaskExecutorNodes", func() {
failpoint.Return(MockServerInfo, nil)
})
var serverInfos map[string]*infosync.ServerInfo
Expand Down Expand Up @@ -774,24 +773,24 @@ func (d *BaseDispatcher) filterByRole(infos []*infosync.ServerInfo) ([]*infosync
return res, nil
}

// GetAllSchedulerIDs gets all the scheduler IDs.
func (d *BaseDispatcher) GetAllSchedulerIDs(ctx context.Context, task *proto.Task) ([]string, error) {
// GetAllTaskExecutorIDs gets all the task executor IDs.
func (d *BaseDispatcher) GetAllTaskExecutorIDs(ctx context.Context, task *proto.Task) ([]string, error) {
// We get all servers instead of eligible servers here
// because eligible servers may change during the task execution.
serverInfos, err := GenerateSchedulerNodes(ctx)
serverInfos, err := GenerateTaskExecutorNodes(ctx)
if err != nil {
return nil, err
}
if len(serverInfos) == 0 {
return nil, nil
}

schedulerIDs, err := d.taskMgr.GetSchedulerIDsByTaskID(d.ctx, task.ID)
executorIDs, err := d.taskMgr.GetTaskExecutorIDsByTaskID(d.ctx, task.ID)
if err != nil {
return nil, err
}
ids := make([]string, 0, len(schedulerIDs))
for _, id := range schedulerIDs {
ids := make([]string, 0, len(executorIDs))
for _, id := range executorIDs {
if ok := disttaskutil.MatchServerInfo(serverInfos, id); ok {
ids = append(ids, id)
}
Expand All @@ -813,9 +812,9 @@ func (d *BaseDispatcher) GetPreviousSubtaskMetas(taskID int64, step proto.Step)
return previousSubtaskMetas, nil
}

// GetPreviousSchedulerIDs gets scheduler IDs that run previous step.
func (d *BaseDispatcher) GetPreviousSchedulerIDs(_ context.Context, taskID int64, step proto.Step) ([]string, error) {
return d.taskMgr.GetSchedulerIDsByTaskIDAndStep(d.ctx, taskID, step)
// GetPreviousTaskExecutorIDs gets task executor IDs that run previous step.
func (d *BaseDispatcher) GetPreviousTaskExecutorIDs(_ context.Context, taskID int64, step proto.Step) ([]string, error) {
return d.taskMgr.GetTaskExecutorIDsByTaskIDAndStep(d.ctx, taskID, step)
}

// WithNewSession executes the function with a new session.
Expand Down
4 changes: 2 additions & 2 deletions pkg/disttask/framework/dispatcher/dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,9 @@ func (dm *Manager) doCleanUpRoutine() {
// CleanUpMeta clean up old node info in dist_framework_meta table.
func (dm *Manager) CleanUpMeta() int {
// Safe to discard errors since this function can be called at regular intervals.
serverInfos, err := GenerateSchedulerNodes(dm.ctx)
serverInfos, err := GenerateTaskExecutorNodes(dm.ctx)
if err != nil {
logutil.BgLogger().Warn("generate scheduler nodes met error")
logutil.BgLogger().Warn("generate task executor nodes met error")
return 0
}

Expand Down
18 changes: 9 additions & 9 deletions pkg/disttask/framework/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func getNumberExampleDispatcherExt(ctrl *gomock.Controller) dispatcher.Extension
mockDispatcher.EXPECT().OnTick(gomock.Any(), gomock.Any()).Return().AnyTimes()
mockDispatcher.EXPECT().GetEligibleInstances(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, _ *proto.Task) ([]*infosync.ServerInfo, bool, error) {
serverInfo, err := dispatcher.GenerateSchedulerNodes(ctx)
serverInfo, err := dispatcher.GenerateTaskExecutorNodes(ctx)
return serverInfo, true, err
},
).AnyTimes()
Expand Down Expand Up @@ -159,14 +159,14 @@ func TestGetInstance(t *testing.T) {
defer pool.Close()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/dispatcher/mockSchedulerNodes", "return()"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/dispatcher/mockTaskExecutorNodes", "return()"))
dspManager, mgr := MockDispatcherManager(t, ctrl, pool, getTestDispatcherExt(ctrl), nil)
// test no server
task := &proto.Task{ID: 1, Type: proto.TaskTypeExample}
dsp := dspManager.MockDispatcher(task)
dsp.Extension = getTestDispatcherExt(ctrl)
instanceIDs, err := dsp.GetAllSchedulerIDs(ctx, task)
require.Lenf(t, instanceIDs, 0, "GetAllSchedulerIDs when there's no subtask")
instanceIDs, err := dsp.GetAllTaskExecutorIDs(ctx, task)
require.Lenf(t, instanceIDs, 0, "GetAllTaskExecutorIDs when there's no subtask")
require.NoError(t, err)

// test 2 servers
Expand All @@ -187,8 +187,8 @@ func TestGetInstance(t *testing.T) {
Port: 65535,
},
}
instanceIDs, err = dsp.GetAllSchedulerIDs(ctx, task)
require.Lenf(t, instanceIDs, 0, "GetAllSchedulerIDs")
instanceIDs, err = dsp.GetAllTaskExecutorIDs(ctx, task)
require.Lenf(t, instanceIDs, 0, "GetAllTaskExecutorIDs")
require.NoError(t, err)

// server ids: uuid0, uuid1
Expand All @@ -200,7 +200,7 @@ func TestGetInstance(t *testing.T) {
}
err = mgr.CreateSubTask(ctx, task.ID, proto.StepInit, subtask.ExecID, nil, subtask.Type, true)
require.NoError(t, err)
instanceIDs, err = dsp.GetAllSchedulerIDs(ctx, task)
instanceIDs, err = dsp.GetAllTaskExecutorIDs(ctx, task)
require.NoError(t, err)
require.Equal(t, []string{serverIDs[1]}, instanceIDs)
// server ids: uuid0, uuid1
Expand All @@ -212,11 +212,11 @@ func TestGetInstance(t *testing.T) {
}
err = mgr.CreateSubTask(ctx, task.ID, proto.StepInit, subtask.ExecID, nil, subtask.Type, true)
require.NoError(t, err)
instanceIDs, err = dsp.GetAllSchedulerIDs(ctx, task)
instanceIDs, err = dsp.GetAllTaskExecutorIDs(ctx, task)
require.NoError(t, err)
require.Len(t, instanceIDs, len(serverIDs))
require.ElementsMatch(t, instanceIDs, serverIDs)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/dispatcher/mockSchedulerNodes"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/dispatcher/mockTaskExecutorNodes"))
}

func TestTaskFailInManager(t *testing.T) {
Expand Down
Loading

0 comments on commit b21bb3e

Please sign in to comment.