Skip to content

Commit

Permalink
fix: handle cancels mid read in the fallback (muesli#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
caarlos0 authored Jun 21, 2022
1 parent 17304ab commit d8fcbea
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 9 deletions.
21 changes: 15 additions & 6 deletions cancelreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type File interface {
// false. However, after calling Cancel(), new Read() calls immediately return
// errCanceled and don't consume any data anymore.
type fallbackCancelReader struct {
r io.Reader
canceled bool
r io.Reader
cancelMixin
}

// newFallbackCancelReader is a fallback for NewReader that cannot actually
Expand All @@ -46,16 +46,25 @@ func newFallbackCancelReader(reader io.Reader) (CancelReader, error) {
}

func (r *fallbackCancelReader) Read(data []byte) (int, error) {
if r.canceled {
if r.isCanceled() {
return 0, ErrCanceled
}

return r.r.Read(data)
n, err := r.r.Read(data)
/*
If the underlying reader is a blocking reader (e.g. an open connection),
it might happen that 1 goroutine cancels the reader while its stuck in
the read call waiting for something.
If that happens, we should still cancel the read.
*/
if r.isCanceled() {
return 0, ErrCanceled
}
return n, err // nolint: wrapcheck
}

func (r *fallbackCancelReader) Cancel() bool {
r.canceled = true

r.setCanceled()
return false
}

Expand Down
4 changes: 1 addition & 3 deletions cancelreader_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@

package cancelreader

import (
"io"
)
import "io"

// NewReader returns a fallbackCancelReader that satisfies the CancelReader but
// does not actually support cancellation.
Expand Down
88 changes: 88 additions & 0 deletions cancelreader_fallback_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package cancelreader

import (
"bytes"
"fmt"
"io/ioutil"
"sync"
"testing"
)

type blockingReader struct {
sync.Mutex
read bool
unblockCh chan bool
startedCh chan bool
}

func (r *blockingReader) Read([]byte) (int, error) {
defer func() {
r.Lock()
defer r.Unlock()
r.read = true
}()
r.startedCh <- true
<-r.unblockCh
return 0, fmt.Errorf("this error should be ignored")
}

func TestFallbackReaderConcurrentCancel(t *testing.T) {
doneCh := make(chan bool, 1)
startedCh := make(chan bool, 1)
unblockCh := make(chan bool, 1)
r := blockingReader{
startedCh: startedCh,
unblockCh: unblockCh,
}
cr, err := newFallbackCancelReader(&r)
if err != nil {
t.Errorf("expected no error, but got %s", err)
}

go func() {
defer func() { doneCh <- true }()
if _, err := ioutil.ReadAll(cr); err != ErrCanceled {
t.Errorf("expected canceled error, got %v", err)
}
}()

// make sure the read started before canceling the reader
<-startedCh
cr.Cancel()
unblockCh <- true

// wait for the read to end to ensure its assertions were made
<-doneCh

// make sure that it waited for the reader
if !r.read {
t.Error("seems like the reader was canceled before the read, this shouldn't happen")
}
}

func TestFallbackReader(t *testing.T) {
var r bytes.Buffer
cr, err := newFallbackCancelReader(&r)
if err != nil {
t.Errorf("expected no error, but got %s", err)
}

txt := "first"
_, _ = r.WriteString(txt)
first, err := ioutil.ReadAll(cr)
if err != nil {
t.Errorf("expected no error, but got %s", err)
}
if string(first) != txt {
t.Errorf("expected output to be %q, got %q", txt, string(first))
}

cr.Cancel()
second, err := ioutil.ReadAll(cr)
if err != ErrCanceled {
t.Errorf("expected ErrCanceled, got %v", err)
}
if len(second) > 0 {
t.Errorf("expected an empty read, got %q", string(second))
}
}

0 comments on commit d8fcbea

Please sign in to comment.