Skip to content

Commit

Permalink
Merge pull request lucperkins#2 from BranislavLazic/master
Browse files Browse the repository at this point in the history
Remove blocking calls from PostgresActor
  • Loading branch information
lucperkins authored May 29, 2018
2 parents e940525 + 92b30e1 commit cd1c1dd
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 127 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
target
project/project
project/target
project/target
.idea
10 changes: 7 additions & 3 deletions project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ object Dependencies {
val sprayHttpx = "io.spray" % "spray-httpx" % sprayVersion
val sprayRouting = "io.spray" % "spray-routing" % sprayVersion
val sprayJson = "io.spray" %% "spray-json" % "1.2.5"
val slick = "com.typesafe.slick" %% "slick" % "1.0.1"
val jodaDateTime = "joda-time" % "joda-time" % "2.1"
val jodaConvert = "org.joda" % "joda-convert" % "1.8.1"
val slick = "com.typesafe.slick" %% "slick" % "3.0.0"
val postgres = "postgresql" % "postgresql" % "9.1-901-1.jdbc4"
val slickJoda = "com.github.tototoshi" %% "slick-joda-mapper" % "0.3.0"
val scalaCsv = "com.github.tototoshi" %% "scala-csv" % "1.0.0-SNAPSHOT"
val slickJoda = "com.github.tototoshi" %% "slick-joda-mapper" % "2.0.0"
val scalaCsv = "com.github.tototoshi" %% "scala-csv" % "1.3.4"
val logback = "ch.qos.logback" % "logback-classic" % "1.0.0"
}

Expand All @@ -67,6 +69,8 @@ object AppBuild extends Build {
postgres,
slickJoda,
scalaCsv,
jodaDateTime,
jodaConvert,
logback
)

Expand Down
15 changes: 9 additions & 6 deletions src/main/scala/app/Application.scala
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
package app

import akka.actor._

import app.{ Configs => C }
import app.{Configs => C}
import app.actors.Starter
import app.models.TaskDAO
import app.utils.PostgresSupport
import scala.concurrent.duration._

import scala.concurrent.Await

object Application extends App
with PostgresSupport
{
val system = ActorSystem("main-system")
C.log.info("Actor system $system is up and running")

startPostgres()
C.log.info("Postgres is up and running")

val starter = system.actorOf(Props[Starter], name = "main")
private implicit val context = system.dispatcher
private val taskDAO = new TaskDAO()
Await.result(startPostgres(taskDAO), 1.second)
val starter = system.actorOf(Starter.apply(taskDAO), name = "main")

starter ! Starter.Start
}
55 changes: 29 additions & 26 deletions src/main/scala/app/actors/PostgresActor.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package app.actors

import akka.actor.Actor

import akka.actor.{Actor, Props}
import akka.pattern.pipe
import app.models.TaskDAO

