Skip to content


WIP: Overhaul coordinator, add SchedulerActor, CoordinatorSetup
Browse files Browse the repository at this point in the history
  • Loading branch information
velvia committed Sep 14, 2015
1 parent 0e4fd7e commit a30c44a
Show file tree
Hide file tree
Showing 11 changed files with 292 additions and 226 deletions.
8 changes: 5 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ lazy val coordinator = (project in file("coordinator"))
.settings(name := "filodb-coordinator")
.settings(libraryDependencies ++= coordDeps)
.dependsOn(core % "compile->compile; test->test",
cassandra % "compile->compile; test->test")

lazy val cassandra = (project in file("cassandra"))
Expand Down Expand Up @@ -60,7 +61,7 @@ lazy val coreDeps = Seq(
"org.velvia.filo" %% "filo-scala" % "0.1.3" excludeAll(excludeShapeless),
"io.spray" %% "spray-caching" % "1.3.2",
"org.mapdb" % "mapdb" % "1.0.6",
"com.typesafe" % "config" % "1.2.0",
"net.ceedubs" %% "ficus" % "1.0.1",
"com.nativelibs4java" %% "scalaxy-loops" % "0.3.3" % "provided",
"org.scalatest" %% "scalatest" % "2.2.4" % "test"
Expand All @@ -73,7 +74,8 @@ lazy val cassDeps = Seq(
lazy val coordDeps = Seq(
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"com.opencsv" % "opencsv" % "3.3",
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test"
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test",
"org.scalatest" %% "scalatest" % "2.2.4" % "test"

lazy val cliDeps = Seq(
Expand Down
15 changes: 12 additions & 3 deletions cassandra/src/test/scala/filodb.cassandra/AllTablesTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ object AllTablesTest {
val CassConfig = ConfigFactory.parseString(CassConfigStr)

abstract class AllTablesTest extends FunSpec with BeforeAndAfter with SimpleCassandraTest {
trait AllTablesTest extends SimpleCassandraTest {
import filodb.cassandra.metastore._

implicit val defaultPatience =
Expand All @@ -30,9 +30,13 @@ abstract class AllTablesTest extends FunSpec with BeforeAndAfter with SimpleCass

implicit val context =

lazy val columnStore = new CassandraColumnStore(ConfigFactory.load,
val config = ConfigFactory.load

lazy val columnStore = new CassandraColumnStore(config,
x => GdeltColumns(0))

lazy val metaStore = new CassandraMetaStore(config)

def createAllTables(): Unit = {
val f = for { _ <- DatasetTable.create.ifNotExists.future()
_ <- ColumnTable.create.ifNotExists.future() }
Expand All @@ -49,12 +53,17 @@ abstract class AllTablesTest extends FunSpec with BeforeAndAfter with SimpleCass

import Column.ColumnType._

val dsName = "dataset"
val dsName = "gdelt"
val GdeltDataset = Dataset(dsName ,"id")
val GdeltColumns = Seq(Column("id", dsName, 0, LongColumn),
Column("sqlDate", dsName, 0, StringColumn),
Column("monthYear", dsName, 0, IntColumn),
Column("year", dsName, 0, IntColumn))

val GdeltColNames =

def createTable(dataset: Dataset, columns: Seq[Column]): Unit = {
metaStore.newDataset(dataset).futureValue should equal (Success)
columns.foreach { col => metaStore.newColumn(col).futureValue should equal (Success) }
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import filodb.core._
import filodb.core.metadata.{Column, Dataset, MetaStore}
import filodb.cassandra.AllTablesTest

class CassandraMetaStoreSpec extends AllTablesTest {
import org.scalatest.{FunSpec, BeforeAndAfter}

class CassandraMetaStoreSpec extends FunSpec with BeforeAndAfter with AllTablesTest {
import MetaStore._

val metastore = new CassandraMetaStore(ConfigFactory.load)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package filodb.core
package filodb.coordinator

import com.typesafe.scalalogging.slf4j.StrictLogging
Expand Down
208 changes: 88 additions & 120 deletions coordinator/src/main/scala/filodb.coordinator/CoordinatorActor.scala
Original file line number Diff line number Diff line change
@@ -1,166 +1,134 @@
package filodb.core.ingest
package filodb.coordinator

import{Actor, ActorRef, Props}
import org.velvia.filo.RowIngestSupport
import scala.collection.mutable.HashMap
import com.typesafe.config.Config
import net.ceedubs.ficus.Ficus._
import scala.concurrent.Future
import scala.concurrent.duration._

import filodb.core.BaseActor
import filodb.core.datastore.Datastore
import filodb.core.messages._
import filodb.core.metadata.{Partition, Column}
import filodb.core._
import filodb.core.metadata.{Column, Dataset, MetaStore}
import filodb.core.columnstore.ColumnStore
import filodb.core.columnstore.RowReader
import filodb.core.reprojector.{MemTable, Scheduler}

* The CoordinatorActor is the common API entry point for all FiloDB ingestion operations.
* The CoordinatorActor is the common API entry point for all FiloDB ingestion and metadata operations.
* It is a singleton - there should be exactly one such actor per node/JVM process.
* It is responsible for:
* - spinning up an Ingester and RowIngester actor per ingestion stream
* - validating dataset, schema, etc.
* - monitoring timeouts and failure recovery from (Row)IngesterActor deaths/exceptions
* - keeping away too many ingestion requests
* - Acting as the gateway to the MemTable, Scheduler, and MetaStore for local and remote actors
* - Supervising any other actors, such as regular scheduler heartbeats
* - Maintaining MemTable backpressure on clients to prevent OOM
* It is called by local (eg HTTP) as well as remote (eg Spark ETL) processes.
* See doc/ and the ingestion flow diagram for more details about the entire ingestion flow.
object CoordinatorActor {
// //////////// Commands

* Tells the CoordinatorActor to spin up the actor pipeline for high volume ingestion of a
* single dataset. Should be sent from an actor, which will get back a RowIngestionReady
* with an ActorRef to start pushing rows to. It will get Acks back from the IngesterActor.
* Sets up ingestion for a given dataset, version, and schema of columns.
* The dataset and columns must have been previously defined.
* @return RowIngestionReady, or NotFound, CannotLockPartition, UndefinedColumns etc.
* @returns BadSchema if the partition column is unsupported, sort column invalid, etc.
case class StartRowIngestion[R](dataset: String,
partition: String,
columns: Seq[String],
initialVersion: Int,
rowIngestSupport: RowIngestSupport[R])
case class SetupIngestion(dataset: String, schema: Seq[String], version: Int)

case object IngestionReady extends Response
case object UnknownDataset extends ErrorResponse
case class UndefinedColumns(undefined: Seq[String]) extends ErrorResponse
case object AlreadySetup extends ErrorResponse
case object BadSchema extends ErrorResponse

* Explicitly stop ingestion. Always use this when possible to help keep resources low.
* Ingests a new set of rows for a given dataset and version.
* The partitioning column and sort column are set up in the dataset.
* @param seqNo the sequence number to be returned for acknowledging the entire set of rows
case class StopIngestion(streamId: Int)
case class IngestRows(dataset: String, version: Int, rows: Seq[RowReader], seqNo: Long)

// ////////// Responses
case class Ack(seqNo: Long) extends Response

case class RowIngestionReady(streamId: Int, rowIngestActor: ActorRef) extends Response
case object CannotLockPartition extends ErrorResponse
case object NoDatasetColumns extends ErrorResponse
case class UndefinedColumns(undefined: Seq[String]) extends ErrorResponse

* This may be sent back for several reasons:
* - in response to an explicit StopIngestion
* - Idle / lack of activity or new input for that dataset/partition
* Either case, it means that the entire pipeline (Ingester, RowIngester) has been shut down, and
* their resources released. Any intermediate data would have been flushed.
* Initiates a flush of the remaining MemTable rows of the given dataset and version.
* Usually used when at the end of ingesting some large blob of data.
case class IngestionStopped(streamId: Int) extends Response

// /////////// Internal messaging
case class GoodToGo(originator: ActorRef, streamId: Int, ingester: ActorRef,
rowIngester: ActorRef, partition: Partition)
case class Flush(dataset: String, version: Int)

def invalidColumns(columns: Seq[String], schema: Column.Schema): Seq[String] =
(columns.toSet -- schema.keys).toSeq

def props(datastore: Datastore): Props =
Props(classOf[CoordinatorActor], datastore)
def props(memTable: MemTable,
metaStore: MetaStore,
scheduler: Scheduler,
config: Config): Props =
Props(classOf[CoordinatorActor], memTable, metaStore, scheduler, config)

class CoordinatorActor(datastore: Datastore) extends BaseActor {
* ==Configuration==
* {{{
* {
* memtable-retry-interval = 10 s
* scheduler-interval = 10 s
* }
* }}}
class CoordinatorActor(memTable: MemTable,
metaStore: MetaStore,
scheduler: Scheduler,
config: Config) extends BaseActor {
import CoordinatorActor._
import context.dispatcher

val streamIds = new HashMap[Int, (String, String)]
val ingesterActors = new HashMap[Int, ActorRef]
val rowIngesterActors = new HashMap[Int, ActorRef]
var nextStreamId = 0
val memtablePushback =[FiniteDuration]("memtable-retry-interval")
val schedulerInterval =[FiniteDuration]("scheduler-interval")

val schedulerActor = context.actorOf(SchedulerActor.props(scheduler), "scheduler")
context.system.scheduler.schedule(schedulerInterval, schedulerInterval,
schedulerActor, SchedulerActor.RunOnce)

private def verifySchema(originator: ActorRef, dataset: String, version: Int, columns: Seq[String]):
Future[Option[Column.Schema]] = {
datastore.getSchema(dataset, version).collect {
case Datastore.TheSchema(schema) =>
val undefinedCols = invalidColumns(columns, schema)
if (schema.isEmpty) {"Either no columns defined or no dataset $dataset")
originator ! NoDatasetColumns
} else if (undefinedCols.nonEmpty) {"Undefined columns $undefinedCols for dataset $dataset with schema $schema")
originator ! UndefinedColumns(undefinedCols.toSeq)
} else {
case r: Response =>
originator ! r
}.recover {
case t: Throwable =>
originator ! MetadataException(t)

private def getPartition(originator: ActorRef, dataset: String, partitionName: String):
Future[Option[Partition]] = {
datastore.getPartition(dataset, partitionName).collect {
case Datastore.ThePartition(partObj) =>
case r: Response =>
originator ! r
}.recover {
case t: Throwable =>
originator ! MetadataException(t)
metaStore.getSchema(dataset, version).map { schema =>
val undefinedCols = invalidColumns(columns, schema)
if (undefinedCols.nonEmpty) {"Undefined columns $undefinedCols for dataset $dataset with schema $schema")
originator ! UndefinedColumns(undefinedCols.toSeq)
} else {

def receive: Receive = {
case StartRowIngestion(dataset, partition, columns, initVersion, rowIngestSupport) =>

// TODO: check that there aren't too many ingestion streams already

case SetupIngestion(dataset, columns, version) =>
val originator = sender // capture mutable sender for async response
val streamId = nextStreamId
nextStreamId += 1
for { schema <- verifySchema(originator, dataset, initVersion, columns) if schema.isDefined
partOpt <- getPartition(originator, dataset, partition) if partOpt.isDefined }
(for { datasetObj <- metaStore.getDataset(dataset)
schema <- verifySchema(originator, dataset, version, columns) if schema.isDefined }
yield {
val columnSeq =
val partObj = partOpt.get

val ingester = context.actorOf(
IngesterActor.props(partObj, columnSeq, datastore, originator),
val rowIngester = context.actorOf(
RowIngesterActor.props(ingester, columnSeq, partObj, rowIngestSupport),

// Send message to myself to modify state, don't do it in async future callback
self ! GoodToGo(originator, streamId, ingester, rowIngester, partObj)
memTable.setupIngestion(datasetObj, columnSeq, version) match {
case MemTable.SetupDone => originator ! IngestionReady
case MemTable.AlreadySetup => originator ! AlreadySetup
case MemTable.BadSchema => originator ! BadSchema
}).recover {
case NotFoundError(what) => originator ! UnknownDataset
case t: Throwable => originator ! MetadataException(t)

case GoodToGo(originator, streamId, ingester, rowIngester, partition) =>
streamIds += streamId -> (partition.dataset -> partition.partition)
ingesterActors += streamId -> ingester
rowIngesterActors += streamId -> rowIngester"Set up ingestion pipeline for $partition, streamId=$streamId")
originator ! RowIngestionReady(streamId, rowIngester)

case StopIngestion(streamId) =>
case ingestCmd @ IngestRows(dataset, version, rows, seqNo) =>
// Check if we are over limit or under memory
// Ingest rows into the memtable
memTable.ingestRows(dataset, version, rows) match {
case MemTable.NoSuchDatasetVersion => sender ! UnknownDataset
case MemTable.Ingested => sender ! Ack(seqNo)
case MemTable.PleaseWait =>
logger.debug(s"MemTable full, retrying in $memtablePushback...")
context.system.scheduler.scheduleOnce(memtablePushback, self, ingestCmd)

// TODO: implement error recovery and watch actors for termination
// Consider restarting everything as a group?
// case Terminated(actorRef) =>
case flushCmd @ Flush(dataset, version) =>
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package filodb.coordinator

import com.typesafe.config.Config

import filodb.core.metadata.MetaStore
import filodb.core.columnstore.ColumnStore
import filodb.core.reprojector.{MemTable, Scheduler, FlushPolicy, DefaultReprojector}

* A trait to make setup of the CoordinatorActor stack a bit easier.
* Mixed in for tests as well as the main FiloDB app and anywhere else the stack needs to be spun up.
trait CoordinatorSetup {
val system: ActorSystem
// The global configuration object
val config: Config

// TODO: Allow for a configurable thread pool for the futures, don't just use the global one
// and strongly consider using a BlockingQueue with the ThreadPoolExecutor with limited capacity
implicit val ec =

// These should be implemented as lazy val's
val memTable: MemTable
val flushPolicy: FlushPolicy
val columnStore: ColumnStore
val metaStore: MetaStore
lazy val reprojector = new DefaultReprojector(columnStore)
lazy val scheduler = new Scheduler(memTable,

lazy val coordinatorActor =
system.actorOf(CoordinatorActor.props(memTable, metaStore, scheduler,


0 comments on commit a30c44a

Please sign in to comment.