Skip to content

Commit

Permalink
Merge pull request blevesearch#1419 from sreekanth-cb/atomic_eventing
Browse files Browse the repository at this point in the history
Switching to atomic ops instead of locks for events
  • Loading branch information
sreekanth-cb authored Jun 22, 2020
2 parents dcafaff + 1314e37 commit 105e2b4
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 25 deletions.
4 changes: 2 additions & 2 deletions index/scorch/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (s *Scorch) pausePersisterForMergerCatchUp(lastPersistedEpoch uint64,
// for sufficient in-memory segments to pile up for the next
// memory merge cum persist loop.
if numFilesOnDisk < uint64(po.PersisterNapUnderNumFiles) &&
po.PersisterNapTimeMSec > 0 && s.paused() == 0 {
po.PersisterNapTimeMSec > 0 && s.NumEventsBlocking() == 0 {
select {
case <-s.closeCh:
case <-time.After(time.Millisecond * time.Duration(po.PersisterNapTimeMSec)):
Expand Down Expand Up @@ -333,7 +333,7 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot,
// Perform in-memory segment merging only when the memory pressure is
// below the configured threshold, else the persister performs the
// direct persistence of segments.
if s.paused() < po.MemoryPressurePauseThreshold {
if s.NumEventsBlocking() < po.MemoryPressurePauseThreshold {
persisted, err := s.persistSnapshotMaybeMerge(snapshot)
if err != nil {
return err
Expand Down
29 changes: 6 additions & 23 deletions index/scorch/scorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,6 @@ type Scorch struct {
onEvent func(event Event)
onAsyncError func(err error)

pauseLock sync.RWMutex

pauseCount uint64

forceMergeRequestCh chan *mergerCtrl

segPlugin segment.Plugin
Expand Down Expand Up @@ -156,30 +152,17 @@ func configForceSegmentTypeVersion(config map[string]interface{}) (string, uint3
return forcedSegmentType, uint32(forcedSegmentVersion), nil
}

func (s *Scorch) paused() uint64 {
s.pauseLock.Lock()
pc := s.pauseCount
s.pauseLock.Unlock()
return pc
}

func (s *Scorch) incrPause() {
s.pauseLock.Lock()
s.pauseCount++
s.pauseLock.Unlock()
}

func (s *Scorch) decrPause() {
s.pauseLock.Lock()
s.pauseCount--
s.pauseLock.Unlock()
func (s *Scorch) NumEventsBlocking() uint64 {
eventsCompleted := atomic.LoadUint64(&s.stats.TotEventTriggerCompleted)
eventsStarted := atomic.LoadUint64(&s.stats.TotEventTriggerStarted)
return eventsStarted - eventsCompleted
}

func (s *Scorch) fireEvent(kind EventKind, dur time.Duration) {
if s.onEvent != nil {
s.incrPause()
atomic.AddUint64(&s.stats.TotEventTriggerStarted, 1)
s.onEvent(Event{Kind: kind, Scorch: s, Duration: dur})
s.decrPause()
atomic.AddUint64(&s.stats.TotEventTriggerCompleted, 1)
}
}

Expand Down
3 changes: 3 additions & 0 deletions index/scorch/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type Stats struct {
TotTermSearchersStarted uint64
TotTermSearchersFinished uint64

TotEventTriggerStarted uint64
TotEventTriggerCompleted uint64

TotIntroduceLoop uint64
TotIntroduceSegmentBeg uint64
TotIntroduceSegmentEnd uint64
Expand Down

0 comments on commit 105e2b4

Please sign in to comment.