Skip to content

Commit

Permalink
Implement latching for MVCC transactions (talent-plan#70)
Browse files Browse the repository at this point in the history
* Implement latching for MVCC transactions

Signed-off-by: Nick Cameron <[email protected]>

* Refactor commands

Signed-off-by: Nick Cameron <[email protected]>

* Refactoring: make executing commands more succinct

Signed-off-by: Nick Cameron <[email protected]>

* Test latching

Signed-off-by: Nick Cameron <[email protected]>

* Address review comments and do some refactoring

Signed-off-by: Nick Cameron <[email protected]>
  • Loading branch information
nrc authored Feb 24, 2020
1 parent 0fbd08b commit bdf7c46
Show file tree
Hide file tree
Showing 26 changed files with 583 additions and 774 deletions.
20 changes: 20 additions & 0 deletions kv/tikv/inner_server/modify.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,23 @@ type Modify struct {
Type ModifyType
Data interface{}
}

func (m *Modify) Key() []byte {
switch m.Type {
case ModifyTypePut:
return m.Data.(Put).Key
case ModifyTypeDelete:
return m.Data.(Delete).Key
}
return nil
}

func (m *Modify) Cf() string {
switch m.Type {
case ModifyTypePut:
return m.Data.(Put).Cf
case ModifyTypeDelete:
return m.Data.(Delete).Cf
}
return ""
}
168 changes: 11 additions & 157 deletions kv/tikv/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"sync/atomic"
"time"

"github.com/juju/errors"

"github.com/pingcap-incubator/tinykv/kv/tikv/storage/commands"
"github.com/pingcap-incubator/tinykv/kv/tikv/storage/interfaces"
"github.com/pingcap-incubator/tinykv/proto/pkg/coprocessor"
Expand Down Expand Up @@ -48,68 +46,68 @@ func (svr *Server) Stop() error {
func (svr *Server) KvGet(ctx context.Context, req *kvrpcpb.GetRequest) (*kvrpcpb.GetResponse, error) {
cmd := commands.NewGet(req)
resp := <-svr.scheduler.Run(&cmd)
return getResponse(&resp)
return resp.Response.(*kvrpcpb.GetResponse), resp.Err
}

func (svr *Server) KvScan(ctx context.Context, req *kvrpcpb.ScanRequest) (*kvrpcpb.ScanResponse, error) {
cmd := commands.NewScan(req)
resp := <-svr.scheduler.Run(&cmd)
return scanResponse(&resp)
return resp.Response.(*kvrpcpb.ScanResponse), resp.Err
}

func (svr *Server) KvPrewrite(ctx context.Context, req *kvrpcpb.PrewriteRequest) (*kvrpcpb.PrewriteResponse, error) {
cmd := commands.NewPrewrite(req)
resp := <-svr.scheduler.Run(&cmd)
return prewriteResponse(&resp)
return resp.Response.(*kvrpcpb.PrewriteResponse), resp.Err
}

func (svr *Server) KvCommit(ctx context.Context, req *kvrpcpb.CommitRequest) (*kvrpcpb.CommitResponse, error) {
cmd := commands.NewCommit(req)
resp := <-svr.scheduler.Run(&cmd)
return commitResponse(&resp)
return resp.Response.(*kvrpcpb.CommitResponse), resp.Err
}

func (svr *Server) KvCheckTxnStatus(ctx context.Context, req *kvrpcpb.CheckTxnStatusRequest) (*kvrpcpb.CheckTxnStatusResponse, error) {
cmd := commands.NewCheckTxnStatus(req)
resp := <-svr.scheduler.Run(&cmd)
return checkTxnStatusResponse(&resp)
return resp.Response.(*kvrpcpb.CheckTxnStatusResponse), resp.Err
}

func (svr *Server) KvBatchRollback(ctx context.Context, req *kvrpcpb.BatchRollbackRequest) (*kvrpcpb.BatchRollbackResponse, error) {
cmd := commands.NewRollback(req)
resp := <-svr.scheduler.Run(&cmd)
return rollbackResponse(&resp)
return resp.Response.(*kvrpcpb.BatchRollbackResponse), resp.Err
}

func (svr *Server) KvResolveLock(ctx context.Context, req *kvrpcpb.ResolveLockRequest) (*kvrpcpb.ResolveLockResponse, error) {
cmd := commands.NewResolveLock(req)
resp := <-svr.scheduler.Run(&cmd)
return resolveLockResponse(&resp)
return resp.Response.(*kvrpcpb.ResolveLockResponse), resp.Err
}

// Raw API.
func (svr *Server) RawGet(ctx context.Context, req *kvrpcpb.RawGetRequest) (*kvrpcpb.RawGetResponse, error) {
cmd := commands.NewRawGet(req)
resp := <-svr.scheduler.Run(&cmd)
return rawGetResponse(&resp)
return resp.Response.(*kvrpcpb.RawGetResponse), resp.Err
}

func (svr *Server) RawPut(ctx context.Context, req *kvrpcpb.RawPutRequest) (*kvrpcpb.RawPutResponse, error) {
cmd := commands.NewRawPut(req)
resp := <-svr.scheduler.Run(&cmd)
return rawPutResponse(&resp)
return resp.Response.(*kvrpcpb.RawPutResponse), resp.Err
}

func (svr *Server) RawDelete(ctx context.Context, req *kvrpcpb.RawDeleteRequest) (*kvrpcpb.RawDeleteResponse, error) {
cmd := commands.NewRawDelete(req)
resp := <-svr.scheduler.Run(&cmd)
return rawDeleteResponse(&resp)
return resp.Response.(*kvrpcpb.RawDeleteResponse), resp.Err
}

func (svr *Server) RawScan(ctx context.Context, req *kvrpcpb.RawScanRequest) (*kvrpcpb.RawScanResponse, error) {
cmd := commands.NewRawScan(req)
resp := <-svr.scheduler.Run(&cmd)
return rawScanResponse(&resp)
return resp.Response.(*kvrpcpb.RawScanResponse), resp.Err
}

// Raft commands (tikv <-> tikv); these are trivially forwarded to innerServer.
Expand All @@ -125,147 +123,3 @@ func (svr *Server) Snapshot(stream tikvpb.Tikv_SnapshotServer) error {
func (svr *Server) Coprocessor(ctx context.Context, req *coprocessor.Request) (*coprocessor.Response, error) {
return &coprocessor.Response{}, nil
}

func getResponse(sr *interfaces.SchedResult) (*kvrpcpb.GetResponse, error) {
if sr.Err != nil {
return nil, sr.Err
}
if sr.Response == nil {
return nil, nil
}
resp, ok := sr.Response.(*kvrpcpb.GetResponse)
if ok {
return resp, nil
}
return nil, errors.New("Unexpected type in response")
}
func prewriteResponse(sr *interfaces.SchedResult) (*kvrpcpb.PrewriteResponse, error) {
if sr.Err != nil {
return nil, sr.Err
}
if sr.Response == nil {
return nil, nil
}
resp, ok := sr.Response.(*kvrpcpb.PrewriteResponse)
if ok {
return resp, nil
}
return nil, errors.New("Unexpected type in response")
}
func scanResponse(sr *interfaces.SchedResult) (*kvrpcpb.ScanResponse, error) {
if sr.Err != nil {
return nil, sr.Err
}
if sr.Response == nil {
return nil, nil
}
resp, ok := sr.Response.(*kvrpcpb.ScanResponse)
if ok {
return resp, nil
}
return nil, errors.New("Unexpected type in response")
}
func checkTxnStatusResponse(sr *interfaces.SchedResult) (*kvrpcpb.CheckTxnStatusResponse, error) {
if sr.Err != nil {
return nil, sr.Err
}
if sr.Response == nil {
return nil, nil
}
resp, ok := sr.Response.(*kvrpcpb.CheckTxnStatusResponse)
if ok {
return resp, nil
}
return nil, errors.New("Unexpected type in response")
}
func commitResponse(sr *interfaces.SchedResult) (*kvrpcpb.CommitResponse, error) {
if sr.Err != nil {
return nil, sr.Err
}
if sr.Response == nil {
return nil, nil
}
resp, ok := sr.Response.(*kvrpcpb.CommitResponse)
if ok {
return resp, nil
}
return nil, errors.New("Unexpected type in response")
}
func rollbackResponse(sr *interfaces.SchedResult) (*kvrpcpb.BatchRollbackResponse, error) {
if sr.Err != nil {
return nil, sr.Err
}
if sr.Response == nil {
return nil, nil
}
resp, ok := sr.Response.(*kvrpcpb.BatchRollbackResponse)
if ok {
return resp, nil
}
return nil, errors.New("Unexpected type in response")
}
func resolveLockResponse(sr *interfaces.SchedResult) (*kvrpcpb.ResolveLockResponse, error) {
if sr.Err != nil {
return nil, sr.Err
}
if sr.Response == nil {
return nil, nil
}
resp, ok := sr.Response.(*kvrpcpb.ResolveLockResponse)
if ok {
return resp, nil
}
return nil, errors.New("Unexpected type in response")
}
func rawGetResponse(sr *interfaces.SchedResult) (*kvrpcpb.RawGetResponse, error) {
if sr.Err != nil {
return nil, sr.Err
}
if sr.Response == nil {
return nil, nil
}
resp, ok := sr.Response.(*kvrpcpb.RawGetResponse)
if ok {
return resp, nil
}
return nil, errors.New("Unexpected type in response")
}
func rawPutResponse(sr *interfaces.SchedResult) (*kvrpcpb.RawPutResponse, error) {
if sr.Err != nil {
return nil, sr.Err
}
if sr.Response == nil {
return nil, nil
}
resp, ok := sr.Response.(*kvrpcpb.RawPutResponse)
if ok {
return resp, nil
}
return nil, errors.New("Unexpected type in response")
}
func rawDeleteResponse(sr *interfaces.SchedResult) (*kvrpcpb.RawDeleteResponse, error) {
if sr.Err != nil {
return nil, sr.Err
}
if sr.Response == nil {
return nil, nil
}
resp, ok := sr.Response.(*kvrpcpb.RawDeleteResponse)
if ok {
return resp, nil
}
return nil, errors.New("Unexpected type in response")
}
func rawScanResponse(sr *interfaces.SchedResult) (*kvrpcpb.RawScanResponse, error) {
if sr.Err != nil {
return nil, sr.Err
}
if sr.Response == nil {
return nil, nil
}
resp, ok := sr.Response.(*kvrpcpb.RawScanResponse)
if ok {
return resp, nil
}
return nil, errors.New("Unexpected type in response")
}
48 changes: 48 additions & 0 deletions kv/tikv/storage/commands/base.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package commands

import (
"github.com/pingcap-incubator/tinykv/kv/tikv/inner_server"
"github.com/pingcap-incubator/tinykv/kv/tikv/storage/kvstore"
"github.com/pingcap-incubator/tinykv/proto/pkg/kvrpcpb"
"reflect"
)

// This file contains some base types for commands to reduce boilerplate.

type CommandBase struct {
context *kvrpcpb.Context
}

func (base CommandBase) Context() *kvrpcpb.Context {
return base.context
}

func (base CommandBase) Read(txn *kvstore.RoTxn) (interface{}, [][]byte, error) {
return nil, nil, nil
}

type ReadOnly struct{}

func (ro ReadOnly) WillWrite() [][]byte {
return nil
}

func (ro ReadOnly) PrepareWrites(txn *kvstore.MvccTxn) (interface{}, error) {
return nil, nil
}

func regionError(err error, resp interface{}) (interface{}, error) {
if regionErr, ok := err.(*inner_server.RegionError); ok {
respValue := reflect.ValueOf(resp)
respValue.FieldByName("RegionError").Set(reflect.ValueOf(regionErr.RequestErr))
return resp, nil
}

return nil, err
}

// regionErrorRo is a convenience version of regionError to match the return type of Read.
func regionErrorRo(err error, resp interface{}) (interface{}, [][]byte, error) {
resp, err = regionError(err, resp)
return resp, nil, err
}
Loading

0 comments on commit bdf7c46

Please sign in to comment.