Skip to content

Commit

Permalink
Fix forwarding async empty body (zio#2244)
Browse files Browse the repository at this point in the history
* Fix forwarding async empty body

* Format
  • Loading branch information
vigoo authored Jun 9, 2023
1 parent bcfe30d commit babadce
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ object NettyBodyWriter {
override def apply(message: Chunk[Byte], isLast: Boolean): Unit = {
val nettyMsg = message match {
case b: ByteArray => Unpooled.wrappedBuffer(b.array)
case other => throw new IllegalStateException(s"Unsupported async msg type: ${other.getClass}")
case other => Unpooled.wrappedBuffer(other.toArray)
}
ctx.writeAndFlush(nettyMsg)
if (isLast) ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
Expand Down
44 changes: 42 additions & 2 deletions zio-http/src/test/scala/zio/http/RequestStreamingServerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

package zio.http

import zio._
import zio.test.Assertion.equalTo
import zio.test.TestAspect.{diagnose, sequential, shrinks, timeout, withLiveClock}
import zio.test.assertZIO
import zio.{Scope, ZIO, ZLayer, durationInt}
import zio.test.{assertCompletes, assertTrue, assertZIO}

import zio.http.ServerSpec.requestBodySpec
import zio.http.internal.{DynamicServer, HttpRunnableSpec}
Expand Down Expand Up @@ -66,6 +66,46 @@ object RequestStreamingServerSpec extends HttpRunnableSpec {
val res = app.deploy.status.run()
assertZIO(res)(equalTo(Status.InternalServerError))
},
suite("streaming request passed to client")({
val app = Http
.collectHandler[Request] {
case req @ Method.POST -> Root / "1" =>
Handler.fromZIO {
val host = req.headers.get(Header.Host).get
val newRequest =
req.copy(url = req.url.withPath("/2").withHost(host.hostAddress).withPort(host.port.getOrElse(80)))
ZIO.debug(s"#1: got response, forwarding") *>
ZIO.serviceWithZIO[Client] { client =>
client.request(newRequest)
}
}
case req @ Method.POST -> Root / "2" =>
Handler.fromZIO {
ZIO.debug("#2: got response, collecting") *>
req.body.asChunk.map { body =>
Response.text(body.length.toString)
}
}
}
.catchAllCauseZIO(cause =>
ZIO
.debug(s"got error: $cause")
.as(Response.fromHttpError(HttpError.InternalServerError(cause = Some(FiberFailure(cause))))),
)
val sizes = Chunk(0, 8192, 1024 * 1024)
sizes.map { size =>
test(s"with body length $size") {
for {
testBytes <- Random.nextBytes(size)
res <- app.deploy.run(method = Method.POST, path = Root / "1", body = Body.fromChunk(testBytes))
str <- res.body.asString
} yield assertTrue(
res.status.isSuccess,
str == testBytes.length.toString,
)
}
}
}: _*),
) @@ timeout(10 seconds)

override def spec =
Expand Down

0 comments on commit babadce

Please sign in to comment.