Skip to content

Commit

Permalink
*: support history read. (pingcap#1734)
Browse files Browse the repository at this point in the history
* *: support history read.

This commit only handles the case when scheme does not change.
Use history schema will be supported in the following PR.
  • Loading branch information
coocood authored Sep 19, 2016
1 parent bff8f56 commit ade11fe
Show file tree
Hide file tree
Showing 19 changed files with 163 additions and 52 deletions.
3 changes: 3 additions & 0 deletions context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type Context interface {
// GetTxn gets a transaction for further execution.
GetTxn(forceNew bool) (kv.Transaction, error)

// GetClient gets a kv.Client.
GetClient() kv.Client

// RollbackTxn rolls back the current transaction.
RollbackTxn() error

Expand Down
4 changes: 4 additions & 0 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func (c *reorgContext) CommitTxn() error {
return c.finishTxn(false)
}

func (c *reorgContext) GetClient() kv.Client {
return c.store.GetClient()
}

func (c *reorgContext) SetValue(key fmt.Stringer, value interface{}) {
c.m[key] = value
}
Expand Down
10 changes: 10 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/plan"
"github.com/pingcap/tidb/sessionctx/variable"
)

// recordSet wraps an executor, implements ast.RecordSet interface
Expand Down Expand Up @@ -96,6 +97,15 @@ func (a *statement) Exec(ctx context.Context) (ast.RecordSet, error) {
}

if len(e.Fields()) == 0 && len(e.Schema()) == 0 {
// Write statements do not have record set, check if snapshot ts is set.
switch e.(type) {
case *DeleteExec, *InsertExec, *UpdateExec, *ReplaceExec, *LoadData, *DDLExec:
snapshotTS := variable.GetSnapshotTS(ctx)
if snapshotTS != 0 {
return nil, errors.New("Can not execute write statement when 'tidb_snapshot' is set.")
}
}

// No result fields means no Recordset.
defer e.Close()
for {
Expand Down
38 changes: 22 additions & 16 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,12 +483,7 @@ func (b *executorBuilder) buildAggregation(v *plan.Aggregation) Executor {
if !ok {
return e
}
txn, err := b.ctx.GetTxn(false)
if err != nil {
b.err = err
return nil
}
client := txn.GetClient()
client := b.ctx.GetClient()
if len(v.GroupByItems) > 0 && !client.SupportRequestType(kv.ReqTypeSelect, kv.ReqSubTypeGroupBy) {
return e
}
Expand Down Expand Up @@ -605,14 +600,26 @@ func (b *executorBuilder) buildTableDual(v *plan.TableDual) Executor {
return &TableDualExec{schema: v.GetSchema()}
}

func (b *executorBuilder) getStartTS() uint64 {
startTS := variable.GetSnapshotTS(b.ctx)
if startTS == 0 {
txn, err := b.ctx.GetTxn(false)
if err != nil {
b.err = errors.Trace(err)
return 0
}
startTS = txn.StartTS()
}
return startTS
}

func (b *executorBuilder) buildTableScan(v *plan.PhysicalTableScan, s *plan.Selection) Executor {
txn, err := b.ctx.GetTxn(false)
if err != nil {
b.err = errors.Trace(err)
startTS := b.getStartTS()
if b.err != nil {
return nil
}
table, _ := b.is.TableByID(v.Table.ID)
client := txn.GetClient()
client := b.ctx.GetClient()
var memDB bool
switch v.DBName.L {
case "information_schema", "performance_schema":
Expand All @@ -623,7 +630,7 @@ func (b *executorBuilder) buildTableScan(v *plan.PhysicalTableScan, s *plan.Sele
st := &XSelectTableExec{
tableInfo: v.Table,
ctx: b.ctx,
txn: txn,
startTS: startTS,
supportDesc: supportDesc,
asName: v.TableAsName,
table: table,
Expand Down Expand Up @@ -654,13 +661,12 @@ func (b *executorBuilder) buildTableScan(v *plan.PhysicalTableScan, s *plan.Sele
}

func (b *executorBuilder) buildIndexScan(v *plan.PhysicalIndexScan, s *plan.Selection) Executor {
txn, err := b.ctx.GetTxn(false)
if err != nil {
b.err = errors.Trace(err)
startTS := b.getStartTS()
if b.err != nil {
return nil
}
table, _ := b.is.TableByID(v.Table.ID)
client := txn.GetClient()
client := b.ctx.GetClient()
var memDB bool
switch v.DBName.L {
case "information_schema", "performance_schema":
Expand All @@ -675,7 +681,7 @@ func (b *executorBuilder) buildIndexScan(v *plan.PhysicalIndexScan, s *plan.Sele
asName: v.TableAsName,
table: table,
indexPlan: v,
txn: txn,
startTS: startTS,
where: v.ConditionPBExpr,
}
return st
Expand Down
18 changes: 9 additions & 9 deletions executor/executor_distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ type XSelectIndexExec struct {
result distsql.SelectResult
partialResult distsql.PartialResult
where *tipb.Expr
txn kv.Transaction
startTS uint64

tasks chan *lookupTableTask
tasksErr error // not nil if tasks closed due to error.
Expand Down Expand Up @@ -664,7 +664,7 @@ func (e *XSelectIndexExec) AddAggregate(funcs []*tipb.Expr, byItems []*tipb.ByIt
e.byItems = byItems
e.aggFields = fields
e.aggregate = true
client := e.txn.GetClient()
client := e.ctx.GetClient()
if !client.SupportRequestType(kv.ReqTypeIndex, kv.ReqSubTypeGroupBy) {
e.indexPlan.DoubleRead = true
}
Expand Down Expand Up @@ -867,7 +867,7 @@ func (e *XSelectIndexExec) fetchHandles(idxResult distsql.SelectResult, ch chan<

func (e *XSelectIndexExec) doIndexRequest() (distsql.SelectResult, error) {
selIdxReq := new(tipb.SelectRequest)
selIdxReq.StartTs = e.txn.StartTS()
selIdxReq.StartTs = e.startTS
selIdxReq.IndexInfo = distsql.IndexToProto(e.table.Meta(), e.indexPlan.Index)
if e.indexPlan.Desc {
selIdxReq.OrderBy = append(selIdxReq.OrderBy, &tipb.ByItem{Desc: e.indexPlan.Desc})
Expand All @@ -893,7 +893,7 @@ func (e *XSelectIndexExec) doIndexRequest() (distsql.SelectResult, error) {
if err != nil {
return nil, errors.Trace(err)
}
return distsql.Select(e.txn.GetClient(), selIdxReq, keyRanges, concurrency, !e.indexPlan.OutOfOrder)
return distsql.Select(e.ctx.GetClient(), selIdxReq, keyRanges, concurrency, !e.indexPlan.OutOfOrder)
}

func (e *XSelectIndexExec) buildTableTasks(handles []int64) []*lookupTableTask {
Expand Down Expand Up @@ -1008,7 +1008,7 @@ func (e *XSelectIndexExec) doTableRequest(handles []int64) (distsql.SelectResult
if e.indexPlan.OutOfOrder {
selTableReq.Limit = e.indexPlan.LimitCount
}
selTableReq.StartTs = e.txn.StartTS()
selTableReq.StartTs = e.startTS
selTableReq.TableInfo = &tipb.TableInfo{
TableId: e.table.Meta().ID,
}
Expand All @@ -1018,7 +1018,7 @@ func (e *XSelectIndexExec) doTableRequest(handles []int64) (distsql.SelectResult
selTableReq.Aggregates = e.aggFuncs
selTableReq.GroupBy = e.byItems
keyRanges := tableHandlesToKVRanges(e.table.Meta().ID, handles)
resp, err := distsql.Select(e.txn.GetClient(), selTableReq, keyRanges, defaultConcurrency, false)
resp, err := distsql.Select(e.ctx.GetClient(), selTableReq, keyRanges, defaultConcurrency, false)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -1048,7 +1048,7 @@ type XSelectTableExec struct {
limitCount *int64
returnedRows uint64 // returned rowCount
keepOrder bool
txn kv.Transaction
startTS uint64

/*
The following attributes are used for aggregation push down.
Expand Down Expand Up @@ -1082,7 +1082,7 @@ func (e *XSelectTableExec) Schema() expression.Schema {
func (e *XSelectTableExec) doRequest() error {
var err error
selReq := new(tipb.SelectRequest)
selReq.StartTs = e.txn.StartTS()
selReq.StartTs = e.startTS
selReq.Where = e.where
columns := e.Columns
selReq.TableInfo = &tipb.TableInfo{
Expand All @@ -1098,7 +1098,7 @@ func (e *XSelectTableExec) doRequest() error {
selReq.GroupBy = e.byItems

kvRanges := tableRangesToKVRanges(e.table.Meta().ID, e.ranges)
e.result, err = distsql.Select(e.txn.GetClient(), selReq, kvRanges, defaultConcurrency, e.keepOrder)
e.result, err = distsql.Select(e.ctx.GetClient(), selReq, kvRanges, defaultConcurrency, e.keepOrder)
if err != nil {
return errors.Trace(err)
}
Expand Down
33 changes: 33 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2281,3 +2281,36 @@ func (s *testSuite) TestSelectVar(c *C) {
result.Check(testkit.Rows("2 2", "2 3", "3 2"))

}

func (s *testSuite) TestHistoryRead(c *C) {
defer testleak.AfterTest(c)()
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists history_read")
tk.MustExec("create table history_read (a int)")
tk.MustExec("insert history_read values (1)")
curVer1, _ := s.store.CurrentVersion()
time.Sleep(time.Millisecond)
snapshotTime := time.Now()
time.Sleep(time.Millisecond)
curVer2, _ := s.store.CurrentVersion()
tk.MustExec("insert history_read values (2)")
tk.MustQuery("select * from history_read").Check(testkit.Rows("1", "2"))
tk.MustExec("set @@tidb_snapshot = '" + snapshotTime.Format("2006-01-02 15:04:05.999999") + "'")
ctx := tk.Se.(context.Context)
snapshotTS := variable.GetSnapshotTS(ctx)
c.Assert(snapshotTS, Greater, curVer1.Ver)
c.Assert(snapshotTS, Less, curVer2.Ver)
tk.MustQuery("select * from history_read").Check(testkit.Rows("1"))
_, err := tk.Exec("insert history_read values (2)")
c.Assert(err, NotNil)
_, err = tk.Exec("update history_read set a = 3 where a = 1")
c.Assert(err, NotNil)
_, err = tk.Exec("delete from history_read where a = 1")
c.Assert(err, NotNil)
tk.MustExec("set @@tidb_snapshot = ''")
tk.MustQuery("select * from history_read").Check(testkit.Rows("1", "2"))
tk.MustExec("insert history_read values (3)")
tk.MustExec("update history_read set a = 4 where a = 3")
tk.MustExec("delete from history_read where a = 1")
}
1 change: 1 addition & 0 deletions executor/executor_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var (
_ Executor = &UpdateExec{}
_ Executor = &DeleteExec{}
_ Executor = &InsertExec{}
_ Executor = &ReplaceExec{}
_ Executor = &LoadData{}
)

Expand Down
4 changes: 2 additions & 2 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ type Transaction interface {
DelOption(opt Option)
// IsReadOnly checks if the transaction has only performed read operations.
IsReadOnly() bool
// GetClient gets a client instance.
GetClient() Client
// StartTS returns the transaction start timestamp.
StartTS() uint64
}
Expand Down Expand Up @@ -169,6 +167,8 @@ type Storage interface {
// GetSnapshot gets a snapshot that is able to read any data which data is <= ver.
// if ver is MaxVersion or > current max committed version, we will use current version for this snapshot.
GetSnapshot(ver Version) (Snapshot, error)
// GetClient gets a client instance.
GetClient() Client
// Close store
Close() error
// Storage's unique ID
Expand Down
8 changes: 4 additions & 4 deletions kv/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ func (t *mockTxn) IsReadOnly() bool {
return true
}

func (t *mockTxn) GetClient() Client {
return nil
}

func (t *mockTxn) StartTS() uint64 {
return uint64(0)
}
Expand Down Expand Up @@ -106,6 +102,10 @@ func (s *mockStorage) CurrentVersion() (Version, error) {
return Version{uint64(1)}, nil
}

func (s *mockStorage) GetClient() Client {
return nil
}

// MockTxn is used for test cases that need more interfaces than Transaction.
type MockTxn interface {
Transaction
Expand Down
8 changes: 4 additions & 4 deletions plan/physical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,14 @@ func (p *DataSource) handleTableScan(prop requiredProperty) (*physicalPlanInfo,
}
ts.AccessCondition, newSel.Conditions = detachTableScanConditions(conds, table)
if txn != nil {
client := txn.GetClient()
client := p.ctx.GetClient()
var memDB bool
switch p.DBName.L {
case "information_schema", "performance_schema":
memDB = true
}
if !memDB && client.SupportRequestType(kv.ReqTypeSelect, 0) {
ts.ConditionPBExpr, newSel.Conditions, err = expressionsToPB(newSel.Conditions, txn.GetClient())
ts.ConditionPBExpr, newSel.Conditions, err = expressionsToPB(newSel.Conditions, client)
}
if err != nil {
return nil, nil, errors.Trace(err)
Expand Down Expand Up @@ -200,14 +200,14 @@ func (p *DataSource) handleIndexScan(prop requiredProperty, index *model.IndexIn
}
is.AccessCondition, newSel.Conditions = detachIndexScanConditions(conds, is)
if txn != nil {
client := txn.GetClient()
client := p.ctx.GetClient()
var memDB bool
switch p.DBName.L {
case "information_schema", "performance_schema":
memDB = true
}
if !memDB && client.SupportRequestType(kv.ReqTypeSelect, 0) {
is.ConditionPBExpr, newSel.Conditions, err = expressionsToPB(newSel.Conditions, txn.GetClient())
is.ConditionPBExpr, newSel.Conditions, err = expressionsToPB(newSel.Conditions, client)
}
if err != nil {
return nil, nil, errors.Trace(err)
Expand Down
4 changes: 4 additions & 0 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ func (s *session) RollbackTxn() error {
return s.finishTxn(true)
}

func (s *session) GetClient() kv.Client {
return s.store.GetClient()
}

func (s *session) String() string {
// TODO: how to print binded context in values appropriately?
data := map[string]interface{}{
Expand Down
Loading

0 comments on commit ade11fe

Please sign in to comment.