Skip to content

Commit

Permalink
Revamp RowSource and Actor-based ingestion client logic; have CLI use…
Browse files Browse the repository at this point in the history
… actor-based ingestion
  • Loading branch information
velvia committed Oct 15, 2015
1 parent b36fdad commit 4b609d1
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 62 deletions.
51 changes: 7 additions & 44 deletions cli/src/main/scala/filodb.cli/CsvImportExport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import scala.concurrent.duration._
import scala.language.postfixOps

import filodb.core.metadata.MetaStore
import filodb.core.reprojector.{MemTable, Scheduler}
import filodb.coordinator.NodeCoordinatorActor
import filodb.core.reprojector.MemTable
import filodb.coordinator.{NodeCoordinatorActor, RowSource}
import filodb.coordinator.sources.CsvSourceActor
import filodb.core._

Expand All @@ -21,7 +21,6 @@ trait CsvImportExport {
val system: ActorSystem
val metaStore: MetaStore
val memTable: MemTable
val scheduler: Scheduler
val coordinatorActor: ActorRef
var exitCode = 0

Expand Down Expand Up @@ -57,51 +56,15 @@ trait CsvImportExport {
def ingestCSV(dataset: String, version: Int, csvPath: String, delimiter: Char) {
val fileReader = new java.io.FileReader(csvPath)

val reader = new CSVReader(fileReader, delimiter)
val columns = reader.readNext.toSeq
println(s"Ingesting CSV at $csvPath with columns $columns...")

val ingestCmd = NodeCoordinatorActor.SetupIngestion(dataset, columns, version: Int)
actorAsk(coordinatorActor, ingestCmd, 10 seconds) {
case NodeCoordinatorActor.IngestionReady =>
case NodeCoordinatorActor.UnknownDataset =>
println(s"Dataset $dataset is not known, you need to --create it first!")
exitCode = 2
return
case NodeCoordinatorActor.UndefinedColumns(undefCols) =>
println(s"Some columns $undefCols are not defined, please define them with --create first!")
exitCode = 2
return
case NodeCoordinatorActor.BadSchema(msg) =>
println(s"BadSchema - $msg")
val csvActor = system.actorOf(CsvSourceActor.props(fileReader, dataset, version, coordinatorActor))
actorAsk(csvActor, RowSource.Start, 99 minutes) {
case RowSource.SetupError(e) =>
println(s"Error $e setting up CSV ingestion of $dataset/$version at $csvPath")
exitCode = 2
return
case RowSource.AllDone =>
}

var linesIngested = 0
reader.iterator.grouped(100).foreach { lines =>
val mappedLines = lines.map(ArrayStringRowReader)
var resp: MemTable.IngestionResponse = MemTable.PleaseWait
do {
resp = memTable.ingestRows(dataset, version, mappedLines)
if (resp == MemTable.PleaseWait) {
do {
println("Waiting for MemTable to be able to ingest again...")
Thread sleep 10000
} while (!memTable.canIngest(dataset, version))
}
} while (resp != MemTable.Ingested)
linesIngested += mappedLines.length
if (linesIngested % 10000 == 0) println(s"Ingested $linesIngested lines!")
}

coordinatorActor ! NodeCoordinatorActor.Flush(dataset, version)
println("Waiting for scheduler/memTable to finish flushing everything")
Thread sleep 5000
while (memTable.flushingDatasets.nonEmpty) {
print(".")
Thread sleep 1000
}
println("ingestCSV finished!")
exitCode = 0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ object DatasetCoordinatorActor {
*/
case class StartFlush(replyTo: Option[ActorRef] = None)

// Checks if memtable is ready to accept more rows. Common reason why not is due to lack of memory
// or the row limit has been reached
case object CanIngest

/**
* Clears all data from the projection. Waits for existing flush to finish first.
*/
Expand Down Expand Up @@ -201,6 +205,9 @@ class DatasetCoordinatorActor(datasetObj: Dataset,
case ClearProjection(replyTo, projection) =>
clearProjection(replyTo, projection)

case CanIngest =>
sender ! NodeCoordinatorActor.CanIngest(memTable.canIngest(datasetObj.name, version))

case GetStats =>
sender ! Stats(flushesStarted, flushesSucceeded, flushesFailed,
activeRows.getOrElse(-1L), flushingRows.getOrElse(-1L))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ object NodeCoordinatorActor {
case class Flush(dataset: String, version: Int)
case object Flushed

/**
* Checks to see if the DatasetCoordActor is ready to take in more rows. Usually sent when an actor
* is in a wait state.
*/
case class CheckCanIngest(dataset: String, version: Int)
case class CanIngest(can: Boolean)

/**
* Truncates all data from a projection of a dataset. Waits for any pending flushes from said
* dataset to finish first, and also clears the columnStore cache for that dataset.
Expand Down Expand Up @@ -201,6 +208,9 @@ class NodeCoordinatorActor(memTable: MemTable,
_ ! DatasetCoordinatorActor.ClearProjection(sender, projection)
}

case CheckCanIngest(dataset, version) =>
withDsCoord(sender, dataset, version) { _.forward(DatasetCoordinatorActor.CanIngest) }

case AddDatasetCoord(dataset, version, dsCoordRef) =>
dsCoordinators((dataset, version)) = dsCoordRef
}
Expand Down
86 changes: 68 additions & 18 deletions coordinator/src/main/scala/filodb.coordinator/RowSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package filodb.coordinator
import akka.actor.{Actor, ActorRef, PoisonPill}
import com.typesafe.scalalogging.slf4j.StrictLogging
import org.velvia.filo.RowReader
import scala.concurrent.duration._

import filodb.core._

object RowSource {
case object Start
case object GetMoreRows
case object AllDone
case object CheckCanIngest
case class SetupError(err: ErrorResponse)
}

Expand All @@ -21,12 +23,12 @@ object RowSource {
*
* To start initialization and reading from source, send the Start message.
*
* TODO: Right now this doesn't work reliably because nobody keeps refreshing this actor
* and checking with the memTable to see if we can write again. Changes needed:
* 1) Follow similar model to code in CsvImportExport: use a FSM, have two states - writing and waiting
* waiting is after memTable returns PleaseWait. In waiting, periodically call memTable.canIngest()
* 2) In waiting -> writing transition, RowSource needs to rewind back to the last Acked sequence number
* and replay incoming rows
* Backpressure and at least once mechanism: RowSource sends rows to the NodeCoordinator,
* but does not send more than maxUnackedRows before getting an ack back. As long as it receives
* acks it will keep sending, but will stop once maxUnackedRows is reached. It then goes into a waiting
* state, waiting for Acks to come back, or periodically pinging with a CheckCanIngest message. If it
* is allowed to ingest again, then it proceeds but rewinds from the last acked message. This ensures
* that we do not skip any incoming messages.
*/
trait RowSource extends Actor with StrictLogging {
import RowSource._
Expand All @@ -37,6 +39,9 @@ trait RowSource extends Actor with StrictLogging {
// rows to read at a time
def rowsToRead: Int

def waitingPeriod: FiniteDuration
= 5.seconds

def coordinatorActor: ActorRef

// Returns the SetupIngestion message needed for initialization
Expand All @@ -53,24 +58,32 @@ trait RowSource extends Actor with StrictLogging {
// Anything additional to do when we hit end of data and it's all acked, before killing oneself
def allDoneAndGood(): Unit = {}

// Rewinds the input source to the given sequence number, usually called after a timeout
def rewindTo(seqNo: Long): Unit = {}

// Needs to be initialized to the first sequence # at the beginning
var lastAckedSeqNo: Long

private var currentHiSeqNo: Long = lastAckedSeqNo
private var isDoneReading: Boolean = false
private var whoStartedMe: ActorRef = _

def receive: Receive = {
import context.dispatcher

def start: Receive = {
case Start =>
whoStartedMe = sender
coordinatorActor ! getStartMessage()

case NodeCoordinatorActor.IngestionReady =>
self ! GetMoreRows
logger.info(s" ==> Setup is all done, starting ingestion...")
context.become(reading)

case e: ErrorResponse =>
whoStartedMe ! SetupError(e)
}

def reading: Receive = {
case GetMoreRows =>
val rows = (1 to rowsToRead).iterator
.map(i => getNewRow())
Expand All @@ -84,22 +97,59 @@ trait RowSource extends Actor with StrictLogging {
if (currentHiSeqNo - lastAckedSeqNo < maxUnackedRows) {
self ! GetMoreRows
} else {
logger.debug(s"Over high water mark: currentHi = $currentHiSeqNo, lastAcked = $lastAckedSeqNo")
logger.debug(s" ==> waiting: currentHi = $currentHiSeqNo, lastAcked = $lastAckedSeqNo")
context.system.scheduler.scheduleOnce(waitingPeriod, self, CheckCanIngest)
context.become(waiting)
}
} else {
logger.debug(s"Marking isDoneReading as true: HiSeqNo = $currentHiSeqNo, lastAcked = $lastAckedSeqNo")
isDoneReading = true
logger.debug(s" ==> doneReading: HiSeqNo = $currentHiSeqNo, lastAcked = $lastAckedSeqNo")
if (currentHiSeqNo == lastAckedSeqNo) { finish() }
else { context.become(doneReading) }
}

case NodeCoordinatorActor.Ack(lastSequenceNo) =>
lastAckedSeqNo = lastSequenceNo
if (!isDoneReading && (currentHiSeqNo - lastAckedSeqNo < maxUnackedRows)) self ! GetMoreRows
if (isDoneReading && currentHiSeqNo == lastAckedSeqNo) {
logger.info(s"Ingestion is all done")
coordinatorActor ! NodeCoordinatorActor.Flush(dataset, version)
allDoneAndGood()
whoStartedMe ! AllDone
self ! PoisonPill

case CheckCanIngest =>
}

def waiting: Receive = {
case NodeCoordinatorActor.Ack(lastSequenceNo) =>
lastAckedSeqNo = lastSequenceNo
if (currentHiSeqNo - lastAckedSeqNo < maxUnackedRows) {
logger.debug(s" ==> reading")
self ! GetMoreRows
context.become(reading)
}

case CheckCanIngest =>
coordinatorActor ! NodeCoordinatorActor.CheckCanIngest(dataset, version)

case NodeCoordinatorActor.CanIngest(can) =>
if (can) {
logger.debug(s"Yay, we're allowed to ingest again! Rewinding to $lastAckedSeqNo")
rewindTo(lastAckedSeqNo)
self ! GetMoreRows
context.become(reading)
} else {
logger.debug(s"Still waiting...")
context.system.scheduler.scheduleOnce(waitingPeriod, self, CheckCanIngest)
}
}

def doneReading: Receive = {
case NodeCoordinatorActor.Ack(lastSequenceNo) =>
lastAckedSeqNo = lastSequenceNo
if (currentHiSeqNo == lastAckedSeqNo) finish()
}

def finish(): Unit = {
logger.info(s"Ingestion is all done")
coordinatorActor ! NodeCoordinatorActor.Flush(dataset, version)
allDoneAndGood()
whoStartedMe ! AllDone
self ! PoisonPill
}

val receive = start
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ with CoordinatorSetup with AllTablesTest {
probe.send(coordActor, SetupIngestion(largeDataset.name, schemaWithPartCol.map(_.name), 0))
probe.expectMsg(IngestionReady)

probe.send(coordActor, CheckCanIngest(largeDataset.name, 0))
probe.expectMsg(CanIngest(true))

probe.send(coordActor, IngestRows(largeDataset.name, 0, lotLotNames.map(TupleRowReader), 1L))
probe.expectMsg(Ack(1L))

Expand Down

0 comments on commit 4b609d1

Please sign in to comment.