Skip to content

Commit

Permalink
dmctl, master: refine source behaviour in operate-source (pingcap#1587
Browse files Browse the repository at this point in the history
)
  • Loading branch information
lance6716 authored Apr 20, 2021
1 parent 3267da4 commit c488033
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 3 deletions.
6 changes: 6 additions & 0 deletions dm/ctl/master/operate_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ func operateSourceFunc(cmd *cobra.Command, _ []string) error {

contents := make([]string, 0, len(cmd.Flags().Args())-1)
sourceID := make([]string, 0, len(cmd.Flags().Args())-1)
sources, err := common.GetSourceArgs(cmd)
if err != nil {
return err
}
sourceID = append(sourceID, sources...)

for i := 1; i < len(cmd.Flags().Args()); i++ {
arg := cmd.Flags().Arg(i)
var content []byte
Expand Down
36 changes: 34 additions & 2 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1084,6 +1084,18 @@ func parseAndAdjustSourceConfig(ctx context.Context, contents []string) ([]*conf
return cfgs, nil
}

func parseSourceConfig(contents []string) ([]*config.SourceConfig, error) {
cfgs := make([]*config.SourceConfig, len(contents))
for i, content := range contents {
cfg := config.NewSourceConfig()
if err := cfg.ParseYaml(content); err != nil {
return cfgs, err
}
cfgs[i] = cfg
}
return cfgs, nil
}

func adjustTargetDB(ctx context.Context, dbConfig *config.DBConfig) error {
cfg := *dbConfig
if len(cfg.Password) > 0 {
Expand Down Expand Up @@ -1122,7 +1134,17 @@ func (s *Server) OperateSource(ctx context.Context, req *pb.OperateSourceRequest
return resp2, err2
}

cfgs, err := parseAndAdjustSourceConfig(ctx, req.Config)
var (
cfgs []*config.SourceConfig
err error
)
switch req.Op {
case pb.SourceOp_StartSource, pb.SourceOp_UpdateSource:
cfgs, err = parseAndAdjustSourceConfig(ctx, req.Config)
default:
// don't check the upstream connections, because upstream may be inaccessible
cfgs, err = parseSourceConfig(req.Config)
}
resp := &pb.OperateSourceResponse{
Result: false,
}
Expand Down Expand Up @@ -1198,9 +1220,19 @@ func (s *Server) OperateSource(ctx context.Context, req *pb.OperateSourceRequest
}
}
case pb.SourceOp_ShowSource:
for _, id := range s.scheduler.GetSourceCfgIDs() {
for _, id := range req.SourceID {
boundM[id] = s.scheduler.GetWorkerBySource(id)
}
for _, cfg := range cfgs {
id := cfg.SourceID
boundM[id] = s.scheduler.GetWorkerBySource(id)
}

if len(boundM) == 0 {
for _, id := range s.scheduler.GetSourceCfgIDs() {
boundM[id] = s.scheduler.GetWorkerBySource(id)
}
}
default:
resp.Msg = terror.ErrMasterInvalidOperateOp.Generate(req.Op.String(), "source").Error()
return resp, nil
Expand Down
10 changes: 9 additions & 1 deletion tests/dmctl_basic/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,11 @@ function run() {
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1
dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2

# test stop source will not check connectivity to upstream
cp $cur/conf/source1.yaml $WORK_DIR/source1-wrong.yaml
sed '/password/d' $WORK_DIR/source1-wrong.yaml
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"operate-source stop $cur/conf/source1.yaml $SOURCE_ID2" \
"operate-source stop $WORK_DIR/source1-wrong.yaml $SOURCE_ID2" \
"\"result\": true" 3

# ensure source1 is bound to worker1
Expand All @@ -167,6 +170,11 @@ function run() {
'"worker": "worker1"' 1 \
'"worker": "worker2"' 1

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"operate-source show -s $SOURCE_ID1" \
"\"result\": true" 2 \
'"worker": "worker1"' 1

transfer_source_valid $SOURCE_ID1 worker1 # transfer to self
transfer_source_invalid $SOURCE_ID1 worker2

Expand Down

0 comments on commit c488033

Please sign in to comment.