Skip to content

Commit

Permalink
ddl: Save reorg doneHandle regularly (pingcap#5041)
Browse files Browse the repository at this point in the history
ddl: Save reorg doneHandle regularly
  • Loading branch information
zimulala authored Nov 9, 2017
1 parent 68cd5b0 commit 5b9640f
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 55 deletions.
2 changes: 1 addition & 1 deletion ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (d *ddl) addTableColumn(t table.Table, columnInfo *model.ColumnInfo, reorgI
return errors.Trace(err)
}

d.setReorgRowCount(count)
d.reorgCtx.setRowCountAndHandle(count, seekHandle)
batchHandleDataHistogram.WithLabelValues(batchAddCol).Observe(sub)
log.Infof("[ddl] added column for %v rows, take time %v", count, sub)
}
Expand Down
37 changes: 14 additions & 23 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,22 +199,13 @@ type ddl struct {
ddlJobCh chan struct{}
ddlJobDoneCh chan struct{}
ddlEventCh chan<- *util.Event

// reorgDoneCh is for reorganization, if the reorganization job is done,
// we will use this channel to notify outer.
// TODO: Now we use goroutine to simulate reorganization jobs, later we may
// use a persistent job list.
reorgDoneCh chan error
// reorgRowCount is for reorganization, it uses to simulate a job's row count.
reorgRowCount int64
// notifyCancelReorgJob is for reorganization, it used to notify the backfilling goroutine if the DDL job is cancelled.
notifyCancelReorgJob chan struct{}
// reorgCtx is for reorganization.
reorgCtx *reorgCtx

quitCh chan struct{}
wait sync.WaitGroup

workerVars *variable.SessionVars

workerVars *variable.SessionVars
delRangeManager delRangeManager
}

Expand Down Expand Up @@ -273,17 +264,17 @@ func newDDL(ctx goctx.Context, etcdCli *clientv3.Client, store kv.Storage,
syncer = NewSchemaSyncer(etcdCli, id)
}
d := &ddl{
infoHandle: infoHandle,
hook: hook,
store: store,
uuid: id,
lease: lease,
ddlJobCh: make(chan struct{}, 1),
ddlJobDoneCh: make(chan struct{}, 1),
notifyCancelReorgJob: make(chan struct{}, 1),
ownerManager: manager,
schemaSyncer: syncer,
workerVars: variable.NewSessionVars(),
infoHandle: infoHandle,
hook: hook,
store: store,
uuid: id,
lease: lease,
ddlJobCh: make(chan struct{}, 1),
ddlJobDoneCh: make(chan struct{}, 1),
reorgCtx: &reorgCtx{notifyCancelReorgJob: make(chan struct{}, 1)},
ownerManager: manager,
schemaSyncer: syncer,
workerVars: variable.NewSessionVars(),
}
d.workerVars.BinlogClient = binloginfo.GetPumpClient()

Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (d *ddl) runDDLJob(t *meta.Meta, job *model.Job) (ver int64) {
// If the value of SnapshotVer isn't zero, it means the work is backfilling the indexes.
if job.Type == model.ActionAddIndex && job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0 {
log.Infof("[ddl] run the cancelling DDL job %s", job)
asyncNotify(d.notifyCancelReorgJob)
asyncNotify(d.reorgCtx.notifyCancelReorgJob)
} else {
job.State = model.JobStateCancelled
job.Error = errCancelledDDLJob
Expand Down
8 changes: 4 additions & 4 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) (ver int64, err error)
return ver, errors.Trace(err)
}

err = d.runReorgJob(job, func() error {
err = d.runReorgJob(t, job, func() error {
return d.addTableIndex(tbl, indexInfo, reorgInfo, job)
})
if err != nil {
Expand All @@ -289,11 +289,11 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) (ver int64, err error)
ver, err = d.convert2RollbackJob(t, job, tblInfo, indexInfo, err)
}
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
cleanNotify(d.notifyCancelReorgJob)
cleanNotify(d.reorgCtx.notifyCancelReorgJob)
return ver, errors.Trace(err)
}
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
cleanNotify(d.notifyCancelReorgJob)
cleanNotify(d.reorgCtx.notifyCancelReorgJob)

