Skip to content

Commit

Permalink
Improve error handling and resource management in streamPrediction (r…
Browse files Browse the repository at this point in the history
…eplicate#79)

* Move up deferred closing of response body

* Return context error instead of nil

* Send to line channel in select with context check

* Refactor final goroutine in streamPrediction
  • Loading branch information
mattt authored Sep 19, 2024
1 parent 63039fd commit 1c2ae68
Showing 1 changed file with 22 additions and 21 deletions.
43 changes: 22 additions & 21 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,20 +169,24 @@ func (r *Client) streamPrediction(ctx context.Context, prediction *Prediction, l

g.Go(func() error {
defer close(lineChan)
defer resp.Body.Close()

for {
select {
case <-ctx.Done():
return nil
return ctx.Err()
case <-done:
return nil
default:
line, err := reader.ReadBytes('\n')
if err != nil {
defer resp.Body.Close()
return err
}
lineChan <- line
select {
case lineChan <- line:
case <-ctx.Done():
return ctx.Err()
}
}
}
})
Expand Down Expand Up @@ -223,30 +227,27 @@ func (r *Client) streamPrediction(ctx context.Context, prediction *Prediction, l
}()

go func() {
err := g.Wait()

defer close(sseChan)
defer close(errChan)

for {
select {
case <-ctx.Done():
return
case <-done:
if err != nil {
if errors.Is(err, io.EOF) {
// Attempt to reconnect if the connection was closed before the stream was done
r.streamPrediction(ctx, prediction, lastEvent, sseChan, errChan)
return
default:
err := g.Wait()
if err != nil {
if err == io.EOF {
// Attempt to reconnect if the connection was closed before the stream was done
r.streamPrediction(ctx, prediction, lastEvent, sseChan, errChan)
continue
}
}

if errors.Is(err, context.Canceled) {
return
}
if errors.Is(err, context.Canceled) {
// Context was canceled, simply return
return
}

errChan <- err
}
select {
case errChan <- err:
default:
// errChan is full or closed
}
}
}()
Expand Down

0 comments on commit 1c2ae68

Please sign in to comment.