Skip to content

Commit

Permalink
Increase Buffer Size In Converting Async Body To Stream (zio#2478)
Browse files Browse the repository at this point in the history
* enable test

* enable test

* revert

* fix test

* try to enable some more tests

* revert
  • Loading branch information
adamgfraser authored Oct 13, 2023
1 parent f962297 commit e195aa4
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 16 deletions.
30 changes: 16 additions & 14 deletions zio-http/src/main/scala/zio/http/netty/NettyBody.scala
Original file line number Diff line number Diff line change
Expand Up @@ -123,21 +123,23 @@ object NettyBody extends BodyEncoding {

override def asStream(implicit trace: Trace): ZStream[Any, Throwable, Byte] =
ZStream
.async[Any, Throwable, Byte](emit =>
try {
unsafeAsync(new UnsafeAsync {
override def apply(message: Chunk[Byte], isLast: Boolean): Unit = {
emit(ZIO.succeed(message))
if (isLast) {
emit(ZIO.fail(None))
.async[Any, Throwable, Byte](
emit =>
try {
unsafeAsync(new UnsafeAsync {
override def apply(message: Chunk[Byte], isLast: Boolean): Unit = {
emit(ZIO.succeed(message))
if (isLast) {
emit(ZIO.fail(None))
}
}
}
override def fail(cause: Throwable): Unit =
emit(ZIO.fail(Some(cause)))
})
} catch {
case e: Throwable => emit(ZIO.fail(Option(e)))
},
override def fail(cause: Throwable): Unit =
emit(ZIO.fail(Some(cause)))
})
} catch {
case e: Throwable => emit(ZIO.fail(Option(e)))
},
4096,
)

override def isComplete: Boolean = false
Expand Down
4 changes: 2 additions & 2 deletions zio-http/src/test/scala/zio/http/endpoint/RoundtripSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ object RoundtripSpec extends ZIOHttpSpec {
("name", 10, Post(1, "title", "body", 111)),
"name: name, value: 10, post: Post(1,title,body,111)",
)
} @@ ifEnvNotSet("CI"),
},
test("endpoint error returned") {
val api = Endpoint(POST / "test")
.outError[String](Status.Custom(999))
Expand Down Expand Up @@ -460,7 +460,7 @@ object RoundtripSpec extends ZIOHttpSpec {
s"name: xyz, value: 100, count: ${1024 * 1024}",
)
}
} @@ ifEnvNotSet("CI"),
},
).provide(
Server.live,
ZLayer.succeed(Server.Config.default.onAnyOpenPort.enableRequestStreaming),
Expand Down

0 comments on commit e195aa4

Please sign in to comment.