Skip to content

Commit

Permalink
workflow: Implement canary feature. (vitessio#2613)
Browse files Browse the repository at this point in the history
User can control through UI for running a canary task,
then running the remaining tasks. Create unit tests
and manually test the UI in e2e test environment.
  • Loading branch information
wangyipei01 authored Mar 15, 2017
1 parent 0e4edf3 commit 925addf
Show file tree
Hide file tree
Showing 10 changed files with 1,042 additions and 274 deletions.
11 changes: 11 additions & 0 deletions go/vt/workflow/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,17 @@ func (m *Manager) WorkflowForTesting(uuid string) (Workflow, error) {
return rw.workflow, nil
}

// WorkflowInfoForTesting returns the WorkflowInfo object of the running
// workflow identified by uuid. The method is used in unit tests to manipulate
// checkpoint.
func (m *Manager) WorkflowInfoForTesting(uuid string) (*topo.WorkflowInfo, error) {
rw, err := m.runningWorkflow(uuid)
if err != nil {
return nil, err
}
return rw.wi, nil
}

// runningWorkflow returns a runningWorkflow by uuid.
func (m *Manager) runningWorkflow(uuid string) (*runningWorkflow, error) {
m.mu.Lock()
Expand Down
38 changes: 25 additions & 13 deletions go/vt/workflow/resharding/horizontal_resharding_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package resharding
import (
"flag"
"fmt"
"strconv"
"strings"

log "github.com/golang/glog"
Expand Down Expand Up @@ -59,6 +60,7 @@ func (*HorizontalReshardingWorkflowFactory) Init(m *workflow.Manager, w *workflo
subFlags := flag.NewFlagSet(horizontalReshardingFactoryName, flag.ContinueOnError)
keyspace := subFlags.String("keyspace", "", "Name of keyspace to perform horizontal resharding")
vtworkersStr := subFlags.String("vtworkers", "", "A comma-separated list of vtworker addresses")
enableApprovals := subFlags.Bool("enable_approvals", true, "If true, executions of tasks require user's approvals on the UI.")

if err := subFlags.Parse(args); err != nil {
return err
Expand All @@ -75,6 +77,8 @@ func (*HorizontalReshardingWorkflowFactory) Init(m *workflow.Manager, w *workflo
return err
}

checkpoint.Settings["enable_approvals"] = fmt.Sprintf("%v", *enableApprovals)

w.Data, err = proto.Marshal(checkpoint)
if err != nil {
return err
Expand All @@ -91,13 +95,19 @@ func (*HorizontalReshardingWorkflowFactory) Instantiate(m *workflow.Manager, w *
return nil, err
}

enableApprovals, err := strconv.ParseBool(checkpoint.Settings["enable_approvals"])
if err != nil {
return nil, err
}

hw := &HorizontalReshardingWorkflow{
checkpoint: checkpoint,
rootUINode: rootNode,
logger: logutil.NewMemoryLogger(),
wr: wrangler.New(logutil.NewConsoleLogger(), m.TopoServer(), tmclient.NewTabletManagerClient()),
topoServer: m.TopoServer(),
manager: m,
checkpoint: checkpoint,
rootUINode: rootNode,
logger: logutil.NewMemoryLogger(),
wr: wrangler.New(logutil.NewConsoleLogger(), m.TopoServer(), tmclient.NewTabletManagerClient()),
topoServer: m.TopoServer(),
manager: m,
enableApprovals: enableApprovals,
}
copySchemaUINode := &workflow.Node{
Name: "CopySchemaShard",
Expand Down Expand Up @@ -310,6 +320,8 @@ type HorizontalReshardingWorkflow struct {

checkpoint *workflowpb.WorkflowCheckpoint
checkpointWriter *CheckpointWriter

enableApprovals bool
}

// Run executes the horizontal resharding process.
Expand All @@ -330,43 +342,43 @@ func (hw *HorizontalReshardingWorkflow) Run(ctx context.Context, manager *workfl

func (hw *HorizontalReshardingWorkflow) runWorkflow() error {
copySchemaTasks := hw.GetTasks(phaseCopySchema)
copySchemaRunner := NewParallelRunner(hw.ctx, hw.rootUINode, hw.checkpointWriter, copySchemaTasks, hw.runCopySchema, Parallel)
copySchemaRunner := NewParallelRunner(hw.ctx, hw.rootUINode, hw.checkpointWriter, copySchemaTasks, hw.runCopySchema, Parallel, hw.enableApprovals)
if err := copySchemaRunner.Run(); err != nil {
return err
}

cloneTasks := hw.GetTasks(phaseClone)
cloneRunner := NewParallelRunner(hw.ctx, hw.rootUINode, hw.checkpointWriter, cloneTasks, hw.runSplitClone, Parallel)
cloneRunner := NewParallelRunner(hw.ctx, hw.rootUINode, hw.checkpointWriter, cloneTasks, hw.runSplitClone, Parallel, hw.enableApprovals)
if err := cloneRunner.Run(); err != nil {
return err
}

waitForFilteredReplicationTasks := hw.GetTasks(phaseWaitForFilteredReplication)
waitForFilteredReplicationRunner := NewParallelRunner(hw.ctx, hw.rootUINode, hw.checkpointWriter, waitForFilteredReplicationTasks, hw.runWaitForFilteredReplication, Parallel)
waitForFilteredReplicationRunner := NewParallelRunner(hw.ctx, hw.rootUINode, hw.checkpointWriter, waitForFilteredReplicationTasks, hw.runWaitForFilteredReplication, Parallel, hw.enableApprovals)
if err := waitForFilteredReplicationRunner.Run(); err != nil {
return err
}

diffTasks := hw.GetTasks(phaseDiff)
diffRunner := NewParallelRunner(hw.ctx, hw.rootUINode, hw.checkpointWriter, diffTasks, hw.runSplitDiff, Sequential)
diffRunner := NewParallelRunner(hw.ctx, hw.rootUINode, hw.checkpointWriter, diffTasks, hw.runSplitDiff, Sequential, hw.enableApprovals)
if err := diffRunner.Run(); err != nil {
return err
}

migrateRdonlyTasks := hw.GetTasks(phaseMigrateRdonly)
migrateRdonlyRunner := NewParallelRunner(hw.ctx, hw.rootUINode, hw.checkpointWriter, migrateRdonlyTasks, hw.runMigrate, Sequential)
migrateRdonlyRunner := NewParallelRunner(hw.ctx, hw.rootUINode, hw.checkpointWriter, migrateRdonlyTasks, hw.runMigrate, Sequential, hw.enableApprovals)
if err := migrateRdonlyRunner.Run(); err != nil {
return err
}

migrateReplicaTasks := hw.GetTasks(phaseMigrateReplica)
migrateReplicaRunner := NewParallelRunner(hw.ctx, hw.rootUINode, hw.checkpointWriter, migrateReplicaTasks, hw.runMigrate, Sequential)
migrateReplicaRunner := NewParallelRunner(hw.ctx, hw.rootUINode, hw.checkpointWriter, migrateReplicaTasks, hw.runMigrate, Sequential, hw.enableApprovals)
if err := migrateReplicaRunner.Run(); err != nil {
return err
}

migrateMasterTasks := hw.GetTasks(phaseMigrateMaster)
migrateMasterRunner := NewParallelRunner(hw.ctx, hw.rootUINode, hw.checkpointWriter, migrateMasterTasks, hw.runMigrate, Sequential)
migrateMasterRunner := NewParallelRunner(hw.ctx, hw.rootUINode, hw.checkpointWriter, migrateMasterTasks, hw.runMigrate, Sequential, hw.enableApprovals)
if err := migrateMasterRunner.Run(); err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestHorizontalResharding(t *testing.T) {
// Run the manager in the background.
wg, _, cancel := startManager(m)
// Create the workflow.
uuid, err := m.Create(ctx, horizontalReshardingFactoryName, []string{"-keyspace=" + testKeyspace, "-vtworkers=" + testVtworkers})
uuid, err := m.Create(ctx, horizontalReshardingFactoryName, []string{"-keyspace=" + testKeyspace, "-vtworkers=" + testVtworkers, "-enable_approvals=false"})
if err != nil {
t.Fatalf("cannot create resharding workflow: %v", err)
}
Expand Down
Loading

0 comments on commit 925addf

Please sign in to comment.