Skip to content

Commit

Permalink
*: support set relation job (pingcap#6206)
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala authored and zhexuany committed Apr 13, 2018
1 parent b51b0de commit 998f696
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 1 deletion.
23 changes: 23 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,29 @@ func (d *ddl) isOwner() bool {
return isOwner
}

// buildJobDependence sets the curjob's dependency-ID.
// The dependency-job's ID must less than the current job's ID, and we need the largest one in the list.
func buildJobDependence(t *meta.Meta, curJob *model.Job) error {
jobs, err := t.GetAllDDLJobs()
if err != nil {
return errors.Trace(err)
}
for _, job := range jobs {
if curJob.ID < job.ID {
continue
}
isDependent, err := curJob.IsDependentOn(job)
if err != nil {
return errors.Trace(err)
}
if isDependent {
curJob.DependencyID = job.ID
break
}
}
return nil
}

// addDDLJob gets a global job ID and puts the DDL job in the DDL queue.
func (d *ddl) addDDLJob(ctx sessionctx.Context, job *model.Job) error {
startTime := time.Now()
Expand Down
50 changes: 50 additions & 0 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,3 +590,53 @@ func (s *testDDLSuite) TestIgnorableSpec(c *C) {
c.Assert(isIgnorableSpec(spec), IsTrue)
}
}

func (s *testDDLSuite) TestBuildJobDependence(c *C) {
defer testleak.AfterTest(c)()
store := testCreateStore(c, "test_set_job_relation")
defer store.Close()

job1 := &model.Job{ID: 1, TableID: 1}
job2 := &model.Job{ID: 2, TableID: 1}
job3 := &model.Job{ID: 3, TableID: 2}
job6 := &model.Job{ID: 6, TableID: 1}
job7 := &model.Job{ID: 7, TableID: 2}
kv.RunInNewTxn(store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
err := t.EnQueueDDLJob(job1)
c.Assert(err, IsNil)
err = t.EnQueueDDLJob(job2)
c.Assert(err, IsNil)
err = t.EnQueueDDLJob(job3)
c.Assert(err, IsNil)
err = t.EnQueueDDLJob(job6)
c.Assert(err, IsNil)
err = t.EnQueueDDLJob(job7)
c.Assert(err, IsNil)
return nil
})
job4 := &model.Job{ID: 4, TableID: 1}
kv.RunInNewTxn(store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
err := buildJobDependence(t, job4)
c.Assert(err, IsNil)
c.Assert(job4.DependencyID, Equals, int64(2))
return nil
})
job5 := &model.Job{ID: 5, TableID: 2}
kv.RunInNewTxn(store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
err := buildJobDependence(t, job5)
c.Assert(err, IsNil)
c.Assert(job5.DependencyID, Equals, int64(3))
return nil
})
job8 := &model.Job{ID: 8, TableID: 3}
kv.RunInNewTxn(store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
err := buildJobDependence(t, job8)
c.Assert(err, IsNil)
c.Assert(job8.DependencyID, Equals, int64(0))
return nil
})
}
20 changes: 20 additions & 0 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,26 @@ func (m *Meta) DDLJobQueueLen() (int64, error) {
return m.txn.LLen(m.jobListKey)
}

// GetAllDDLJobs gets all DDL Jobs.
func (m *Meta) GetAllDDLJobs() ([]*model.Job, error) {
values, err := m.txn.LGetAll(mDDLJobListKey)
if err != nil || values == nil {
return nil, errors.Trace(err)
}

jobs := make([]*model.Job, 0, len(values))
for _, val := range values {
job := &model.Job{}
err = job.Decode(val)
if err != nil {
return nil, errors.Trace(err)
}
jobs = append(jobs, job)
}

return jobs, nil
}

func (m *Meta) jobIDKey(id int64) []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(id))
Expand Down
10 changes: 10 additions & 0 deletions meta/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,16 @@ func (s *testSuite) TestDDL(c *C) {
lastID = job.ID
}

// Test GetAllDDLJobs.
err = t.EnQueueDDLJob(job)
job1 := &model.Job{ID: 2}
err = t.EnQueueDDLJob(job1)
c.Assert(err, IsNil)
jobs, err := t.GetAllDDLJobs()
c.Assert(err, IsNil)
expectJobs := []*model.Job{job, job1}
c.Assert(jobs, DeepEquals, expectJobs)

err = txn.Commit(context.Background())
c.Assert(err, IsNil)
}
40 changes: 40 additions & 0 deletions model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ type Job struct {
// StartTS uses timestamp allocated by TSO.
// Now it's the TS when we put the job to TiKV queue.
StartTS uint64 `json:"start_ts"`
// DependencyID is the job's ID that the current job depends on.
DependencyID int64 `json:"dependency_id"`
// Query string of the ddl job.
Query string `json:"query"`
BinlogInfo *HistoryInfo `json:"binlog"`
Expand Down Expand Up @@ -213,6 +215,44 @@ func (job *Job) String() string {
job.ID, job.Type, job.State, job.SchemaState, job.SchemaID, job.TableID, rowCount, len(job.Args), tsConvert2Time(job.StartTS), job.Error, job.ErrorCount, job.SnapshotVer)
}

