Skip to content

Commit

Permalink
Retry failed writes once (lyft#120)
Browse files Browse the repository at this point in the history
Introduce a retry channel, giving buffers one more chance to be successfully flushed before they're dropped. This is intended to improve handling the case where the underlying connection is intermittently closed, but the destination sink is otherwise generally healthy.
  • Loading branch information
danielmmetz authored Feb 14, 2022
1 parent 55f0788 commit 7495b50
Showing 1 changed file with 28 additions and 13 deletions.
41 changes: 28 additions & 13 deletions net_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,10 @@ func NewNetSink(opts ...SinkOption) FlushableSink {
bufSize = defaultBufferSizeTCP
}

outc := make(chan *bytes.Buffer, approxMaxMemBytes/bufSize)
writer := &sinkWriter{
outc: outc,
}
s.outc = outc
s.outc = make(chan *bytes.Buffer, approxMaxMemBytes/bufSize)
s.retryc = make(chan *bytes.Buffer, 1) // It should be okay to limit this given we preferentially process from this over outc.

writer := &sinkWriter{outc: s.outc}
s.bufWriter = bufio.NewWriterSize(writer, bufSize)

go s.run()
Expand All @@ -124,6 +122,7 @@ func NewNetSink(opts ...SinkOption) FlushableSink {
type netSink struct {
conn net.Conn
outc chan *bytes.Buffer
retryc chan *bytes.Buffer
mu sync.Mutex
bufWriter *bufio.Writer
doFlush chan chan struct{}
Expand Down Expand Up @@ -292,6 +291,20 @@ func (s *netSink) run() {
reconnectFailed = false
}

// Handle buffers that need to be retried first, if they exist.
select {
case buf := <-s.retryc:
if err := s.writeToConn(buf); err != nil {
s.mu.Lock()
s.handleFlushErrorSize(err, buf.Len())
s.mu.Unlock()
}
putBuffer(buf)
continue
default:
// Drop through in case retryc has nothing.
}

select {
case <-t.C:
s.flush()
Expand All @@ -305,34 +318,36 @@ func (s *netSink) run() {
n := len(s.outc)
for i := 0; i < n && s.conn != nil; i++ {
buf := <-s.outc
s.writeToConn(buf)
if err := s.writeToConn(buf); err != nil {
s.retryc <- buf
continue
}
putBuffer(buf)
}
close(done)
case buf := <-s.outc:
s.writeToConn(buf)
if err := s.writeToConn(buf); err != nil {
s.retryc <- buf
continue
}
putBuffer(buf)
}
}
}

// writeToConn writes the buffer to the underlying conn. May only be called
// from run().
func (s *netSink) writeToConn(buf *bytes.Buffer) {
len := buf.Len()

func (s *netSink) writeToConn(buf *bytes.Buffer) error {
// TODO (CEV): parameterize timeout
s.conn.SetWriteDeadline(time.Now().Add(defaultWriteTimeout))
_, err := buf.WriteTo(s.conn)
s.conn.SetWriteDeadline(time.Time{}) // clear

if err != nil {
s.mu.Lock()
s.handleFlushErrorSize(err, len)
s.mu.Unlock()
_ = s.conn.Close()
s.conn = nil // this will break the loop
}
return err
}

func (s *netSink) connect(address string) error {
Expand Down

0 comments on commit 7495b50

Please sign in to comment.