Skip to content
This repository has been archived by the owner on Sep 9, 2023. It is now read-only.

Commit

Permalink
Fix #48: Project updates
Browse files Browse the repository at this point in the history
* Scala 2.13
* SBT 1.3.13
* better-files instead of scala-arm
* Library updates
  • Loading branch information
Lukas Sembera committed Jul 26, 2020
1 parent ad79784 commit b6d07e8
Show file tree
Hide file tree
Showing 13 changed files with 147 additions and 147 deletions.
9 changes: 4 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ val dependencies = Seq(
"com.typesafe.akka" %% "akka-slf4j" % Versions.akka,
"com.typesafe.scala-logging" %% "scala-logging" % Versions.scalaLogging,
"org.yaml" % "snakeyaml" % Versions.snakeYaml,
"com.jsuereth" %% "scala-arm" % Versions.scalaArm,
"com.github.pathikrit" %% "better-files" % Versions.betterFiles,
"com.github.tototoshi" %% "scala-csv" % Versions.scalaCsv,
"org.apache.commons" % "commons-math3" % Versions.commonsMath,
"com.github.scopt" %% "scopt" % Versions.scopt,
"org.apache.commons" % "commons-lang3" % Versions.commonsLang3,
"ch.qos.logback" % "logback-classic" % Versions.logbackClassic,
"com.typesafe.akka" %% "akka-testkit" % Versions.akka % "test, it",
"org.scalatest" %% "scalatest" % Versions.scalatest % "test, it",
"org.scalatest" %% "scalatest-flatspec" % Versions.scalatest % "test, it",
"org.postgresql" % "postgresql" % Versions.postgres % "test, it"
)

