Skip to content

Commit

Permalink
binlog: add sequence to binlog.TableMutation (pingcap#2060)
Browse files Browse the repository at this point in the history
Add sequence in binlog.TableMutation to preserve the original mutation order.
  • Loading branch information
coocood authored Nov 22, 2016
1 parent 8b070e0 commit 3612ba0
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 39 deletions.
159 changes: 124 additions & 35 deletions _vendor/src/github.com/pingcap/tipb/go-binlog/binlog.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ import:
- package: github.com/pingcap/pd
version: 447b500dc50e6a5f5cf46abf3bacaf67abb718c1
- package: github.com/pingcap/tipb
version: 38084aeaf922fb5d41284865d64b2113f3ae5f1c
version: 17699c2d5e3b549d85eec896afd9d4f87879b0eb
subpackages:
- go-binlog
- go-tipb
Expand Down
17 changes: 17 additions & 0 deletions sessionctx/binloginfo/binloginfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func (s *testBinlogSuite) TestBinlog(c *C) {
tk.MustExec("insert local_binlog2 values ('abc', 16), ('def', 18)")
tk.MustExec("delete from local_binlog2 where name = 'def'")
prewriteVal = getLatestBinlogPrewriteValue(c, pump)
c.Assert(prewriteVal.Mutations[0].Sequence[0], Equals, binlog.MutationType_DeletePK)
_, deletedPK, _ := codec.DecodeOne(prewriteVal.Mutations[0].DeletedPks[0])
c.Assert(deletedPK.GetString(), Equals, "def")

Expand All @@ -161,12 +162,28 @@ func (s *testBinlogSuite) TestBinlog(c *C) {

tk.MustExec("delete from local_binlog3 where c1 = 3 and c2 = 3")
prewriteVal = getLatestBinlogPrewriteValue(c, pump)
c.Assert(prewriteVal.Mutations[0].Sequence[0], Equals, binlog.MutationType_DeleteRow)
gotRows = mutationRowsToRows(c, prewriteVal.Mutations[0].DeletedRows, 1, 3)
expected = [][]types.Datum{
{types.NewIntDatum(3), types.NewIntDatum(3)},
}
c.Assert(gotRows, DeepEquals, expected)

// Test Mutation Sequence.
tk.MustExec("create table local_binlog4 (c1 int primary key, c2 int)")
tk.MustExec("insert local_binlog4 values (1, 1), (2, 2), (3, 2)")
tk.MustExec("begin")
tk.MustExec("delete from local_binlog4 where c1 = 1")
tk.MustExec("insert local_binlog4 values (1, 1)")
tk.MustExec("update local_binlog4 set c2 = 3 where c1 = 3")
tk.MustExec("commit")
prewriteVal = getLatestBinlogPrewriteValue(c, pump)
c.Assert(prewriteVal.Mutations[0].Sequence, DeepEquals, []binlog.MutationType{
binlog.MutationType_DeleteID,
binlog.MutationType_Insert,
binlog.MutationType_Update,
})

checkBinlogCount(c, pump)

pump.mu.Lock()
Expand Down
5 changes: 5 additions & 0 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ func (t *Table) AddRecord(ctx context.Context, r []types.Datum) (recordID int64,
handleVal, _ := codec.EncodeValue(nil, types.NewIntDatum(recordID))
bin := append(handleVal, value...)
mutation.InsertedRows = append(mutation.InsertedRows, bin)
mutation.Sequence = append(mutation.Sequence, binlog.MutationType_Insert)
}
variable.GetSessionVars(ctx).AddAffectedRows(1)
return recordID, nil
Expand Down Expand Up @@ -547,13 +548,15 @@ func (t *Table) addUpdateBinlog(ctx context.Context, h int64, old []types.Datum,
bin = append(oldData, newValue...)
}
mutation.UpdatedRows = append(mutation.UpdatedRows, bin)
mutation.Sequence = append(mutation.Sequence, binlog.MutationType_Update)
return nil
}

func (t *Table) addDeleteBinlog(ctx context.Context, h int64, r []types.Datum) error {
mutation := t.getMutation(ctx)
if t.meta.PKIsHandle {
mutation.DeletedIds = append(mutation.DeletedIds, h)
mutation.Sequence = append(mutation.Sequence, binlog.MutationType_DeleteID)
return nil
}

Expand All @@ -576,6 +579,7 @@ func (t *Table) addDeleteBinlog(ctx context.Context, h int64, r []types.Datum) e
return errors.Trace(err)
}
mutation.DeletedPks = append(mutation.DeletedPks, data)
mutation.Sequence = append(mutation.Sequence, binlog.MutationType_DeletePK)
return nil
}
colIDs := make([]int64, len(t.Cols()))
Expand All @@ -587,6 +591,7 @@ func (t *Table) addDeleteBinlog(ctx context.Context, h int64, r []types.Datum) e
return errors.Trace(err)
}
mutation.DeletedRows = append(mutation.DeletedRows, data)
mutation.Sequence = append(mutation.Sequence, binlog.MutationType_DeleteRow)
return nil
}

Expand Down

0 comments on commit 3612ba0

Please sign in to comment.