Skip to content

Commit

Permalink
Merge pull request Jeffail#31 from jarri-abidi/patch-1
Browse files Browse the repository at this point in the history
Add ProcessCtx method to Pool
  • Loading branch information
Jeffail authored Jan 26, 2021
2 parents f13eb66 + d77f5f2 commit 6adc18d
Showing 1 changed file with 40 additions and 0 deletions.
40 changes: 40 additions & 0 deletions tunny.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,46 @@ func (p *Pool) ProcessTimed(
return payload, nil
}

// ProcessCtx will use the Pool to process a payload and synchronously return
// the result. If the context cancels before the job has finished the worker will
// be interrupted and ErrJobTimedOut will be returned. ProcessCtx can be
// called safely by any goroutines.
func (p *Pool) ProcessCtx(ctx context.Context, i interface{}) (interface{}, error) {
atomic.AddInt64(&p.queuedJobs, 1)
defer atomic.AddInt64(&p.queuedJobs, -1)

var request workRequest
var open bool

select {
case request, open = <-p.reqChan:
if !open {
return nil, ErrPoolNotRunning
}
case <-ctx.Done():
return nil, ErrJobTimedOut
}

select {
case request.jobChan <- payload:
case <-ctx.Done():
request.interruptFunc()
return nil, ErrJobTimedOut
}

select {
case payload, open = <-request.retChan:
if !open {
return nil, ErrWorkerClosed
}
case <-ctx.Done():
request.interruptFunc()
return nil, ErrJobTimedOut
}

return payload, nil
}

// QueueLength returns the current count of pending queued jobs.
func (p *Pool) QueueLength() int64 {
return atomic.LoadInt64(&p.queuedJobs)
Expand Down

0 comments on commit 6adc18d

Please sign in to comment.