Skip to content

Commit

Permalink
SwitchWrites bug: reverse replication workflows can have wrong start …
Browse files Browse the repository at this point in the history
…positions

Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps committed Dec 13, 2020
1 parent d603a44 commit e43516d
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 34 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ require (
github.com/martini-contrib/render v0.0.0-20150707142108-ec18f8345a11
github.com/mattn/go-sqlite3 v1.14.0
github.com/minio/minio-go v0.0.0-20190131015406-c8a261de75c1
github.com/mitchellh/go-ps v1.0.0 // indirect
github.com/mitchellh/go-testing-interface v1.14.0 // indirect
github.com/mitchellh/mapstructure v1.2.3 // indirect
github.com/montanaflynn/stats v0.6.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,8 @@ github.com/mitchellh/cli v1.1.0/go.mod h1:xcISNoH86gajksDmfB23e/pu+B+GeFRMYmoHXx
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-ps v1.0.0 h1:i6ampVEEF4wQFF+bkYfwYgY+F/uYJDktmvLPf7qIgjc=
github.com/mitchellh/go-ps v1.0.0/go.mod h1:J4lOc8z8yJs6vUwklHw2XEIiT4z4C40KtWVN3nvg8Pg=
github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
github.com/mitchellh/go-testing-interface v1.14.0 h1:/x0XQ6h+3U3nAyk1yx+bHPURrKa9sVVvYbuqZ7pIAtI=
github.com/mitchellh/go-testing-interface v1.14.0/go.mod h1:gfgS7OtZj6MA4U1UrDRp04twqAjfvlZyCfX3sDjEym8=
Expand Down
29 changes: 16 additions & 13 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,10 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl

insertQuery2 = "insert into customer(name, cid) values('tempCustomer4', 102)" //ID 102, hence due to reverse_bits in shard -80
require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab1, "customer", insertQuery2, matchInsertQuery2))
time.Sleep(1 * time.Second) // wait for vreplication to catchup

reverseKsWorkflow := "product.p2c_reverse"
if testReverse {
//Reverse Replicate
switchReads(t, allCellNames, reverseKsWorkflow)
printShardPositions(vc, ksShards)
switchWrites(t, reverseKsWorkflow)

insertQuery1 = "insert into customer(cid, name) values(1002, 'tempCustomer5')"
Expand All @@ -249,11 +246,10 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
insertQuery1 = "insert into customer(cid, name) values(1004, 'tempCustomer7')"
require.False(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab2, "customer", insertQuery1, matchInsertQuery1))

time.Sleep(1 * time.Second) // wait for vreplication to catchup

//Go forward again
switchReads(t, allCellNames, ksWorkflow)
switchWrites(t, ksWorkflow)

dropSourcesDryRun(t, ksWorkflow, false, dryRunResultsDropSourcesDropCustomerShard)
dropSourcesDryRun(t, ksWorkflow, true, dryRunResultsDropSourcesRenameCustomerShard)

Expand Down Expand Up @@ -653,15 +649,11 @@ func switchWritesDryRun(t *testing.T, ksWorkflow string, dryRunResults []string)
validateDryRunResults(t, output, dryRunResults)
}

func switchWrites(t *testing.T, ksWorkflow string) {
const SwitchWritesTimeout = "91s" // max: 3 tablet picker 30s waits + 1
output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchWrites",
"-filtered_replication_wait_time="+SwitchWritesTimeout, ksWorkflow)

func printSwitchWritesExtraDebug(t *testing.T, ksWorkflow, msg string) {
// Temporary code: print lots of info for debugging occasional flaky failures in customer reshard in CI for multicell test
debug := true
if debug && strings.Contains(ksWorkflow, ".p2c") {
fmt.Printf("------------------- START Extra debug info for a p2c SwitchWrites\n")
if debug {
fmt.Printf("------------------- START Extra debug info %s SwitchWrites %s\n", msg, ksWorkflow)
ksShards := []string{"product/0", "customer/-80", "customer/80-"}
printShardPositions(vc, ksShards)
custKs := vc.Cells[defaultCell.Name].Keyspaces["customer"]
Expand All @@ -683,8 +675,19 @@ func switchWrites(t *testing.T, ksWorkflow string) {
tab.Cell, tab.Keyspace, tab.Shard, tab.TabletUID, query, qr.Rows)
}
}
fmt.Printf("------------------- END Extra debug info for a p2c SwitchWrites\n")
fmt.Printf("------------------- END Extra debug info %s SwitchWrites %s\n", msg, ksWorkflow)
}
}