indexInfo.State = model.StatePublic
// Set column index flag.
Expand Down Expand Up @@ -588,7 +588,7 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo
addedCount, baseHandle, nextHandle, taskAddedCount, err, sub, err1)
return errors.Trace(err)
}
d.setReorgRowCount(addedCount)
d.reorgCtx.setRowCountAndHandle(addedCount, nextHandle)
batchHandleDataHistogram.WithLabelValues(batchAddIdx).Observe(sub)
log.Infof("[ddl] total added index for %d rows, this task [%d,%d) added index for %d rows, take time %v",
addedCount, baseHandle, nextHandle, taskAddedCount, sub)
Expand Down
58 changes: 41 additions & 17 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@ import (
"github.com/pingcap/tidb/util/mock"
)

// reorgCtx is for reorganization.
type reorgCtx struct {
// doneCh is used to notify.
// If the reorganization job is done, we will use this channel to notify outer.
// TODO: Now we use goroutine to simulate reorganization jobs, later we may
// use a persistent job list.
doneCh chan error
// rowCount is used to simulate a job's row count.
rowCount int64
// notifyCancelReorgJob is used to notify the backfilling goroutine if the DDL job is cancelled.
notifyCancelReorgJob chan struct{}
// doneHandle is used to simulate the handle that has been processed.
doneHandle int64
}