object PostgresActor {
Expand All @@ -16,43 +16,46 @@ object PostgresActor {
case object GetIds
case object CreateTable
case object DropTable

def apply(taskDAO: TaskDAO) = Props(new PostgresActor(taskDAO))
}

class PostgresActor extends Actor {
final class PostgresActor(taskDAO: TaskDAO) extends Actor {
import PostgresActor._
import context.dispatcher

def receive: Receive = {
case FetchAll =>
sender ! TaskDAO.listAllTasks
case FetchAll =>
taskDAO.listAllTasks.pipeTo(sender)

case CreateTask(content: String, assignee: String) =>
sender ! TaskDAO.addTask(content, assignee)
taskDAO.addTask(content, assignee).pipeTo(sender)

case FetchTask(id: Int) =>
sender ! TaskDAO.fetchTaskById(id)
taskDAO.fetchTaskById(id).pipeTo(sender)

case ModifyTask(id: Int, content: String) =>
sender ! TaskDAO.updateTaskById(id, content)
taskDAO.updateTaskById(id, content).pipeTo(sender)

case DeleteTask(id: Int) =>
sender ! TaskDAO.deleteTaskById(id)
taskDAO.deleteTaskById(id).pipeTo(sender)

case DeleteAll =>
sender ! TaskDAO.deleteAll
taskDAO.deleteAll.pipeTo(sender)

case GetCount =>
sender ! TaskDAO.numberOfTasks
case Populate(file: String) =>
sender ! TaskDAO.populateTable(file)
taskDAO.numberOfTasks.pipeTo(sender)

case Populate(file: String) =>
taskDAO.populateTable(file).pipeTo(sender)

case GetIds =>
sender ! TaskDAO.listAllIds
taskDAO.listAllIds.pipeTo(sender)

case CreateTable =>
sender ! TaskDAO.createTable
taskDAO.createTable.pipeTo(sender)

case DropTable =>
sender ! TaskDAO.dropTable
taskDAO.dropTable.pipeTo(sender)
}
}
14 changes: 8 additions & 6 deletions src/main/scala/app/actors/Starter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,27 @@ package app.actors
import akka.actor._
import akka.io.IO
import akka.routing.RoundRobinRouter
import app.models.TaskDAO
import spray.can.Http

import app.{ Configs => C }
import app.{Configs => C}
import app.server.ServerSupervisor

object Starter {
case object Start
case object Stop

def apply(taskDAO: TaskDAO) = Props(new Starter(taskDAO))
}

class Starter extends Actor {
import Starter.{ Start, Stop }
final class Starter(taskDAO: TaskDAO) extends Actor {
import Starter.Start

implicit val system = context.system
private implicit val system = context.system

def receive: Receive = {
case Start =>
val mainHandler: ActorRef =
context.actorOf(Props[ServerSupervisor].withRouter(RoundRobinRouter(nrOfInstances = 10)))
context.actorOf(ServerSupervisor.apply(taskDAO).withRouter(RoundRobinRouter(nrOfInstances = 10)))
IO(Http) ! Http.Bind(mainHandler, interface = C.interface, port = C.appPort)
}
}
4 changes: 2 additions & 2 deletions src/main/scala/app/data/TaskJsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ object TaskJsonProtocol extends DefaultJsonProtocol {

implicit object TaskJsonFormat extends RootJsonFormat[Task] {
def write(t: Task) = JsObject(
"taskId" -> JsNumber(t.taskId.toInt),
"taskId" -> JsNumber(t.taskId.getOrElse(0)),
"content" -> JsString(t.content.toString()),
"created" -> JsString(t.created.toString()),
"finished" -> JsBoolean(t.finished),
Expand All @@ -22,7 +22,7 @@ object TaskJsonProtocol extends DefaultJsonProtocol {
def read(j: JsValue) = {
j.asJsObject.getFields("taskId", "content", "created", "finished", "assignee") match {
case Seq(JsNumber(taskId), JsString(content), JsString(created), JsBoolean(finished), JsString(assignee)) =>
new Task(taskId.toInt, content, dateTimeParse(created), finished, assignee)
Task(Some(taskId.toInt), content, dateTimeParse(created), finished, assignee)
case _ => throw new DeserializationException("Improperly formed Task object")
}
}
Expand Down
99 changes: 40 additions & 59 deletions src/main/scala/app/models/TaskDAO.scala
Original file line number Diff line number Diff line change
@@ -1,109 +1,90 @@
package app.models

import scala.slick.driver.PostgresDriver
import scala.slick.driver.PostgresDriver.simple._
import com.github.tototoshi.slick.JodaSupport._
import slick.driver.PostgresDriver.api._
import com.github.tototoshi.slick.PostgresJodaSupport._
import org.joda.time.DateTime
import spray.json._
import DefaultJsonProtocol._
import com.github.tototoshi.csv._

import app.utils.PostgresSupport

import scala.concurrent.{ExecutionContext, Future}

case class Task(
taskId: Int,
taskId: Option[Int] = None,
content: String,
created: DateTime,
finished: Boolean,
assignee: String
)

object TaskDAO extends PostgresSupport {
class TaskDAO(implicit val executionContext: ExecutionContext) extends PostgresSupport {
import CSVConverter._
import app.data.TaskJsonProtocol._

object TaskTable extends Table[Task]("tasks") {
def taskId = column[Int] ("taskId", O.AutoInc, O.PrimaryKey, O.DBType("BIGINT"))
def content = column[String] ("content", O.DBType("VARCHAR(50)"), O.NotNull)
def created = column[DateTime]("created", O.DBType("TIMESTAMP"), O.NotNull)
def finished = column[Boolean] ("finished", O.DBType("BOOLEAN"), O.NotNull)
def assignee = column[String] ("assignee", O.DBType("VARCHAR(20)"), O.NotNull)
class TaskTable(tag: Tag) extends Table[Task](tag, "tasks") {
def taskId = column[Int] ("taskId", O.AutoInc, O.PrimaryKey, O.SqlType("BIGINT"))
def content = column[String] ("content", O.SqlType("VARCHAR(50)"), O.NotNull)
def created = column[DateTime]("created", O.SqlType("TIMESTAMP"), O.NotNull)
def finished = column[Boolean] ("finished", O.SqlType("BOOLEAN"), O.NotNull)
def assignee = column[String] ("assignee", O.SqlType("VARCHAR(20)"), O.NotNull)

def * = (taskId ~ content ~ created ~ finished ~ assignee) <> (Task, Task.unapply _)

def forInsert = (content ~ created ~ finished ~ assignee) returning taskId
def * = (taskId.?, content, created, finished, assignee) <> (Task.tupled, Task.unapply)
}

private val tasks = TableQuery[TaskTable]

case class Count(numberOfTasks: Int)
case class Ids(ids: List[Int])
case class Result(result: String)
implicit val countJsonFormat = jsonFormat1(Count)
implicit val idsJsonFormat = jsonFormat1(Ids)
implicit val resultFormat = jsonFormat1(Result)
def pgResult(result: String) = new Result(result).toJson.compactPrint

def numberOfTasks: String = {
val count: Int = Query(TaskTable).list.length
new Count(count).toJson.compactPrint
}
def pgResult(result: String): String = Result(result).toJson.compactPrint

def listAllIds: String = {
val ids = Query(TaskTable).map(_.taskId).list
new Ids(ids).toJson.compactPrint
}
def numberOfTasks: Future[String] = db.run(tasks.length.result).map(r => Count(r).toJson.compactPrint)

def listAllTasks: String =
Query(TaskTable).list.toJson.compactPrint
def listAllIds: Future[String] = db.run(tasks.map(_.taskId).result).map(ids => Ids(ids.toList).toJson.compactPrint)

def createTable =
TaskTable.ddl.create
def listAllTasks: Future[String] = db.run(tasks.result).map(_.toJson.compactPrint)

def dropTable =
TaskTable.ddl.drop
def createTable: Future[Unit] = db.run(tasks.schema.create)

def addTask(content: String, assignee: String): String = {
TaskTable.forInsert.insert(content, new DateTime(), false, assignee) match {
case 0 => pgResult("Something went wrong")
case n => pgResult(s"Task $n added successfully")
}
}
def dropTable: Future[Unit] = db.run(tasks.schema.drop)

def fetchTaskById(id: Int): String =
Query(TaskTable).where(_.taskId is id).list.toJson.compactPrint
def addTask(content: String, assignee: String): Future[Int] =
db.run(tasks += Task(content = content, created = new DateTime(), finished = false, assignee = assignee))

def deleteTaskById(id: Int): String = {
TaskTable.filter(_.taskId === id).delete match {
case 0 => pgResult(s"Task $id was not found")
case 1 => pgResult(s"Task $id successfully deleted")
case _ => pgResult("Something went wrong")
}
}
def fetchTaskById(id: Int): Future[Option[Task]] = db.run(tasks.filter(_.taskId === id).result.headOption)

def updateTaskById(id: Int, newContent: String): String = {
TaskTable.where(_.taskId is id).
map(t => t.content).
update(newContent) match {
case 1 => pgResult(s"Task $id successfully modified")
case _ => pgResult(s"Task $id was not found")
}
def deleteTaskById(id: Int): Future[String] = db.run(tasks.filter(_.taskId === id).delete).collect {
case 0 => pgResult("0 tasks deleted")
case 1 => pgResult("1 task deleted")
case n => pgResult(s"$n tasks deleted")
}

def addMultipleTasks(args: List[(String, String)]) = {
args.map(arg => addTask(arg._1, arg._2)).map(result => println(result))
}
def updateTaskById(id: Int, newContent: String): Future[String] =
db.run(tasks.filter(_.taskId === id).map(t => t.content).update(newContent)).collect {
case 1 => pgResult(s"Task $id successfully modified")
case _ => pgResult(s"Task $id was not found")
}

def populateTable(filename: String) = {
def addMultipleTasks(args: List[(String, String)]): Future[List[Int]] =
Future.sequence(args.map(value => addTask(value._1, value._2)))

def populateTable(filename: String): Future[List[Int]] = {
val csvInfo = CSVConverter.convert(filename)
addMultipleTasks(csvInfo)
}

def deleteAll = {
Query(TaskTable).delete match {
def deleteAll: Future[String] =
db.run(tasks.delete).collect {
case 0 => pgResult("0 tasks deleted")
case 1 => pgResult("1 task deleted")
case n => pgResult(s"$n tasks deleted")
}
}

}

object CSVConverter {
Expand Down
13 changes: 9 additions & 4 deletions src/main/scala/app/server/ServerSupervisor.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package app.server

import akka.actor._
import app.models.TaskDAO

class ServerSupervisor extends Actor
with TaskService
{
object ServerSupervisor {
def apply(taskDAO: TaskDAO) = Props(new ServerSupervisor(taskDAO))
}

final class ServerSupervisor(taskDAO: TaskDAO) extends Actor
with TaskService {
def actorRefFactory = context

def receive = runRoute(
pathPrefix("api" / "v1") {
taskServiceRoutes
taskServiceRoutes(taskDAO)
}
)
}
Loading

0 comments on commit cd1c1dd

Please sign in to comment.