Skip to content

Commit

Permalink
fix connection leakage
Browse files Browse the repository at this point in the history
  • Loading branch information
gangtao committed Jan 27, 2024
1 parent bde063a commit 14939d0
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 0 deletions.
3 changes: 3 additions & 0 deletions internal/impl/nats/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ func (n *natsReader) Read(ctx context.Context) (*service.Message, service.AckFun
}

func (n *natsReader) Close(ctx context.Context) (err error) {
go func() {
n.disconnect()
}()
n.interruptOnce.Do(func() {
close(n.interruptChan)
})
Expand Down
3 changes: 3 additions & 0 deletions internal/impl/nats/input_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,9 @@ func (n *natsStreamReader) Read(ctx context.Context) (*service.Message, service.
}

func (n *natsStreamReader) Close(ctx context.Context) (err error) {
go func() {
n.disconnect()
}()
n.interruptOnce.Do(func() {
close(n.interruptChan)
})
Expand Down

0 comments on commit 14939d0

Please sign in to comment.