Skip to content

Commit

Permalink
ZIO 2 migration (zio-archive#154)
Browse files Browse the repository at this point in the history
Co-authored-by: Peter Kotula <[email protected]>
  • Loading branch information
zalbia and justcoon authored Oct 4, 2022
1 parent b8d65f7 commit 837aaa1
Show file tree
Hide file tree
Showing 30 changed files with 1,526 additions and 1,439 deletions.
27 changes: 12 additions & 15 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@ inThisBuild(
addCommandAlias("fmt", "all scalafmtSbt scalafmt test:scalafmt")
addCommandAlias("check", "all scalafmtSbtCheck scalafmtCheck test:scalafmtCheck")

val zioVersion = "1.0.16"
val zioHttpVersion = "1.0.0.0-RC17"
val zioJson = "0.1.5"
val zioMagicVersion = "0.3.12"
val zioPreludeVersion = "1.0.0-RC8-1"
val zioVersion = "2.0.2"
val zioHttpVersion = "2.0.0-RC11"
val zioJson = "0.3.0"
val zioPreludeVersion = "1.0.0-RC15"
val sttpVersion = "3.8.0"

lazy val `zio-webhooks` =
Expand All @@ -55,7 +54,7 @@ lazy val zioWebhooksCore = module("zio-webhooks-core", "webhooks")
"dev.zio" %% "zio-streams" % zioVersion,
"dev.zio" %% "zio-test" % zioVersion,
"com.softwaremill.sttp.client3" %% "core" % sttpVersion,
"com.softwaremill.sttp.client3" %% "zio1" % sttpVersion
"com.softwaremill.sttp.client3" %% "zio" % sttpVersion
)
)
.settings(
Expand All @@ -68,11 +67,10 @@ lazy val zioWebhooksTest = module("zio-webhooks-test", "webhooks-test")
Defaults.itSettings,
publish / skip := true,
libraryDependencies ++= Seq(
"dev.zio" %% "zio-test" % zioVersion % "it,test",
"dev.zio" %% "zio-test-sbt" % zioVersion % "it,test",
"dev.zio" %% "zio-json" % zioJson % "it",
"io.github.kitlangton" %% "zio-magic" % zioMagicVersion % "it,test",
"io.d11" %% "zhttp" % zioHttpVersion % "it"
"dev.zio" %% "zio-test" % zioVersion % "it,test",
"dev.zio" %% "zio-test-sbt" % zioVersion % "it,test",
"dev.zio" %% "zio-json" % zioJson % "it",
"io.d11" %% "zhttp" % zioHttpVersion % "it"
),
testFrameworks := Seq(new TestFramework("zio.test.sbt.ZTestFramework"))
)
Expand All @@ -93,10 +91,9 @@ lazy val examples = module("zio-webhooks-examples", "examples")
publish / skip := true,
fork := true,
libraryDependencies ++= Seq(
"dev.zio" %% "zio-test" % zioVersion % "test",
"dev.zio" %% "zio-test-sbt" % zioVersion % "test",
"io.d11" %% "zhttp" % zioHttpVersion,
"io.github.kitlangton" %% "zio-magic" % zioMagicVersion
"dev.zio" %% "zio-test" % zioVersion % "test",
"dev.zio" %% "zio-test-sbt" % zioVersion % "test",
"io.d11" %% "zhttp" % zioHttpVersion
),
testFrameworks := Seq(new TestFramework("zio.test.sbt.ZTestFramework"))
)
Expand Down
32 changes: 16 additions & 16 deletions examples/src/main/scala/zio/webhooks/example/BasicExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ package zio.webhooks.example
import zhttp.http._
import zhttp.service.Server
import zio._
import zio.console._
import zio.duration._
import zio.magic._
import zio.stream.UStream
import zio.stream.ZStream
import zio.webhooks.backends.{ InMemoryWebhookStateRepo, JsonPayloadSerialization }
import zio.webhooks.backends.sttp.WebhookSttpClient
import zio.webhooks.testkit._
import zio.webhooks.{ WebhooksProxy, _ }
import zio.{ Random, ZIOAppDefault }
import zio.Console.{ printLine, printLineError }

