Skip to content

Commit

Permalink
VReplication: use stored/binlogged ENUM index value in WHERE clauses (v…
Browse files Browse the repository at this point in the history
…itessio#9868)

Add e2e test to verify replication of PKs with enums

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Mar 12, 2022
1 parent a8c9636 commit d2b32f2
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 4 deletions.
5 changes: 4 additions & 1 deletion go/test/endtoend/vreplication/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ package vreplication
// We violate the NO_ZERO_DATES and NO_ZERO_IN_DATE sql_modes that are enabled by default in
// MySQL 5.7+ and MariaDB 10.2+ to ensure that vreplication still works everywhere and the
// permissive sql_mode now used in vreplication causes no unwanted side effects.
// The customer table also tests two important things:
// 1. Composite or multi-column primary keys
// 2. PKs that contain an ENUM column
// The Lead and Lead-1 tables also allows us to test several things:
// 1. Mixed case identifiers
// 2. Column and table names with special characters in them, namely a dash
Expand All @@ -15,7 +18,7 @@ var (
create table product(pid int, description varbinary(128), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key(pid)) CHARSET=utf8mb4;
create table customer(cid int, name varchar(128) collate utf8mb4_bin, meta json default null, typ enum('individual','soho','enterprise'), sport set('football','cricket','baseball'),
ts timestamp not null default current_timestamp, bits bit(2) default b'11', date1 datetime not null default '0000-00-00 00:00:00',
date2 datetime not null default '2021-00-01 00:00:00', primary key(cid)) CHARSET=utf8mb4;
date2 datetime not null default '2021-00-01 00:00:00', primary key(cid,typ)) CHARSET=utf8mb4;
create table customer_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';
create table merchant(mname varchar(128), category varchar(128), primary key(mname)) CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
create table orders(oid int, cid int, pid int, mname varchar(128), price int, qty int, total int as (qty * price), total2 int as (qty * price) stored, primary key(oid)) CHARSET=utf8;
Expand Down
24 changes: 24 additions & 0 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,28 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) {
validateQuery(t, vtgateConn, "product", "select max(cid) from customer2", want)
}

// testReplicatingWithPKEnumCols ensures that we properly apply binlog events
// in the stream where the PK contains an ENUM column
func testReplicatingWithPKEnumCols(t *testing.T) {
// At this point we have an ongoing MoveTables operation for the customer table
// from the product to the customer keyspace. Let's delete and insert a row to
// ensure that the PK -- which is on (cid, typ) with typ being an ENUM -- is
// managed correctly in the WHERE clause for the delete. The end result is that
// we should see the proper deletes propogate and not get a duplicate key error
// when we re-insert the same row values and ultimately VDiff shows the table as
// being identical in both keyspaces.

// typ is an enum, with soho having a stored and binlogged value of 2
deleteQuery := "delete from customer where cid = 2 and typ = 'soho'"
insertQuery := "insert into customer(cid, name, typ, sport, meta) values(2, 'Paül','soho','cricket',convert(x'7b7d' using utf8mb4))"
execVtgateQuery(t, vtgateConn, "product", deleteQuery)
time.Sleep(2 * time.Second)
vdiff(t, ksWorkflow, "")
execVtgateQuery(t, vtgateConn, "product", insertQuery)
time.Sleep(2 * time.Second)
vdiff(t, ksWorkflow, "")
}

func testReshardV2Workflow(t *testing.T) {
currentWorkflowType = wrangler.ReshardWorkflow

Expand Down Expand Up @@ -413,6 +435,8 @@ func testMoveTablesV2Workflow(t *testing.T) {
// and that they were not copied to the new target keyspace
verifyNoInternalTables(t, vtgateConn, targetKs)

testReplicatingWithPKEnumCols(t)

testRestOfWorkflow(t)

listAllArgs := []string{"workflow", "customer", "listall"}
Expand Down
15 changes: 13 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,17 +318,28 @@ func (tp *TablePlan) bindFieldVal(field *querypb.Field, val *sqltypes.Value) (*q
return sqltypes.StringBindVariable(valString), nil
}
if enumValues, ok := tp.EnumValuesMap[field.Name]; ok && !val.IsNull() {
// The fact that this fielkd has a EnumValuesMap entry, means we must
// The fact that this field has a EnumValuesMap entry, means we must
// use the enum's text value as opposed to the enum's numerical value.
// Once known use case is with Online DDL, when a column is converted from
// ENUM to a VARCHAR/TEXT.
enumValue, enumValueOK := enumValues[val.ToString()]
if !enumValueOK {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Invalid enum value: %v for field %s", val, field.Name)
}
// get the enum text fir this val
// get the enum text for this val
return sqltypes.StringBindVariable(enumValue), nil
}
if field.Type == querypb.Type_ENUM {
// This is an ENUM w/o a values map, which means that we are most likely using
// the index value -- what is stored and binlogged vs. the list of strings
// defined in the table schema -- and we must use an int bindvar or we'll have
// invalid/incorrect predicates like WHERE enumcol='2'.
// This will be the case when applying binlog events.
enumIndexVal := sqltypes.MakeTrusted(querypb.Type_UINT64, val.Raw())
if enumIndex, err := enumIndexVal.ToUint64(); err == nil {
return sqltypes.Uint64BindVariable(enumIndex), nil
}
}
return sqltypes.ValueBindVariable(*val), nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1476,7 +1476,7 @@ func TestPlayerTypes(t *testing.T) {
},
}, {
input: "insert into vitess_strings values('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'a', 'a,b')",
output: "insert into vitess_strings(vb,c,vc,b,tb,bl,ttx,tx,en,s) values ('a','b','c','d\\0\\0\\0\\0','e','f','g','h','1','3')",
output: "insert into vitess_strings(vb,c,vc,b,tb,bl,ttx,tx,en,s) values ('a','b','c','d\\0\\0\\0\\0','e','f','g','h',1,'3')",
table: "vitess_strings",
data: [][]string{
{"a", "b", "c", "d\000\000\000\000", "e", "f", "g", "h", "a", "a,b"},
Expand Down

0 comments on commit d2b32f2

Please sign in to comment.