Skip to content

Commit

Permalink
Fix queue bug
Browse files Browse the repository at this point in the history
  • Loading branch information
gopherclass committed Feb 27, 2020
1 parent 1d1ba40 commit 600d8a5
Showing 1 changed file with 90 additions and 43 deletions.
133 changes: 90 additions & 43 deletions queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package queue
import (
"net/url"
"sync"
"sync/atomic"

"github.com/gocolly/colly/v2"
)
Expand All @@ -27,10 +26,10 @@ type Storage interface {
// requests in multiple threads
type Queue struct {
// Threads defines the number of consumer threads
Threads int
storage Storage
activeThreadCount int32
requestsOut chan *colly.Request
Threads int
storage Storage
wake chan struct{}
mut sync.Mutex // guards wake
}

// InMemoryQueueStorage is the default implementation of the Storage interface.
Expand Down Expand Up @@ -60,9 +59,8 @@ func New(threads int, s Storage) (*Queue, error) {
return nil, err
}
return &Queue{
Threads: threads,
storage: s,
requestsOut: make(chan *colly.Request),
Threads: threads,
storage: s,
}, nil
}

Expand Down Expand Up @@ -91,6 +89,21 @@ func (q *Queue) AddURL(URL string) error {

// AddRequest adds a new Request to the queue
func (q *Queue) AddRequest(r *colly.Request) error {
q.mut.Lock()
waken := q.wake != nil
q.mut.Unlock()
if !waken {
return q.storeRequest(r)
}
err := q.storeRequest(r)
if err != nil {
return err
}
q.wake <- struct{}{}
return nil
}

func (q *Queue) storeRequest(r *colly.Request) error {
d, err := r.Marshal()
if err != nil {
return err
Expand All @@ -105,52 +118,86 @@ func (q *Queue) Size() (int, error) {

// Run starts consumer threads and calls the Collector
// to perform requests. Run blocks while the queue has active requests
// The given Storage must not be used directly while Run blocks.
func (q *Queue) Run(c *colly.Collector) error {
wg := &sync.WaitGroup{}
q.mut.Lock()
if q.wake != nil {
q.mut.Unlock()
panic("cannot call duplicate Queue.Run")
}
q.wake = make(chan struct{})
q.mut.Unlock()

requestc := make(chan *colly.Request)
complete, errc := make(chan struct{}), make(chan error, 1)
for i := 0; i < q.Threads; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt32(&q.activeThreadCount, 1)
for r := range q.requestsOut {
r.Do()
}
atomic.AddInt32(&q.activeThreadCount, -1)
}()
go independentRunner(requestc, complete)
}
go q.loop(c, requestc, complete, errc)
defer close(requestc)
return <-errc
}

wg.Add(1)
go func(c *colly.Collector, s Storage) {
defer wg.Done()
func (q *Queue) loop(c *colly.Collector, requestc chan<- *colly.Request, complete <-chan struct{}, errc chan<- error) {
var active int
for {
size, err := q.storage.QueueSize()
if err != nil {
errc <- err
break
}
if size == 0 && active == 0 {
// Terminate when
// 1. No active requests
// 2. Emtpy queue
errc <- nil
break
}
sent := requestc
var req *colly.Request
if size > 0 {
req, err = q.loadRequest(c)
}
if size == 0 || err != nil {
// ignore an error returned by GetRequest() or
// UnmarshalRequest()
sent = nil
}
Sent:
for {
if q.IsEmpty() {
if q.activeThreadCount == 0 {
q.finish()
break
select {
case sent <- req:
active++
break Sent
case <-q.wake:
if sent == nil {
break Sent
}
case <-complete:
active--
if sent == nil && active == 0 {
break Sent
}
continue
}
rb, err := s.GetRequest()
if err != nil || rb == nil {
//q.finish()
break
}
t := make([]byte, len(rb))
copy(t, rb)
r, err := c.UnmarshalRequest(t[:])
if err != nil || r == nil {
continue
}
q.requestsOut <- r
}
}(c, q.storage)
}
}

wg.Wait()
return nil
func independentRunner(requestc <-chan *colly.Request, complete chan<- struct{}) {
for req := range requestc {
req.Do()
complete <- struct{}{}
}
}

func (q *Queue) finish() {
close(q.requestsOut)
func (q *Queue) loadRequest(c *colly.Collector) (*colly.Request, error) {
buf, err := q.storage.GetRequest()
if err != nil {
return nil, err
}
copied := make([]byte, len(buf))
copy(copied, buf)
return c.UnmarshalRequest(copied)
}

// Init implements Storage.Init() function
Expand Down

0 comments on commit 600d8a5

Please sign in to comment.