Skip to content

Commit

Permalink
Allow queue additions to be seen immediately
Browse files Browse the repository at this point in the history
Remove locks and replace with channel.
  • Loading branch information
felix committed Feb 10, 2020
1 parent 64bec98 commit 45ed182
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 52 deletions.
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y=
github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8=
github.com/gocolly/colly v1.2.0 h1:qRz9YAn8FIH0qzgNUw+HT9UN7wm1oF9OBAilwEWpyrI=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/jawher/mow.cli v1.1.0 h1:NdtHXRc0CwZQ507wMvQ/IS+Q3W3x2fycn973/b8Zuk8=
Expand Down
87 changes: 36 additions & 51 deletions queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ type Queue struct {
Threads int
storage Storage
activeThreadCount int32
threadChans []chan bool
lock *sync.Mutex
requestsOut chan *colly.Request
}

// InMemoryQueueStorage is the default implementation of the Storage interface.
Expand Down Expand Up @@ -63,8 +62,7 @@ func New(threads int, s Storage) (*Queue, error) {
return &Queue{
Threads: threads,
storage: s,
lock: &sync.Mutex{},
threadChans: make([]chan bool, 0, threads),
requestsOut: make(chan *colly.Request),
}, nil
}

Expand Down Expand Up @@ -97,16 +95,7 @@ func (q *Queue) AddRequest(r *colly.Request) error {
if err != nil {
return err
}
if err := q.storage.AddRequest(d); err != nil {
return err
}
q.lock.Lock()
for _, c := range q.threadChans {
c <- !stop
}
q.threadChans = make([]chan bool, 0, q.Threads)
q.lock.Unlock()
return nil
return q.storage.AddRequest(d)
}

// Size returns the size of the queue
Expand All @@ -120,52 +109,48 @@ func (q *Queue) Run(c *colly.Collector) error {
wg := &sync.WaitGroup{}
for i := 0; i < q.Threads; i++ {
wg.Add(1)
go func(c *colly.Collector, wg *sync.WaitGroup) {
go func() {
defer wg.Done()
for {
if q.IsEmpty() {
if q.activeThreadCount == 0 {
break
}
ch := make(chan bool)
q.lock.Lock()
q.threadChans = append(q.threadChans, ch)
q.lock.Unlock()
action := <-ch
if action == stop && q.IsEmpty() {
break
}
}
q.lock.Lock()
atomic.AddInt32(&q.activeThreadCount, 1)
q.lock.Unlock()
rb, err := q.storage.GetRequest()
if err != nil || rb == nil {
q.finish()
continue
}
r, err := c.UnmarshalRequest(rb)
if err != nil || r == nil {
q.finish()
continue
}
atomic.AddInt32(&q.activeThreadCount, 1)
for r := range q.requestsOut {
r.Do()
q.finish()
}
}(c, wg)
atomic.AddInt32(&q.activeThreadCount, -1)
}()
}

wg.Add(1)
go func(c *colly.Collector, s Storage) {
defer wg.Done()
for {
if q.IsEmpty() {
if q.activeThreadCount == 0 {
q.finish()
break
}
continue
}
rb, err := s.GetRequest()
if err != nil || rb == nil {
//q.finish()
break
}
t := make([]byte, len(rb))
copy(t, rb)
r, err := c.UnmarshalRequest(t[:])
if err != nil || r == nil {
continue
}
q.requestsOut <- r
}
}(c, q.storage)

wg.Wait()
return nil
}

func (q *Queue) finish() {
q.lock.Lock()
q.activeThreadCount--
for _, c := range q.threadChans {
c <- stop
}
q.threadChans = make([]chan bool, 0, q.Threads)
q.lock.Unlock()
close(q.requestsOut)
}

// Init implements Storage.Init() function
Expand Down

0 comments on commit 45ed182

Please sign in to comment.