Skip to content

Commit

Permalink
Persist last heartbeat time to better estimate lag. Fix some tests fo…
Browse files Browse the repository at this point in the history
…r the updated query to set/get this value. Compute workflow transaction lag and expose in Workflow Show and update related tests

Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps committed Feb 1, 2022
1 parent c074437 commit 0ef85a3
Show file tree
Hide file tree
Showing 13 changed files with 187 additions and 65 deletions.
16 changes: 16 additions & 0 deletions go/sqltypes/named_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,22 @@ func (r RowNamedValues) AsBool(fieldName string, def bool) bool {
return def
}

// ToBytes returns the named field as a byte array
func (r RowNamedValues) ToBytes(fieldName string) ([]byte, error) {
if v, ok := r[fieldName]; ok {
return v.ToBytes()
}
return nil, ErrNoSuchField
}

// AsBytes returns the named field as a byte array, or default value if nonexistent/error
func (r RowNamedValues) AsBytes(fieldName string, def []byte) []byte {
if v, err := r.ToBytes(fieldName); err == nil {
return v
}
return def
}

// NamedResult represents a query result with named values as opposed to ordinal values.
type NamedResult struct {
Fields []*querypb.Field `json:"fields"`
Expand Down
12 changes: 7 additions & 5 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,9 @@ var AlterVReplicationTable = []string{
"ALTER TABLE _vt.vreplication ADD KEY workflow_idx (workflow(64))",
"ALTER TABLE _vt.vreplication ADD COLUMN rows_copied BIGINT(20) NOT NULL DEFAULT 0",
"ALTER TABLE _vt.vreplication ADD COLUMN tags VARBINARY(1024) NOT NULL DEFAULT ''",

// records the time of the last heartbeat. Heartbeats are only received if the source has no recent events
"ALTER TABLE _vt.vreplication ADD COLUMN time_heartbeat BIGINT(20) NOT NULL DEFAULT 0",
}

// WithDDLInitialQueries contains the queries to be expected by the mock db client during tests
Expand Down Expand Up @@ -638,8 +641,7 @@ func CreateVReplicationState(workflow string, source *binlogdatapb.BinlogSource,
encodeString(workflow), encodeString(source.String()), encodeString(position), throttler.MaxRateModuleDisabled, throttler.ReplicationLagModuleDisabled, time.Now().Unix(), state, encodeString(dbName))
}

