Skip to content

Commit

Permalink
feat(http): Add HTTP /api/v1/cluster cluster API; some memory logging…
Browse files Browse the repository at this point in the history
…/multi-JVM fixes (filodb#103)

Adds routes for getting cluster / shard health status and setting up a new streaming config, and docs to go along with it.

* fix(bootstrapper): switch bootstrapper to use Circe and fix multi-JVM test compilation
* misc(core): Add logging for shard memory allocation, lower default standalone memory per shard
* fix(core): Release memory during orderly shutdown (except blockStore.shutdown not working)
* Increase default timeouts; help multi-JVM tests to pass more often
* Remove CleanShutdown() and rely on explicit memory free and shutdown logic
* Even if MAYBE_MULTI_JVM is not enabled, multi-JVM sources should always compile
* Remove Kafka multi-JVM tests.  They have been superceded by standalone/multi-JVM
  • Loading branch information
Evan Chan authored and Evan Chan committed Jan 4, 2018
1 parent b40c1d3 commit c454aaf
Show file tree
Hide file tree
Showing 39 changed files with 576 additions and 340 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ See [architecture](doc/architecture.md) and [datasets and reading](doc/datasets_
- [Data Modelling and Performance Considerations](#data-modelling-and-performance-considerations)
- [Predicate Pushdowns](#predicate-pushdowns)
- [Sharding](#sharding)
- [Using the FiloDB HTTP API](#using-the-filodb-http-api)
- [Using FiloDB Data Source with Spark](#using-filodb-data-source-with-spark)
- [Configuring FiloDB](#configuring-filodb)
- [Passing Cassandra Authentication Settings](#passing-cassandra-authentication-settings)
Expand Down Expand Up @@ -260,6 +261,8 @@ If this was for events, you might have multiple columns, and some of them can be
### Data Modelling and Performance Considerations
NOTE: this section is really old and needs to be rewritten.
**Choosing Partition Keys**.
- A good start for a partition key is a hash of an ID or columns that distribute well, plus a time bucket. This spreads data around but also prevents too much data from piling up in a single partition.
Expand Down Expand Up @@ -315,6 +318,10 @@ Note: You can see predicate pushdown filters in application logs by setting logg
TODO: add details about FiloDB sharding, the shard-key vs partition key mechanism, etc.
## Using the FiloDB HTTP API
Please see the [HTTP API](doc/http_api.md) doc.
## Using FiloDB Data Source with Spark
FiloDB has a Spark data-source module - `filodb.spark`. So, you can use the Spark Dataframes `read` and `write` APIs with FiloDB. To use it, follow the steps below
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,31 @@ import scala.language.postfixOps

import akka.actor.{ActorRef, Address, Props}
import akka.cluster.Cluster
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.pattern.ask
import akka.util.Timeout
import com.typesafe.scalalogging.StrictLogging
import spray.json._
import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport

object AkkaBootstrapperMessages {

final case object ClusterMembershipRequest

final case class ClusterMembershipResponse(seeds: Seq[Address])

}

final case class DiscoveryTimeoutException(message: String, cause: Throwable = None.orNull)
extends RuntimeException(message, cause)

final case class ClusterMembershipHttpResponse(members: List[String])

trait ClusterMembershipJsonSuppport {

import DefaultJsonProtocol._

implicit val printer = PrettyPrinter
implicit val membershipJsonFormat = jsonFormat1(ClusterMembershipHttpResponse)

}

/**
* This is the API facade for the Akka Bootstrapper Library
*
* @param cluster local cluster object to join with seeds
*/
class AkkaBootstrapper(protected val cluster: Cluster) extends StrictLogging with ClusterMembershipJsonSuppport {
class AkkaBootstrapper(protected val cluster: Cluster) extends StrictLogging {
import FailFastCirceSupport._
import io.circe.generic.auto._

private implicit val system = cluster.system
private[filodb] val settings = new AkkaBootstrapperSettings(cluster.system.settings.config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import akka.actor.{Address, AddressFromURIString}
import akka.cluster.Cluster
import com.typesafe.scalalogging.StrictLogging
import scalaj.http.Http
import spray.json._

/** Seed node strategy. Some implementations discover, some simply read from immutable config. */
abstract class ClusterSeedDiscovery(val cluster: Cluster,
val settings: AkkaBootstrapperSettings)
extends StrictLogging with ClusterMembershipJsonSuppport {
val settings: AkkaBootstrapperSettings) extends StrictLogging {
import io.circe.parser.decode
import io.circe.generic.auto._

@throws(classOf[DiscoveryTimeoutException])
def discoverClusterSeeds: Seq[Address] = {
Expand All @@ -40,9 +40,14 @@ abstract class ClusterSeedDiscovery(val cluster: Cluster,
response.code.toString, response.body)
Seq.empty[Address]
} else {
val membersResponse = response.body.parseJson.convertTo[ClusterMembershipHttpResponse]
logger.info("Cluster exists. Response: {}", membersResponse)
membersResponse.members.sorted.map(a => AddressFromURIString.parse(a))
decode[ClusterMembershipHttpResponse](response.body) match {
case Right(membersResponse) =>
logger.info("Cluster exists. Response: {}", membersResponse)
membersResponse.members.sorted.map(a => AddressFromURIString.parse(a))
case Left(ex) =>
logger.error(s"Exception parsing JSON response ${response.body}, returning empty seeds", ex)
Seq.empty[Address]
}
}
} catch {
case NonFatal(e) =>
Expand All @@ -54,7 +59,6 @@ abstract class ClusterSeedDiscovery(val cluster: Cluster,


object ClusterSeedDiscovery {

/** Seed node strategy. Some implementations discover, some simply read them. */
def apply(cluster: Cluster, settings: AkkaBootstrapperSettings): ClusterSeedDiscovery = {
import settings.{seedDiscoveryClass => fqcn}
Expand All @@ -67,5 +71,4 @@ object ClusterSeedDiscovery {
case Success(clazz) => clazz
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import scala.language.postfixOps
import akka.actor.AddressFromURIString
import akka.cluster.Cluster
import akka.http.scaladsl.Http
import akka.remote.testkit.{MultiNodeConfig, MultiNodeSpecCallbacks, MultiNodeSpec}
import akka.stream.ActorMaterializer
import com.typesafe.config.{Config, ConfigFactory}
import spray.json._
import org.scalatest._
import org.scalatest.concurrent.ScalaFutures

import filodb.akkabootstrapper.{AkkaBootstrapper, ClusterMembershipHttpResponse, ClusterMembershipJsonSuppport}
import filodb.akkabootstrapper.{AkkaBootstrapper, ClusterMembershipHttpResponse}

trait AkkaBootstrapperMultiNodeConfig extends MultiNodeConfig {

Expand Down Expand Up @@ -47,9 +49,10 @@ trait AkkaBootstrapperMultiNodeConfig extends MultiNodeConfig {
*/
trait BaseAkkaBootstrapperSpec extends MultiNodeSpecCallbacks
with WordSpecLike with Matchers with ScalaFutures
with BeforeAndAfterAll with ClusterMembershipJsonSuppport {
with BeforeAndAfterAll { multiNodeSpecWithConfig: MultiNodeSpec =>

multiNodeSpecWithConfig: MultiNodeSpec =>
import io.circe.parser.decode
import io.circe.generic.auto._

override def beforeAll(): Unit = multiNodeSpecBeforeAll()
override def afterAll(): Unit = multiNodeSpecAfterAll()
Expand Down Expand Up @@ -84,19 +87,16 @@ trait BaseAkkaBootstrapperSpec extends MultiNodeSpecCallbacks
enterBarrier("thirdMemberAdded")
awaitCond(cluster.state.members.size == 3, max = 10 seconds, interval = 1 second)
validateSeedsFromHttpEndpoint(seedsEndpoint, 3)

}

}

protected def validateSeedsFromHttpEndpoint(seedsEndpoint: String, numSeeds: Int): Unit = {
Thread.sleep(4000) // sleep for a bit for ClusterMembershipTracker actor to get the message
val response = scalaj.http.Http(seedsEndpoint).timeout(500, 500).asString
response.is2xx shouldEqual true
val addresses = response.body.parseJson.convertTo[ClusterMembershipHttpResponse]
.members.map(a => AddressFromURIString.parse(a))
val addresses = decode[ClusterMembershipHttpResponse](response.body).right.get
.members.map(a => AddressFromURIString.parse(a))
addresses.size shouldEqual numSeeds
}

}

Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package filodb.akkabootstrapper.multijvm

import akka.remote.testkit.MultiNodeSpec
import com.typesafe.config.{Config, ConfigFactory}


object ConsulBootstrapperMultiNodeConfig extends AkkaBootstrapperMultiNodeConfig {
override def baseConfig : Config = ConfigFactory.parseString(
s"""
|akka-bootstrapper {
| seed-discovery.class = "filodb.akkabootstrapper.ConsulAkkaClusterSeedDiscovery"
| seed-discovery.class = "filodb.akkabootstrapper.ConsulClusterSeedDiscovery"
| dns-srv.resolver-host = "127.0.0.1" #consul by default
| dns-srv.resolver-port = 8600 # consul by default
| dns-srv.seed-node-count = 2
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package filodb.akkabootstrapper.multijvm

import akka.remote.testkit.MultiNodeSpec
import com.typesafe.config.{Config, ConfigFactory}


Expand All @@ -8,7 +9,7 @@ object WhitelistBootstrapperMultiNodeConfig extends AkkaBootstrapperMultiNodeCon
override def baseConfig: Config = ConfigFactory.parseString(
s"""
|akka-bootstrapper {
| seed-discovery.class = "filodb.akkabootstrapper.WhitelistAkkaClusterSeedDiscovery"
| seed-discovery.class = "filodb.akkabootstrapper.WhitelistClusterSeedDiscovery"
| whitelist.seeds = [
| "akka.tcp://[email protected]:2552"
| "akka.tcp://[email protected]:2562"
Expand Down
28 changes: 20 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ lazy val kafka = project
.in(file("kafka"))
.settings(name := "filodb-kafka")
.settings(commonSettings: _*)
.settings(multiJvmMaybeSettings: _*)
.settings(kafkaSettings: _*)
.settings(itSettings: _*)
.settings(assemblySettings: _*)
Expand All @@ -58,6 +57,7 @@ lazy val kafka = project
lazy val bootstrapper = project
.in(file("akka-bootstrapper"))
.settings(commonSettings: _*)
.settings(multiJvmMaybeSettings: _*)
.settings(name := "akka-bootstrapper")
.settings(libraryDependencies ++= bootstrapperDeps)
.configs(MultiJvm)
Expand All @@ -67,7 +67,7 @@ lazy val http = project
.settings(commonSettings: _*)
.settings(name := "http")
.settings(libraryDependencies ++= httpDeps)
.dependsOn(core)
.dependsOn(core, coordinator % "compile->compile; test->test")

lazy val standalone = project
.in(file("standalone"))
Expand Down Expand Up @@ -128,6 +128,7 @@ val excludeJersey = ExclusionRule(organization = "com.sun.jersey")

/* Versions in various modules versus one area of build */
val akkaVersion = "2.4.19" // akka-http/akka-stream compat. TODO when kamon-akka-remote is akka 2.5.4 compat
val akkaHttpVersion = "10.0.10"
val cassDriverVersion = "3.0.2"
val ficusVersion = "1.1.2"
val kamonVersion = "0.6.3"
Expand All @@ -140,8 +141,11 @@ val log4jDep = "log4j" % "log4j" %
val scalaLoggingDep = "com.typesafe.scala-logging" %% "scala-logging" % "3.7.2"
val scalaTest = "org.scalatest" %% "scalatest" % "2.2.6" // TODO upgrade to 3.0.4
val scalaCheck = "org.scalacheck" %% "scalacheck" % "1.11.0"
val akkaHttp = "com.typesafe.akka" %% "akka-http" % "10.0.10"
val akkaHttpSprayJson = "com.typesafe.akka" %% "akka-http-spray-json" % "10.0.10"
val akkaHttp = "com.typesafe.akka" %% "akka-http" % akkaHttpVersion
val akkaHttpTestkit = "com.typesafe.akka" %% "akka-http-testkit" % akkaHttpVersion
val akkaHttpCirce = "de.heikoseeberger" %% "akka-http-circe" % "1.18.1"
val circeGeneric = "io.circe" %% "circe-generic" % "0.8.0"
val circeParser = "io.circe" %% "circe-parser" % "0.8.0"

lazy val commonDeps = Seq(
"io.kamon" %% "kamon-core" % kamonVersion,
Expand Down Expand Up @@ -215,7 +219,10 @@ lazy val tsgeneratorDeps = Seq(
lazy val httpDeps = Seq(
logbackDep,
akkaHttp,
akkaHttpSprayJson
akkaHttpCirce,
circeGeneric,
circeParser,
akkaHttpTestkit % Test
)

lazy val standaloneDeps = Seq(
Expand All @@ -230,7 +237,9 @@ lazy val bootstrapperDeps = Seq(
"com.typesafe.akka" %% "akka-cluster" % akkaVersion,
// akka http should be a compile time dependency only. Users of this library may want to use a different http server
akkaHttp % "test; provided",
akkaHttpSprayJson % "test; provided",
akkaHttpCirce % "test; provided",
circeGeneric % "test; provided",
circeParser % "test; provided",
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"dnsjava" % "dnsjava" % "2.1.8",
"org.scalaj" %% "scalaj-http" % "2.3.0",
Expand Down Expand Up @@ -379,7 +388,9 @@ lazy val itSettings = Defaults.itSettings ++ Seq(
internalDependencyClasspath in IntegrationTest, exportedProducts in Test)).value)

lazy val multiJvmSettings = SbtMultiJvm.multiJvmSettings ++ Seq(
compile in MultiJvm := ((compile in MultiJvm) triggeredBy (compile in Test)).value,
compile in MultiJvm := ((compile in MultiJvm) triggeredBy (compile in Test)).value)

lazy val testMultiJvmToo = Seq(
// make sure that MultiJvm tests are executed by the default test target,
// and combine the results from ordinary test and multi-jvm tests
executeTests in Test := {
Expand All @@ -396,7 +407,8 @@ lazy val multiJvmSettings = SbtMultiJvm.multiJvmSettings ++ Seq(
}
)

lazy val multiJvmMaybeSettings = if (sys.env.contains("MAYBE_MULTI_JVM")) multiJvmSettings else Nil
lazy val multiJvmMaybeSettings = multiJvmSettings ++ {
if (sys.env.contains("MAYBE_MULTI_JVM")) testMultiJvmToo else Nil }

// Fork a separate JVM for each test, instead of one for all tests in a module.
// This is necessary for Spark tests due to initialization, for example
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package filodb.cassandra.columnstore

import com.typesafe.config.ConfigFactory
import monix.reactive.Observable
import org.scalatest.BeforeAndAfterAll

import filodb.core.TestData
import filodb.core.memstore.{TimeSeriesPartition, TimeSeriesPartitionSpec, TimeSeriesShardStats}
Expand All @@ -10,12 +11,16 @@ import filodb.core.binaryrecord.BinaryRecord
import filodb.core.store.{AllChunkScan, ChunkSet, RowKeyChunkScan}
import filodb.memory.format.TupleRowReader

class CassandraBackedTimeSeriesPartitionSpec extends TimeSeriesPartitionSpec {
class CassandraBackedTimeSeriesPartitionSpec extends TimeSeriesPartitionSpec with BeforeAndAfterAll {

val config = ConfigFactory.load("application_test.conf").getConfig("filodb")
import monix.execution.Scheduler.Implicits.global
override val colStore = new CassandraColumnStore(config, global)
colStore.initialize(dataset1.ref)

override def beforeAll(): Unit = {
super.beforeAll()
colStore.initialize(dataset1.ref)
}

it("should be able to load from persistent store to answer queries") {

Expand Down
20 changes: 7 additions & 13 deletions cli/src/main/scala/filodb.cli/CliMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import scala.util.{Failure, Success => SSuccess, Try}
import com.opencsv.CSVWriter
import com.quantifind.sumac.{ArgMain, FieldArgs}
import com.typesafe.config.{Config, ConfigFactory}
import net.ceedubs.ficus.Ficus._
import org.parboiled2.ParseError

import filodb.coordinator._
Expand Down Expand Up @@ -142,9 +141,9 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste
val remote = Client.standaloneClient(system, args.host.get, args.port)
(args.filename, args.configPath) match {
case (Some(configFile), _) =>
setupDataset(remote, ConfigFactory.parseFile(new java.io.File(configFile)))
setupDataset(remote, ConfigFactory.parseFile(new java.io.File(configFile)), timeout)
case (None, Some(configPath)) =>
setupDataset(remote, systemConfig.getConfig(configPath))
setupDataset(remote, systemConfig.getConfig(configPath), timeout)
case (None, None) =>
println("Either --filename or --configPath must be specified for setup")
exitCode = 1
Expand Down Expand Up @@ -220,7 +219,7 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste
* setup command configuration example:
* {{{
* dataset = "gdelt"
* numshards = 32 # for Kafka this should match the number of partitions
* num-shards = 32 # for Kafka this should match the number of partitions
* min-num-nodes = 10 # This many nodes needed to ingest all shards
* sourcefactory = "filodb.kafka.KafkaSourceFactory"
* sourceconfig {
Expand All @@ -231,16 +230,11 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste
* sourcefactory and sourceconfig is optional. If omitted, a NoOpFactory will be used, which means
* no automatic pull ingestion will be started. New data can always be pushed into any Filo node.
*/
def setupDataset(client: LocalClient, config: Config): Unit = {
val dataset = DatasetRef(config.getString("dataset"))
val resourceSpec = DatasetResourceSpec(config.getInt("numshards"),
config.getInt("min-num-nodes"))
val sourceSpec = config.as[Option[String]]("sourcefactory").map { factory =>
IngestionSource(factory, config.getConfig("sourceconfig"))
}.getOrElse(noOpSource)
client.setupDataset(dataset, resourceSpec, sourceSpec).foreach {
def setupDataset(client: LocalClient, config: Config, timeout: FiniteDuration): Unit = {
val ingestConfig = IngestionConfig(config, backupSourceFactory=noOpSource.streamFactoryClass)
client.setupDataset(ingestConfig, timeout).foreach {
case e: ErrorResponse =>
println(s"Errors setting up dataset $dataset: $e")
println(s"Errors setting up dataset ${ingestConfig.ref}: $e")
exitCode = 2
}
}
Expand Down
2 changes: 1 addition & 1 deletion conf/timeseries-dev-source.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
dataset = "timeseries"
numshards = 4
num-shards = 4
min-num-nodes = 2
# Length of chunks to be written, roughly
sourcefactory = "filodb.kafka.KafkaIngestionStreamFactory"
Expand Down
1 change: 1 addition & 0 deletions conf/timeseries-filodb-server.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ filodb {
max-chunks-size = 360
max-num-partitions = 5000
groups-per-shard = 10
shard-memory-mb = 1500
}

metrics-logger.enabled=true
Expand Down
Loading

0 comments on commit c454aaf

Please sign in to comment.