Skip to content

Commit

Permalink
Merge pull request influxdata#985 from influxdb/fix-985
Browse files Browse the repository at this point in the history
client.WriteSeries returns: Server returned (400):  IO error: /opt/influxdb/shared/data/db/shard_db_v2/00190/MANIFEST-000006: No such file or directory
  • Loading branch information
jvshahid committed Oct 24, 2014
2 parents 3f551b2 + c265f1f commit 97cd03c
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 14 deletions.
3 changes: 2 additions & 1 deletion cluster/cluster_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -1137,7 +1137,8 @@ func (self *ClusterConfiguration) DropShard(shardId uint32, serverIds []uint32)
// now actually remove it from disk if it lives here
for _, serverId := range serverIds {
if serverId == self.LocalServer.Id {
return self.shardStore.DeleteShard(shardId)
self.shardStore.DeleteShard(shardId)
return nil
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion cluster/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ type LocalShardStore interface {
BufferWrite(request *p.Request)
GetOrCreateShard(id uint32) (LocalShardDb, error)
ReturnShard(id uint32)
DeleteShard(shardId uint32) error
DeleteShard(shardId uint32)
}

func (self *ShardData) Id() uint32 {
Expand Down
47 changes: 35 additions & 12 deletions datastore/shard_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type ShardDatastore struct {
lastAccess map[uint32]time.Time
shardRefCounts map[uint32]int
shardsToClose map[uint32]bool
shardsToDelete map[uint32]struct{}
shardsLock sync.RWMutex
writeBuffer *cluster.WriteBuffer
maxOpenShards int
Expand Down Expand Up @@ -74,6 +75,7 @@ func NewShardDatastore(config *configuration.Configuration, metaStore *metastore
lastAccess: make(map[uint32]time.Time),
shardRefCounts: make(map[uint32]int),
shardsToClose: make(map[uint32]bool),
shardsToDelete: make(map[uint32]struct{}),
pointBatchSize: config.StoragePointBatchSize,
writeBatchSize: config.StorageWriteBatchSize,
metaStore: metaStore,
Expand Down Expand Up @@ -211,7 +213,16 @@ func (self *ShardDatastore) ReturnShard(id uint32) {
self.shardsLock.Lock()
defer self.shardsLock.Unlock()
self.shardRefCounts[id] -= 1
if self.shardsToClose[id] && self.shardRefCounts[id] == 0 {
if self.shardRefCounts[id] != 0 {
return
}

if _, ok := self.shardsToDelete[id]; ok {
self.deleteShard(id)
return
}

if self.shardsToClose[id] {
self.closeShard(id)
}
}
Expand All @@ -233,20 +244,20 @@ func (self *ShardDatastore) SetWriteBuffer(writeBuffer *cluster.WriteBuffer) {
self.writeBuffer = writeBuffer
}

func (self *ShardDatastore) DeleteShard(shardId uint32) error {
func (self *ShardDatastore) DeleteShard(shardId uint32) {
self.shardsLock.Lock()
shardDb := self.shards[shardId]
delete(self.shards, shardId)
delete(self.lastAccess, shardId)
self.shardsLock.Unlock()

if shardDb != nil {
shardDb.close()
defer self.shardsLock.Unlock()
// If someone has a reference to the shard we can't delete it
// now. We have to wait until it's returned and delete
// it. ReturnShard will take care of that as soon as the reference
// count becomes 0.
if self.shardRefCounts[shardId] > 0 {
self.shardsToDelete[shardId] = struct{}{}
return
}

dir := self.shardDir(shardId)
log.Info("DATASTORE: dropping shard %s", dir)
return os.RemoveAll(dir)
// otherwise, close the shard and delete it now
self.deleteShard(shardId)
}

func (self *ShardDatastore) shardDir(id uint32) string {
Expand All @@ -269,6 +280,17 @@ func (self *ShardDatastore) closeOldestShard() {
}
}

func (self *ShardDatastore) deleteShard(id uint32) {
self.closeShard(id)
dir := self.shardDir(id)
log.Info("DATASTORE: dropping shard %s", dir)
if err := os.RemoveAll(dir); err != nil {
// TODO: we should do some cleanup to make sure any shards left
// behind are deleted properly
log.Error("Cannot delete %s: %s", dir, err)
}
}

func (self *ShardDatastore) closeShard(id uint32) {
shard := self.shards[id]
if shard != nil {
Expand All @@ -278,6 +300,7 @@ func (self *ShardDatastore) closeShard(id uint32) {
delete(self.shards, id)
delete(self.lastAccess, id)
delete(self.shardsToClose, id)
delete(self.shardsToDelete, id)
log.Debug("DATASTORE: closing shard %s", self.shardDir(id))
}

Expand Down
27 changes: 27 additions & 0 deletions integration/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,33 @@ func (self *DataTestSuite) TestInfiniteValues(c *C) {
c.Assert(maps[0]["derivative"], IsNil)
}

// This test tries to write a large batch of points to a shard that is
// supposed to be dropped. This will demonstrate issue #985: while the
// data is being written, InfluxDB will close the underlying storage
// engine which will cause random errors to be thrown and could
// possibly corrupt the db.
func (self *DataTestSuite) TestWritingToExpiredShards(c *C) {
client := self.server.GetClient(self.dbname, c)
err := client.CreateShardSpace(self.dbname, &influxdb.ShardSpace{
Name: "default",
Regex: ".*",
RetentionPolicy: "7d",
ShardDuration: "1y",
})
c.Assert(err, IsNil)

data := CreatePoints("test_using_deleted_shard", 1, 1000000)
data[0].Columns = append(data[0].Columns, "time")
for i, _ := range data[0].Points {
data[0].Points[i] = append(data[0].Points[i], 0)
}
// This test will fail randomly without the fix submitted in the
// same commit. 10 times is sufficient to trigger the bug.
for i := 0; i < 10; i++ {
self.client.WriteData(data, c, influxdb.Second)
}
}

// test large integer values
func (self *DataTestSuite) TestLargeIntegerValues(c *C) {
// make sure we exceed the pointBatchSize, so we force a yield to
Expand Down

0 comments on commit 97cd03c

Please sign in to comment.