Skip to content

Commit

Permalink
*: add a priority option to transaction, use high priority for meta (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored Jun 28, 2017
1 parent b5e6c02 commit 6b6b03d
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 7 deletions.
9 changes: 9 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ const (
SchemaLeaseChecker
// IsolationLevel sets isolation level for current transaction. The default level is SI.
IsolationLevel
// Priority marks the priority of this transaction.
Priority
)

// Priority value for transaction priority.
const (
PriorityNormal int = iota
PriorityLow
PriorityHigh
)

// IsoLevel is the transaction's isolation level.
Expand Down
1 change: 1 addition & 0 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type Meta struct {

// NewMeta creates a Meta in transaction txn.
func NewMeta(txn kv.Transaction) *Meta {
txn.SetOption(kv.Priority, kv.PriorityHigh)
t := structure.NewStructure(txn, txn, mMetaPrefix)
return &Meta{txn: t}
}
Expand Down
26 changes: 23 additions & 3 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type twoPhaseCommitter struct {
committed bool
undetermined bool
}
priority pb.CommandPri
}

// newTwoPhaseCommitter creates a twoPhaseCommitter.
Expand Down Expand Up @@ -147,6 +148,7 @@ func newTwoPhaseCommitter(txn *tikvTxn) (*twoPhaseCommitter, error) {
keys: keys,
mutations: mutations,
lockTTL: txnLockTTL(txn.startTime, size),
priority: getTxnPriority(txn),
}, nil
}

Expand Down Expand Up @@ -333,7 +335,8 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
skipCheck = true
}
req := &tikvrpc.Request{
Type: tikvrpc.CmdPrewrite,
Type: tikvrpc.CmdPrewrite,
Priority: c.priority,
Prewrite: &pb.PrewriteRequest{
Mutations: mutations,
PrimaryLock: c.primary(),
Expand All @@ -342,7 +345,6 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
SkipConstraintCheck: skipCheck,
},
}

for {
resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort)
if err != nil {
Expand Down Expand Up @@ -402,9 +404,27 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
}
}

func getTxnPriority(txn *tikvTxn) pb.CommandPri {
if pri := txn.us.GetOption(kv.Priority); pri != nil {
return kvPriorityToCommandPri(pri.(int))
}
return pb.CommandPri_Normal
}

func kvPriorityToCommandPri(pri int) pb.CommandPri {
switch pri {
case kv.PriorityLow:
return pb.CommandPri_Low
case kv.PriorityHigh:
return pb.CommandPri_High
}
return pb.CommandPri_Normal
}

