Skip to content

Commit

Permalink
MB-32846 - more aggressively removeOldData() in scorch persister
Browse files Browse the repository at this point in the history
The scorch persister loop has an optimization to not block/wait if it
sees that the latest root epoch has changed during the current
persistence round, so that the persister can continue immediately to
the top of the persister loop.  But, this "continue OUTER"
optimization would skip a removeOldData() invocation.

This meant that removeOldData() wouldn't be invoked for potentially a
long time in this kind of indexing-heavy scenario.

While indexing 200K wiki docs (indexing only using bleve-blast),
before this change, # of files could grow up to >100 zap segment
files.  After this change, the # of files stayed nearer the
neighborhood of 10~20 files.

See also: https://issues.couchbase.com/browse/MB-32846

Also, cleaned up some whitespace lines seen while trying to diagnose
this issue.
  • Loading branch information
steveyen committed Jan 29, 2019
1 parent d725210 commit c8e737a
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 2 deletions.
4 changes: 3 additions & 1 deletion index/scorch/introducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,6 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
fileSegments++
}
}

}

// before the newMerge introduction, need to clean the newly
Expand All @@ -393,6 +392,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
}
}
}

// In case where all the docs in the newly merged segment getting
// deleted by the time we reach here, can skip the introduction.
if nextMerge.new != nil &&
Expand Down Expand Up @@ -424,6 +424,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
newSnapshot.AddRef() // 1 ref for the nextMerge.notify response

newSnapshot.updateSize()

s.rootLock.Lock()
// swap in new index snapshot
newSnapshot.epoch = s.nextSnapshotEpoch
Expand Down Expand Up @@ -501,6 +502,7 @@ func (s *Scorch) revertToSnapshot(revertTo *snapshotReversion) error {
}

newSnapshot.updateSize()

// swap in new snapshot
rootPrev := s.root
s.root = newSnapshot
Expand Down
5 changes: 4 additions & 1 deletion index/scorch/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ OUTER:
if ew != nil && ew.epoch > lastMergedEpoch {
lastMergedEpoch = ew.epoch
}

lastMergedEpoch, persistWatchers = s.pausePersisterForMergerCatchUp(lastPersistedEpoch,
lastMergedEpoch, persistWatchers, po)

Expand Down Expand Up @@ -178,6 +179,7 @@ OUTER:
s.fireEvent(EventKindPersisterProgress, time.Since(startTime))

if changed {
s.removeOldData()
atomic.AddUint64(&s.stats.TotPersistLoopProgress, 1)
continue OUTER
}
Expand Down Expand Up @@ -659,13 +661,13 @@ func (s *Scorch) LoadSnapshot(epoch uint64) (rv *IndexSnapshot, err error) {
}

func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) {

rv := &IndexSnapshot{
parent: s,
internal: make(map[string][]byte),
refs: 1,
creator: "loadSnapshot",
}

var running uint64
c := snapshot.Cursor()
for k, _ := c.First(); k != nil; k, _ = c.Next() {
Expand Down Expand Up @@ -701,6 +703,7 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) {
running += segmentSnapshot.segment.Count()
}
}

return rv, nil
}

Expand Down

0 comments on commit c8e737a

Please sign in to comment.