Expand All @@ -21,10 +22,8 @@ lazy val root = (project in file(".")).enablePlugins(PackPlugin).settings(
name := "dbstress",
version := sys.env.getOrElse("DBSTRESS_CI_BUILD_VERSION", "0.0.0-SNAPSHOT"),
scalaVersion := Versions.scala,
scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature", "-Xfatal-warnings", "-Ywarn-unused-import")
).settings(resolvers ++= Seq(
"Typesafe Repo" at "http://repo.typesafe.com/typesafe/releases/"
)).configs(IntegrationTest).settings(Defaults.itSettings: _*)
scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature", "-Xfatal-warnings", "-Ywarn-unused:imports")
).configs(IntegrationTest).settings(Defaults.itSettings: _*)
.settings(libraryDependencies ++= dependencies: _*)
.settings(packMain := Map("dbstress" -> "eu.semberal.dbstress.Main"))
.settings(packArchiveExcludes := List("VERSION", "Makefile")
Expand Down
12 changes: 6 additions & 6 deletions project/Versions.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
object Versions {
val scala = "2.12.11"
val akka = "2.5.30"
val scala = "2.13.3"
val akka = "2.6.8"
val scalaLogging = "3.9.2"
val snakeYaml = "1.26"
val scalaArm = "2.0"
val betterFiles = "3.9.1"
val scalaCsv = "1.3.6"
val commonsMath = "3.6.1"
val scopt = "3.7.1"
val commonsLang3 = "3.10"
val commonsLang3 = "3.11"
val logbackClassic = "1.2.3"
val scalatest = "3.0.8"
val postgres = "42.2.11"
val scalatest = "3.2.0"
val postgres = "42.2.14"
}
3 changes: 2 additions & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
sbt.version=1.3.8
sbt.version=1.3.13

2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
addSbtPlugin("org.xerial.sbt" % "sbt-pack" % "0.12")
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.4.2")
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.5.1")
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package eu.semberal.dbstress.integration

import java.io.{File, FilenameFilter, InputStreamReader}
import java.io.{BufferedReader, InputStreamReader}
import java.lang.System._
import java.nio.file.Files._

import akka.actor.ActorSystem
import akka.testkit.{TestKit, TestKitBase}
import better.files._
import eu.semberal.dbstress.Orchestrator
import eu.semberal.dbstress.config.ConfigParser._
import eu.semberal.dbstress.util.{CsvResultsExport, ResultsExport}
Expand All @@ -19,16 +20,14 @@ trait AbstractDbstressIntegrationTest extends TestKitBase with BeforeAndAfterAll

override implicit lazy val system: ActorSystem = ActorSystem()

protected val csvFilter = new FilenameFilter {
override def accept(dir: File, name: String): Boolean = name.endsWith(".csv")
}

override protected def afterAll(): Unit = TestKit.shutdownActorSystem(system)
override protected def afterAll(): Unit =
TestKit.shutdownActorSystem(system)

protected def executeTest(configFile: String): File = {
val tmpDir = createTempDirectory(s"dbstress_OrchestratorTest_${currentTimeMillis()}_").toFile
val reader = new InputStreamReader(getClass.getClassLoader.getResourceAsStream(configFile))
val config = parseConfigurationYaml(reader, Some("")).right.get
val tmpDir = createTempDirectory(s"dbstress_OrchestratorTest_${currentTimeMillis()}_").toFile.toScala

val reader = new BufferedReader(new InputStreamReader(getClass.getClassLoader.getResourceAsStream(configFile))).autoClosed
val config = parseConfigurationYaml(reader, Some(""))getOrElse(throw new IllegalStateException("Cannot parse configuration"))
val exports: List[ResultsExport] = new CsvResultsExport(tmpDir) :: Nil
whenReady(new Orchestrator(system).run(config, exports), Timeout(Span(10, Seconds)))(_ => tmpDir)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
package eu.semberal.dbstress.integration

import akka.testkit.ImplicitSender
import org.scalatest.{FlatSpecLike, Matchers}
import resource.managed
import org.scalatest.flatspec.AnyFlatSpecLike

import scala.io.Source

class OrchestratorIntegrationTest extends FlatSpecLike with Matchers with ImplicitSender with AbstractDbstressIntegrationTest {
class OrchestratorIntegrationTest extends AnyFlatSpecLike with ImplicitSender with AbstractDbstressIntegrationTest {

"Orchestrator" should "successfully launch the test of PostgreSQL and verify results" in {
val tmpDir = executeTest("config.postgres.yaml")

/* Test generated CSV file*/
val csvFiles = tmpDir.listFiles(csvFilter)
csvFiles should have size 1
managed(Source.fromFile(csvFiles.head)) foreach { source =>
val lines = source.getLines().toList
lines should have size 3
lines.tail.foreach { line =>
line should startWith regex "unit[1-2]{1}".r
}
val csvFiles = tmpDir.glob("*.csv").toList
assert(csvFiles.length === 1)

val lines = csvFiles.head.lines
assert(lines.size === 3)

lines.tail.foreach { line =>
assert(line.matches("^unit[1-2].*$"))
}
}
}
25 changes: 14 additions & 11 deletions src/main/scala/eu/semberal/dbstress/Main.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package eu.semberal.dbstress

import java.io.File
import java.util.concurrent.TimeoutException

import akka.actor.ActorSystem
Expand All @@ -12,6 +11,8 @@ import eu.semberal.dbstress.model.Results.{ConnectionInitException, UnitRunExcep
import eu.semberal.dbstress.util.{CsvResultsExport, ResultsExport}
import org.slf4j.LoggerFactory
import scopt.OptionParser
import better.files._
import java.io.{File => JFile}

import scala.concurrent.{Await, Future}

Expand All @@ -26,17 +27,19 @@ object Main extends LazyLogging {

head("dbstress", version, "Database performance and stress testing tool")

opt[File]('c', "config").valueName("CONFIG_FILE").text("Path to the configuration YAML file").required().action { (x, c) =>
c.copy(configFile = x)
}.validate {
case x if !x.exists() => failure(s"File '$x' does not exist")
case x if !x.isFile => failure(s"'$x' is not a file")
case x if !x.canRead => failure(s"File '$x' is not readable")
case _ => success
}
opt[JFile]('c', "config").valueName("CONFIG_FILE").text("Path to the configuration YAML file").required()
.action { (x, c) =>
c.copy(configFile = x.toScala)
}
.validate {
case x if !x.exists() => failure(s"File '$x' does not exist")
case x if !x.isFile => failure(s"'$x' is not a file")
case x if !x.canRead => failure(s"File '$x' is not readable")
case _ => success
}

opt[File]('o', "output").valueName("OUTPUT_DIR").text("Output directory").required().action { (x, c) =>
c.copy(outputDir = x)
opt[JFile]('o', "output").valueName("OUTPUT_DIR").text("Output directory").required().action { (x, c) =>
c.copy(outputDir = x.toScala)
}.validate {
case x if !x.exists() => failure(s"Directory '$x' does not exist")
case x if !x.isDirectory => failure(s"'$x' is not a directory")
Expand Down
12 changes: 6 additions & 6 deletions src/main/scala/eu/semberal/dbstress/actor/ControllerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,22 @@ class ControllerActor(sc: ScenarioConfig) extends Actor with LazyLogging {
client ! Status.Failure(new ConnectionInitException(e))
}

private def waitForFinish(client: ActorRef, urResults: List[(String, UnitRunResult)]): Receive = LoggingReceive {
private def waitForFinish(client: ActorRef, unitRunResults: List[(String, UnitRunResult)]): Receive = LoggingReceive {
case UnitRunFinished(unitName, result) =>
logger.info(s"Finished unit runs: ${urResults.length + 1}/$totalConnections")
context.become(waitForFinish(client, (unitName -> result) :: urResults))
if (totalConnections - urResults.length == 1) self ! Done
logger.info(s"Finished unit runs: ${unitRunResults.length + 1}/$totalConnections")
context.become(waitForFinish(client, (unitName -> result) :: unitRunResults))
if (totalConnections - unitRunResults.length == 1) self ! Done

case UnitRunError(e) =>
context.stop(self)
client ! Status.Failure(e)

case Done =>
val allCalls = urResults.flatMap(_._2.callResults)
val allCalls = unitRunResults.flatMap(_._2.callResults)
val failedCalls = allCalls.count(_.isInstanceOf[DbCallFailure])
val msg = if (failedCalls > 0) s"($failedCalls/${allCalls.length} calls failed)" else ""
logger.info("All database operations finished {}", msg)
val unitResultMap = urResults.groupBy(_._1).mapValues(_.map(_._2))
val unitResultMap = unitRunResults.groupBy(_._1).view.mapValues(_.map(_._2))
client ! ScenarioResult(sc.units.map(conf => UnitResult(conf, unitResultMap(conf.name))).toList)
}
}
Expand Down
28 changes: 15 additions & 13 deletions src/main/scala/eu/semberal/dbstress/actor/DatabaseActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import eu.semberal.dbstress.actor.DatabaseActor.{ConnectionTimeoutException, Ini
import eu.semberal.dbstress.model.Configuration.UnitRunConfig
import eu.semberal.dbstress.model.Results._
import eu.semberal.dbstress.util.IdGen
import resource._
import better.files._

import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import scala.util.{Failure, Success}
import scala.util.{Failure, Success, Try}

class DatabaseActor(scenarioId: String, unitName: String, urConfig: UnitRunConfig) extends Actor with LazyLogging {

Expand Down Expand Up @@ -72,17 +72,19 @@ class DatabaseActor(scenarioId: String, unitName: String, urConfig: UnitRunConfi
val dbCallId = DbCallId(scenarioId, connectionId, IdGen.genStatementId())
val start = System.nanoTime()
connection.map(c =>
managed(c.createStatement()).map { statement =>
if (statement.execute(urConfig.dbConfig.query.replace(IdGen.IdPlaceholder, dbCallId.toString)))
FetchedRows(Iterator.continually(statement.getResultSet.next()).takeWhile(identity).length)
else
UpdateCount(statement.getUpdateCount)
}.tried match {
case Success(result) =>
DbCallSuccess(Utils.toMillis(start, System.nanoTime()), dbCallId, result) :: l
case Failure(e) =>
logger.warn(s"Query execution failed: ${e.getMessage}")
DbCallFailure(Utils.toMillis(start, System.nanoTime()), dbCallId, e) :: l
c.createStatement().autoClosed.apply { statement =>
Try {
if (statement.execute(urConfig.dbConfig.query.replace(IdGen.IdPlaceholder, dbCallId.toString)))
FetchedRows(Iterator.continually(statement.getResultSet.next()).takeWhile(identity).length)
else
UpdateCount(statement.getUpdateCount)
} match {
case Success(result) =>
DbCallSuccess(Utils.toMillis(start, System.nanoTime()), dbCallId, result) :: l
case Failure(e) =>
logger.warn(s"Query execution failed: ${e.getMessage}")
DbCallFailure(Utils.toMillis(start, System.nanoTime()), dbCallId, e) :: l
}
}
).getOrElse {
val e = new IllegalStateException("Connection not initialized")
Expand Down
66 changes: 36 additions & 30 deletions src/main/scala/eu/semberal/dbstress/config/ConfigParser.scala
Original file line number Diff line number Diff line change
@@ -1,51 +1,55 @@
package eu.semberal.dbstress.config

import java.io.{BufferedReader, File, FileReader, Reader}
import java.io.BufferedReader
import java.util.{Map => JMap}

import better.files._
import eu.semberal.dbstress.model.Configuration._
import org.yaml.snakeyaml.Yaml
import resource._

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
import scala.util.{Failure, Success}
import scala.util.{Failure, Success, Try}

object ConfigParser {

private def sequence[A, B](s: Seq[Either[A, B]]): Either[A, Seq[B]] =
s.foldRight(Right(Nil): Either[A, List[B]]) {
(e, acc) => for (xs <- acc.right; x <- e.right) yield x :: xs
(e, acc) => for (xs <- acc; x <- e) yield x :: xs
}

def parseConfigurationYaml(f: File, defaultPassword: Option[String]): Either[String, ScenarioConfig] =
parseConfigurationYaml(new BufferedReader(new FileReader(f)), defaultPassword)
parseConfigurationYaml(f.bufferedReader, defaultPassword)

def parseConfigurationYaml(reader: Reader, defaultPassword: Option[String]): Either[String, ScenarioConfig] = {
def parseConfigurationYaml(reader: Dispose[BufferedReader], defaultPassword: Option[String]): Either[String, ScenarioConfig] = {

def isStringNonEmpty(s: String): Boolean = Option(s).getOrElse("").length > 0

val yaml = new Yaml

managed(reader).map { reader =>
yaml.loadAll(reader).asScala.map(x => Map(x.asInstanceOf[JMap[String, Object]].asScala.toList: _*))
}.tried match {
case Failure(e) => Left(e.getMessage)
reader.map { reader =>
Try {
yaml.loadAll(reader).asScala.map(x => Map(x.asInstanceOf[JMap[String, Object]].asScala.toList: _*))
}
} apply {
case Failure(e) =>
Left(e.getMessage)

case Success(foo) =>
val units = foo.toList.map { map =>
for {
uri <- loadFromMap[String, String](map, "uri")(isStringNonEmpty).right
driverClass <- loadFromMapOptional[String, String](map, "driver_class")(isStringNonEmpty).right
username <- loadFromMap[String, String](map, "username")(isStringNonEmpty).right
password <- loadFromMap[String, String](map, "password", defaultPassword)().right
query <- loadFromMap[String, String](map, "query")(isStringNonEmpty).right
connectionTimeout <- loadFromMapOptional[Int, java.lang.Integer](map, "connection_timeout")(_ > 0).right

repeats <- loadFromMap[Int, java.lang.Integer](map, "repeats")(_ > 0).right

unitName <- loadFromMap[String, String](map, "unit_name")(x => isStringNonEmpty(x) && x.matches("[a-zA-Z0-9]+")).right
description <- loadFromMapOptional[String, String](map, "description")().right
parallelConnections <- loadFromMap[Int, java.lang.Integer](map, "parallel_connections")(_ > 0).right
uri <- loadFromMap[String, String](map, "uri")(isStringNonEmpty)
driverClass <- loadFromMapOptional[String, String](map, "driver_class")(isStringNonEmpty)
username <- loadFromMap[String, String](map, "username")(isStringNonEmpty)
password <- loadFromMap[String, String](map, "password", defaultPassword)()
query <- loadFromMap[String, String](map, "query")(isStringNonEmpty)
connectionTimeout <- loadFromMapOptional[Int, java.lang.Integer](map, "connection_timeout")(_ > 0)

repeats <- loadFromMap[Int, java.lang.Integer](map, "repeats")(_ > 0)

unitName <- loadFromMap[String, String](map, "unit_name")(x => isStringNonEmpty(x) && x.matches("[a-zA-Z0-9]+"))
description <- loadFromMapOptional[String, String](map, "description")()
parallelConnections <- loadFromMap[Int, java.lang.Integer](map, "parallel_connections")(_ > 0)
} yield {
val dbConfig = DbCommunicationConfig(uri, driverClass, username, password, query, connectionTimeout)

Expand All @@ -55,7 +59,7 @@ object ConfigParser {
}
}

sequence(units).right.flatMap(x => {
sequence(units).flatMap(x => {
val unitNames = x.map(_.name)
if (unitNames.distinct.length != unitNames.length) {
Left("Unit names must be distinct, scenario configuration contains duplicate unit names")
Expand All @@ -68,22 +72,24 @@ object ConfigParser {
}
}

private[this] def loadFromMap[T, U <% T : ClassTag](map: Map[String, Any], key: String, default: Option[T] = None)
(validation: T => Boolean = (_: T) => true): Either[String, T] = {
private[this] def loadFromMap[T, U: ClassTag](map: Map[String, Any], key: String, default: Option[T] = None)
(validation: T => Boolean = (_: T) => true)
(implicit ev: U => T): Either[String, T] = {
loadFromMapOptional[T, U](map, key, default)(validation) match {
case Right(None) => Left(s"Configuration property $key is missing")
case Left(x) => Left(x)
case Right(Some(x)) => Right(x)
}
}

private[this] def loadFromMapOptional[T, U <% T : ClassTag](map: Map[String, Any], key: String, default: Option[T] = None)
(validationIfPresent: T => Boolean = (_: T) => true): Either[String, Option[T]] = {
private[this] def loadFromMapOptional[T, U: ClassTag](map: Map[String, Any], key: String, default: Option[T] = None)
(validationIfPresent: T => Boolean = (_: T) => true)
(implicit ev: U => T): Either[String, Option[T]] = {
val rtc = implicitly[ClassTag[U]].runtimeClass
map.get(key) match {
case None => Right(default)
case Some(x) if !rtc.isInstance(x) => Left( s"""Value "$x" does conform to the expected type: "${rtc.getSimpleName}"""")
case Some(x: U) if !validationIfPresent(x) => Left( s"""Invalid value "$x" for configuration entry: "$key"""")
case Some(x) if !rtc.isInstance(x) => Left(s"""Value "$x" does conform to the expected type: "${rtc.getSimpleName}"""")
case Some(x: U) if !validationIfPresent(x) => Left(s"""Invalid value "$x" for configuration entry: "$key"""")
case Some(x: U) => Right(Some(x))
}
}
Expand Down
Loading

0 comments on commit b6d07e8

Please sign in to comment.