Skip to content

Commit

Permalink
*: rename job to ddlJob, rename task to bgJob
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala committed Jan 20, 2016
1 parent 071eff6 commit 79aaefb
Show file tree
Hide file tree
Showing 15 changed files with 222 additions and 208 deletions.
65 changes: 36 additions & 29 deletions ddl/executor.go → ddl/bg_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,43 +24,44 @@ import (
"github.com/pingcap/tidb/terror"
)

func (d *ddl) handleTaskQueue() error {
// handleBgJobQueue handles background job queue.
func (d *ddl) handleBgJobQueue() error {
if d.isClosed() {
return nil
}

task := &model.Job{}
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
owner, err := d.checkOwner(t, ddlTaskFlag)
owner, err := d.checkOwner(t, bgJobFlag)
if terror.ErrorEqual(err, ErrNotOwner) {
return nil
}
if err != nil {
return errors.Trace(err)
}

// get the first task and run
task, err = d.getFirstTask(t)
// get the first background job and run
task, err = d.getFirstBgJob(t)
if err != nil {
return errors.Trace(err)
}
if task == nil {
return nil
}

d.runTask(t, task)
d.runBgJob(t, task)
if task.IsFinished() {
err = d.finishTask(t, task)
err = d.finishBgJob(t, task)
} else {
err = d.updateTask(t, task)
err = d.updateBgJob(t, task)
}
if err != nil {
return errors.Trace(err)
}

owner.LastUpdateTS = time.Now().UnixNano()
err = t.SetDDLTaskOwner(owner)
err = t.SetBgJobOwner(owner)

return errors.Trace(err)
})
Expand All @@ -72,7 +73,8 @@ func (d *ddl) handleTaskQueue() error {
return nil
}