// newContext gets a context. It is only used for adding column in reorganization state.
func (d *ddl) newContext() context.Context {
c := mock.NewContext()
Expand All @@ -37,22 +52,30 @@ func (d *ddl) newContext() context.Context {

const waitReorgTimeout = 10 * time.Second

func (d *ddl) setReorgRowCount(count int64) {
atomic.StoreInt64(&d.reorgRowCount, count)
func (rc *reorgCtx) setRowCountAndHandle(count, doneHandle int64) {
atomic.StoreInt64(&rc.rowCount, count)
atomic.StoreInt64(&rc.doneHandle, doneHandle)
}

func (rc *reorgCtx) getRowCountAndHandle() (int64, int64) {
row := atomic.LoadInt64(&rc.rowCount)
handle := atomic.LoadInt64(&rc.doneHandle)
return row, handle
}

func (d *ddl) getReorgRowCount() int64 {
return atomic.LoadInt64(&d.reorgRowCount)
func (rc *reorgCtx) clean() {
rc.setRowCountAndHandle(0, 0)
rc.doneCh = nil
}

func (d *ddl) runReorgJob(job *model.Job, f func() error) error {
if d.reorgDoneCh == nil {
func (d *ddl) runReorgJob(t *meta.Meta, job *model.Job, f func() error) error {
if d.reorgCtx.doneCh == nil {
// start a reorganization job
d.wait.Add(1)
d.reorgDoneCh = make(chan error, 1)
d.reorgCtx.doneCh = make(chan error, 1)
go func() {
defer d.wait.Done()
d.reorgDoneCh <- f()
d.reorgCtx.doneCh <- f()
}()
}

Expand All @@ -68,24 +91,25 @@ func (d *ddl) runReorgJob(job *model.Job, f func() error) error {

// wait reorganization job done or timeout
select {
case err := <-d.reorgDoneCh:
rowCount := d.getReorgRowCount()
log.Info("[ddl] run reorg job done, handled %d rows", rowCount)
d.reorgDoneCh = nil
case err := <-d.reorgCtx.doneCh:
rowCount, _ := d.reorgCtx.getRowCountAndHandle()
log.Infof("[ddl] run reorg job done, handled %d rows", rowCount)
// Update a job's RowCount.
job.SetRowCount(rowCount)
d.setReorgRowCount(0)
d.reorgCtx.clean()
return errors.Trace(err)
case <-d.quitCh:
log.Info("[ddl] run reorg job ddl quit")
d.setReorgRowCount(0)
d.reorgCtx.setRowCountAndHandle(0, 0)
// We return errWaitReorgTimeout here too, so that outer loop will break.
return errWaitReorgTimeout
case <-time.After(waitTimeout):
rowCount := d.getReorgRowCount()
log.Infof("[ddl] run reorg job wait timeout %v, handled %d rows", waitTimeout, rowCount)
rowCount, doneHandle := d.reorgCtx.getRowCountAndHandle()
// Update a job's RowCount.
job.SetRowCount(rowCount)
// Update a reorgInfo's handle.
err := t.UpdateDDLReorgHandle(job, doneHandle)
log.Infof("[ddl] run reorg job wait timeout %v, handled %d rows, current done handle %d, err %v", waitTimeout, rowCount, doneHandle, err)
// If timeout, we will return, check the owner and retry to wait job done again.
return errWaitReorgTimeout
}
Expand All @@ -98,7 +122,7 @@ func (d *ddl) isReorgRunnable() error {
}

select {
case <-d.notifyCancelReorgJob:
case <-d.reorgCtx.notifyCancelReorgJob:
// Job is cancelled. So it can't be done.
return errCancelledDDLJob
default:
Expand Down
37 changes: 28 additions & 9 deletions ddl/reorg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,37 +62,57 @@ func (s *testDDLSuite) TestReorg(c *C) {
c.Assert(err, IsNil)

rowCount := int64(10)
handle := int64(100)
f := func() error {
d.setReorgRowCount(rowCount)
d.reorgCtx.setRowCountAndHandle(rowCount, handle)
time.Sleep(20 * testLease)
return nil
}
job := &model.Job{}
err = d.runReorgJob(job, f)
job := &model.Job{
ID: 1,
SnapshotVer: 1, // Make sure it is not zero. So the reorgInfo's frist is false.
}
err = ctx.NewTxn()
c.Assert(err, IsNil)
m := meta.NewMeta(ctx.Txn())
err = d.runReorgJob(m, job, f)
c.Assert(err, NotNil)

// The longest to wait for 5 seconds to make sure the function of f is returned.
for i := 0; i < 1000; i++ {
time.Sleep(5 * time.Millisecond)
err = d.runReorgJob(job, f)
err = d.runReorgJob(m, job, f)
if err == nil {
c.Assert(job.RowCount, Equals, rowCount)
c.Assert(d.reorgRowCount, Equals, int64(0))
c.Assert(d.reorgCtx.rowCount, Equals, int64(0))

// Test whether reorgInfo's Handle is update.
err = ctx.Txn().Commit(goctx.Background())
c.Assert(err, IsNil)
err = ctx.NewTxn()
c.Assert(err, IsNil)
m = meta.NewMeta(ctx.Txn())
info, err1 := d.getReorgInfo(m, job)
c.Assert(err1, IsNil)
c.Assert(info.Handle, Equals, handle)
c.Assert(d.reorgCtx.doneHandle, Equals, int64(0))
break
}
}
c.Assert(err, IsNil)

d.Stop()
err = d.runReorgJob(job, func() error {
err = d.runReorgJob(m, job, func() error {
time.Sleep(4 * testLease)
return nil
})
c.Assert(err, NotNil)
d.start(goctx.Background())
err = ctx.Txn().Commit(goctx.Background())
c.Assert(err, IsNil)

d.start(goctx.Background())
job = &model.Job{
ID: 1,
ID: 2,
SchemaID: 1,
Type: model.ActionCreateSchema,
Args: []interface{}{model.NewCIStr("test")},
Expand All @@ -106,7 +126,6 @@ func (s *testDDLSuite) TestReorg(c *C) {
c.Assert(err1, IsNil)
err1 = info.UpdateHandle(txn, 1)
c.Assert(err1, IsNil)

return nil
})
c.Assert(err, IsNil)
Expand Down

0 comments on commit 5b9640f

Please sign in to comment.