Skip to content

Commit

Permalink
Minor cleanup, some comments
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps committed Dec 26, 2020
1 parent 2157e91 commit a40c31f
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 39 deletions.
37 changes: 25 additions & 12 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1953,6 +1953,19 @@ func commandMoveTables(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
return wr.MoveTables(ctx, *workflow, source, target, tableSpecs, *cells, *tabletTypes, *allTables, *excludes)
}

// VReplicationWorkflowAction defines subcommands passed to vtctl for movetables or reshard
type VReplicationWorkflowAction string

const (
vReplicationWorkflowActionStart = "start"
vReplicationWorkflowActionSwitchTraffic = "switchtraffic"
vReplicationWorkflowActionReverseTraffic = "reversetraffic"
vReplicationWorkflowActionComplete = "complete"
vReplicationWorkflowActionAbort = "abort"
vReplicationWorkflowActionShow = "show"
vReplicationWorkflowActionProgress = "progress"
)

func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string,
workflowType wrangler.VReplicationWorkflowType) error {

Expand Down Expand Up @@ -2039,7 +2052,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
originalAction := action
action = strings.ToLower(action) // allow users to input action in a case-insensitive manner
switch action {
case wrangler.VReplicationWorkflowActionStart:
case vReplicationWorkflowActionStart:
switch workflowType {
case wrangler.MoveTablesWorkflow:
if *sourceKeyspace == "" {
Expand Down Expand Up @@ -2068,14 +2081,14 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
}
vrwp.Cells = *cells
vrwp.TabletTypes = *tabletTypes
case wrangler.VReplicationWorkflowActionSwitchTraffic, wrangler.VReplicationWorkflowActionReverseTraffic:
case vReplicationWorkflowActionSwitchTraffic, vReplicationWorkflowActionReverseTraffic:
vrwp.Cells = *cells
vrwp.TabletTypes = *tabletTypes
vrwp.Timeout = *timeout
vrwp.EnableReverseReplication = *reverseReplication
case wrangler.VReplicationWorkflowActionAbort:
case vReplicationWorkflowActionAbort:
vrwp.KeepData = *keepData
case wrangler.VReplicationWorkflowActionComplete:
case vReplicationWorkflowActionComplete:
switch workflowType {
case wrangler.MoveTablesWorkflow:
vrwp.RenameTables = *renameTables
Expand All @@ -2091,7 +2104,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
log.Warningf("NewVReplicationWorkflow returned error %+v", wf)
return err
}
if !wf.Exists() && action != wrangler.VReplicationWorkflowActionStart {
if !wf.Exists() && action != vReplicationWorkflowActionStart {
return fmt.Errorf("workflow %s does not exist", ksWorkflow)
}

Expand Down Expand Up @@ -2127,19 +2140,19 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
startState := wf.CachedState()
wr.Logger().Printf("\nCachedState: %s\n", startState)
switch action {
case wrangler.VReplicationWorkflowActionShow:
case vReplicationWorkflowActionShow:
return printDetails()
case wrangler.VReplicationWorkflowActionProgress:
case vReplicationWorkflowActionProgress:
return printCopyProgress()
case wrangler.VReplicationWorkflowActionStart:
case vReplicationWorkflowActionStart:
err = wf.Start()
case wrangler.VReplicationWorkflowActionSwitchTraffic:
case vReplicationWorkflowActionSwitchTraffic:
err = wf.SwitchTraffic(wrangler.DirectionForward)
case wrangler.VReplicationWorkflowActionReverseTraffic:
case vReplicationWorkflowActionReverseTraffic:
err = wf.ReverseTraffic()
case wrangler.VReplicationWorkflowActionComplete:
case vReplicationWorkflowActionComplete:
err = wf.Complete()
case wrangler.VReplicationWorkflowActionAbort:
case vReplicationWorkflowActionAbort:
err = wf.Abort()
default:
return fmt.Errorf("found unsupported action %s", originalAction)
Expand Down
15 changes: 14 additions & 1 deletion go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ type workflowState struct {
WritesSwitched bool
}

// For a Reshard, to check whether we have switched reads for a tablet type, we check if any one of the source shards has
// the query service disabled in its tablet control record
func (wr *Wrangler) getCellsWithShardReadsSwitched(ctx context.Context, targetKeyspace string, si *topo.ShardInfo, tabletType string) (
cellsSwitched, cellsNotSwitched []string, err error) {

Expand All @@ -168,12 +170,17 @@ func (wr *Wrangler) getCellsWithShardReadsSwitched(ctx context.Context, targetKe
if !strings.EqualFold(partition.GetServedType().String(), tabletType) {
continue
}

// If reads and writes are both switched it is possible that the shard is not in the partition table
for _, shardReference := range partition.GetShardReferences() {
if key.KeyRangeEqual(shardReference.GetKeyRange(), si.GetKeyRange()) {
found = true
break
}
}

// It is possible that there are no tablet controls if the target shards are not yet serving
// or once reads and writes are both switched,
if len(partition.GetShardTabletControls()) == 0 {
noControls = true
break
Expand All @@ -196,6 +203,8 @@ func (wr *Wrangler) getCellsWithShardReadsSwitched(ctx context.Context, targetKe
return cellsSwitched, cellsNotSwitched, nil
}

// For MoveTables, to check whether we have switched reads for a tablet type, we check whether the routing rule
// for the tablet_type is pointing to the target keyspace
func (wr *Wrangler) getCellsWithTableReadsSwitched(ctx context.Context, targetKeyspace, table, tabletType string) (
cellsSwitched, cellsNotSwitched []string, err error) {

Expand Down Expand Up @@ -257,6 +266,10 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl
var cellsSwitched, cellsNotSwitched []string
var keyspace string
var reverse bool

// we reverse writes by using the source_keyspace.workflowname_reverse workflow spec, so we need to use the
// source of the reverse workflow, which is the target of the workflow initiated by the user for checking routing rules
// Similarly we use a target shard of the reverse workflow as the original source to check if writes have been switched
if strings.HasSuffix(workflow, "_reverse") {
reverse = true
keyspace = ws.SourceKeyspace
Expand Down Expand Up @@ -352,7 +365,7 @@ func (wr *Wrangler) SwitchReads(ctx context.Context, targetKeyspace, workflow st
}
}

//If journals exist notify user and fail
// If journals exist notify user and fail
journalsExist, _, err := ts.checkJournals(ctx)
if err != nil {
wr.Logger().Errorf("checkJournals failed: %v", err)
Expand Down
34 changes: 8 additions & 26 deletions go/vt/wrangler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@ import (
(CurrentState())
* dry run
* implement/test Reshard same as MoveTables!
VReplicationWorkflow as common to both MoveTables/Reshard
*/

// VReplicationWorkflowType specifies the switching direction.
// VReplicationWorkflowType specifies whether workflow is MoveTables or Reshard
type VReplicationWorkflowType int

const (
Expand All @@ -37,19 +35,6 @@ const (
ReshardWorkflow
)

// VReplicationWorkflowAction defines subcommands passed to vtctl for movetables or reshard
type VReplicationWorkflowAction string

const (
VReplicationWorkflowActionStart = "start"
VReplicationWorkflowActionSwitchTraffic = "switchtraffic"
VReplicationWorkflowActionReverseTraffic = "reversetraffic"
VReplicationWorkflowActionComplete = "complete"
VReplicationWorkflowActionAbort = "abort"
VReplicationWorkflowActionShow = "show"
VReplicationWorkflowActionProgress = "progress"
)

// region Move Tables Public API

// VReplicationWorkflow stores various internal objects for a workflow
Expand Down Expand Up @@ -121,15 +106,13 @@ func (vrw *VReplicationWorkflow) CurrentState() string {
return vrw.stateAsString(ws)
}

// CachedState returns a human readable workflow state
// CachedState returns a human readable workflow state at the time the workflow was created
func (vrw *VReplicationWorkflow) CachedState() string {
return vrw.stateAsString(vrw.ws)
}

// Exists checks if the workflow has already been initiated
func (vrw *VReplicationWorkflow) Exists() bool {
log.Infof("vrw %+v", *vrw)

return vrw.ws != nil
}

Expand Down Expand Up @@ -229,7 +212,8 @@ func (vrw *VReplicationWorkflow) Complete() error {
} else {
renameTable = DropTable
}
if _, err := vrw.wr.DropSources(vrw.ctx, vrw.ws.TargetKeyspace, vrw.ws.Workflow, renameTable, vrw.params.KeepData, false, false); err != nil {
if _, err := vrw.wr.DropSources(vrw.ctx, vrw.ws.TargetKeyspace, vrw.ws.Workflow, renameTable, vrw.params.KeepData,
false, false); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -291,14 +275,14 @@ func (vrw *VReplicationWorkflow) parseTabletTypes() (hasReplica, hasRdonly, hasM

func (vrw *VReplicationWorkflow) initMoveTables() error {
log.Infof("In VReplicationWorkflow.initMoveTables() for %+v", vrw)
return vrw.wr.MoveTables(vrw.ctx, vrw.params.Workflow, vrw.params.SourceKeyspace, vrw.params.TargetKeyspace, vrw.params.Tables,
vrw.params.Cells, vrw.params.TabletTypes, vrw.params.AllTables, vrw.params.ExcludeTables)
return vrw.wr.MoveTables(vrw.ctx, vrw.params.Workflow, vrw.params.SourceKeyspace, vrw.params.TargetKeyspace,
vrw.params.Tables, vrw.params.Cells, vrw.params.TabletTypes, vrw.params.AllTables, vrw.params.ExcludeTables)
}

func (vrw *VReplicationWorkflow) initReshard() error {
log.Infof("In VReplicationWorkflow.initReshard() for %+v", vrw)
return vrw.wr.Reshard(vrw.ctx, vrw.params.TargetKeyspace, vrw.params.Workflow, vrw.params.SourceShards, vrw.params.TargetShards,
vrw.params.SkipSchemaCopy, vrw.params.Cells, vrw.params.TabletTypes)
return vrw.wr.Reshard(vrw.ctx, vrw.params.TargetKeyspace, vrw.params.Workflow, vrw.params.SourceShards,
vrw.params.TargetShards, vrw.params.SkipSchemaCopy, vrw.params.Cells, vrw.params.TabletTypes)
}

func (vrw *VReplicationWorkflow) switchReads() error {
Expand Down Expand Up @@ -437,7 +421,6 @@ func (vrw *VReplicationWorkflow) GetCopyProgress() (*CopyProgress, error) {
}

query := fmt.Sprintf(getRowCountQuery, encodeString(targetDbName), tableList)
log.Infof("query is %s", query)
for _, target := range vrw.ts.targets {
tablet := target.master.Tablet
if err := getTableMetrics(tablet, query, &targetRowCounts, &targetTableSizes); err != nil {
Expand All @@ -446,7 +429,6 @@ func (vrw *VReplicationWorkflow) GetCopyProgress() (*CopyProgress, error) {
}

query = fmt.Sprintf(getRowCountQuery, encodeString(sourceDbName), tableList)
log.Infof("query is %s", query)
for source := range sourceMasters {
ti, err := vrw.wr.ts.GetTablet(ctx, source)
tablet := ti.Tablet
Expand Down

0 comments on commit a40c31f

Please sign in to comment.