Skip to content

Commit

Permalink
Return ErrPipelineOverflow from PipelineClient.Do if the pending requ…
Browse files Browse the repository at this point in the history
…ests' queue is overflown. This should prevent from caller's goroutines leak on stalled pipeline client
  • Loading branch information
valyala committed Apr 1, 2016
1 parent 22c9594 commit ca21b21
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 2 deletions.
19 changes: 17 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1547,7 +1547,15 @@ func (c *PipelineClient) Do(req *Request, resp *Response) error {
w.resp = &w.respCopy
}

c.chW <- w
// Put the request to outgoing queue
select {
case c.chW <- w:
default:
releasePipelineWork(&c.workPool, w)
return ErrPipelineOverflow
}

// Wait for the response
<-w.done
err := w.err

Expand All @@ -1556,6 +1564,10 @@ func (c *PipelineClient) Do(req *Request, resp *Response) error {
return err
}

// ErrPipelineOverflow may be returned from PipelineClient.Do
// if the requests' queue is overflown.
var ErrPipelineOverflow = errors.New("pipelined requests' queue has been overflown. Increase MaxPendingRequests")

// DefaultMaxPendingRequests is the default value
// for PipelineClient.MaxPendingRequests.
const DefaultMaxPendingRequests = 1024
Expand Down Expand Up @@ -1756,7 +1768,10 @@ func (c *PipelineClient) logger() Logger {
func (c *PipelineClient) PendingRequests() int {
c.init()

return len(c.chR)
c.chLock.Lock()
n := len(c.chR)
c.chLock.Unlock()
return n
}

var errPipelineClientStopped = errors.New("pipeline client has been stopped")
Expand Down
8 changes: 8 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func testPipelineClientDoConcurrent(t *testing.T, concurrency int) {
}
}

if c.PendingRequests() != 0 {
t.Fatalf("unexpected number of pending requests: %d. Expecting zero", c.PendingRequests())
}

if err := ln.Close(); err != nil {
t.Fatalf("unexpected error: %s", err)
}
Expand All @@ -86,6 +90,10 @@ func testPipelineClientDo(t *testing.T, c *PipelineClient) {
err = c.Do(req, resp)
}
if err != nil {
if err == ErrPipelineOverflow {
time.Sleep(10 * time.Millisecond)
continue
}
t.Fatalf("unexpected error on iteration %d: %s", i, err)
}
if resp.StatusCode() != StatusOK {
Expand Down

0 comments on commit ca21b21

Please sign in to comment.