From e43516dddd81a9a6ef46ea4f88325403a2f762d5 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sun, 13 Dec 2020 14:02:22 +0100 Subject: [PATCH] SwitchWrites bug: reverse replication workflows can have wrong start positions Signed-off-by: Rohit Nayak --- go.mod | 1 + go.sum | 2 + .../vreplication/vreplication_test.go | 29 ++++++++------ go/vt/wrangler/traffic_switcher.go | 40 +++++++++---------- 4 files changed, 38 insertions(+), 34 deletions(-) diff --git a/go.mod b/go.mod index c044a0fbaf6..d5e6020f8da 100644 --- a/go.mod +++ b/go.mod @@ -58,6 +58,7 @@ require ( github.com/martini-contrib/render v0.0.0-20150707142108-ec18f8345a11 github.com/mattn/go-sqlite3 v1.14.0 github.com/minio/minio-go v0.0.0-20190131015406-c8a261de75c1 + github.com/mitchellh/go-ps v1.0.0 // indirect github.com/mitchellh/go-testing-interface v1.14.0 // indirect github.com/mitchellh/mapstructure v1.2.3 // indirect github.com/montanaflynn/stats v0.6.3 diff --git a/go.sum b/go.sum index 9b71e7010e0..6b1485cb833 100644 --- a/go.sum +++ b/go.sum @@ -476,6 +476,8 @@ github.com/mitchellh/cli v1.1.0/go.mod h1:xcISNoH86gajksDmfB23e/pu+B+GeFRMYmoHXx github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/go-ps v1.0.0 h1:i6ampVEEF4wQFF+bkYfwYgY+F/uYJDktmvLPf7qIgjc= +github.com/mitchellh/go-ps v1.0.0/go.mod h1:J4lOc8z8yJs6vUwklHw2XEIiT4z4C40KtWVN3nvg8Pg= github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= github.com/mitchellh/go-testing-interface v1.14.0 h1:/x0XQ6h+3U3nAyk1yx+bHPURrKa9sVVvYbuqZ7pIAtI= github.com/mitchellh/go-testing-interface v1.14.0/go.mod h1:gfgS7OtZj6MA4U1UrDRp04twqAjfvlZyCfX3sDjEym8= diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 5cfc865f7bd..43aaecc0b85 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -232,13 +232,10 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl insertQuery2 = "insert into customer(name, cid) values('tempCustomer4', 102)" //ID 102, hence due to reverse_bits in shard -80 require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab1, "customer", insertQuery2, matchInsertQuery2)) - time.Sleep(1 * time.Second) // wait for vreplication to catchup - reverseKsWorkflow := "product.p2c_reverse" if testReverse { //Reverse Replicate switchReads(t, allCellNames, reverseKsWorkflow) - printShardPositions(vc, ksShards) switchWrites(t, reverseKsWorkflow) insertQuery1 = "insert into customer(cid, name) values(1002, 'tempCustomer5')" @@ -249,11 +246,10 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl insertQuery1 = "insert into customer(cid, name) values(1004, 'tempCustomer7')" require.False(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab2, "customer", insertQuery1, matchInsertQuery1)) - time.Sleep(1 * time.Second) // wait for vreplication to catchup - //Go forward again switchReads(t, allCellNames, ksWorkflow) switchWrites(t, ksWorkflow) + dropSourcesDryRun(t, ksWorkflow, false, dryRunResultsDropSourcesDropCustomerShard) dropSourcesDryRun(t, ksWorkflow, true, dryRunResultsDropSourcesRenameCustomerShard) @@ -653,15 +649,11 @@ func switchWritesDryRun(t *testing.T, ksWorkflow string, dryRunResults []string) validateDryRunResults(t, output, dryRunResults) } -func switchWrites(t *testing.T, ksWorkflow string) { - const SwitchWritesTimeout = "91s" // max: 3 tablet picker 30s waits + 1 - output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchWrites", - "-filtered_replication_wait_time="+SwitchWritesTimeout, ksWorkflow) - +func printSwitchWritesExtraDebug(t *testing.T, ksWorkflow, msg string) { // Temporary code: print lots of info for debugging occasional flaky failures in customer reshard in CI for multicell test debug := true - if debug && strings.Contains(ksWorkflow, ".p2c") { - fmt.Printf("------------------- START Extra debug info for a p2c SwitchWrites\n") + if debug { + fmt.Printf("------------------- START Extra debug info %s SwitchWrites %s\n", msg, ksWorkflow) ksShards := []string{"product/0", "customer/-80", "customer/80-"} printShardPositions(vc, ksShards) custKs := vc.Cells[defaultCell.Name].Keyspaces["customer"] @@ -683,8 +675,19 @@ func switchWrites(t *testing.T, ksWorkflow string) { tab.Cell, tab.Keyspace, tab.Shard, tab.TabletUID, query, qr.Rows) } } - fmt.Printf("------------------- END Extra debug info for a p2c SwitchWrites\n") + fmt.Printf("------------------- END Extra debug info %s SwitchWrites %s\n", msg, ksWorkflow) + } +} + +func switchWrites(t *testing.T, ksWorkflow string) { + const SwitchWritesTimeout = "91s" // max: 3 tablet picker 30s waits + 1 + output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchWrites", + "-filtered_replication_wait_time="+SwitchWritesTimeout, ksWorkflow) + if output != "" { + fmt.Printf("Output of SwitchWrites for %s:\n++++++\n%s\n--------\n", ksWorkflow, output) } + //printSwitchWritesExtraDebug is useful when debugging failures in SwitchWrites due to corner cases/races + _ = printSwitchWritesExtraDebug if err != nil { require.FailNow(t, fmt.Sprintf("SwitchWrites Error: %s: %s", err, output)) } diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index d800d44a06f..a1ed6bc447e 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -751,7 +751,8 @@ func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error { return ts.forAllSources(func(source *tsSource) error { var err error source.position, err = ts.wr.tmc.MasterPosition(ctx, source.master.Tablet) - ts.wr.Logger().Infof("Position for source %v:%v: %v", ts.sourceKeyspace, source.si.ShardName(), source.position) + ts.wr.Logger().Infof("Stopped Source Writes. Position for source %v:%v: %v", + ts.sourceKeyspace, source.si.ShardName(), source.position) if err != nil { log.Warningf("Error: %s", err) } @@ -773,39 +774,34 @@ func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access a func (ts *trafficSwitcher) waitForCatchup(ctx context.Context, filteredReplicationWaitTime time.Duration) error { ctx, cancel := context.WithTimeout(ctx, filteredReplicationWaitTime) defer cancel() - - var mu sync.Mutex - return ts.forAllUids(func(target *tsTarget, uid uint32) error { - ts.wr.Logger().Infof("uid: %d, target master %s, target position %s, shard %s", uid, - target.master.AliasString(), target.position, target.si.String()) - log.Infof("uid: %d, target master %s, target position %s, shard %s", uid, + // source writes have been stopped, wait for all streams on targets to catch up + if err := ts.forAllUids(func(target *tsTarget, uid uint32) error { + ts.wr.Logger().Infof("Before Catchup: uid: %d, target master %s, target position %s, shard %s", uid, target.master.AliasString(), target.position, target.si.String()) bls := target.sources[uid] source := ts.sources[bls.Shard] - ts.wr.Logger().Infof("waiting for keyspace:shard: %v:%v, source position %v, uid %d", + ts.wr.Logger().Infof("Before Catchup: waiting for keyspace:shard: %v:%v to reach source position %v, uid %d", ts.targetKeyspace, target.si.ShardName(), source.position, uid) if err := ts.wr.tmc.VReplicationWaitForPos(ctx, target.master.Tablet, int(uid), source.position); err != nil { return err } - log.Infof("waiting for keyspace:shard: %v:%v, source position %v, uid %d", + log.Infof("After catchup: target keyspace:shard: %v:%v, source position %v, uid %d", ts.targetKeyspace, target.si.ShardName(), source.position, uid) - ts.wr.Logger().Infof("position for keyspace:shard: %v:%v reached, uid %d", ts.targetKeyspace, target.si.ShardName(), uid) - log.Infof("position for keyspace:shard: %v:%v reached, uid %d", ts.targetKeyspace, target.si.ShardName(), uid) + ts.wr.Logger().Infof("After catchup: position for keyspace:shard: %v:%v reached, uid %d", + ts.targetKeyspace, target.si.ShardName(), uid) if _, err := ts.wr.tmc.VReplicationExec(ctx, target.master.Tablet, binlogplayer.StopVReplication(uid, "stopped for cutover")); err != nil { log.Infof("error marking stopped for cutover on %s, uid %d", target.master.AliasString(), uid) return err } - - // Need lock because a target can have multiple uids. - mu.Lock() - defer mu.Unlock() - if target.position != "" { - return nil - } + return nil + }); err != nil { + return err + } + // all targets have caught up, record their positions for setting up reverse workflows + return ts.forAllTargets(func(target *tsTarget) error { var err error target.position, err = ts.wr.tmc.MasterPosition(ctx, target.master.Tablet) - ts.wr.Logger().Infof("Position for target master %s, uid %v: %v", target.master.AliasString(), uid, target.position) - log.Infof("Position for target master %s, uid %v: %v", target.master.AliasString(), uid, target.position) + ts.wr.Logger().Infof("After catchup, position for target master %s, %v", target.master.AliasString(), target.position) return err }) } @@ -898,7 +894,8 @@ func (ts *trafficSwitcher) createReverseVReplication(ctx context.Context) error Filter: filter, }) } - + log.Infof("Creating reverse workflow vreplication stream on tablet %s: workflow %s, startPos %s", + source.master.Alias, ts.reverseWorkflow, target.position) _, err := ts.wr.VReplicationExec(ctx, source.master.Alias, binlogplayer.CreateVReplicationState(ts.reverseWorkflow, reverseBls, target.position, binlogplayer.BlpStopped, source.master.DbName())) if err != nil { return err @@ -907,6 +904,7 @@ func (ts *trafficSwitcher) createReverseVReplication(ctx context.Context) error // if user has defined the cell/tablet_types parameters in the forward workflow, update the reverse workflow as well updateQuery := ts.getReverseVReplicationUpdateQuery(target.master.Alias.Cell, source.master.Alias.Cell, source.master.DbName()) if updateQuery != "" { + log.Infof("Updating vreplication stream entry on %s with: %s", source.master.Alias, updateQuery) _, err = ts.wr.VReplicationExec(ctx, source.master.Alias, updateQuery) return err }