func switchWrites(t *testing.T, ksWorkflow string) {
const SwitchWritesTimeout = "91s" // max: 3 tablet picker 30s waits + 1
output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchWrites",
"-filtered_replication_wait_time="+SwitchWritesTimeout, ksWorkflow)
if output != "" {
fmt.Printf("Output of SwitchWrites for %s:\n++++++\n%s\n--------\n", ksWorkflow, output)
}
//printSwitchWritesExtraDebug is useful when debugging failures in SwitchWrites due to corner cases/races
_ = printSwitchWritesExtraDebug
if err != nil {
require.FailNow(t, fmt.Sprintf("SwitchWrites Error: %s: %s", err, output))
}
Expand Down
40 changes: 19 additions & 21 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,8 @@ func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error {
return ts.forAllSources(func(source *tsSource) error {
var err error
source.position, err = ts.wr.tmc.MasterPosition(ctx, source.master.Tablet)
ts.wr.Logger().Infof("Position for source %v:%v: %v", ts.sourceKeyspace, source.si.ShardName(), source.position)
ts.wr.Logger().Infof("Stopped Source Writes. Position for source %v:%v: %v",
ts.sourceKeyspace, source.si.ShardName(), source.position)
if err != nil {
log.Warningf("Error: %s", err)
}
Expand All @@ -773,39 +774,34 @@ func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access a
func (ts *trafficSwitcher) waitForCatchup(ctx context.Context, filteredReplicationWaitTime time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, filteredReplicationWaitTime)
defer cancel()

var mu sync.Mutex
return ts.forAllUids(func(target *tsTarget, uid uint32) error {
ts.wr.Logger().Infof("uid: %d, target master %s, target position %s, shard %s", uid,
target.master.AliasString(), target.position, target.si.String())
log.Infof("uid: %d, target master %s, target position %s, shard %s", uid,
// source writes have been stopped, wait for all streams on targets to catch up
if err := ts.forAllUids(func(target *tsTarget, uid uint32) error {
ts.wr.Logger().Infof("Before Catchup: uid: %d, target master %s, target position %s, shard %s", uid,
target.master.AliasString(), target.position, target.si.String())
bls := target.sources[uid]
source := ts.sources[bls.Shard]
ts.wr.Logger().Infof("waiting for keyspace:shard: %v:%v, source position %v, uid %d",
ts.wr.Logger().Infof("Before Catchup: waiting for keyspace:shard: %v:%v to reach source position %v, uid %d",
ts.targetKeyspace, target.si.ShardName(), source.position, uid)
if err := ts.wr.tmc.VReplicationWaitForPos(ctx, target.master.Tablet, int(uid), source.position); err != nil {
return err
}
log.Infof("waiting for keyspace:shard: %v:%v, source position %v, uid %d",
log.Infof("After catchup: target keyspace:shard: %v:%v, source position %v, uid %d",
ts.targetKeyspace, target.si.ShardName(), source.position, uid)
ts.wr.Logger().Infof("position for keyspace:shard: %v:%v reached, uid %d", ts.targetKeyspace, target.si.ShardName(), uid)
log.Infof("position for keyspace:shard: %v:%v reached, uid %d", ts.targetKeyspace, target.si.ShardName(), uid)
ts.wr.Logger().Infof("After catchup: position for keyspace:shard: %v:%v reached, uid %d",
ts.targetKeyspace, target.si.ShardName(), uid)
if _, err := ts.wr.tmc.VReplicationExec(ctx, target.master.Tablet, binlogplayer.StopVReplication(uid, "stopped for cutover")); err != nil {
log.Infof("error marking stopped for cutover on %s, uid %d", target.master.AliasString(), uid)
return err
}

// Need lock because a target can have multiple uids.
mu.Lock()
defer mu.Unlock()
if target.position != "" {
return nil
}
return nil
}); err != nil {
return err
}
// all targets have caught up, record their positions for setting up reverse workflows
return ts.forAllTargets(func(target *tsTarget) error {
var err error
target.position, err = ts.wr.tmc.MasterPosition(ctx, target.master.Tablet)
ts.wr.Logger().Infof("Position for target master %s, uid %v: %v", target.master.AliasString(), uid, target.position)
log.Infof("Position for target master %s, uid %v: %v", target.master.AliasString(), uid, target.position)
ts.wr.Logger().Infof("After catchup, position for target master %s, %v", target.master.AliasString(), target.position)
return err
})
}
Expand Down Expand Up @@ -898,7 +894,8 @@ func (ts *trafficSwitcher) createReverseVReplication(ctx context.Context) error
Filter: filter,
})
}

log.Infof("Creating reverse workflow vreplication stream on tablet %s: workflow %s, startPos %s",
source.master.Alias, ts.reverseWorkflow, target.position)
_, err := ts.wr.VReplicationExec(ctx, source.master.Alias, binlogplayer.CreateVReplicationState(ts.reverseWorkflow, reverseBls, target.position, binlogplayer.BlpStopped, source.master.DbName()))
if err != nil {
return err
Expand All @@ -907,6 +904,7 @@ func (ts *trafficSwitcher) createReverseVReplication(ctx context.Context) error
// if user has defined the cell/tablet_types parameters in the forward workflow, update the reverse workflow as well
updateQuery := ts.getReverseVReplicationUpdateQuery(target.master.Alias.Cell, source.master.Alias.Cell, source.master.DbName())
if updateQuery != "" {
log.Infof("Updating vreplication stream entry on %s with: %s", source.master.Alias, updateQuery)
_, err = ts.wr.VReplicationExec(ctx, source.master.Alias, updateQuery)
return err
}
Expand Down

0 comments on commit e43516d

Please sign in to comment.