Skip to content

Commit

Permalink
Throw exception when reading from AsyncBody after the channel closed (z…
Browse files Browse the repository at this point in the history
…io#2399) (zio#2454)

* Throw exception when reading from AsyncBody after the channel closed (zio#2399)

Currently, when we call `Response.body.asStream`, we don't check to see if the
corresponding response channel is open or not. If the channel has been closed
and the user attempts to read the response body, it's possible for the body stream
to hang (i.e never finish or throw an error), leading to the code hanging as
illustrated in zio#2399. This behavior is undesirable since the user isn't aware of the
channel closing (because we take care of that behind the scene), leading to the
issue being hard to debug.

This commit adds a check to `AsyncBodyReader.connect` so that we will throw
an exception when the user attempts to read an unfinished response body from
a closed channel.

* fix: AsyncBodyReader emit failure when channel closes

This is a bug which happened to me in CI: when the connect to the server closes,
we must fail the response body stream so that the client doesn't hang forever
trying to read from the stream.
  • Loading branch information
lackhoa authored Oct 10, 2023
1 parent f20cab3 commit 9674ac2
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 7 deletions.
32 changes: 28 additions & 4 deletions zio-http/src/main/scala/zio/http/netty/AsyncBodyReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package zio.http.netty

import java.io.IOException

import zio.stacktracer.TracingImplicits.disableAutoTrace
import zio.{Chunk, ChunkBuilder, Trace, Unsafe}

Expand All @@ -38,11 +40,22 @@ abstract class AsyncBodyReader(implicit trace: Trace) extends SimpleChannelInbou
this.synchronized {
state match {
case State.Buffering =>
state = State.Direct(callback)
buffer.result().foreach { case (chunk, isLast) =>
callback(chunk, isLast)
val result: Chunk[(Chunk[Byte], Boolean)] = buffer.result()
val readingDone: Boolean = result.lastOption match {
case None => false
case Some((_, isLast)) => isLast
}
ctx.read()

if (ctx.channel.isOpen || readingDone) {
state = State.Direct(callback)
result.foreach { case (chunk, isLast) =>
callback(chunk, isLast)
}
ctx.read()
} else {
throw new IllegalStateException("Attempting to read from a closed channel, which will never finish")
}

case State.Direct(_) =>
throw new IllegalStateException("Cannot connect twice")
}
Expand Down Expand Up @@ -91,6 +104,17 @@ abstract class AsyncBodyReader(implicit trace: Trace) extends SimpleChannelInbou
}
super.exceptionCaught(ctx, cause)
}

override def channelInactive(ctx: ChannelHandlerContext): Unit = {
this.synchronized {
state match {
case State.Buffering =>
case State.Direct(callback) =>
callback.fail(new IOException("Channel closed unexpectedly"))
}
}
ctx.fireChannelInactive()
}
}

object AsyncBodyReader {
Expand Down
12 changes: 12 additions & 0 deletions zio-http/src/test/scala/zio/http/ClientSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,18 @@ object ClientSpec extends HttpRunnableSpec {
loggedUrl <- ZTestLogger.logOutput.map(_.collectFirst { case m => m.annotations("url") }.mkString)
} yield assertTrue(loggedUrl == s"$baseURL/")
},
test("reading of unfinished body must fail") {
val app = Handler.fromStream(ZStream.never).sandbox.toHttpApp
val requestCode = (client: Client) =>
(for {
response <- ZIO.scoped(client(Request()))
_ <- response.body.asStream.runForeach { _ => ZIO.succeed(0) }
.timeout(60.second) // timeout just in case it hangs
} yield ()).fold(success = _ => false, failure = _ => true)

val effect = app.deployAndRequest(requestCode).runZIO(())
assertZIO(effect)(isTrue)
},
)

override def spec = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ abstract class HttpRunnableSpec extends ZIOHttpSpec { self =>
}
} yield response

def deployAndRequest(
call: Client => ZIO[Scope, Throwable, Response],
): Handler[Client with DynamicServer with R with Scope, Throwable, Any, Response] =
def deployAndRequest[Output](
call: Client => ZIO[Scope, Throwable, Output],
): Handler[Client with DynamicServer with R with Scope, Throwable, Any, Output] =
for {
port <- Handler.fromZIO(DynamicServer.port)
id <- Handler.fromZIO(DynamicServer.deploy[R](app))
Expand Down

0 comments on commit 9674ac2

Please sign in to comment.