func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) error {
req := &tikvrpc.Request{
Type: tikvrpc.CmdCommit,
Type: tikvrpc.CmdCommit,
Priority: c.priority,
Commit: &pb.CommitRequest{
StartVersion: c.startTS,
Keys: batch.keys,
Expand Down
3 changes: 2 additions & 1 deletion store/tikv/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ func (s *Scanner) getData(bo *Backoffer) error {
return errors.Trace(err)
}
req := &tikvrpc.Request{
Type: tikvrpc.CmdScan,
Type: tikvrpc.CmdScan,
Priority: s.snapshot.priority,
Scan: &pb.ScanRequest{
StartKey: []byte(s.nextStartKey),
Limit: uint32(s.batchSize),
Expand Down
8 changes: 6 additions & 2 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type tikvSnapshot struct {
store *tikvStore
version kv.Version
isolationLevel kv.IsoLevel
priority pb.CommandPri
}

// newTiKVSnapshot creates a snapshot of an TiKV store.
Expand All @@ -48,6 +49,7 @@ func newTiKVSnapshot(store *tikvStore, ver kv.Version) *tikvSnapshot {
store: store,
version: ver,
isolationLevel: kv.SI,
priority: pb.CommandPri_Normal,
}
}

Expand Down Expand Up @@ -121,7 +123,8 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll
pending := batch.keys
for {
req := &tikvrpc.Request{
Type: tikvrpc.CmdBatchGet,
Type: tikvrpc.CmdBatchGet,
Priority: s.priority,
BatchGet: &pb.BatchGetRequest{
Keys: pending,
Version: s.version.Ver,
Expand Down Expand Up @@ -198,7 +201,8 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
sender := NewRegionRequestSender(s.store.regionCache, s.store.client, pbIsolationLevel(s.isolationLevel))

req := &tikvrpc.Request{
Type: tikvrpc.CmdGet,
Type: tikvrpc.CmdGet,
Priority: s.priority,
Get: &pb.GetRequest{
Key: k,
Version: s.version.Ver,
Expand Down
57 changes: 57 additions & 0 deletions store/tikv/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"github.com/juju/errors"
. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/errorpb"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
pd "github.com/pingcap/pd/pd-client"
"github.com/pingcap/tidb"
"github.com/pingcap/tidb/kv"
mocktikv "github.com/pingcap/tidb/store/tikv/mock-tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
Expand Down Expand Up @@ -332,3 +334,58 @@ func (c *mockPDClient) GetStore(ctx goctx.Context, storeID uint64) (*metapb.Stor
}

func (c *mockPDClient) Close() {}

type checkRequestClient struct {
Client
priority pb.CommandPri
}

func (c *checkRequestClient) SendReq(ctx goctx.Context, addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
if c.priority != req.Priority {
return nil, errors.New("request check error")
}

return c.Client.SendReq(ctx, addr, req)
}

func (s *testStoreSuite) TestRequestPriority(c *C) {
client := &checkRequestClient{
Client: s.store.client,
}
s.store.client = client

// Cover 2PC commit.
txn, err := s.store.Begin()
c.Assert(err, IsNil)
client.priority = pb.CommandPri_High
txn.SetOption(kv.Priority, kv.PriorityHigh)
err = txn.Set([]byte("key"), []byte("value"))
c.Assert(err, IsNil)
err = txn.Commit()
c.Assert(err, IsNil)

// Cover the basic Get request.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
client.priority = pb.CommandPri_Low
txn.SetOption(kv.Priority, kv.PriorityLow)
_, err = txn.Get([]byte("key"))
c.Assert(err, IsNil)

// A counter example.
client.priority = pb.CommandPri_Low
txn.SetOption(kv.Priority, kv.PriorityNormal)
_, err = txn.Get([]byte("key"))
// err is translated to "try again later" by backoffer, so doesn't check error value here.
c.Assert(err, NotNil)

// Cover Seek request.
client.priority = pb.CommandPri_High
txn.SetOption(kv.Priority, kv.PriorityHigh)
iter, err := txn.Seek([]byte("key"))
c.Assert(err, IsNil)
for iter.Valid() {
c.Assert(iter.Next(), IsNil)
}
iter.Close()
}
2 changes: 2 additions & 0 deletions store/tikv/tikvrpc/tikvrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
// Request wraps all kv/coprocessor requests.
type Request struct {
Type CmdType
Priority kvrpcpb.CommandPri
Get *kvrpcpb.GetRequest
Scan *kvrpcpb.ScanRequest
Prewrite *kvrpcpb.PrewriteRequest
Expand Down Expand Up @@ -122,6 +123,7 @@ type Response struct {

// SetContext set the Context field for the given req to the specifed ctx.
func SetContext(req *Request, ctx *kvrpcpb.Context) error {
ctx.Priority = req.Priority
switch req.Type {
case CmdGet:
req.Get.Context = ctx
Expand Down
5 changes: 4 additions & 1 deletion store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,11 @@ func (txn *tikvTxn) Delete(k kv.Key) error {

func (txn *tikvTxn) SetOption(opt kv.Option, val interface{}) {
txn.us.SetOption(opt, val)
if opt == kv.IsolationLevel {
switch opt {
case kv.IsolationLevel:
txn.snapshot.isolationLevel = val.(kv.IsoLevel)
case kv.Priority:
txn.snapshot.priority = kvPriorityToCommandPri(val.(int))
}
}

Expand Down

0 comments on commit 6b6b03d

Please sign in to comment.