Skip to content

Commit

Permalink
Fix: Request Streaming back pressure (zio#1188)
Browse files Browse the repository at this point in the history
* using a bounded queue of 1 element instead of ZIO.effectAsync to be able to control reading from channel.

* integraiton test added

* separated the streaming execution in its own test object

* added unit test

* fixed test for scala 2.12

* refactored tests after review

* removed redundant test

* refactor: move logic to ServerResponseWriter

* refactor: remove UnsafeReadableChannel

* added another example for File Streaming upload

* fix tests

* fixeed code after merge with main

* removed unusefull example

* formatting

* added a request streaming integration test with larger file

* refactor: cleanup UnsafeAsync

* refactor: cleaning up the test

* revert: serverspec

* refactor: simplify the test

Co-authored-by: Tushar Mathur <[email protected]>
  • Loading branch information
gciuloaica and tusharmath authored May 14, 2022
1 parent 3f1aee0 commit c78ddfc
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 14 deletions.
34 changes: 23 additions & 11 deletions zio-http/src/main/scala/zhttp/http/HttpData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.http.{HttpContent, LastHttpContent}
import io.netty.util.AsciiString
import zhttp.http.HttpData.ByteBufConfig
import zio._
import zio.stream.ZStream
import zio.{Chunk, Task, UIO, ZIO}

import java.io.FileInputStream
import java.nio.charset.Charset
Expand Down Expand Up @@ -126,11 +126,26 @@ object HttpData {
}
}

private[zhttp] final case class UnsafeAsync(unsafeRun: (ChannelHandlerContext => HttpContent => Unit) => Unit)
private[zhttp] final case class UnsafeAsync(unsafeRun: (ChannelHandlerContext => HttpContent => Any) => Unit)
extends HttpData {

private def isLast(msg: HttpContent): Boolean = msg.isInstanceOf[LastHttpContent]

private def toQueue: ZIO[Any, Nothing, Queue[HttpContent]] = {
for {
queue <- ZQueue.bounded[HttpContent](1)
ctxPromise <- Promise.make[Nothing, ChannelHandlerContext]
runtime <- ZIO.runtime[Any]
_ <- UIO(
unsafeRun { ch =>
runtime.unsafeRun(ctxPromise.succeed(ch))
msg => runtime.unsafeRun(queue.offer(msg))
},
)
ch <- ctxPromise.await
} yield queue.mapM(msg => UIO(ch.read()).unless(isLast(msg)).as(msg))
}

/**
* Encodes the HttpData into a ByteBuf.
*/
Expand All @@ -150,15 +165,12 @@ object HttpData {
* Encodes the HttpData into a Stream of ByteBufs
*/
override def toByteBufStream(config: ByteBufConfig): ZStream[Any, Throwable, ByteBuf] =
ZStream
.effectAsync[Any, Nothing, ByteBuf](cb =>
unsafeRun(ch =>
msg => {
cb(ZIO.succeed(Chunk(msg.content)))
if (isLast(msg)) cb(ZIO.fail(None)) else ch.read(): Unit
},
),
)
ZStream.unwrap {
for {
queue <- toQueue
stream = ZStream.fromQueueWithShutdown(queue).takeUntil(isLast(_)).map(_.content())
} yield stream
}

override def toHttp(config: ByteBufConfig): Http[Any, Throwable, Any, ByteBuf] =
Http.fromZIO(toByteBuf(config))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package zhttp.service
import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
import io.netty.handler.codec.http.{HttpContent, LastHttpContent}

final class RequestBodyHandler(val callback: HttpContent => Unit)
final class RequestBodyHandler(val callback: HttpContent => Any)
extends SimpleChannelInboundHandler[HttpContent](false) { self =>

override def channelRead0(ctx: ChannelHandlerContext, msg: HttpContent): Unit = {
Expand Down
Binary file added zio-http/src/test/resources/1M.img
Binary file not shown.
6 changes: 4 additions & 2 deletions zio-http/src/test/scala/zhttp/http/HttpDataSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import zio.duration.durationInt
import zio.stream.ZStream
import zio.test.Assertion.{anything, equalTo, isLeft, isSubtype}
import zio.test.TestAspect.timeout
import zio.test.{DefaultRunnableSpec, Gen, assertM, checkAllM}
import zio.test._

import java.io.File

object HttpDataSpec extends DefaultRunnableSpec {

override def spec =
suite("HttpDataSpec") {

val testFile = new File(getClass.getResource("/TestFile.txt").getPath)
suite("outgoing") {
suite("encode")(
Expand Down Expand Up @@ -43,5 +45,5 @@ object HttpDataSpec extends DefaultRunnableSpec {
),
)
}
} @@ timeout(5 seconds)
} @@ timeout(10 seconds)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package zhttp.service
import zhttp.http._
import zhttp.internal.{DynamicServer, HttpRunnableSpec}
import zhttp.service.ServerSpec.requestBodySpec
import zio.duration.durationInt
import zio.test.Assertion.equalTo
import zio.test.TestAspect.{sequential, timeout}
import zio.test._

object RequestStreamingServerSpec extends HttpRunnableSpec {
private val env =
EventLoopGroup.nio() ++ ChannelFactory.nio ++ zhttp.service.server.ServerChannelFactory.nio ++ DynamicServer.live

private val appWithReqStreaming = serve(DynamicServer.app, Some(Server.enableObjectAggregator(-1)))

/**
* Generates a string of the provided length and char.
*/
private def genString(size: Int, char: Char): String = {
val buffer = new Array[Char](size)
for (i <- 0 until size) buffer(i) = char
new String(buffer)
}

def largeContentSpec = suite("ServerStreamingSpec") {
testM("test unsafe large content") {
val size = 1024 * 1024
val content = genString(size, '?')

val app = Http.fromFunctionZIO[Request] {
_.bodyAsStream.runCount.map(bytesCount => {
Response.text(bytesCount.toString)
})
}

val res = app.deploy.bodyAsString.run(content = HttpData.fromString(content))

assertM(res)(equalTo(size.toString))

}
}

override def spec =
suite("RequestStreamingServerSpec") {
val spec = requestBodySpec + largeContentSpec
suiteM("app with request streaming") { appWithReqStreaming.as(List(spec)).useNow }
}.provideCustomLayerShared(env) @@ timeout(10 seconds) @@ sequential

}

0 comments on commit c78ddfc

Please sign in to comment.