Skip to content

Commit

Permalink
Flush out MemTable and ReprojectorActor a little bit
Browse files Browse the repository at this point in the history
  • Loading branch information
velvia committed Sep 5, 2015
1 parent 55c4523 commit 4761e6d
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 155 deletions.
49 changes: 47 additions & 2 deletions core/src/main/scala/filodb.core/reprojector/MemTable.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,58 @@
package filodb.core.reprojector

import filodb.core.KeyRange
import filodb.core.Types._
import filodb.core.metadata.Dataset
import filodb.core.columnstore.RowReader

object MemTable {
// Ingestion responses
trait IngestionResponse
case object Ingested extends IngestionResponse
case object PleaseWait extends IngestionResponse // Cannot quite ingest yet
}

/**
* The MemTable serves these purposes:
* 1) Holds incoming rows of data before being flushed
* 2) Can extract rows of data in a given sort order, and remove them
* 3) Can read rows of data in a given sort order for queries
*
* It definitely must be multithread safe, and very very fast.
* It is flushed using a FlushPolicy which may be based on things like how stale the rows
* for a given dataset are, or memory size, or other parameters.
*
* Data written to a MemTable should be logged via WAL or some other mechanism so it can be recovered in
* case of failure.
*/
trait MemTable {
def numRows: Long
def tables: Set[String]
import MemTable._

def totalNumRows: Long
def totalBytesUsed: Long
def datasets: Set[String]

val DefaultTopK = 10 // # of top results to return

def mostStaleDatasets(k: Int = DefaultTopK): Seq[String]
def mostStalePartitions(dataset: String, k: Int = DefaultTopK): Seq[PartitionKey]

/**
* === Row ingest, read, delete operations ===
*/

/**
* Ingests a bunch of new rows for a given table. The rows will be grouped by partition key and then
* sorted based on the superprojection sort order.
* @param dataset the Dataset to ingest
* @param rows the rows to ingest
* @param timestamp the write timestamp to associate with the rows, used to calculate staleness
* @returns Ingested or PleaseWait, if the MemTable is too full.
*/
def ingestRows(dataset: Dataset, rows: Seq[RowReader], timestamp: Long): IngestionResponse

def readRows[K](keyRange: KeyRange[K], sortOrder: SortOrder): Iterator[RowReader]

def removeRows[K](keyRange: KeyRange[K]): Unit
}

32 changes: 0 additions & 32 deletions core/src/main/scala/filodb.core/reprojector/Reprojector.scala

This file was deleted.

48 changes: 48 additions & 0 deletions core/src/main/scala/filodb.core/reprojector/ReprojectorActor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package filodb.core.ingest

import akka.actor.{Actor, ActorRef, Props}

import filodb.core.BaseActor
import filodb.core.metadata.{Dataset, MetaStore}
import filodb.core.columnstore.ColumnStore

/**
* The Reprojector flushes rows out of the MemTable and writes out Segments to the ColumnStore.
*
* It can be triggered by a timer to regularly flush or by events (such as MemTable getting full).
* It works on one partition at a time, going through rows, creating segments, flushing them to
* the ColumnStore, then deleting the rows from the memtable.
*
* It is an actor, working asynchronously, and multiple reprojectors can be in action at the same time.
*/
object ReprojectorActor {
/**
* Sent to periodically check if flushing is required.
*/
case object Check

/**
* Flushes data for a particular dataset. Usually called explicitly when ingestion has finished.
* Usually starts with the most stale partitions.
* NOTE: This may take a long time! After a segment has been flushed this actor may call flush again.
*/
case class Flush(dataset: Dataset)

/**
* Creates a new ReprojectorActor (to be used in system.actorOf(....))
* Note: partition does not need to include shard info, just chunkSize, dataset and partition name.
*/
def props[R](metaStore: MetaStore,
columnStore: ColumnStore): Props =
Props(classOf[ReprojectorActor[R]], metaStore)
}

class ReprojectorActor[R](metaStore: MetaStore,
columnStore: ColumnStore) extends BaseActor {
import ReprojectorActor._

def receive: Receive = {
case Check =>
logger.debug("Checking for anything that needs to be flushed...")
}
}
120 changes: 0 additions & 120 deletions core/src/main/scala/filodb.core/reprojector/RowIngesterActor.scala

This file was deleted.

2 changes: 1 addition & 1 deletion scalastyle-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
<parameter name="maxParameters"><![CDATA[8]]></parameter>
</parameters>
</check>
<check level="error" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="true">
<check level="error" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="false">
<parameters>
<parameter name="ignore"><![CDATA[-1,0,1,2,3]]></parameter>
</parameters>
Expand Down

0 comments on commit 4761e6d

Please sign in to comment.