Skip to content

Commit

Permalink
Added examples of zio-spark usage
Browse files Browse the repository at this point in the history
  • Loading branch information
fancellu committed Nov 22, 2022
1 parent 0670074 commit 8921403
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 6 deletions.
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# ZIO Restful webservice example

Using zio 2.0, zio-http, zio-json, quill, H2, twirl, zio-logging, zio-cache, zio-actors
Using zio 2.0, zio-http, zio-json, quill, H2, twirl, zio-logging, zio-cache, zio-actors, zio-spark

Now targeting zio-http 0.0.3

Expand Down Expand Up @@ -82,6 +82,14 @@ Now targeting zio-http 0.0.3
- http://localhost:8080/catchAll
- http://localhost:8080/randomString

### SparkApp
(Some Spark examples with zio-spark)

(You can disable by setting enableSpark=false)
- http://localhost:8080/spark
- http://localhost:8080/spark/person/Peter
- http://localhost:8080/spark/job
- http://localhost:8080/spark/wordcount

## To run

Expand Down
7 changes: 5 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ name := "zio-restful-webservice"
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % "2.0.3",
"dev.zio" %% "zio-json" % "0.3.0",
"dev.zio" %% "zio-http" % "0.0.3",
"dev.zio" %% "zio-http" % "0.0.3",
"io.getquill" %% "quill-zio" % "4.6.0",
"io.getquill" %% "quill-jdbc-zio" % "4.6.0",
"com.h2database" % "h2" % "2.1.214",
"dev.zio" %% "zio-cache" % "0.2.0",
"dev.zio" %% "zio-actors" % "0.1.0"
"dev.zio" %% "zio-actors" % "0.1.0",
"io.univalence" %% "zio-spark" % "0.8.1",
"org.apache.spark" %% "spark-core" % "3.3.0",
"org.apache.spark" %% "spark-sql" % "3.3.0"
)

