Skip to content

Commit

Permalink
Add proj6 (talent-plan#93)
Browse files Browse the repository at this point in the history
* add proj6

Signed-off-by: you06 <[email protected]>

* add proj6

Signed-off-by: you06 <[email protected]>

* fix typo

Signed-off-by: you06 <[email protected]>

* typo

Signed-off-by: you06 <[email protected]>

* sort imports

Signed-off-by: you06 <[email protected]>

* fix test

Signed-off-by: you06 <[email protected]>

* Update courses/proj6-README-zh_CN.md

Co-authored-by: rebelice <[email protected]>

* Update courses/proj6-README-zh_CN.md

Co-authored-by: rebelice <[email protected]>

* add description about percolator

Signed-off-by: you06 <[email protected]>

Co-authored-by: rebelice <[email protected]>
  • Loading branch information
you06 and rebelice authored Nov 22, 2021
1 parent 1f08cee commit 1bdb9a3
Show file tree
Hide file tree
Showing 9 changed files with 805 additions and 151 deletions.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -266,3 +266,8 @@ vectorized-bench:
-bench=BenchmarkVectorizedBuiltin$(VB_FILE)Func \
-run=BenchmarkVectorizedBuiltin$(VB_FILE)Func \
-args "$(VB_FUNC)"


proj6: failpoint-enable
go test -timeout 600s ./store/tikv -mockStore=false
@$(FAILPOINT_DISABLE)
80 changes: 80 additions & 0 deletions courses/proj6-README-zh_CN.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# 分布式事务: Percolator

## 概览

在这一章节,我们将实现 Percolator 提交协议。

## Percolator

[Percolator](https://research.google/pubs/pub36726/) 是 Google 于 2010 公开的一种分布式事务的处理协议。Percolator 协议将事务提交分为 Prewrite 与 Commit 两个阶段,实现了无协调者的分布式事务数据库。Google Spanner 借助于这一协议与 GPS 和原子钟的时钟同步协议实现了全球部署的分布式数据库。因为优秀的扩展性,它也被各家分布式数据库所使用,TiDB 与 TinySQL 的事务实现也基于这一协议。

## 执行模型介绍

在 TiDB 中,事务的执行过程会被缓存在 buffer 中,在提交时,才会通过 Percolator 提交协议将其完整的写入到分布的 TiKV 存储引擎中。这一调用的入口是 `store/tikv/txn.go` 中的 `tikvTxn.Commit` 函数。

在执行时,一个事务可能会遇到其他执行过程中的事务,此时需要通过 Lock Resolve 组件来查询所遇到的事务状态,并根据查询到的结果执行相应的措施。

## Two Phase Commit

Percolator 提交协议的两阶段提交分为 Prewrite 和 Commit:Prewrite 实际写入数据,Commit 让数据对外可见。其中,事务的成功以 Primary Key 为原子性标记。当 Prewrite 失败或是 Primary Key Commit 失败时需要进行垃圾清理,将写入的事务回滚。

一个事务中的 Key 可能会涉及到不同的 Region,在对 Key 进行写操作时,需要将其发送到正确的 Region 上才能够处理,`GroupKeysByRegion` 函数根据 region cache 将 Key 按 Region 分成多个 batch,但是可能出现因缓存过期而导致对应的存储节点返回 Region Error,此时需要分割 batch 后重试。

### TODO

为了让对于 Key 的操作能够执行,需要实现 `region_cache.go` 中的 `GroupKeysByRegion` 函数。

在执行过程中,会涉及到三类操作,分别是 Prewrite/Commit/Rollback(Cleanup)。这些操作会在同一个流程中被处理。你需要完成 Prewrite 过程中的 `buildPrewriteRequest` 函数,然后仿照 Prewrite 的 `handleSingleBatch` 函数完成 Commit 和 Rollback 的 `handleSingleBatch` 函数。

## Lock Resolver

在 Prewrite 阶段,对于一个 Key 的操作会写入两条记录。

- Default CF 中存储了实际的 KV 数据。
- Lock CF 中存储了锁,包括 Key 和时间戳信息,会在 Commit 成功时清理。

Lock Resolver 的职责就是应对一个事务在提交过程中遇到 Lock 的情况。

当一个事务遇到 Lock 时,可能有几种情况。

- Lock 所属的事务还未提交这个 Key,Lock 尚未被清理;
- Lock 所属的事务遇到了不可恢复的错误,正在回滚中,尚未清理 Key;
- Lock 所属事务的节点发生了意外错误,例如节点 crash,这个 Lock 所属的节点已经不能够更新它。

在 Percolator 协议下,会通过查询 Lock 所属的 Primary Key 来判断事务的状态,但是当读取到一个未完成的事务(Primary Key 的 Lock 尚未被清理)时,我们所期望的,是等待提交中的事物至完成状态,并且清理如 crash 等异常留下的垃圾数据。此时会借助 ttl 来判断事务是否过期,遇到过期事务时则会主动 Rollback 它。

### TODO

`lock_resolver.go` 中完成 `getTxnStatus``resolveLock` 函数,使得向外暴露的 `ResolveLocks` 函数能够正常运行。

除了在事务的提交过程中,事务对数据进行读取的时候也可能遇到 Lock,此时也会触发 `ResolveLocks` 函数,完成 `snapshot.go` 中的 `tikvSnapshot.get` 函数,让读请求能够正常运行。

## Failpoint

[Failpoint](https://github.com/pingcap/failpoint) 是一种通过内置注入错误测试常规代码路径的手段。Golang 中的 failpoint 使用代码生成实现,因此在使用了 failpoint 的测试中,需要先打开 failpoint。

打开 failpoint,原始代码会被暂时隐藏到 `{filename}__failpoint_stash__`,请勿删除。

```sh
make failpoint-enable
```

关闭 failpoint 会将 `{filename}__failpoint_stash__` 中的代码恢复到原文件中,并删除暂存用的 stash 文件。

```sh
make failpoint-disable
```

因为 failpoint 开关状态不同时,使用了 failpoint 的 `.go` 文件代码会有不同,请在关闭 failpoint 的状态下进行代码的提交。

## 测试

运行 `make proj6`,通过所有测试用例。

可以使用下面的命令测试指定的单个或多个用例。

```sh
go test {package path} -check.f ^{regex}$
# example
go test -timeout 5s ./store/tikv -check.f ^TestFailAfterPrimary$
```
117 changes: 29 additions & 88 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import (
"sync"
"time"

"github.com/pingcap/failpoint"
pb "github.com/pingcap-incubator/tinykv/proto/pkg/kvrpcpb"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/terror"

"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -346,18 +346,9 @@ func (c *twoPhaseCommitter) keySize(key []byte) int {
}

func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchKeys) *tikvrpc.Request {
mutations := make([]*pb.Mutation, len(batch.keys))
for i, k := range batch.keys {
tmp := c.mutations[string(k)]
mutations[i] = &tmp.Mutation
}

req := &pb.PrewriteRequest{
Mutations: mutations,
PrimaryLock: c.primary(),
StartVersion: c.startTS,
LockTtl: c.lockTTL,
}
var req *pb.PrewriteRequest
// YOUR CODE HERE (proj6).
panic("YOUR CODE HERE")
return tikvrpc.NewRequest(tikvrpc.CmdPrewrite, req, pb.Context{})
}

Expand Down Expand Up @@ -427,14 +418,12 @@ func (c *twoPhaseCommitter) getUndeterminedErr() error {
}

func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error {
req := tikvrpc.NewRequest(tikvrpc.CmdCommit, &pb.CommitRequest{
StartVersion: c.startTS,
Keys: batch.keys,
CommitVersion: c.commitTS,
}, pb.Context{})

sender := NewRegionRequestSender(c.store.regionCache, c.store.client)
resp, err := sender.SendReq(bo, req, batch.region, readTimeoutShort)
// follow actionPrewrite.handleSingleBatch, build the commit request
var sender *RegionRequestSender
var err error
// build and send the commit request
// YOUR CODE HERE (proj6).
panic("YOUR CODE HERE")

// If we fail to receive response for the request that commits primary key, it will be undetermined whether this
// transaction has been successfully committed.
Expand All @@ -446,49 +435,18 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
c.setUndeterminedErr(errors.Trace(sender.rpcError))
}

if err != nil {
return errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
failpoint.Inject("mockFailAfterPK", func() {
if !isPrimary {
err = errors.New("commit secondary keys error")
}
// re-split keys and commit again.
err = c.commitKeys(bo, batch.keys)
})
if err != nil {
return errors.Trace(err)
}
if resp.Resp == nil {
return errors.Trace(ErrBodyMissing)
}
commitResp := resp.Resp.(*pb.CommitResponse)
// Here we can make sure tikv has processed the commit primary key request. So
// we can clean undetermined error.
if isPrimary {
c.setUndeterminedErr(nil)
}
if keyErr := commitResp.GetError(); keyErr != nil {
c.mu.RLock()
defer c.mu.RUnlock()
err = extractKeyErr(keyErr)
if c.mu.committed {
// No secondary key could be rolled back after it's primary key is committed.
// There must be a serious bug somewhere.
logutil.BgLogger().Error("2PC failed commit key after primary key committed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
return errors.Trace(err)
}
// The transaction maybe rolled back by concurrent transactions.
logutil.BgLogger().Debug("2PC failed commit primary key",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
return err
}

// handle the response and error refer to actionPrewrite.handleSingleBatch
// YOUR CODE HERE (proj6).
panic("YOUR CODE HERE")

c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -499,34 +457,17 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
}

func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error {
req := tikvrpc.NewRequest(tikvrpc.CmdBatchRollback, &pb.BatchRollbackRequest{
Keys: batch.keys,
StartVersion: c.startTS,
}, pb.Context{})
resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort)
if err != nil {
return errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
}
err = c.cleanupKeys(bo, batch.keys)
return errors.Trace(err)
}
if keyErr := resp.Resp.(*pb.BatchRollbackResponse).GetError(); keyErr != nil {
err = errors.Errorf("conn %d 2PC cleanup failed: %s", c.connID, keyErr)
logutil.BgLogger().Debug("2PC failed cleanup key",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
return errors.Trace(err)
}
// follow actionPrewrite.handleSingleBatch, build the rollback request

// build and send the rollback request
// YOUR CODE HERE (proj6).
panic("YOUR CODE HERE")
// handle the response and error refer to actionPrewrite.handleSingleBatch

// YOUR CODE HERE (proj6).
panic("YOUR CODE HERE")
return nil

}

func (c *twoPhaseCommitter) prewriteKeys(bo *Backoffer, keys [][]byte) error {
Expand Down
41 changes: 21 additions & 20 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,10 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte
// 2.3 No lock -- concurrence prewrite.

var status TxnStatus
req := tikvrpc.NewRequest(tikvrpc.CmdCheckTxnStatus, &kvrpcpb.CheckTxnStatusRequest{
PrimaryKey: primary,
LockTs: txnID,
CurrentTs: currentTS,
})
var req *tikvrpc.Request
// build the request
// YOUR CODE HERE (proj6).
panic("YOUR CODE HERE")
for {
loc, err := lr.store.GetRegionCache().LocateKey(bo, primary)
if err != nil {
Expand All @@ -320,18 +319,19 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte
if resp.Resp == nil {
return status, errors.Trace(ErrBodyMissing)
}
cmdResp := resp.Resp.(*kvrpcpb.CheckTxnStatusResponse)
status.action = cmdResp.Action
if cmdResp.LockTtl != 0 {
status.ttl = cmdResp.LockTtl
} else {
status.commitTS = cmdResp.CommitVersion
lr.saveResolved(txnID, status)
}
_ = resp.Resp.(*kvrpcpb.CheckTxnStatusResponse)

// Assign status with response
// YOUR CODE HERE (proj6).
panic("YOUR CODE HERE")
return status, nil
}

}

// resolveLock resolve the lock for the given transaction status which is checked from primary key.
// If status is committed, the secondary should also be committed.
// If status is not committed and the
func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status TxnStatus, cleanRegions map[RegionVerID]struct{}) error {
cleanWholeRegion := l.TxnSize >= bigTxnThreshold
for {
Expand All @@ -342,13 +342,13 @@ func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status TxnStatus, cl
if _, ok := cleanRegions[loc.Region]; ok {
return nil
}
lreq := &kvrpcpb.ResolveLockRequest{
StartVersion: l.TxnID,
}
if status.IsCommitted() {
lreq.CommitVersion = status.CommitTS()
}
req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq)

var req *tikvrpc.Request

// build the request
// YOUR CODE HERE (proj6).
panic("YOUR CODE HERE")

resp, err := lr.store.SendReq(bo, req, loc.Region, readTimeoutShort)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -378,4 +378,5 @@ func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status TxnStatus, cl
}
return nil
}

}
Loading

0 comments on commit 1bdb9a3

Please sign in to comment.