Skip to content

Commit

Permalink
store/tikv: tiny update mocktikv (pingcap#4555)
Browse files Browse the repository at this point in the history
* store/tikv: tiny update mocktikv

1. for mocktikv, close storage should close the leveldb engine
2. set tikvStore's mock flag in NewMockTikvStore function
  • Loading branch information
tiancaiamao authored and coocood committed Sep 19, 2017
1 parent 8794374 commit f1022c8
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 8 deletions.
8 changes: 4 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ func (b *executorBuilder) buildTableScan(v *plan.PhysicalTableScan) Executor {
if b.err != nil {
return nil
}
table, _ := b.is.TableByID(v.Table.ID)
tbl, _ := b.is.TableByID(v.Table.ID)
client := b.ctx.GetClient()
supportDesc := client.IsRequestTypeSupported(kv.ReqTypeSelect, kv.ReqSubTypeDesc)
var handleCol *expression.Column
Expand All @@ -661,7 +661,7 @@ func (b *executorBuilder) buildTableScan(v *plan.PhysicalTableScan) Executor {
ctx: b.ctx,
startTS: startTS,
supportDesc: supportDesc,
table: table,
table: tbl,
schema: v.Schema(),
Columns: v.Columns,
ranges: v.Ranges,
Expand All @@ -684,7 +684,7 @@ func (b *executorBuilder) buildIndexScan(v *plan.PhysicalIndexScan) Executor {
if b.err != nil {
return nil
}
table, _ := b.is.TableByID(v.Table.ID)
tbl, _ := b.is.TableByID(v.Table.ID)
client := b.ctx.GetClient()
supportDesc := client.IsRequestTypeSupported(kv.ReqTypeIndex, kv.ReqSubTypeDesc)
var handleCol *expression.Column
Expand All @@ -695,7 +695,7 @@ func (b *executorBuilder) buildIndexScan(v *plan.PhysicalIndexScan) Executor {
tableInfo: v.Table,
ctx: b.ctx,
supportDesc: supportDesc,
table: table,
table: tbl,
singleReadMode: !v.DoubleRead,
startTS: startTS,
where: v.TableConditionPBExpr,
Expand Down
7 changes: 3 additions & 4 deletions store/tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,13 @@ func newTikvStore(uuid string, pdClient pd.Client, client Client, enableGC bool)
if err != nil {
return nil, errors.Trace(err)
}
_, mock := client.(*mocktikv.RPCClient)
store := &tikvStore{
clusterID: pdClient.GetClusterID(goctx.TODO()),
uuid: uuid,
oracle: o,
client: client,
pdClient: pdClient,
regionCache: NewRegionCache(pdClient),
mock: mock,
}
store.lockResolver = newLockResolver(store)
store.enableGC = enableGC
Expand Down Expand Up @@ -213,7 +211,6 @@ func NewMockTikvStore(options ...MockTiKVStoreOption) (kv.Storage, error) {

mvccStore := opt.mvccStore
if mvccStore == nil {
// mvccStore = mocktikv.NewMvccStore()
var err error
mvccStore, err = mocktikv.NewMVCCLevelDB(opt.path)
if err != nil {
Expand All @@ -234,7 +231,9 @@ func NewMockTikvStore(options ...MockTiKVStoreOption) (kv.Storage, error) {
pdCli = opt.pdClientHijack(pdCli)
}

return newTikvStore(uuid, pdCli, client, false)
tikvStore, err := newTikvStore(uuid, pdCli, client, false)
tikvStore.mock = true
return tikvStore, errors.Trace(err)
}

func (s *tikvStore) Begin() (kv.Transaction, error) {
Expand Down
5 changes: 5 additions & 0 deletions store/tikv/mock-tikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,3 +819,8 @@ func (mvcc *MVCCLevelDB) ResolveLock(startKey, endKey []byte, startTS, commitTS
}
return mvcc.db.Write(batch, nil)
}

// Close calls leveldb's Close to free resources.
func (mvcc *MVCCLevelDB) Close() error {
return mvcc.db.Close()
}
6 changes: 6 additions & 0 deletions store/tikv/mock-tikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package mocktikv

import (
"io"

"github.com/golang/protobuf/proto"
"github.com/juju/errors"
"github.com/pingcap/kvproto/pkg/coprocessor"
Expand Down Expand Up @@ -401,6 +403,7 @@ type RPCClient struct {
}

// NewRPCClient creates an RPCClient.
// Note that close the RPCClient may close the underlying MvccStore.
func NewRPCClient(cluster *Cluster, mvccStore MVCCStore) *RPCClient {
return &RPCClient{
Cluster: cluster,
Expand Down Expand Up @@ -601,5 +604,8 @@ func (c *RPCClient) SendReq(ctx goctx.Context, addr string, req *tikvrpc.Request

// Close closes the client.
func (c *RPCClient) Close() error {
if raw, ok := c.MvccStore.(io.Closer); ok {
return raw.Close()
}
return nil
}

0 comments on commit f1022c8

Please sign in to comment.