func (job *Job) hasDependentSchema(other *Job) (bool, error) {
if other.Type == ActionDropSchema || other.Type == ActionCreateSchema {
if other.SchemaID == job.SchemaID {
return true, nil
}
if job.Type == ActionRenameTable {
var oldSchemaID int64
if err := job.DecodeArgs(&oldSchemaID); err != nil {
return false, errors.Trace(err)
}
if other.SchemaID == oldSchemaID {
return true, nil
}
}
}
return false, nil
}

// IsDependentOn returns whether the job depends on "other".
// How to check the job depends on "other"?
// 1. The two jobs handle the same database when one of the two jobs is an ActionDropSchema or ActionCreateSchema type.
// 2. Or the two jobs handle the same table.
func (job *Job) IsDependentOn(other *Job) (bool, error) {
isDependent, err := job.hasDependentSchema(other)
if err != nil || isDependent {
return isDependent, errors.Trace(err)
}
isDependent, err = other.hasDependentSchema(job)
if err != nil || isDependent {
return isDependent, errors.Trace(err)
}

if other.TableID == job.TableID {
return true, nil
}
return false, nil
}

// IsFinished returns whether job is finished or not.
// If the job state is Done or Cancelled, it is finished.
func (job *Job) IsFinished() bool {
Expand Down
33 changes: 33 additions & 0 deletions model/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package model

import (
"encoding/json"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -142,12 +143,44 @@ func (*testModelSuite) TestJobCodec(c *C) {
}
job := &Job{
ID: 1,
TableID: 2,
SchemaID: 1,
BinlogInfo: &HistoryInfo{},
Args: []interface{}{NewCIStr("a"), A{Name: "abc"}},
}
job.BinlogInfo.AddDBInfo(123, &DBInfo{ID: 1, Name: NewCIStr("test_history_db")})
job.BinlogInfo.AddTableInfo(123, &TableInfo{ID: 1, Name: NewCIStr("test_history_tbl")})

// Test IsDependentOn.
// job: table ID is 2
// job1: table ID is 2
var err error
job1 := &Job{
ID: 2,
TableID: 2,
SchemaID: 1,
Type: ActionRenameTable,
BinlogInfo: &HistoryInfo{},
Args: []interface{}{int64(3), NewCIStr("new_table_name")},
}
job1.RawArgs, err = json.Marshal(job1.Args)
c.Assert(err, IsNil)
isDependent, err := job.IsDependentOn(job1)
c.Assert(err, IsNil)
c.Assert(isDependent, IsTrue)
// job1: rename table, old schema ID is 3
// job2: create schema, schema ID is 3
job2 := &Job{
ID: 3,
TableID: 3,
SchemaID: 3,
Type: ActionCreateSchema,
BinlogInfo: &HistoryInfo{},
}
isDependent, err = job2.IsDependentOn(job1)
c.Assert(err, IsNil)
c.Assert(isDependent, IsTrue)

c.Assert(job.IsCancelled(), Equals, false)
b, err := job.Encode(false)
c.Assert(err, IsNil)
Expand Down
20 changes: 20 additions & 0 deletions structure/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,26 @@ func (t *TxStructure) LLen(key []byte) (int64, error) {
return meta.RIndex - meta.LIndex, errors.Trace(err)
}

// LGetAll gets all elements of this list in order from right to left.
func (t *TxStructure) LGetAll(key []byte) ([][]byte, error) {
metaKey := t.encodeListMetaKey(key)
meta, err := t.loadListMeta(metaKey)
if err != nil || meta.IsEmpty() {
return nil, errors.Trace(err)
}

length := int(meta.RIndex - meta.LIndex)
elements := make([][]byte, 0, length)
for index := meta.RIndex - 1; index >= meta.LIndex; index-- {
e, err := t.reader.Get(t.encodeListDataKey(key, index))
if err != nil {
return nil, errors.Trace(err)
}
elements = append(elements, e)
}
return elements, nil
}

// LIndex gets an element from a list by its index.
func (t *TxStructure) LIndex(key []byte, index int64) ([]byte, error) {
metaKey := t.encodeListMetaKey(key)
Expand Down
12 changes: 11 additions & 1 deletion structure/structure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,21 @@ func (s *testTxStructureSuite) TestList(c *C) {
err = tx.LPush(key, []byte("3"), []byte("2"), []byte("1"))
c.Assert(err, IsNil)

// Test LGetAll.
err = tx.LPush(key, []byte("11"))
c.Assert(err, IsNil)
values, err := tx.LGetAll(key)
c.Assert(err, IsNil)
c.Assert(values, DeepEquals, [][]byte{[]byte("3"), []byte("2"), []byte("1"), []byte("11")})
value, err := tx.LPop(key)
c.Assert(err, IsNil)
c.Assert(value, DeepEquals, []byte("11"))

l, err := tx.LLen(key)
c.Assert(err, IsNil)
c.Assert(l, Equals, int64(3))

value, err := tx.LIndex(key, 1)
value, err = tx.LIndex(key, 1)
c.Assert(err, IsNil)
c.Assert(value, DeepEquals, []byte("2"))

Expand Down

0 comments on commit 998f696

Please sign in to comment.