Skip to content

Commit

Permalink
ddl: Fix rename table bug (pingcap#4862)
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala authored and hanfei1991 committed Oct 26, 2017
1 parent 1cec7c7 commit 4550e91
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 22 deletions.
6 changes: 3 additions & 3 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (d *ddl) onDropTable(t *meta.Meta, job *model.Job) (ver int64, _ error) {
if err != nil {
return ver, errors.Trace(err)
}
if err = t.DropTable(job.SchemaID, job.TableID, true); err != nil {
if err = t.DropTable(job.SchemaID, tblInfo, true); err != nil {
break
}
// Finish this job.
Expand Down Expand Up @@ -206,7 +206,7 @@ func (d *ddl) onTruncateTable(t *meta.Meta, job *model.Job) (ver int64, _ error)
return ver, errors.Trace(err)
}

err = t.DropTable(schemaID, tableID, true)
err = t.DropTable(schemaID, tblInfo, true)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand Down Expand Up @@ -253,7 +253,7 @@ func (d *ddl) onRenameTable(t *meta.Meta, job *model.Job) (ver int64, _ error) {
}
}

err = t.DropTable(oldSchemaID, tblInfo.ID, false)
err = t.DropTable(oldSchemaID, tblInfo, false)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand Down
27 changes: 25 additions & 2 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,18 +219,41 @@ func (s *testSuite) TestRenameTable(c *C) {
tk.MustExec("create database rename1")
tk.MustExec("create database rename2")
tk.MustExec("create database rename3")

tk.MustExec("create table rename1.t (a int primary key auto_increment)")
tk.MustExec("insert rename1.t values ()")
tk.MustExec("rename table rename1.t to rename2.t")
tk.MustExec("insert rename2.t values ()")
tk.MustExec("rename table rename2.t to rename3.t")
tk.MustExec("insert rename3.t values ()")
tk.MustQuery("select * from rename3.t").Check(testkit.Rows("1", "2", "3"))

tk.MustExec("drop database rename1")
tk.MustExec("drop database rename2")
tk.MustExec("drop database rename3")

tk.MustExec("create database rename1")
tk.MustExec("create database rename2")
tk.MustExec("create table rename1.t (a int primary key auto_increment)")
tk.MustExec("rename table rename1.t to rename2.t1")
tk.MustExec("insert rename2.t1 values ()")
result := tk.MustQuery("select * from rename2.t1")
result.Check(testkit.Rows("1"))
tk.MustExec("drop database rename1")
tk.MustExec("drop database rename2")

tk.MustExec("create database rename1")
tk.MustExec("create database rename2")
tk.MustExec("create table rename1.t (a int primary key auto_increment)")
tk.MustExec("insert rename1.t values ()")
tk.MustExec("rename table rename1.t to rename2.t1")
// Make sure the value is greater than autoid.step.
tk.MustExec("insert rename2.t1 values (100000)")
tk.MustExec("insert rename2.t1 values ()")
result = tk.MustQuery("select * from rename2.t1")
result.Check(testkit.Rows("1", "100000", "100001"))
_, err := tk.Exec("insert rename1.t values ()")
c.Assert(err, NotNil)
tk.MustExec("drop database rename1")
tk.MustExec("drop database rename2")
}

func (s *testSuite) TestUnsupportedCharset(c *C) {
Expand Down
35 changes: 21 additions & 14 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (m *Meta) parseDatabaseID(key string) (int64, error) {
return n, errors.Trace(err)
}

func (m *Meta) autoTalbeIDKey(tableID int64) []byte {
func (m *Meta) autoTableIDKey(tableID int64) []byte {
return []byte(fmt.Sprintf("%s:%d", mTableIDPrefix, tableID))
}

Expand All @@ -140,26 +140,24 @@ func (m *Meta) parseTableID(key string) (int64, error) {
return n, errors.Trace(err)
}

// GenAutoTableID adds step to the auto id of the table and returns the sum.
// GenAutoTableID adds step to the auto ID of the table and returns the sum.
func (m *Meta) GenAutoTableID(dbID int64, tableID int64, step int64) (int64, error) {
// Check if db exists.
dbKey := m.dbKey(dbID)
if err := m.checkDBExists(dbKey); err != nil {
id, err := m.txn.HGet(dbKey, m.autoTableIDKey(tableID))
if err != nil {
return 0, errors.Trace(err)
}

// Check if table exists.
tableKey := m.tableKey(tableID)
if err := m.checkTableExists(dbKey, tableKey); err != nil {
return 0, errors.Trace(err)
isNotExisting := len(id) == 0
if isNotExisting {
return 0, kv.ErrNotExist
}

return m.txn.HInc(dbKey, m.autoTalbeIDKey(tableID), step)
return m.txn.HInc(dbKey, m.autoTableIDKey(tableID), step)
}

// GetAutoTableID gets current auto id with table id.
func (m *Meta) GetAutoTableID(dbID int64, tableID int64) (int64, error) {
return m.txn.HGetInt64(m.dbKey(dbID), m.autoTalbeIDKey(tableID))
return m.txn.HGetInt64(m.dbKey(dbID), m.autoTableIDKey(tableID))
}

// GetSchemaVersion gets current global schema version.
Expand Down Expand Up @@ -273,6 +271,12 @@ func (m *Meta) CreateTable(dbID int64, tableInfo *model.TableInfo) error {
return errors.Trace(err)
}

// Initial the auto ID key.
base := []byte(strconv.FormatInt(0, 10))
err = m.txn.HSet(dbKey, m.autoTableIDKey(tableInfo.ID), base)
if err != nil {
return errors.Trace(err)
}
return m.txn.HSet(dbKey, tableKey, data)
}

Expand All @@ -294,15 +298,15 @@ func (m *Meta) DropDatabase(dbID int64) error {
// DropTable drops table in database.
// If delAutoID is true, it will delete the auto_increment id key-value of the table.
// For rename table, we do not need to rename auto_increment id key-value.
func (m *Meta) DropTable(dbID int64, tableID int64, delAutoID bool) error {
func (m *Meta) DropTable(dbID int64, tblInfo *model.TableInfo, delAutoID bool) error {
// Check if db exists.
dbKey := m.dbKey(dbID)
if err := m.checkDBExists(dbKey); err != nil {
return errors.Trace(err)
}

// Check if table exists.
tableKey := m.tableKey(tableID)
tableKey := m.tableKey(tblInfo.ID)
if err := m.checkTableExists(dbKey, tableKey); err != nil {
return errors.Trace(err)
}
Expand All @@ -312,7 +316,10 @@ func (m *Meta) DropTable(dbID int64, tableID int64, delAutoID bool) error {
}

if delAutoID {
if err := m.txn.HDel(dbKey, m.autoTalbeIDKey(tableID)); err != nil {
if tblInfo.OldSchemaID != 0 {
dbKey = m.dbKey(tblInfo.OldSchemaID)
}
if err := m.txn.HDel(dbKey, m.autoTableIDKey(tblInfo.ID)); err != nil {
return errors.Trace(err)
}
}
Expand Down
20 changes: 18 additions & 2 deletions meta/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (s *testSuite) TestMeta(c *C) {
c.Assert(err, IsNil)
c.Assert(n, Equals, int64(10))

err = t.DropTable(1, 2, true)
err = t.DropTable(1, tbInfo2, true)
c.Assert(err, IsNil)
// Make sure auto id key-value entry is gone.
n, err = t.GetAutoTableID(1, 2)
Expand All @@ -168,16 +168,32 @@ func (s *testSuite) TestMeta(c *C) {
n, err = t.GenAutoTableID(1, tid, 10)
c.Assert(err, IsNil)
c.Assert(n, Equals, int64(10))
// Fail to update auto id.
nonExistentID := int64(1234)
_, err = t.GenAutoTableID(1, nonExistentID, 10)
c.Assert(err, NotNil)
n, err = t.GetAutoTableID(1, tid)
c.Assert(err, IsNil)
c.Assert(n, Equals, int64(10))
// Drop table without touch auto id key-value entry.
err = t.DropTable(1, 100, false)
err = t.DropTable(1, tbInfo100, false)
c.Assert(err, IsNil)
// Make sure that auto id key-value entry is still there.
n, err = t.GetAutoTableID(1, tid)
c.Assert(err, IsNil)
c.Assert(n, Equals, int64(10))
// Drop table with old schema ID.
err = t.CreateTable(1, tbInfo100)
c.Assert(err, IsNil)
n, err = t.GenAutoTableID(1, tid, 10)
c.Assert(err, IsNil)
c.Assert(n, Equals, int64(10))
tbInfo100.OldSchemaID = 1
err = t.DropTable(1, tbInfo100, true)
c.Assert(err, IsNil)
n, err = t.GetAutoTableID(1, tid)
c.Assert(err, IsNil)
c.Assert(n, Equals, int64(0))

err = t.DropDatabase(1)
c.Assert(err, IsNil)
Expand Down
2 changes: 1 addition & 1 deletion util/admin/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (s *testSuite) TearDownSuite(c *C) {
c.Assert(err, IsNil)
t := meta.NewMeta(txn)

err = t.DropTable(s.dbInfo.ID, s.tbInfo.ID, true)
err = t.DropTable(s.dbInfo.ID, s.tbInfo, true)
c.Assert(err, IsNil)
err = t.DropDatabase(s.dbInfo.ID)
c.Assert(err, IsNil)
Expand Down

0 comments on commit 4550e91

Please sign in to comment.