func (d *ddl) runTask(t *meta.Meta, task *model.Job) {
// runBgJob runs background job.
func (d *ddl) runBgJob(t *meta.Meta, task *model.Job) {
task.State = model.JobRunning

var err error
Expand All @@ -83,20 +85,21 @@ func (d *ddl) runTask(t *meta.Meta, task *model.Job) {
err = d.delReorgTable(t, task)
default:
task.State = model.JobCancelled
err = errors.Errorf("invalid task %v", task)
err = errors.Errorf("invalid background job %v", task)
}

if err != nil {
if task.State != model.JobCancelled {
log.Errorf("run task err %v", errors.ErrorStack(err))
log.Errorf("run background job err %v", errors.ErrorStack(err))
}

task.Error = err.Error()
task.ErrorCount++
}
}

func (d *ddl) prepareTask(job *model.Job) error {
// prepareBgJob prepares background job.
func (d *ddl) prepareBgJob(job *model.Job) error {
task := &model.Job{
ID: job.ID,
SchemaID: job.SchemaID,
Expand All @@ -107,46 +110,50 @@ func (d *ddl) prepareTask(job *model.Job) error {

err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
err1 := t.EnQueueDDLTask(task)
err1 := t.EnQueueBgJob(task)

return errors.Trace(err1)
})

return errors.Trace(err)
}

func (d *ddl) startTask(tp model.ActionType) {
// startBgJob starts background job.
func (d *ddl) startBgJob(tp model.ActionType) {
switch tp {
case model.ActionDropSchema, model.ActionDropTable:
asyncNotify(d.dropTaskCh)
asyncNotify(d.bgJobCh)
}
}

func (d *ddl) getFirstTask(t *meta.Meta) (*model.Job, error) {
task, err := t.GetDDLTask(0)
// getFirstBgJob gets the first background job.
func (d *ddl) getFirstBgJob(t *meta.Meta) (*model.Job, error) {
task, err := t.GetBgJob(0)
return task, errors.Trace(err)
}

func (d *ddl) updateTask(t *meta.Meta, task *model.Job) error {
err := t.UpdateDDLTask(0, task)
// updateBgJob updates background job.
func (d *ddl) updateBgJob(t *meta.Meta, task *model.Job) error {
err := t.UpdateBgJob(0, task)
return errors.Trace(err)
}

func (d *ddl) finishTask(t *meta.Meta, task *model.Job) error {
log.Warnf("[ddl] finish DDL task %v", task)
if _, err := t.DeQueueDDLTask(); err != nil {
// finishBgJob finishs background job.
func (d *ddl) finishBgJob(t *meta.Meta, task *model.Job) error {
log.Warnf("[ddl] finish background job %v", task)
if _, err := t.DeQueueBgJob(); err != nil {
return errors.Trace(err)
}

err := t.AddHistoryDDLTask(task)
err := t.AddHistoryBgJob(task)

return errors.Trace(err)
}

func (d *ddl) onExecute() {
func (d *ddl) onBackgroundWorker() {
defer d.wait.Done()

// ensure that have ddl job convert to ddl task
// ensure that have ddl job convert to background job.
checkTime := chooseLeaseTime(8*d.lease, 10*time.Second)

ticker := time.NewTicker(checkTime)
Expand All @@ -155,15 +162,15 @@ func (d *ddl) onExecute() {
for {
select {
case <-ticker.C:
log.Debugf("[ddl] wait %s to check DDL task status again", checkTime)
case <-d.dropTaskCh:
log.Debugf("[ddl] wait %s to check background job status again", checkTime)
case <-d.bgJobCh:
case <-d.quitCh:
return
}

err := d.handleTaskQueue()
err := d.handleBgJobQueue()
if err != nil {
log.Errorf("[ddl] handle task err %v", errors.ErrorStack(err))
log.Errorf("[ddl] handle background job err %v", errors.ErrorStack(err))
}
}
}
20 changes: 10 additions & 10 deletions ddl/executor_test.go → ddl/bg_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,25 @@ func (s *testDDLSuite) TestDropSchemaError(c *C) {
Name: model.CIStr{O: "test"},
}},
}
d.prepareTask(task)
d.startTask(task.Type)
d.prepareBgJob(task)
d.startBgJob(task.Type)

time.Sleep(lease)
testCheckTaskCancelled(c, d, task)
testCheckBgJobCancelled(c, d, task)
}

func testCheckTaskCancelled(c *C, d *ddl, task *model.Job) {
func testCheckBgJobCancelled(c *C, d *ddl, task *model.Job) {
kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
historyTask, err := t.GetHistoryDDLTask(task.ID)
historyBgJob, err := t.GetHistoryBgJob(task.ID)
c.Assert(err, IsNil)
c.Assert(historyTask.State, Equals, model.JobCancelled)
c.Assert(historyBgJob.State, Equals, model.JobCancelled)

return nil
})
}

func (s *testDDLSuite) TestInvalidTaskType(c *C) {
func (s *testDDLSuite) TestInvalidBgJobType(c *C) {
store := testCreateStore(c, "test_invalid_task_type")
defer store.Close()

Expand All @@ -68,9 +68,9 @@ func (s *testDDLSuite) TestInvalidTaskType(c *C) {
TableID: 1,
Type: model.ActionCreateTable,
}
d.prepareTask(task)
d.startTask(model.ActionDropTable)
d.prepareBgJob(task)
d.startBgJob(model.ActionDropTable)

time.Sleep(lease)
testCheckTaskCancelled(c, d, task)
testCheckBgJobCancelled(c, d, task)
}
4 changes: 2 additions & 2 deletions ddl/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func testCreateColumn(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo, t
Args: []interface{}{col, pos, 0},
}

err = d.startJob(ctx, job)
err = d.startDDLJob(ctx, job)
c.Assert(err, IsNil)
return job
}
Expand All @@ -92,7 +92,7 @@ func testDropColumn(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo, tbl
Args: []interface{}{model.NewCIStr(colName)},
}

err := d.startJob(ctx, job)
err := d.startDDLJob(ctx, job)
if isError {
c.Assert(err, NotNil)
return nil
Expand Down
Loading

0 comments on commit 79aaefb

Please sign in to comment.