libraryDependencies ++= Seq(
Expand Down
5 changes: 5 additions & 0 deletions src/main/resources/data.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
name,age
Maria,93
John,24
Peter,19
Cassandra,46
14 changes: 11 additions & 3 deletions src/main/scala/com/felstar/restfulzio/MainApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.felstar.restfulzio.helloworld.HelloWorldApp
import com.felstar.restfulzio.noenv.NoEnvApp
import com.felstar.restfulzio.staticserver.StaticApp
import com.felstar.restfulzio.hellotwirl.HelloTwirlApp
import com.felstar.restfulzio.spark.SparkApp
import com.felstar.restfulzio.stream.StreamApp
import com.felstar.restfulzio.videos.{InmemoryVideoRepo, PersistentVideoRepo, VideoApp}
import zio.http._
Expand All @@ -18,6 +19,8 @@ import zio.http.{Http, Middleware, Request, Response, Server}
import zio.http.middleware.HttpMiddleware
import zio.http.model.Status
import zio.logging.{LogFormat, console}
import zio.spark.parameter.localAllNodes
import zio.spark.sql.SparkSession

import java.io.IOException
import java.time.LocalDateTime
Expand All @@ -40,7 +43,7 @@ object MainApp extends ZIOAppDefault {

val middlewares = errorMiddleware // ++ Middleware.dropTrailingSlash

private val logger =
val logger =
Runtime.removeDefaultLoggers >>> console(LogFormat.colored)

val requestMiddleWare=Middleware.identity[Request, Response].contramap[Request](_.addHeader("Seen", DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(LocalDateTime.now())))
Expand All @@ -51,12 +54,16 @@ object MainApp extends ZIOAppDefault {
lookup = Lookup(ClientApp.getUser)
)

val enableSpark=true

lazy val sparkSession: ZLayer[Any, Throwable, SparkSession] =SparkSession.builder.master(localAllNodes).appName("app").asLayer

override val run = for {
_ <- ZIO.logInfo("Starting up").provide(logger)
args <- getArgs // to get command line params
_ <- ZIO.logInfo(args.toString).provide(logger)
serverFibre <- Server.serve((NoEnvApp() @@ requestMiddleWare ++ HelloWorldApp() ++ DownloadApp() ++
CounterApp() ++ VideoApp() ++ ActorsApp() ++ HelloTwirlApp() ++
CounterApp() ++ VideoApp() ++ ActorsApp() ++ HelloTwirlApp() ++ (if (enableSpark) SparkApp() else Http.empty) ++
DelayApp() ++ StreamApp() ++ ClientApp() ++ StaticApp()) @@ middlewares)
.provide(
Server.default,
Expand All @@ -70,7 +77,8 @@ object MainApp extends ZIOAppDefault {
InmemoryVideoRepo.layer,
// PersistentVideoRepo.layer
logger,
ZLayer.fromZIO(userCache)
ZLayer.fromZIO(userCache),
if (enableSpark) sparkSession else ZLayer.die(new Throwable("bang"))
)
.fork
_ <- Console.readLine("Press enter to stop the server\n")
Expand Down
104 changes: 104 additions & 0 deletions src/main/scala/com/felstar/restfulzio/spark/SparkApp.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package com.felstar.restfulzio.spark

import zio._
import zio.http._
import zio.http.model.Method
import org.apache.spark.sql.Row
import zio.spark.experimental
import zio.spark.experimental.Pipeline
import zio.spark.parameter._
import zio.spark.rdd.RDD
import zio.spark.sql._
import zio.spark.sql.implicits._

/** An http app that:
* - Accepts a `Request` and returns a `Response`
* - Does not fail
* - Uses a String for the env, for the webapp root
*/
object SparkApp {

import zio.spark.sql.TryAnalysis.syntax.throwAnalysisException

final case class Person(name: String, age: Int)

val transform: DataFrame => Dataset[Person] = _.as[Person]

val headOption: Dataset[Person] => Task[Option[Person]] = _.headOption

val csv: SIO[DataFrame] = SparkSession.read
.schema[Person]
.withHeader
.csv("src/main/resources/data.csv")

val buildsbt: SIO[Dataset[String]] = SparkSession.read.textFile("build.sbt")

def wordCount(inputDs: Dataset[String]): RDD[(String, Int)] =
inputDs
.flatMap(line => line.trim.split(" "))
.flatMap(word => word.split('.'))
.map(_.replaceAll("[^a-zA-Z]", ""))
.filter(_.length > 1)
.map(word => (word, 1))
.rdd
.reduceByKey(_ + _)

val wordCountZIO: ZIO[SparkSession, Throwable, Seq[(String, Int)]] =
for {
words <- buildsbt.map(wordCount).flatMap(_.collect)
mostUsedWords = words.sortBy(_._2).reverse
} yield mostUsedWords

val jobZIO: ZIO[SparkSession, Throwable, String] =
for {
somePeople <- experimental.Pipeline(csv, transform, headOption).run
st = somePeople
.map(p => s"The first person's name is ${p.name}.")
.getOrElse("Nobody there")
} yield st

val allOutput: Dataset[Person] => Task[Iterator[Person]] = _.toLocalIterator

val allZIO: ZIO[SparkSession, Throwable, String] =
for {
people <- experimental.Pipeline(csv, transform, allOutput).run
st = people.mkString(",")
} yield st

def filterByName(name: String): DataFrame => Dataset[Person] =
_.as[Person].filter(_.name == name)

val byNameZIO: ZIO[SparkSession with String, Throwable, String] =
for {
name <- ZIO.service[String]
people <- experimental.Pipeline(csv, filterByName(name), allOutput).run
st = people.mkString(",")
} yield st

def apply(): Http[SparkSession, Throwable, Request, Response] =
Http.fromZIO(ZIO.service[SparkSession]).flatMap { ss =>
val ssLayer = ZLayer.succeed(ss)
Http.collectZIO[Request] {
case Method.GET -> !! / "spark" =>
for {
st <- allZIO.provide(ssLayer)
_ <- ZIO.logInfo(st)
} yield Response.text(st)
case Method.GET -> !! / "spark" / "wordcount" =>
for {
mostUsedWords <- wordCountZIO.provide(ssLayer)
_ <- ZIO.logInfo(mostUsedWords.toString)
} yield Response.text(mostUsedWords.mkString("\n"))
case Method.GET -> !! / "spark" / "person" / name =>
for {
st <- byNameZIO.provide(ssLayer, ZLayer.succeed(name))
_ <- ZIO.logInfo(st)
} yield Response.text(st)
case Method.GET -> !! / "spark" / "job" =>
for {
st <- jobZIO.provide(ssLayer)
_ <- ZIO.logInfo(st)
} yield Response.text(st)
}
}
}

0 comments on commit 8921403

Please sign in to comment.