/**
* Runs a webhook server and a zio-http server to which webhook events are delivered. The webhook
Expand All @@ -20,10 +19,10 @@ import zio.webhooks.{ WebhooksProxy, _ }
* and the events are delivered to an endpoint one-by-one. The zio-http endpoint prints out the
* contents of each payload as it receives them.
*/
object BasicExample extends App {
object BasicExample extends ZIOAppDefault {

// JSON webhook event stream
private lazy val events = UStream
private lazy val events = ZStream
.iterate(0L)(_ + 1)
.map { i =>
WebhookEvent(
Expand All @@ -34,15 +33,16 @@ object BasicExample extends App {
None
)
}
.schedule(Schedule.spaced(100.milli))

// reliable endpoint
private val httpApp = HttpApp.collectM {
case request @ Method.POST -> Root / "endpoint" =>
private val httpApp = Http.collectZIO[Request] {
case request @ Method.POST -> !! / "endpoint" =>
for {
randomDelay <- random.nextIntBounded(300).map(_.millis)
response <- ZIO
.foreach(request.getBodyAsString)(str => putStrLn(s"""SERVER RECEIVED PAYLOAD: "$str""""))
.as(Response.status(Status.OK))
randomDelay <- Random.nextIntBounded(300).map(_.millis)
response <- request.body.asString
.flatMap(str => printLine(s"""SERVER RECEIVED PAYLOAD: "$str""""))
.as(Response.status(Status.Ok))
.delay(randomDelay) // random delay to simulate latency
} yield response
}
Expand All @@ -54,18 +54,18 @@ object BasicExample extends App {

private def program =
for {
_ <- httpEndpointServer.start(port, httpApp).fork
_ <- WebhookServer.getErrors.use(UStream.fromQueue(_).map(_.toString).foreach(putStrLnErr(_))).fork
_ <- httpEndpointServer.start(port, httpApp).forkScoped
_ <- WebhookServer.getErrors.flatMap(ZStream.fromQueue(_).map(_.toString).foreach(printLineError(_))).forkScoped
_ <- TestWebhookRepo.setWebhook(webhook)
_ <- events.schedule(Schedule.spaced(50.micros).jittered).foreach(TestWebhookEventRepo.createEvent)
} yield ()

/**
* The webhook server is started as part of the layer construction. See `WebhookServer.live`.
*/
def run(args: List[String]): URIO[zio.ZEnv, ExitCode] =
override def run =
program
.injectCustom(
.provideSome[Scope](
InMemoryWebhookStateRepo.live,
JsonPayloadSerialization.live,
TestWebhookEventRepo.test,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,22 @@ package zio.webhooks.example
import zhttp.http._
import zhttp.service.Server
import zio._
import zio.console._
import zio.duration._
import zio.magic._
import zio.stream.UStream
import zio.stream.ZStream
import zio.webhooks.backends.{ InMemoryWebhookStateRepo, JsonPayloadSerialization }
import zio.webhooks.{ WebhooksProxy, _ }
import zio.webhooks.backends.sttp.WebhookSttpClient
import zio.webhooks.testkit._
import zio.{ Random, ZIOAppDefault }
import zio.Console.{ printLine, printLineError }

/**
* Differs from the [[BasicExample]] in that events are batched with the default batching setting
* of 128 elements per batch. The server dispatches all events queued up for each webhook since the
* last delivery and sends them in a batch.
*/
object BasicExampleWithBatching extends App {
object BasicExampleWithBatching extends ZIOAppDefault {

private lazy val events = UStream
private lazy val events = ZStream
.iterate(0L)(_ + 1)
.map { i =>
WebhookEvent(
Expand All @@ -31,15 +30,14 @@ object BasicExampleWithBatching extends App {
)
}

private val httpApp = HttpApp.collectM {
case request @ Method.POST -> Root / "endpoint" =>
private val httpApp = Http.collectZIO[Request] {
case request @ Method.POST -> !! / "endpoint" =>
for {
randomDelay <- random.nextIntBetween(10, 20).map(_.millis)
response <- ZIO
.foreach(request.getBodyAsString) { str =>
putStrLn(s"""SERVER RECEIVED PAYLOAD: "$str"""")
}
.as(Response.status(Status.OK))
randomDelay <- Random.nextIntBetween(10, 20).map(_.millis)
response <- request.body.asString.flatMap { str =>
printLine(s"""SERVER RECEIVED PAYLOAD: "$str"""")
}
.as(Response.status(Status.Ok))
.delay(randomDelay)
} yield response
}
Expand All @@ -52,14 +50,14 @@ object BasicExampleWithBatching extends App {
private def program =
for {
_ <- httpEndpointServer.start(port, httpApp).fork
_ <- WebhookServer.getErrors.use(UStream.fromQueue(_).map(_.toString).foreach(putStrLnErr(_))).fork
_ <- WebhookServer.getErrors.flatMap(ZStream.fromQueue(_).map(_.toString).foreach(printLineError(_))).fork
_ <- TestWebhookRepo.setWebhook(webhook)
_ <- events.schedule(Schedule.spaced(50.micros).jittered).foreach(TestWebhookEventRepo.createEvent)
} yield ()

def run(args: List[String]): URIO[zio.ZEnv, ExitCode] =
override def run =
program
.injectCustom(
.provideSome[Scope](
InMemoryWebhookStateRepo.live,
JsonPayloadSerialization.live,
TestWebhookRepo.test,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,23 @@ package zio.webhooks.example
import zhttp.http._
import zhttp.service.Server
import zio._
import zio.console._
import zio.duration._
import zio.magic._
import zio.stream.UStream
import zio.stream.ZStream
import zio.webhooks.backends.{ InMemoryWebhookStateRepo, JsonPayloadSerialization }
import zio.webhooks.{ WebhooksProxy, _ }
import zio.webhooks.backends.sttp.WebhookSttpClient
import zio.webhooks.testkit._
import zio.{ Clock, Random, ZIOAppDefault }
import zio.Console.{ printLine, printLineError }

/**
* Differs from the [[BasicExample]] in that the zio-http server responds with a non-200 status some
* of the time. This behavior prompts the webhook server to retry delivery. The server will keep on
* retrying events for a webhook with at-least-once delivery semantics one-by-one until the server
* successfully marks all `n` events delivered.
*/
object BasicExampleWithRetrying extends App {
object BasicExampleWithRetrying extends ZIOAppDefault {

private lazy val events = UStream
private lazy val events = ZStream
.iterate(0L)(_ + 1)
.map { i =>
WebhookEvent(
Expand All @@ -34,23 +33,20 @@ object BasicExampleWithRetrying extends App {
.take(n)

// a flaky server answers with 200 60% of the time, 404 the other
private lazy val httpApp = HttpApp.collectM {
case request @ Method.POST -> Root / "endpoint" =>
val payload = request.getBodyAsString
private lazy val httpApp = Http.collectZIO[Request] {
case request @ Method.POST -> !! / "endpoint" =>
for {
n <- random.nextIntBounded(100)
tsString <- clock.instant.map(_.toString).map(ts => s"[$ts]: ")
response <- ZIO
.foreach(payload) { payload =>
if (n < 60)
putStrLn(tsString + payload + " Response: OK") *>
UIO(Response.status(Status.OK))
else
putStrLn(tsString + payload + " Response: NOT_FOUND") *>
UIO(Response.status(Status.NOT_FOUND))
}
.orDie
} yield response.getOrElse(Response.fromHttpError(HttpError.BadRequest("empty body")))
n <- Random.nextIntBounded(100)
tsString <- Clock.instant.map(_.toString).map(ts => s"[$ts]: ")
response <- request.body.asString.flatMap { payload =>
if (n < 60)
printLine(tsString + payload + " Response: Ok") *>
ZIO.succeed(Response.status(Status.Ok))
else
printLine(tsString + payload + " Response: NotFound") *>
ZIO.succeed(Response.status(Status.NotFound))
}.orDie
} yield response
}

// just an alias for a zio-http server to disambiguate it with the webhook server
Expand All @@ -62,16 +58,19 @@ object BasicExampleWithRetrying extends App {

private def program =
for {
_ <- httpEndpointServer.start(port, httpApp).fork
_ <- WebhookServer.getErrors.use(UStream.fromQueue(_).map(_.toString).foreach(putStrLnErr(_))).fork
_ <- printLine("starting http")
_ <- httpEndpointServer.start(port, httpApp).forkScoped
_ <- WebhookServer.getErrors.flatMap(ZStream.fromQueue(_).map(_.toString).foreach(printLineError(_))).forkScoped
_ <- printLine("set webhook")
_ <- TestWebhookRepo.setWebhook(webhook)
_ <- printLine("scheduling events")
_ <- events.schedule(Schedule.spaced(50.micros).jittered).foreach(TestWebhookEventRepo.createEvent)
_ <- clock.sleep(Duration.Infinity)
_ <- Clock.sleep(Duration.Infinity)
} yield ()

def run(args: List[String]): URIO[zio.ZEnv, ExitCode] =
override def run =
program
.injectCustom(
.provideSome[Scope](
InMemoryWebhookStateRepo.live,
JsonPayloadSerialization.live,
TestWebhookRepo.test,
Expand Down
Loading

0 comments on commit 837aaa1

Please sign in to comment.