// GenerateUpdatePos returns a statement to update a value in the
// _vt.vreplication table.
// GenerateUpdatePos returns a statement to record the latest processed gtid in the _vt.vreplication table.
func GenerateUpdatePos(uid uint32, pos mysql.Position, timeUpdated int64, txTimestamp int64, rowsCopied int64, compress bool) string {
strGTID := encodeString(mysql.EncodePosition(pos))
if compress {
Expand All @@ -659,12 +661,12 @@ func GenerateUpdateRowsCopied(uid uint32, rowsCopied int64) string {
return fmt.Sprintf("update _vt.vreplication set rows_copied=%v where id=%v", rowsCopied, uid)
}

// GenerateUpdateTime returns a statement to update time_updated in the _vt.vreplication table.
func GenerateUpdateTime(uid uint32, timeUpdated int64) (string, error) {
// GenerateUpdateHeartbeat returns a statement to record the latest heartbeat in the _vt.vreplication table.
func GenerateUpdateHeartbeat(uid uint32, timeUpdated int64) (string, error) {
if timeUpdated == 0 {
return "", fmt.Errorf("timeUpdated cannot be zero")
}
return fmt.Sprintf("update _vt.vreplication set time_updated=%v where id=%v", timeUpdated, uid), nil
return fmt.Sprintf("update _vt.vreplication set time_updated=%v, time_heartbeat=%v where id=%v", timeUpdated, timeUpdated, uid), nil
}

// StartVReplication returns a statement to start the replication.
Expand Down
11 changes: 8 additions & 3 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2282,16 +2282,21 @@ func getSourceKeyspace(clusterKeyspace string) (clusterName string, sourceKeyspa
func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string,
workflowType wrangler.VReplicationWorkflowType) error {

const defaultWaitTime = time.Duration(30 * time.Second)
// for backward compatibility we default the lag to match the timeout for switching primary traffic
// this should probably be much smaller so that target and source are almost in sync before switching traffic
const defaultMaxTransactionLagAllowed = defaultWaitTime

cells := subFlags.String("cells", "", "Cell(s) or CellAlias(es) (comma-separated) to replicate from.")
tabletTypes := subFlags.String("tablet_types", "primary,replica,rdonly", "Source tablet types to replicate from (e.g. primary, replica, rdonly). Defaults to -vreplication_tablet_type parameter value for the tablet, which has the default value of replica.")
dryRun := subFlags.Bool("dry_run", false, "Does a dry run of SwitchReads and only reports the actions to be taken. -dry_run is only supported for SwitchTraffic, ReverseTraffic and Complete.")
timeout := subFlags.Duration("timeout", 30*time.Second, "Specifies the maximum time to wait, in seconds, for vreplication to catch up on primary migrations. The migration will be cancelled on a timeout. -timeout is only supported for SwitchTraffic and ReverseTraffic.")
timeout := subFlags.Duration("timeout", defaultWaitTime, "Specifies the maximum time to wait, in seconds, for vreplication to catch up on primary migrations. The migration will be cancelled on a timeout. -timeout is only supported for SwitchTraffic and ReverseTraffic.")
reverseReplication := subFlags.Bool("reverse_replication", true, "Also reverse the replication (default true). -reverse_replication is only supported for SwitchTraffic.")
keepData := subFlags.Bool("keep_data", false, "Do not drop tables or shards (if true, only vreplication artifacts are cleaned up). -keep_data is only supported for Complete and Cancel.")
keepRoutingRules := subFlags.Bool("keep_routing_rules", false, "Do not remove the routing rules for the source keyspace. -keep_routing_rules is only supported for Complete and Cancel.")
autoStart := subFlags.Bool("auto_start", true, "If false, streams will start in the Stopped state and will need to be explicitly started")
stopAfterCopy := subFlags.Bool("stop_after_copy", false, "Streams will be stopped once the copy phase is completed")
maxLagAllowed := subFlags.Duration("max_lag_allowed", 5*time.Second, "Allow traffic to be switched only if vreplication lag is below this")
maxTransactionLagAllowed := subFlags.Duration("max_transaction_lag_allowed", defaultMaxTransactionLagAllowed, "Allow traffic to be switched only if vreplication lag is below this")

// MoveTables and Migrate params
tables := subFlags.String("tables", "", "MoveTables only. A table spec or a list of tables. Either table_specs or -all needs to be specified.")
Expand Down Expand Up @@ -2443,7 +2448,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
}
vrwp.Timeout = *timeout
vrwp.EnableReverseReplication = *reverseReplication
vrwp.MaxAllowedLagSeconds = int64(math.Ceil(maxLagAllowed.Seconds()))
vrwp.MaxAllowedTransactionLagSeconds = int64(math.Ceil(maxTransactionLagAllowed.Seconds()))
case vReplicationWorkflowActionCancel:
vrwp.KeepData = *keepData
case vReplicationWorkflowActionComplete:
Expand Down
11 changes: 6 additions & 5 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ func (vp *vplayer) updatePos(ts int64) (posReached bool, err error) {
return posReached, nil
}

func (vp *vplayer) updateCurrentTime(tm int64) error {
update, err := binlogplayer.GenerateUpdateTime(vp.vr.id, tm)
func (vp *vplayer) updateHeartbeat(tm int64) error {
update, err := binlogplayer.GenerateUpdateHeartbeat(vp.vr.id, tm)
if err != nil {
return err
}
Expand All @@ -262,19 +262,19 @@ func (vp *vplayer) updateCurrentTime(tm int64) error {
return nil
}

func (vp *vplayer) mustUpdateCurrentTime() bool {
func (vp *vplayer) mustUpdateHeartbeat() bool {
return vp.numAccumulatedHeartbeats >= *vreplicationHeartbeatUpdateInterval ||
vp.numAccumulatedHeartbeats >= vreplicationMinimumHeartbeatUpdateInterval
}

func (vp *vplayer) recordHeartbeat() error {
tm := time.Now().Unix()
vp.vr.stats.RecordHeartbeat(tm)
if !vp.mustUpdateCurrentTime() {
if !vp.mustUpdateHeartbeat() {
return nil
}
vp.numAccumulatedHeartbeats = 0
return vp.updateCurrentTime(tm)
return vp.updateHeartbeat(tm)
}

// applyEvents is the main thread that applies the events. It has the following use
Expand Down Expand Up @@ -402,6 +402,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
}
}
}

if sbm >= 0 {
vp.vr.stats.ReplicationLagSeconds.Set(sbm)
vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), time.Duration(sbm)*time.Second)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestHeartbeatFrequencyFlag(t *testing.T) {
*vreplicationHeartbeatUpdateInterval = tcase.interval
for _, tcount := range tcase.counts {
vp.numAccumulatedHeartbeats = tcount.count
require.Equal(t, tcount.mustUpdate, vp.mustUpdateCurrentTime())
require.Equal(t, tcount.mustUpdate, vp.mustUpdateHeartbeat())
}
})
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ var (

const (
getSQLModeQuery = `SELECT @@session.sql_mode AS sql_mode`
// Use this whenever performing a schema change as part of a vreplication
// SQLMode should be used whenever performing a schema change as part of a vreplication
// workflow to ensure that you set a permissive SQL mode as defined by
// VReplication. We follow MySQL's model for recreating database objects
// on a target -- using SQL statements generated from a source -- which
Expand Down
23 changes: 15 additions & 8 deletions go/vt/wrangler/traffic_switcher_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,16 @@ import (
"vitess.io/vitess/go/vt/vttablet/tmclient"
)

const vreplQueryks = "select id, source, message, cell, tablet_types from _vt.vreplication where workflow='test' and db_name='vt_ks'"
const vreplQueryks2 = "select id, source, message, cell, tablet_types from _vt.vreplication where workflow='test' and db_name='vt_ks2'"
const vreplQueryks1 = "select id, source, message, cell, tablet_types from _vt.vreplication where workflow='test_reverse' and db_name='vt_ks1'"
const (
streamInfoQuery = "select id, source, message, cell, tablet_types from _vt.vreplication where workflow='test' and db_name='vt_%s'"
streamExtInfoQuery = "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, time_heartbeat, message, tags from _vt.vreplication where db_name = 'vt_ks2' and workflow = 'test'"
)

var (
streamInfoKs = fmt.Sprintf(streamInfoQuery, "ks")
streamInfoKs1 = fmt.Sprintf(streamInfoQuery, "ks1")
streamInfoKs2 = fmt.Sprintf(streamInfoQuery, "ks2")
)

type testMigraterEnv struct {
ts *topo.Server
Expand Down Expand Up @@ -188,7 +195,7 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards,
}
rows = append(rows, fmt.Sprintf("%d|%v|||", j+1, bls))
}
tme.dbTargetClients[i].addInvariant(vreplQueryks2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
tme.dbTargetClients[i].addInvariant(streamInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
rows...),
Expand All @@ -213,7 +220,7 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards,
}
rows = append(rows, fmt.Sprintf("%d|%v|||", j+1, bls))
}
tme.dbSourceClients[i].addInvariant(vreplQueryks1, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
tme.dbSourceClients[i].addInvariant(streamInfoKs1, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
rows...),
Expand Down Expand Up @@ -333,12 +340,12 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe
rows = append(rows, fmt.Sprintf("%d|%v|||", j+1, bls))
rowsRdOnly = append(rows, fmt.Sprintf("%d|%v|||RDONLY", j+1, bls))
}
tme.dbTargetClients[i].addInvariant(vreplQueryks, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
tme.dbTargetClients[i].addInvariant(streamInfoKs, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
rows...),
)
tme.dbTargetClients[i].addInvariant(vreplQueryks+"-rdonly", sqltypes.MakeTestResult(sqltypes.MakeTestFields(
tme.dbTargetClients[i].addInvariant(streamInfoKs+"-rdonly", sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
rowsRdOnly...),
Expand All @@ -347,7 +354,7 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe

tme.targetKeyspace = "ks"
for _, dbclient := range tme.dbSourceClients {
dbclient.addInvariant(vreplQueryks, &sqltypes.Result{})
dbclient.addInvariant(streamInfoKs, &sqltypes.Result{})
}
return tme
}
Expand Down
24 changes: 12 additions & 12 deletions go/vt/wrangler/traffic_switcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1566,12 +1566,12 @@ func TestMigrateFrozen(t *testing.T) {
}},
},
}
tme.dbTargetClients[0].addQuery(vreplQueryks2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
tme.dbTargetClients[0].addQuery(streamInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
fmt.Sprintf("1|%v|FROZEN||", bls1),
), nil)
tme.dbTargetClients[1].addQuery(vreplQueryks2, &sqltypes.Result{}, nil)
tme.dbTargetClients[1].addQuery(streamInfoKs2, &sqltypes.Result{}, nil)

switchWrites(tme)
_, _, err = tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 0*time.Second, false, false, true, false)
Expand All @@ -1586,8 +1586,8 @@ func TestMigrateNoStreamsFound(t *testing.T) {
tme := newTestTableMigrater(ctx, t)
defer tme.stopTablets(t)

tme.dbTargetClients[0].addQuery(vreplQueryks2, &sqltypes.Result{}, nil)
tme.dbTargetClients[1].addQuery(vreplQueryks2, &sqltypes.Result{}, nil)
tme.dbTargetClients[0].addQuery(streamInfoKs2, &sqltypes.Result{}, nil)
tme.dbTargetClients[1].addQuery(streamInfoKs2, &sqltypes.Result{}, nil)

tme.expectNoPreviousJournals()
_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", []topodatapb.TabletType{topodatapb.TabletType_RDONLY}, nil, workflow.DirectionForward, false)
Expand Down Expand Up @@ -1615,7 +1615,7 @@ func TestMigrateDistinctSources(t *testing.T) {
}},
},
}
tme.dbTargetClients[0].addQuery(vreplQueryks2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
tme.dbTargetClients[0].addQuery(streamInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
fmt.Sprintf("1|%v|||", bls),
Expand Down Expand Up @@ -1644,7 +1644,7 @@ func TestMigrateMismatchedTables(t *testing.T) {
}},
},
}
tme.dbTargetClients[0].addQuery(vreplQueryks2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
tme.dbTargetClients[0].addQuery(streamInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
fmt.Sprintf("1|%v|||", bls)),
Expand All @@ -1664,7 +1664,7 @@ func TestTableMigrateAllShardsNotPresent(t *testing.T) {
tme := newTestTableMigrater(ctx, t)
defer tme.stopTablets(t)

tme.dbTargetClients[0].addQuery(vreplQueryks2, &sqltypes.Result{}, nil)
tme.dbTargetClients[0].addQuery(streamInfoKs2, &sqltypes.Result{}, nil)

tme.expectNoPreviousJournals()
_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", []topodatapb.TabletType{topodatapb.TabletType_RDONLY}, nil, workflow.DirectionForward, false)
Expand Down Expand Up @@ -1703,7 +1703,7 @@ func TestMigrateNoTableWildcards(t *testing.T) {
}},
},
}
tme.dbTargetClients[0].addQuery(vreplQueryks2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
tme.dbTargetClients[0].addQuery(streamInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
fmt.Sprintf("1|%v|||", bls1),
Expand All @@ -1719,7 +1719,7 @@ func TestMigrateNoTableWildcards(t *testing.T) {
}},
},
}
tme.dbTargetClients[1].addQuery(vreplQueryks2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
tme.dbTargetClients[1].addQuery(streamInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
fmt.Sprintf("1|%v|||", bls3),
Expand Down Expand Up @@ -2039,8 +2039,8 @@ func TestShardMigrateNoAvailableTabletsForReverseReplication(t *testing.T) {
// Temporarily set tablet types to RDONLY to test that SwitchWrites fails if no tablets of rdonly are available
invariants := make(map[string]*sqltypes.Result)
for i := range tme.targetShards {
invariants[fmt.Sprintf("%s-%d", vreplQueryks, i)] = tme.dbTargetClients[i].getInvariant(vreplQueryks)
tme.dbTargetClients[i].addInvariant(vreplQueryks, tme.dbTargetClients[i].getInvariant(vreplQueryks+"-rdonly"))
invariants[fmt.Sprintf("%s-%d", streamInfoKs, i)] = tme.dbTargetClients[i].getInvariant(streamInfoKs)
tme.dbTargetClients[i].addInvariant(streamInfoKs, tme.dbTargetClients[i].getInvariant(streamInfoKs+"-rdonly"))
}
_, _, err = tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, false, false, true, false)
require.Error(t, err)
Expand All @@ -2049,7 +2049,7 @@ func TestShardMigrateNoAvailableTabletsForReverseReplication(t *testing.T) {
require.True(t, strings.Contains(err.Error(), "80-"))
require.False(t, strings.Contains(err.Error(), "40"))
for i := range tme.targetShards {
tme.dbTargetClients[i].addInvariant(vreplQueryks, invariants[fmt.Sprintf("%s-%d", vreplQueryks, i)])
tme.dbTargetClients[i].addInvariant(streamInfoKs, invariants[fmt.Sprintf("%s-%d", streamInfoKs, i)])
}

journalID, _, err := tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, false, false, true, false)
Expand Down
Loading

0 comments on commit 0ef85a3

Please sign in to comment.