From c4880338bbb26ce075ad6651402623764ec2eead Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 20 Apr 2021 15:07:53 +0800 Subject: [PATCH] dmctl, master: refine source behaviour in `operate-source` (#1587) --- dm/ctl/master/operate_source.go | 6 ++++++ dm/master/server.go | 36 +++++++++++++++++++++++++++++++-- tests/dmctl_basic/run.sh | 10 ++++++++- 3 files changed, 49 insertions(+), 3 deletions(-) diff --git a/dm/ctl/master/operate_source.go b/dm/ctl/master/operate_source.go index 8110b2ca33..29e1d1c660 100644 --- a/dm/ctl/master/operate_source.go +++ b/dm/ctl/master/operate_source.go @@ -96,6 +96,12 @@ func operateSourceFunc(cmd *cobra.Command, _ []string) error { contents := make([]string, 0, len(cmd.Flags().Args())-1) sourceID := make([]string, 0, len(cmd.Flags().Args())-1) + sources, err := common.GetSourceArgs(cmd) + if err != nil { + return err + } + sourceID = append(sourceID, sources...) + for i := 1; i < len(cmd.Flags().Args()); i++ { arg := cmd.Flags().Arg(i) var content []byte diff --git a/dm/master/server.go b/dm/master/server.go index 06d2086046..fce53e6f15 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -1084,6 +1084,18 @@ func parseAndAdjustSourceConfig(ctx context.Context, contents []string) ([]*conf return cfgs, nil } +func parseSourceConfig(contents []string) ([]*config.SourceConfig, error) { + cfgs := make([]*config.SourceConfig, len(contents)) + for i, content := range contents { + cfg := config.NewSourceConfig() + if err := cfg.ParseYaml(content); err != nil { + return cfgs, err + } + cfgs[i] = cfg + } + return cfgs, nil +} + func adjustTargetDB(ctx context.Context, dbConfig *config.DBConfig) error { cfg := *dbConfig if len(cfg.Password) > 0 { @@ -1122,7 +1134,17 @@ func (s *Server) OperateSource(ctx context.Context, req *pb.OperateSourceRequest return resp2, err2 } - cfgs, err := parseAndAdjustSourceConfig(ctx, req.Config) + var ( + cfgs []*config.SourceConfig + err error + ) + switch req.Op { + case pb.SourceOp_StartSource, pb.SourceOp_UpdateSource: + cfgs, err = parseAndAdjustSourceConfig(ctx, req.Config) + default: + // don't check the upstream connections, because upstream may be inaccessible + cfgs, err = parseSourceConfig(req.Config) + } resp := &pb.OperateSourceResponse{ Result: false, } @@ -1198,9 +1220,19 @@ func (s *Server) OperateSource(ctx context.Context, req *pb.OperateSourceRequest } } case pb.SourceOp_ShowSource: - for _, id := range s.scheduler.GetSourceCfgIDs() { + for _, id := range req.SourceID { boundM[id] = s.scheduler.GetWorkerBySource(id) } + for _, cfg := range cfgs { + id := cfg.SourceID + boundM[id] = s.scheduler.GetWorkerBySource(id) + } + + if len(boundM) == 0 { + for _, id := range s.scheduler.GetSourceCfgIDs() { + boundM[id] = s.scheduler.GetWorkerBySource(id) + } + } default: resp.Msg = terror.ErrMasterInvalidOperateOp.Generate(req.Op.String(), "source").Error() return resp, nil diff --git a/tests/dmctl_basic/run.sh b/tests/dmctl_basic/run.sh index c74e0e4d90..60982a6d93 100755 --- a/tests/dmctl_basic/run.sh +++ b/tests/dmctl_basic/run.sh @@ -141,8 +141,11 @@ function run() { dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 + # test stop source will not check connectivity to upstream + cp $cur/conf/source1.yaml $WORK_DIR/source1-wrong.yaml + sed '/password/d' $WORK_DIR/source1-wrong.yaml run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "operate-source stop $cur/conf/source1.yaml $SOURCE_ID2" \ + "operate-source stop $WORK_DIR/source1-wrong.yaml $SOURCE_ID2" \ "\"result\": true" 3 # ensure source1 is bound to worker1 @@ -167,6 +170,11 @@ function run() { '"worker": "worker1"' 1 \ '"worker": "worker2"' 1 + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "operate-source show -s $SOURCE_ID1" \ + "\"result\": true" 2 \ + '"worker": "worker1"' 1 + transfer_source_valid $SOURCE_ID1 worker1 # transfer to self transfer_source_invalid $SOURCE_ID1 worker2