diff --git a/Makefile b/Makefile index 2e0e7ad7c..c46794a1d 100644 --- a/Makefile +++ b/Makefile @@ -266,3 +266,8 @@ vectorized-bench: -bench=BenchmarkVectorizedBuiltin$(VB_FILE)Func \ -run=BenchmarkVectorizedBuiltin$(VB_FILE)Func \ -args "$(VB_FUNC)" + + +proj6: failpoint-enable + go test -timeout 600s ./store/tikv -mockStore=false + @$(FAILPOINT_DISABLE) diff --git a/courses/proj6-README-zh_CN.md b/courses/proj6-README-zh_CN.md new file mode 100644 index 000000000..ed1ffa81c --- /dev/null +++ b/courses/proj6-README-zh_CN.md @@ -0,0 +1,80 @@ +# 分布式事务: Percolator + +## 概览 + +在这一章节,我们将实现 Percolator 提交协议。 + +## Percolator + +[Percolator](https://research.google/pubs/pub36726/) 是 Google 于 2010 公开的一种分布式事务的处理协议。Percolator 协议将事务提交分为 Prewrite 与 Commit 两个阶段,实现了无协调者的分布式事务数据库。Google Spanner 借助于这一协议与 GPS 和原子钟的时钟同步协议实现了全球部署的分布式数据库。因为优秀的扩展性,它也被各家分布式数据库所使用,TiDB 与 TinySQL 的事务实现也基于这一协议。 + +## 执行模型介绍 + +在 TiDB 中,事务的执行过程会被缓存在 buffer 中,在提交时,才会通过 Percolator 提交协议将其完整的写入到分布的 TiKV 存储引擎中。这一调用的入口是 `store/tikv/txn.go` 中的 `tikvTxn.Commit` 函数。 + +在执行时,一个事务可能会遇到其他执行过程中的事务,此时需要通过 Lock Resolve 组件来查询所遇到的事务状态,并根据查询到的结果执行相应的措施。 + +## Two Phase Commit + +Percolator 提交协议的两阶段提交分为 Prewrite 和 Commit:Prewrite 实际写入数据,Commit 让数据对外可见。其中,事务的成功以 Primary Key 为原子性标记。当 Prewrite 失败或是 Primary Key Commit 失败时需要进行垃圾清理,将写入的事务回滚。 + +一个事务中的 Key 可能会涉及到不同的 Region,在对 Key 进行写操作时,需要将其发送到正确的 Region 上才能够处理,`GroupKeysByRegion` 函数根据 region cache 将 Key 按 Region 分成多个 batch,但是可能出现因缓存过期而导致对应的存储节点返回 Region Error,此时需要分割 batch 后重试。 + +### TODO + +为了让对于 Key 的操作能够执行,需要实现 `region_cache.go` 中的 `GroupKeysByRegion` 函数。 + +在执行过程中,会涉及到三类操作,分别是 Prewrite/Commit/Rollback(Cleanup)。这些操作会在同一个流程中被处理。你需要完成 Prewrite 过程中的 `buildPrewriteRequest` 函数,然后仿照 Prewrite 的 `handleSingleBatch` 函数完成 Commit 和 Rollback 的 `handleSingleBatch` 函数。 + +## Lock Resolver + +在 Prewrite 阶段,对于一个 Key 的操作会写入两条记录。 + +- Default CF 中存储了实际的 KV 数据。 +- Lock CF 中存储了锁,包括 Key 和时间戳信息,会在 Commit 成功时清理。 + +Lock Resolver 的职责就是应对一个事务在提交过程中遇到 Lock 的情况。 + +当一个事务遇到 Lock 时,可能有几种情况。 + +- Lock 所属的事务还未提交这个 Key,Lock 尚未被清理; +- Lock 所属的事务遇到了不可恢复的错误,正在回滚中,尚未清理 Key; +- Lock 所属事务的节点发生了意外错误,例如节点 crash,这个 Lock 所属的节点已经不能够更新它。 + +在 Percolator 协议下,会通过查询 Lock 所属的 Primary Key 来判断事务的状态,但是当读取到一个未完成的事务(Primary Key 的 Lock 尚未被清理)时,我们所期望的,是等待提交中的事物至完成状态,并且清理如 crash 等异常留下的垃圾数据。此时会借助 ttl 来判断事务是否过期,遇到过期事务时则会主动 Rollback 它。 + +### TODO + +在 `lock_resolver.go` 中完成 `getTxnStatus` 和 `resolveLock` 函数,使得向外暴露的 `ResolveLocks` 函数能够正常运行。 + +除了在事务的提交过程中,事务对数据进行读取的时候也可能遇到 Lock,此时也会触发 `ResolveLocks` 函数,完成 `snapshot.go` 中的 `tikvSnapshot.get` 函数,让读请求能够正常运行。 + +## Failpoint + +[Failpoint](https://github.com/pingcap/failpoint) 是一种通过内置注入错误测试常规代码路径的手段。Golang 中的 failpoint 使用代码生成实现,因此在使用了 failpoint 的测试中,需要先打开 failpoint。 + +打开 failpoint,原始代码会被暂时隐藏到 `{filename}__failpoint_stash__`,请勿删除。 + +```sh +make failpoint-enable +``` + +关闭 failpoint 会将 `{filename}__failpoint_stash__` 中的代码恢复到原文件中,并删除暂存用的 stash 文件。 + +```sh +make failpoint-disable +``` + +因为 failpoint 开关状态不同时,使用了 failpoint 的 `.go` 文件代码会有不同,请在关闭 failpoint 的状态下进行代码的提交。 + +## 测试 + +运行 `make proj6`,通过所有测试用例。 + +可以使用下面的命令测试指定的单个或多个用例。 + +```sh +go test {package path} -check.f ^{regex}$ +# example +go test -timeout 5s ./store/tikv -check.f ^TestFailAfterPrimary$ +``` diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index b140f1cc6..4cf91653d 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -20,11 +20,11 @@ import ( "sync" "time" + "github.com/pingcap/failpoint" pb "github.com/pingcap-incubator/tinykv/proto/pkg/kvrpcpb" "github.com/pingcap/errors" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/terror" - "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/logutil" @@ -346,18 +346,9 @@ func (c *twoPhaseCommitter) keySize(key []byte) int { } func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchKeys) *tikvrpc.Request { - mutations := make([]*pb.Mutation, len(batch.keys)) - for i, k := range batch.keys { - tmp := c.mutations[string(k)] - mutations[i] = &tmp.Mutation - } - - req := &pb.PrewriteRequest{ - Mutations: mutations, - PrimaryLock: c.primary(), - StartVersion: c.startTS, - LockTtl: c.lockTTL, - } + var req *pb.PrewriteRequest + // YOUR CODE HERE (proj6). + panic("YOUR CODE HERE") return tikvrpc.NewRequest(tikvrpc.CmdPrewrite, req, pb.Context{}) } @@ -427,14 +418,12 @@ func (c *twoPhaseCommitter) getUndeterminedErr() error { } func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error { - req := tikvrpc.NewRequest(tikvrpc.CmdCommit, &pb.CommitRequest{ - StartVersion: c.startTS, - Keys: batch.keys, - CommitVersion: c.commitTS, - }, pb.Context{}) - - sender := NewRegionRequestSender(c.store.regionCache, c.store.client) - resp, err := sender.SendReq(bo, req, batch.region, readTimeoutShort) + // follow actionPrewrite.handleSingleBatch, build the commit request + var sender *RegionRequestSender + var err error + // build and send the commit request + // YOUR CODE HERE (proj6). + panic("YOUR CODE HERE") // If we fail to receive response for the request that commits primary key, it will be undetermined whether this // transaction has been successfully committed. @@ -446,49 +435,18 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch c.setUndeterminedErr(errors.Trace(sender.rpcError)) } - if err != nil { - return errors.Trace(err) - } - regionErr, err := resp.GetRegionError() - if err != nil { - return errors.Trace(err) - } - if regionErr != nil { - err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) - if err != nil { - return errors.Trace(err) + failpoint.Inject("mockFailAfterPK", func() { + if !isPrimary { + err = errors.New("commit secondary keys error") } - // re-split keys and commit again. - err = c.commitKeys(bo, batch.keys) + }) + if err != nil { return errors.Trace(err) } - if resp.Resp == nil { - return errors.Trace(ErrBodyMissing) - } - commitResp := resp.Resp.(*pb.CommitResponse) - // Here we can make sure tikv has processed the commit primary key request. So - // we can clean undetermined error. - if isPrimary { - c.setUndeterminedErr(nil) - } - if keyErr := commitResp.GetError(); keyErr != nil { - c.mu.RLock() - defer c.mu.RUnlock() - err = extractKeyErr(keyErr) - if c.mu.committed { - // No secondary key could be rolled back after it's primary key is committed. - // There must be a serious bug somewhere. - logutil.BgLogger().Error("2PC failed commit key after primary key committed", - zap.Error(err), - zap.Uint64("txnStartTS", c.startTS)) - return errors.Trace(err) - } - // The transaction maybe rolled back by concurrent transactions. - logutil.BgLogger().Debug("2PC failed commit primary key", - zap.Error(err), - zap.Uint64("txnStartTS", c.startTS)) - return err - } + + // handle the response and error refer to actionPrewrite.handleSingleBatch + // YOUR CODE HERE (proj6). + panic("YOUR CODE HERE") c.mu.Lock() defer c.mu.Unlock() @@ -499,34 +457,17 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch } func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error { - req := tikvrpc.NewRequest(tikvrpc.CmdBatchRollback, &pb.BatchRollbackRequest{ - Keys: batch.keys, - StartVersion: c.startTS, - }, pb.Context{}) - resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort) - if err != nil { - return errors.Trace(err) - } - regionErr, err := resp.GetRegionError() - if err != nil { - return errors.Trace(err) - } - if regionErr != nil { - err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) - if err != nil { - return errors.Trace(err) - } - err = c.cleanupKeys(bo, batch.keys) - return errors.Trace(err) - } - if keyErr := resp.Resp.(*pb.BatchRollbackResponse).GetError(); keyErr != nil { - err = errors.Errorf("conn %d 2PC cleanup failed: %s", c.connID, keyErr) - logutil.BgLogger().Debug("2PC failed cleanup key", - zap.Error(err), - zap.Uint64("txnStartTS", c.startTS)) - return errors.Trace(err) - } + // follow actionPrewrite.handleSingleBatch, build the rollback request + + // build and send the rollback request + // YOUR CODE HERE (proj6). + panic("YOUR CODE HERE") + // handle the response and error refer to actionPrewrite.handleSingleBatch + + // YOUR CODE HERE (proj6). + panic("YOUR CODE HERE") return nil + } func (c *twoPhaseCommitter) prewriteKeys(bo *Backoffer, keys [][]byte) error { diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 13e0eea92..28159630e 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -292,11 +292,10 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte // 2.3 No lock -- concurrence prewrite. var status TxnStatus - req := tikvrpc.NewRequest(tikvrpc.CmdCheckTxnStatus, &kvrpcpb.CheckTxnStatusRequest{ - PrimaryKey: primary, - LockTs: txnID, - CurrentTs: currentTS, - }) + var req *tikvrpc.Request + // build the request + // YOUR CODE HERE (proj6). + panic("YOUR CODE HERE") for { loc, err := lr.store.GetRegionCache().LocateKey(bo, primary) if err != nil { @@ -320,18 +319,19 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte if resp.Resp == nil { return status, errors.Trace(ErrBodyMissing) } - cmdResp := resp.Resp.(*kvrpcpb.CheckTxnStatusResponse) - status.action = cmdResp.Action - if cmdResp.LockTtl != 0 { - status.ttl = cmdResp.LockTtl - } else { - status.commitTS = cmdResp.CommitVersion - lr.saveResolved(txnID, status) - } + _ = resp.Resp.(*kvrpcpb.CheckTxnStatusResponse) + + // Assign status with response + // YOUR CODE HERE (proj6). + panic("YOUR CODE HERE") return status, nil } + } +// resolveLock resolve the lock for the given transaction status which is checked from primary key. +// If status is committed, the secondary should also be committed. +// If status is not committed and the func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status TxnStatus, cleanRegions map[RegionVerID]struct{}) error { cleanWholeRegion := l.TxnSize >= bigTxnThreshold for { @@ -342,13 +342,13 @@ func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status TxnStatus, cl if _, ok := cleanRegions[loc.Region]; ok { return nil } - lreq := &kvrpcpb.ResolveLockRequest{ - StartVersion: l.TxnID, - } - if status.IsCommitted() { - lreq.CommitVersion = status.CommitTS() - } - req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq) + + var req *tikvrpc.Request + + // build the request + // YOUR CODE HERE (proj6). + panic("YOUR CODE HERE") + resp, err := lr.store.SendReq(bo, req, loc.Region, readTimeoutShort) if err != nil { return errors.Trace(err) @@ -378,4 +378,5 @@ func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status TxnStatus, cl } return nil } + } diff --git a/store/tikv/proj6_test.go b/store/tikv/proj6_test.go new file mode 100644 index 000000000..bcec8d995 --- /dev/null +++ b/store/tikv/proj6_test.go @@ -0,0 +1,511 @@ +package tikv + +import ( + "context" + "math/rand" + + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/parser/terror" + "github.com/pingcap-incubator/tinykv/proto/pkg/kvrpcpb" + . "github.com/pingcap/check" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/store/mockstore/mocktikv" + "github.com/pingcap/tidb/store/tikv/tikvrpc" +) + +type testProj6Suite struct { + OneByOneSuite + cluster *mocktikv.Cluster + store *tikvStore +} + +var _ = Suite(&testProj6Suite{}) + +func (s *testProj6Suite) SetUpSuite(c *C) { + ManagedLockTTL = 3000 // 3s + s.OneByOneSuite.SetUpSuite(c) +} + +func (s *testProj6Suite) SetUpTest(c *C) { + s.cluster = mocktikv.NewCluster() + mocktikv.BootstrapWithMultiRegions(s.cluster, []byte("a"), []byte("b"), []byte("c")) + mvccStore, err := mocktikv.NewMVCCLevelDB("") + c.Assert(err, IsNil) + client := mocktikv.NewRPCClient(s.cluster, mvccStore) + pdCli := &codecPDClient{mocktikv.NewPDClient(s.cluster)} + spkv := NewMockSafePointKV() + store, err := newTikvStore("mocktikv-store", pdCli, spkv, client, false) + c.Assert(err, IsNil) + s.store = store + CommitMaxBackoff = 2000 +} + +func (s *testProj6Suite) TearDownSuite(c *C) { + CommitMaxBackoff = 20000 + s.store.Close() + s.OneByOneSuite.TearDownSuite(c) +} + +func (s *testProj6Suite) checkValues(c *C, m map[string]string) { + txn := s.begin(c) + for k, v := range m { + val, err := txn.Get(context.TODO(), []byte(k)) + c.Assert(err, IsNil) + c.Assert(string(val), Equals, v) + } +} + +func (s *testProj6Suite) mustNotExist(c *C, keys ...[]byte) { + txn := s.begin(c) + for _, k := range keys { + _, err := txn.Get(context.TODO(), k) + c.Assert(err, NotNil) + c.Check(terror.ErrorEqual(err, kv.ErrNotExist), IsTrue) + } +} + +func (s *testProj6Suite) mustGetLock(c *C, key []byte) *Lock { + ver, err := s.store.CurrentVersion() + c.Assert(err, IsNil) + bo := NewBackoffer(context.Background(), getMaxBackoff) + req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ + Key: key, + Version: ver.Ver, + }) + loc, err := s.store.regionCache.LocateKey(bo, key) + c.Assert(err, IsNil) + resp, err := s.store.SendReq(bo, req, loc.Region, readTimeoutShort) + c.Assert(err, IsNil) + c.Assert(resp.Resp, NotNil) + keyErr := resp.Resp.(*kvrpcpb.GetResponse).GetError() + c.Assert(keyErr, NotNil) + lock, err := extractLockFromKeyErr(keyErr) + c.Assert(err, IsNil) + return lock +} + +func (s *testProj6Suite) mustUnLock(c *C, key []byte) { + ver, err := s.store.CurrentVersion() + c.Assert(err, IsNil) + bo := NewBackoffer(context.Background(), getMaxBackoff) + req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ + Key: key, + Version: ver.Ver, + }) + loc, err := s.store.regionCache.LocateKey(bo, key) + c.Assert(err, IsNil) + resp, err := s.store.SendReq(bo, req, loc.Region, readTimeoutShort) + c.Assert(err, IsNil) + c.Assert(resp.Resp, NotNil) + keyErr := resp.Resp.(*kvrpcpb.GetResponse).GetError() + c.Assert(keyErr, IsNil) + //lock, err := extractLockFromKeyErr(keyErr) + //c.Assert(err, IsNil) + //c.Assert(lock, IsNil) +} + +func (s *testProj6Suite) begin(c *C) *tikvTxn { + txn, err := s.store.Begin() + c.Assert(err, IsNil) + return txn.(*tikvTxn) +} + +func (s *testProj6Suite) beginWithStartTs(c *C, ts uint64) *tikvTxn { + txn, err := s.store.BeginWithStartTS(ts) + c.Assert(err, IsNil) + return txn.(*tikvTxn) +} + +func (s *testProj6Suite) TestInitKeysAndMutations(c *C) { + txn := s.begin(c) + err := txn.Set([]byte("a"), []byte("a1")) + c.Assert(err, IsNil) + err = txn.Set([]byte("b"), []byte("b1")) + c.Assert(err, IsNil) + err = txn.Delete([]byte("c")) + c.Assert(err, IsNil) + err = txn.LockKeys(context.Background(), new(kv.LockCtx), kv.Key("a"), kv.Key("d")) + c.Assert(err, IsNil) + committer, err := newTwoPhaseCommitterWithInit(txn, 0) + c.Assert(err, IsNil) + err = committer.initKeysAndMutations() + c.Assert(err, IsNil) + + key2mut := map[string]kvrpcpb.Mutation{ + "a": { + Op: kvrpcpb.Op_Put, + Key: []byte("a"), + Value: []byte("a1"), + }, + "b": { + Op: kvrpcpb.Op_Put, + Key: []byte("b"), + Value: []byte("b1"), + }, + "c": { + Op: kvrpcpb.Op_Del, + Key: []byte("c"), + }, + "d": { + Op: kvrpcpb.Op_Lock, + Key: []byte("d"), + }, + } + c.Assert(len(committer.mutations), Equals, len(key2mut)) + c.Assert(len(committer.keys), Equals, len(key2mut)) + txnSize := 0 + for k, m := range committer.mutations { + c.Assert(m.Op, Equals, key2mut[k].Op) + c.Assert(m.Key, BytesEquals, key2mut[k].Key) + c.Assert(m.Value, BytesEquals, key2mut[k].Value) + delete(key2mut, k) + txnSize += len(m.Key) + len(m.Value) + } + c.Assert(committer.txnSize, Equals, txnSize) +} + +func (s *testProj6Suite) TestGroupKeysByRegion(c *C) { + var ( + key1 = []byte("Z") + key2 = []byte("a1") + key3 = []byte("a") + key4 = []byte("b") + key5 = []byte("c") + ) + keys := [][]byte{key2, key3, key4, key5} + rand.Shuffle(len(keys), func(i, j int) { keys[i], keys[j] = keys[j], keys[i] }) + keys = append([][]byte{key1}, keys...) + groups, first, err := s.store.GetRegionCache().GroupKeysByRegion(NewBackoffer(context.Background(), 1000), keys, nil) + c.Assert(err, IsNil) + c.Assert(first.GetID(), Equals, uint64(3)) + + keys2id := map[string]uint64{ + string(key1): 3, + string(key2): 4, + string(key3): 4, + string(key4): 5, + string(key5): 6, + } + + keyCnt := 0 + for region, keys := range groups { + for _, key := range keys { + keyCnt++ + c.Assert(region.GetID(), Equals, keys2id[string(key)]) + delete(keys2id, string(key)) + } + } + c.Assert(len(keys), Equals, keyCnt) +} + +func (s *testProj6Suite) TestBuildPrewriteRequest(c *C) { + txn := s.begin(c) + + k1 := []byte("k1") + v1 := []byte("v1") + k2 := []byte("k2") + v2 := []byte("v2") + k3 := []byte("k3") + v3 := []byte("v3") + batch := batchKeys{ + keys: [][]byte{k2, k3}, + } + + txn.Set(k1, v1) + txn.Set(k2, v2) + txn.Set(k3, v3) + committer, err := newTwoPhaseCommitterWithInit(txn, 0) + c.Assert(err, IsNil) + committer.primaryKey = k1 + c.Assert(committer.primary(), BytesEquals, k1) + r := committer.buildPrewriteRequest(batch) + c.Assert(r.Type, Equals, tikvrpc.CmdPrewrite) + req := r.Prewrite() + + c.Assert(req.PrimaryLock, BytesEquals, k1) + c.Assert(req.StartVersion, Equals, committer.startTS) + c.Assert(req.LockTtl, Equals, committer.lockTTL) + c.Assert(len(req.Mutations), Equals, len(batch.keys)) +} + +func (s *testProj6Suite) preparePrewritedTxn(c *C, pk string, kvs map[string]string) (*tikvTxn, *twoPhaseCommitter) { + txn := s.begin(c) + hasPk := false + keys := make([][]byte, 0, len(kvs)) + for k, v := range kvs { + keys = append(keys, []byte(k)) + err := txn.Set([]byte(k), []byte(v)) + c.Assert(err, IsNil) + if k == pk { + hasPk = true + } + } + c.Assert(hasPk, IsTrue) + committer, err := newTwoPhaseCommitterWithInit(txn, 0) + committer.primaryKey = []byte(pk) + c.Assert(err, IsNil) + bo := NewBackoffer(context.Background(), PrewriteMaxBackoff) + err = committer.prewriteKeys(bo, keys) + c.Assert(err, IsNil) + return txn, committer +} + +func (s *testProj6Suite) TestCommitSingleBatch(c *C) { + k1 := []byte("a1") + k2 := []byte("b1") + k3 := []byte("b2") + + _, committer := s.preparePrewritedTxn(c, "a1", map[string]string{ + "a1": "a1", + "b1": "b1", + "b2": "b2", + }) + action := actionCommit{} + bo := NewBackoffer(context.Background(), CommitMaxBackoff) + loc, err := s.store.regionCache.LocateKey(NewBackoffer(context.Background(), getMaxBackoff), k1) + c.Assert(err, IsNil) + action.handleSingleBatch(committer, bo, batchKeys{ + region: loc.Region, + keys: [][]byte{k1}, + }) + s.mustUnLock(c, k1) + s.mustGetLock(c, k2) + s.mustGetLock(c, k3) + + loc, err = s.store.regionCache.LocateKey(NewBackoffer(context.Background(), getMaxBackoff), k2) + c.Assert(err, IsNil) + action.handleSingleBatch(committer, bo, batchKeys{ + region: loc.Region, + keys: [][]byte{k2, k3}, + }) + s.mustUnLock(c, k1) + s.mustUnLock(c, k2) + s.mustUnLock(c, k3) + + s.checkValues(c, map[string]string{ + "a1": "a1", + "b1": "b1", + "b2": "b2", + }) +} + +func (s *testProj6Suite) TestUnCommit(c *C) { + k1 := []byte("k1") + k2 := []byte("k2") + + _, _ = s.preparePrewritedTxn(c, "k1", map[string]string{ + "k1": "v1", + "k2": "v2", + }) + + s.mustNotExist(c, k1, k2) +} + +func (s *testProj6Suite) TestCommit(c *C) { + var err error + k1 := []byte("k1") + k2 := []byte("k2") + + _, committer := s.preparePrewritedTxn(c, "k1", map[string]string{ + "k1": "v1", + "k2": "v2", + }) + // check lock + s.mustGetLock(c, k1) + s.mustGetLock(c, k2) + // commit keys + committer.commitTS, err = s.store.oracle.GetTimestamp(context.Background()) + c.Assert(err, IsNil) + err = committer.commitKeys(NewBackoffer(context.Background(), CommitMaxBackoff), [][]byte{k1, k2}) + c.Assert(err, IsNil) + s.checkValues(c, map[string]string{ + "k1": "v1", + "k2": "v2", + }) +} + +func (s *testProj6Suite) TestRollbackSingleBatch(c *C) { + k1 := []byte("a1") + k2 := []byte("b1") + k3 := []byte("b2") + + _, committer := s.preparePrewritedTxn(c, "a1", map[string]string{ + "a1": "a1", + "b1": "b1", + "b2": "b2", + }) + + action := actionCleanup{} + bo := NewBackoffer(context.Background(), cleanupMaxBackoff) + loc, err := s.store.regionCache.LocateKey(NewBackoffer(context.Background(), getMaxBackoff), k1) + c.Assert(err, IsNil) + action.handleSingleBatch(committer, bo, batchKeys{ + region: loc.Region, + keys: [][]byte{k1}, + }) + s.mustUnLock(c, k1) + s.mustGetLock(c, k2) + s.mustGetLock(c, k3) + + loc, err = s.store.regionCache.LocateKey(NewBackoffer(context.Background(), getMaxBackoff), k2) + c.Assert(err, IsNil) + action.handleSingleBatch(committer, bo, batchKeys{ + region: loc.Region, + keys: [][]byte{k2, k3}, + }) + s.mustUnLock(c, k1) + s.mustUnLock(c, k2) + s.mustUnLock(c, k3) + + s.mustNotExist(c, k1, k2, k3) +} + +func (s *testProj6Suite) TestRollback(c *C) { + var err error + k1 := []byte("k1") + k2 := []byte("k2") + + _, committer := s.preparePrewritedTxn(c, "k1", map[string]string{ + "k1": "v1", + "k2": "v2", + }) + bo := NewBackoffer(context.Background(), cleanupMaxBackoff) + err = committer.cleanupKeys(bo, [][]byte{k1, k2}) + c.Assert(err, IsNil) + + s.mustNotExist(c, k1, k2) +} + +func (s *testProj6Suite) TestResolveLock(c *C) { + k1 := []byte("a") + k2 := []byte("b") + k3 := []byte("c") + + txn, committer := s.preparePrewritedTxn(c, "a", map[string]string{ + "a": "a", + "b": "b", + "c": "c", + }) + var err error + committer.commitTS, err = s.store.oracle.GetTimestamp(context.Background()) + c.Assert(err, IsNil) + err = committer.commitKeys(NewBackoffer(context.Background(), CommitMaxBackoff), [][]byte{k1}) + c.Assert(err, IsNil) + + lr := newLockResolver(s.store) + bo := NewBackoffer(context.Background(), getMaxBackoff) + status, err := lr.GetTxnStatus(txn.StartTS(), committer.commitTS+1, k1) + c.Assert(err, IsNil) + // the transaction status from primary key is committed + c.Assert(status.IsCommitted(), IsTrue) + // resolve the lock for committed transactions + for _, k := range [][]byte{k2, k3} { + lock := s.mustGetLock(c, k) + c.Assert(lock, NotNil) + cleanRegions := make(map[RegionVerID]struct{}) + lr := newLockResolver(s.store) + status, err := lr.getTxnStatusFromLock(NewBackoffer(context.Background(), 1000), lock, committer.commitTS+1) + c.Assert(err, IsNil) + err = lr.resolveLock(bo, lock, status, cleanRegions) + c.Assert(err, IsNil) + // after resolve, check the lock is cleared + s.mustUnLock(c, k) + } +} + +func (s *testProj6Suite) TestFailAfterPrimary(c *C) { + point := "github.com/pingcap/tidb/store/tikv/mockFailAfterPK" + c.Assert(failpoint.Enable(point, `return(true)`), IsNil) + defer func() { + c.Assert(failpoint.Disable(point), IsNil) + }() + + txn := s.begin(c) + + k1 := []byte("a") + v1 := []byte("a") + k2 := []byte("b") + v2 := []byte("b") + k3 := []byte("c") + v3 := []byte("c") + + txn.Set(k1, v1) + txn.Set(k2, v2) + txn.Set(k3, v3) + + err := txn.Commit(context.Background()) + c.Assert(err, IsNil) + + s.checkValues(c, map[string]string{ + "a": "a", + "b": "b", + "c": "c", + }) +} + +func (s *testProj6Suite) TestGetResolveLockCommit(c *C) { + txn, committer := s.preparePrewritedTxn(c, "a", map[string]string{ + "a": "a", + "b": "b", + }) + k1 := []byte("a") + k2 := []byte("b") + var err error + committer.commitTS, err = s.store.oracle.GetTimestamp(context.Background()) + c.Assert(err, IsNil) + err = committer.commitKeys(NewBackoffer(context.Background(), CommitMaxBackoff), [][]byte{k1}) + c.Assert(err, IsNil) + + // there is a lock on k2 since it hasn't been committed + s.mustGetLock(c, k2) + + // transaction with smaller startTS will not see the lock + txn1 := s.beginWithStartTs(c, txn.startTS-1) + _, err = txn1.Get(context.Background(), k2) + c.Check(terror.ErrorEqual(err, kv.ErrNotExist), IsTrue) + + // check the lock is still exist + s.mustGetLock(c, k2) + + // tinysql will always use the latest ts to try resolving the lock + txn2 := s.beginWithStartTs(c, committer.commitTS+1) + val, err := txn2.Get(context.Background(), k2) + c.Assert(err, IsNil) + c.Assert(val, BytesEquals, []byte("b")) + + s.mustUnLock(c, k2) +} + + +func (s *testProj6Suite) TestGetResolveLockRollback(c *C) { + txn, committer := s.preparePrewritedTxn(c, "a", map[string]string{ + "a": "a", + "b": "b", + }) + k1 := []byte("a") + k2 := []byte("b") + + // there is a lock on k1 and k2 since they haven't been committed + s.mustGetLock(c, k1) + s.mustGetLock(c, k2) + + // transaction with smaller startTS will not see the lock + txn1 := s.beginWithStartTs(c, txn.startTS-1) + _, err := txn1.Get(context.Background(), k1) + c.Check(terror.ErrorEqual(err, kv.ErrNotExist), IsTrue) + _, err = txn1.Get(context.Background(), k2) + c.Check(terror.ErrorEqual(err, kv.ErrNotExist), IsTrue) + + // check the lock is still exist + s.mustGetLock(c, k1) + s.mustGetLock(c, k2) + + // tinysql will always use the latest ts to try resolving the lock + txn2 := s.beginWithStartTs(c, committer.startTS+1) + _, err = txn2.Get(context.Background(), k2) + c.Assert(err, NotNil) + c.Check(terror.ErrorEqual(err, kv.ErrNotExist), IsTrue) + + s.mustUnLock(c, k1) + s.mustUnLock(c, k2) +} diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index fe02ffbcf..1990450e8 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -453,27 +453,9 @@ func (c *RegionCache) LocateRegionByID(bo *Backoffer, regionID uint64) (*KeyLoca // 'PrimaryLockKey' and should be committed ahead of others. // filter is used to filter some unwanted keys. func (c *RegionCache) GroupKeysByRegion(bo *Backoffer, keys [][]byte, filter func(key, regionStartKey []byte) bool) (map[RegionVerID][][]byte, RegionVerID, error) { - groups := make(map[RegionVerID][][]byte) - var first RegionVerID - var lastLoc *KeyLocation - for i, k := range keys { - if lastLoc == nil || !lastLoc.Contains(k) { - var err error - lastLoc, err = c.LocateKey(bo, k) - if err != nil { - return nil, first, errors.Trace(err) - } - if filter != nil && filter(k, lastLoc.StartKey) { - continue - } - } - id := lastLoc.Region - if i == 0 { - first = id - } - groups[id] = append(groups[id], k) - } - return groups, first, nil + // YOUR CODE HERE (proj6). + panic("YOUR CODE HERE") + return nil, RegionVerID{}, nil } // ListRegionIDsInKeyRange lists ids of regions in [start_key,end_key]. diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 45b99d882..d9706ceb6 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -142,21 +142,14 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { cmdGetResp := resp.Resp.(*pb.GetResponse) val := cmdGetResp.GetValue() if keyErr := cmdGetResp.GetError(); keyErr != nil { - lock, err := extractLockFromKeyErr(keyErr) - if err != nil { - return nil, errors.Trace(err) - } - msBeforeExpired, err := cli.ResolveLocks(bo, s.version.Ver, []*Lock{lock}) - if err != nil { - return nil, errors.Trace(err) - } - if msBeforeExpired > 0 { - err = bo.BackoffWithMaxSleep(boTxnLockFast, int(msBeforeExpired), errors.New(keyErr.String())) - if err != nil { - return nil, errors.Trace(err) - } - } + // You need to handle the key error here + // If the key error is a lock, there are 2 possible cases: + // 1. The transaction is during commit, wait for a while and retry. + // 2. The transaction is dead with some locks left, resolve it. + // YOUR CODE HERE (proj6). + panic("YOUR CODE HERE") continue + } return val, nil } diff --git a/store/tikv/test_util.go b/store/tikv/test_util.go index bf1b999f9..896faf03d 100644 --- a/store/tikv/test_util.go +++ b/store/tikv/test_util.go @@ -14,12 +14,21 @@ package tikv import ( + "context" + "flag" + "github.com/google/uuid" + pb "github.com/pingcap-incubator/tinykv/proto/pkg/kvrpcpb" "github.com/pingcap-incubator/tinykv/scheduler/client" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/store/tikv/tikvrpc" ) +var mockStore = flag.Bool("mockStore", true, "use mock store when commit protocol is not implemented") + // NewTestTiKVStore creates a test store with Option func NewTestTiKVStore(client Client, pdClient pd.Client, clientHijack func(Client) Client, pdClientHijack func(pd.Client) pd.Client) (kv.Storage, error) { if clientHijack != nil { @@ -37,5 +46,119 @@ func NewTestTiKVStore(client Client, pdClient pd.Client, clientHijack func(Clien tikvStore, err := newTikvStore(uid, pdCli, spkv, client, false) tikvStore.mock = true + if *mockStore { + return &mockTikvStore{tikvStore}, errors.Trace(err) + } return tikvStore, errors.Trace(err) } + +type mockTikvStore struct { + *tikvStore +} + +type mockTransaction struct { + *tikvTxn +} + +type mockTwoPhaseCommitter struct { + *twoPhaseCommitter +} + +func (m *mockTikvStore) Begin() (kv.Transaction, error){ + txn, err := m.tikvStore.Begin() + if err != nil { + return nil, errors.Trace(err) + } + return &mockTransaction{txn.(*tikvTxn)}, nil +} + +func (m *mockTikvStore) BeginWithStartTS(startTS uint64) (kv.Transaction, error){ + txn, err := m.tikvStore.BeginWithStartTS(startTS) + if err != nil { + return nil, errors.Trace(err) + } + return &mockTransaction{txn.(*tikvTxn)}, nil +} + +func (txn *mockTransaction) Commit(ctx context.Context) error { + if !txn.valid { + return kv.ErrInvalidTxn + } + defer txn.close() + + failpoint.Inject("mockCommitError", func(val failpoint.Value) { + if val.(bool) && kv.IsMockCommitErrorEnable() { + kv.MockCommitErrorDisable() + failpoint.Return(errors.New("mock commit error")) + } + }) + + // connID is used for log. + var connID uint64 + val := ctx.Value(sessionctx.ConnID) + if val != nil { + connID = val.(uint64) + } + + var err error + committer := txn.committer + if committer == nil { + committer, err = newTwoPhaseCommitter(txn.tikvTxn, connID) + if err != nil { + return errors.Trace(err) + } + } + if err := committer.initKeysAndMutations(); err != nil { + return errors.Trace(err) + } + if len(committer.keys) == 0 { + return nil + } + mockCommitter := &mockTwoPhaseCommitter{committer} + err = mockCommitter.execute(ctx) + return errors.Trace(err) +} + +// execute executes the two-phase commit protocol. +func (c *mockTwoPhaseCommitter) execute(_ context.Context) (err error) { + if len(c.mutations) == 0 { + return nil + } + for k := range c.mutations { + c.primaryKey = []byte(k) + break + } + // prewrite + for k, m := range c.mutations { + req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, &pb.PrewriteRequest{ + Mutations: []*pb.Mutation{&m.Mutation}, + PrimaryLock: c.primary(), + StartVersion: c.startTS, + LockTtl: c.lockTTL, + }, pb.Context{}) + bo := NewBackoffer(context.Background(), PrewriteMaxBackoff) + sender := NewRegionRequestSender(c.store.regionCache, c.store.client) + loc, err := c.store.GetRegionCache().LocateKey(bo, []byte(k)) + if err != nil { + return err + } + sender.SendReq(bo, req, loc.Region,readTimeoutShort ) + } + + // commit + for k := range c.mutations { + req := tikvrpc.NewRequest(tikvrpc.CmdCommit, &pb.CommitRequest{ + StartVersion: c.startTS, + Keys: [][]byte{[]byte(k)}, + CommitVersion: c.commitTS, + }, pb.Context{}) + bo := NewBackoffer(context.Background(), CommitMaxBackoff) + sender := NewRegionRequestSender(c.store.regionCache, c.store.client) + loc, err := c.store.GetRegionCache().LocateKey(bo, []byte(k)) + if err != nil { + return err + } + sender.SendReq(bo, req, loc.Region,readTimeoutShort ) + } + return nil +} diff --git a/tablecodec/tablecodec.go b/tablecodec/tablecodec.go index 6f2ee20a3..d32e05e42 100644 --- a/tablecodec/tablecodec.go +++ b/tablecodec/tablecodec.go @@ -102,26 +102,44 @@ func DecodeIndexKeyPrefix(key kv.Key) (tableID int64, indexID int64, indexValues func DecodeIndexKey(key kv.Key) (tableID int64, indexID int64, indexValues []string, err error) { k := key - tableID, indexID, key, err = DecodeIndexKeyPrefix(key) + tableID, indexID, isRecord, err := DecodeKeyHead(key) if err != nil { return 0, 0, nil, errors.Trace(err) } + if isRecord { + err = errInvalidIndexKey.GenWithStack("invalid index key - %q", k) + return 0, 0, nil, err + } + indexKey := key[prefixLen+idLen:] + indexValues, err = DecodeValuesBytesToStrings(indexKey) + if err != nil { + err = errInvalidIndexKey.GenWithStack("invalid index key - %q %v", k, err) + return 0, 0, nil, err + } + return tableID, indexID, indexValues, nil +} - for len(key) > 0 { - remain, d, e := codec.DecodeOne(key) +// DecodeValuesBytesToStrings decode the raw bytes to strings for each columns. +// FIXME: Without the schema information, we can only decode the raw kind of +// the column. For instance, MysqlTime is internally saved as uint64. +func DecodeValuesBytesToStrings(b []byte) ([]string, error) { + var datumValues []string + for len(b) > 0 { + remain, d, e := codec.DecodeOne(b) if e != nil { - return 0, 0, nil, errInvalidIndexKey.GenWithStack("invalid index key - %q %v", k, e) + return nil, e } str, e1 := d.ToString() if e1 != nil { - return 0, 0, nil, errInvalidIndexKey.GenWithStack("invalid index key - %q %v", k, e1) + return nil, e } - indexValues = append(indexValues, str) - key = remain + datumValues = append(datumValues, str) + b = remain } - return + return datumValues, nil } + // EncodeRow encode row data and column ids into a slice of byte. // Row layout: colID1, value1, colID2, value2, ..... // valBuf and values pass by caller, for reducing EncodeRow allocates temporary bufs. If you pass valBuf and values as nil,