Skip to content

Commit

Permalink
Delete routing rules on workflow Complete
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps committed Jan 3, 2021
1 parent 337e40b commit 0426aed
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 5 deletions.
8 changes: 4 additions & 4 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func reshardCustomer2to4Split(t *testing.T, cells []*Cell, sourceCellOrAlias str
func reshardMerchant2to3SplitMerge(t *testing.T) {
ksName := "merchant"
counts := map[string]int{"zone1-1600": 0, "zone1-1700": 2, "zone1-1800": 0}
reshard(t, ksName, "merchant", "m2m3", "-80,80-", "-40,40-c0,c0-", 1600, counts, dryrunresultsswitchwritesM2m3, nil, "")
reshard(t, ksName, "merchant", "m2m3", "-80,80-", "-40,40-c0,c0-", 1600, counts, dryRunResultsSwitchWritesM2m3, nil, "")
validateCount(t, vtgateConn, ksName, "merchant", 2)
query := "insert into merchant (mname, category) values('amazon', 'electronics')"
execVtgateQuery(t, vtgateConn, ksName, query)
Expand Down Expand Up @@ -381,7 +381,7 @@ func reshardCustomer3to1Merge(t *testing.T) { //to unsharded
reshard(t, ksName, "customer", "c3c1", "-60,60-c0,c0-", "0", 1500, counts, nil, nil, "")
}

func reshard(t *testing.T, ksName string, tableName string, workflow string, sourceShards string, targetShards string, tabletIDBase int, counts map[string]int, dryRunResultswitchWrites []string, cells []*Cell, sourceCellOrAlias string) {
func reshard(t *testing.T, ksName string, tableName string, workflow string, sourceShards string, targetShards string, tabletIDBase int, counts map[string]int, dryRunResultSwitchWrites []string, cells []*Cell, sourceCellOrAlias string) {
if cells == nil {
cells = []*Cell{defaultCell}
}
Expand Down Expand Up @@ -414,8 +414,8 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou
}
vdiff(t, ksWorkflow)
switchReads(t, allCellNames, ksWorkflow)
if dryRunResultswitchWrites != nil {
switchWritesDryRun(t, ksWorkflow, dryRunResultswitchWrites)
if dryRunResultSwitchWrites != nil {
switchWritesDryRun(t, ksWorkflow, dryRunResultSwitchWrites)
}
switchWrites(t, ksWorkflow, false)
dropSources(t, ksWorkflow)
Expand Down
4 changes: 3 additions & 1 deletion go/test/endtoend/vreplication/vreplication_test_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var dryRunResultsReadCustomerShard = []string{
"Unlock keyspace product",
}

var dryrunresultsswitchwritesM2m3 = []string{
var dryRunResultsSwitchWritesM2m3 = []string{
"Lock keyspace merchant",
"Stop streams on keyspace merchant",
"/ Id 2 Keyspace customer Shard -80 Rules rules:<match:\"morders\" filter:\"select * from orders where in_keyrange(mname, 'merchant.md5', '-80')\" > at Position ",
Expand Down Expand Up @@ -106,6 +106,7 @@ var dryRunResultsDropSourcesDropCustomerShard = []string{
"Delete vreplication streams on target:",
" Keyspace customer Shard -80 Workflow p2c DbName vt_customer Tablet 200",
" Keyspace customer Shard 80- Workflow p2c DbName vt_customer Tablet 300",
"Routing rules for participating tables will be deleted",
"Unlock keyspace customer",
"Unlock keyspace product",
}
Expand All @@ -122,6 +123,7 @@ var dryRunResultsDropSourcesRenameCustomerShard = []string{
"Delete vreplication streams on target:",
" Keyspace customer Shard -80 Workflow p2c DbName vt_customer Tablet 200",
" Keyspace customer Shard 80- Workflow p2c DbName vt_customer Tablet 300",
"Routing rules for participating tables will be deleted",
"Unlock keyspace customer",
"Unlock keyspace product",
}
1 change: 1 addition & 0 deletions go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta
rules[targetKeyspace+"."+table] = toSource
rules[targetKeyspace+"."+table+"@replica"] = toSource
rules[targetKeyspace+"."+table+"@rdonly"] = toSource
rules[targetKeyspace+"."+table] = toSource
rules[sourceKeyspace+"."+table+"@replica"] = toSource
rules[sourceKeyspace+"."+table+"@rdonly"] = toSource
}
Expand Down
4 changes: 4 additions & 0 deletions go/vt/wrangler/switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ type switcher struct {
wr *Wrangler
}

func (r *switcher) deleteRoutingRules(ctx context.Context) error {
return r.ts.deleteRoutingRules(ctx)
}

func (r *switcher) dropSourceBlacklistedTables(ctx context.Context) error {
return r.ts.dropSourceBlacklistedTables(ctx)
}
Expand Down
5 changes: 5 additions & 0 deletions go/vt/wrangler/switcher_dry_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ type switcherDryRun struct {
ts *trafficSwitcher
}

func (dr *switcherDryRun) deleteRoutingRules(ctx context.Context) error {
dr.drLog.Log("Routing rules for participating tables will be deleted")
return nil
}

func (dr *switcherDryRun) switchShardReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction TrafficSwitchDirection) error {
sourceShards := make([]string, 0)
targetShards := make([]string, 0)
Expand Down
2 changes: 2 additions & 0 deletions go/vt/wrangler/switcher_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,7 @@ type iswitcher interface {
dropTargetVReplicationStreams(ctx context.Context) error
removeTargetTables(ctx context.Context) error
dropTargetShards(ctx context.Context) error
deleteRoutingRules(ctx context.Context) error

logs() *[]string
}
27 changes: 27 additions & 0 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,10 @@ func (wr *Wrangler) dropArtifacts(ctx context.Context, sw iswitcher) error {
if err := sw.dropTargetVReplicationStreams(ctx); err != nil {
return err
}
if err := sw.deleteRoutingRules(ctx); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -676,6 +680,7 @@ func (wr *Wrangler) DropSources(ctx context.Context, targetKeyspace, workflow st
if err := sw.dropSourceBlacklistedTables(ctx); err != nil {
return nil, err
}

case binlogdatapb.MigrationType_SHARDS:
log.Infof("Removing shards")
if err := sw.dropSourceShards(ctx); err != nil {
Expand Down Expand Up @@ -1575,6 +1580,28 @@ func (ts *trafficSwitcher) dropTargetShards(ctx context.Context) error {
})
}

func (ts *trafficSwitcher) deleteRoutingRules(ctx context.Context) error {
rules, err := ts.wr.getRoutingRules(ctx)
if err != nil {
return err
}
for _, table := range ts.tables {
delete(rules, table)
delete(rules, table+"@replica")
delete(rules, table+"@rdonly")
delete(rules, ts.targetKeyspace+"."+table)
delete(rules, ts.targetKeyspace+"."+table+"@replica")
delete(rules, ts.targetKeyspace+"."+table+"@rdonly")
delete(rules, ts.sourceKeyspace+"."+table)
delete(rules, ts.sourceKeyspace+"."+table+"@replica")
delete(rules, ts.sourceKeyspace+"."+table+"@rdonly")
}
if err := ts.wr.saveRoutingRules(ctx, rules); err != nil {
return err
}
return nil
}

func (wr *Wrangler) getRoutingRules(ctx context.Context) (map[string][]string, error) {
rrs, err := wr.ts.GetRoutingRules(ctx)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions go/vt/wrangler/traffic_switcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,7 @@ func TestTableMigrateOneToMany(t *testing.T) {
"Delete vreplication streams on target:",
" Keyspace ks2 Shard -80 Workflow test DbName vt_ks2 Tablet 20",
" Keyspace ks2 Shard 80- Workflow test DbName vt_ks2 Tablet 30",
"Routing rules for participating tables will be deleted",
"Unlock keyspace ks2",
"Unlock keyspace ks1",
}
Expand All @@ -894,6 +895,7 @@ func TestTableMigrateOneToMany(t *testing.T) {
"Delete vreplication streams on target:",
" Keyspace ks2 Shard -80 Workflow test DbName vt_ks2 Tablet 20",
" Keyspace ks2 Shard 80- Workflow test DbName vt_ks2 Tablet 30",
"Routing rules for participating tables will be deleted",
"Unlock keyspace ks2",
"Unlock keyspace ks1",
}
Expand Down
43 changes: 43 additions & 0 deletions go/vt/wrangler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ limitations under the License.
package wrangler

import (
"fmt"
"testing"

"vitess.io/vitess/go/vt/topo"

"github.com/stretchr/testify/require"
"golang.org/x/net/context"
"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -170,6 +173,46 @@ func TestMoveTablesV2(t *testing.T) {
require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState())
}

func validateRoutingRuleCount(ctx context.Context, t *testing.T, ts *topo.Server, cnt int) {
rr, err := ts.GetRoutingRules(ctx)
fmt.Printf("Rules %+v\n", rr.Rules)
require.NoError(t, err)
require.NotNil(t, rr)
rules := rr.Rules
require.Equal(t, cnt, len(rules))
}

func TestMoveTablesV2Complete(t *testing.T) {
ctx := context.Background()
p := &VReplicationWorkflowParams{
Workflow: "test",
SourceKeyspace: "ks1",
TargetKeyspace: "ks2",
Tables: "t1,t2",
Cells: "cell1,cell2",
TabletTypes: "replica,rdonly,master",
Timeout: DefaultActionTimeout,
}
tme := newTestTableMigrater(ctx, t)
defer tme.stopTablets(t)
wf, err := tme.wr.NewVReplicationWorkflow(ctx, MoveTablesWorkflow, p)
require.NoError(t, err)
require.NotNil(t, wf)
require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState())
tme.expectNoPreviousJournals()
expectMoveTablesQueries(t, tme)
tme.expectNoPreviousJournals()
require.NoError(t, wf.SwitchTraffic(DirectionForward))
require.Equal(t, WorkflowStateAllSwitched, wf.CurrentState())

//16 rules, 8 per table t1,t2 eg: t1,t1@replica,t1@rdonly,ks1.t1,ks1.t1@replica,ks1.t1@rdonly,ks2.t1@replica,ks2.t1@rdonly
validateRoutingRuleCount(ctx, t, wf.wr.ts, 16)

require.NoError(t, wf.Complete())

validateRoutingRuleCount(ctx, t, wf.wr.ts, 0)
}

func TestMoveTablesV2Partial(t *testing.T) {
ctx := context.Background()
p := &VReplicationWorkflowParams{
Expand Down

0 comments on commit 0426aed

Please sign in to comment.