Skip to content

Commit

Permalink
Clean up tests in core; add InMemoryColumnStore!
Browse files Browse the repository at this point in the history
  • Loading branch information
velvia committed Sep 8, 2015
1 parent c6344d4 commit 4172c15
Show file tree
Hide file tree
Showing 13 changed files with 116 additions and 294 deletions.
13 changes: 8 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,26 @@ val excludeShapeless = ExclusionRule(organization = "com.chuusai")
val excludeZK = ExclusionRule(organization = "org.apache.zookeeper")

lazy val coreDeps = Seq(
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"com.typesafe.scala-logging" %% "scala-logging-slf4j" % "2.1.2",
"ch.qos.logback" % "logback-classic" % "1.0.7",
"com.beachape" %% "enumeratum" % "1.2.1",
"org.velvia.filo" %% "filo-scala" % "0.1.3" excludeAll(excludeShapeless),
"io.spray" %% "spray-caching" % "1.3.2",
"org.mapdb" % "mapdb" % "2.0-beta6",
"com.typesafe" % "config" % "1.2.0",
"com.nativelibs4java" %% "scalaxy-loops" % "0.3.3" % "provided",
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test",
"com.websudos" %% "phantom-testkit" % phantomVersion % "test" excludeAll(excludeZK)
"org.scalatest" %% "scalatest" % "2.2.4"
)

lazy val cassDeps = Seq(
"com.websudos" %% "phantom-dsl" % phantomVersion
"com.websudos" %% "phantom-dsl" % phantomVersion,
"com.websudos" %% "phantom-testkit" % phantomVersion % "test" excludeAll(excludeZK)
)

lazy val coordDeps = Seq(
"com.opencsv" % "opencsv" % "3.3"
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"com.opencsv" % "opencsv" % "3.3",
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test"
)

lazy val cliDeps = Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ class CassandraColumnStoreSpec extends CassandraFlatSpec with BeforeAndAfter {
}
}

it should "return empty iterator if cannot find partition or version" in (pending)

it should "return segment with empty chunks if cannot find columns" in {
whenReady(colStore.appendSegment(baseSegment, 0)) { response =>
response should equal (Success)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package filodb.core.columnstore

import java.nio.ByteBuffer
import java.util.TreeMap
import scala.collection.mutable.HashMap
import scala.concurrent.{ExecutionContext, Future}
import spray.caching._

import filodb.core._
import filodb.core.metadata.Column

/**
* A ColumnStore implementation which is entirely in memory for speed.
* Good for testing or performance.
* TODO: use thread-safe structures
*/
class InMemoryColumnStore(getSortColumn: Types.TableName => Column)
(implicit val ec: ExecutionContext) extends CachedMergingColumnStore {
import Types._
import filodb.core.columnstore._
import collection.JavaConversions._

val segmentCache = LruCache[Segment[_]](100)

val mergingStrategy = new AppendingChunkMergingStrategy(this, getSortColumn)

type ChunkTree = TreeMap[(ColumnId, ByteBuffer, ChunkID), ByteBuffer]
type RowMapTree = TreeMap[ByteBuffer, (ByteBuffer, ByteBuffer, Int)]

val chunkDb = new HashMap[(TableName, PartitionKey, Int), ChunkTree]
val rowMaps = new HashMap[(TableName, PartitionKey, Int), RowMapTree]

def writeChunks(dataset: TableName,
partition: PartitionKey,
version: Int,
segmentId: ByteBuffer,
chunks: Iterator[(ColumnId, ChunkID, ByteBuffer)]): Future[Response] = Future {
val chunkTree = chunkDb.getOrElseUpdate((dataset, partition, version), new ChunkTree)
chunks.foreach { case (colId, chunkId, bytes) => chunkTree.put((colId, segmentId, chunkId), bytes) }
Success
}

def writeChunkRowMap(dataset: TableName,
partition: PartitionKey,
version: Int,
segmentId: ByteBuffer,
chunkRowMap: ChunkRowMap): Future[Response] = Future {
val rowMapTree = rowMaps.getOrElseUpdate((dataset, partition, version), new RowMapTree)
val (chunkIds, rowNums) = chunkRowMap.serialize()
rowMapTree.put(segmentId, (chunkIds, rowNums, chunkRowMap.nextChunkId))
Success
}

def readChunks[K](columns: Set[ColumnId],
keyRange: KeyRange[K],
version: Int): Future[Seq[ChunkedData]] = Future {
val chunkTree = chunkDb.getOrElseUpdate((keyRange.dataset, keyRange.partition, version), new ChunkTree)
for { column <- columns.toSeq } yield {
val startKey = (column, keyRange.binaryStart, 0)
val endKey = (column, keyRange.binaryEnd, 0) // exclusive end
val it = chunkTree.subMap(startKey, endKey).entrySet.iterator
val chunkList = it.toSeq.map { entry =>
val (colId, segmentId, chunkId) = entry.getKey
(segmentId, chunkId, entry.getValue)
}
ChunkedData(column, chunkList)
}
}

def readChunkRowMaps[K](keyRange: KeyRange[K], version: Int):
Future[Seq[(ByteBuffer, BinaryChunkRowMap)]] = Future {
val rowMapTree = rowMaps.getOrElseUpdate((keyRange.dataset, keyRange.partition, version), new RowMapTree)
val it = rowMapTree.subMap(keyRange.binaryStart, keyRange.binaryEnd).entrySet.iterator
it.toSeq.map { entry =>
val (chunkIds, rowNums, nextChunkId) = entry.getValue
(entry.getKey, new BinaryChunkRowMap(chunkIds, rowNums, nextChunkId))
}
}

def scanChunkRowMaps(dataset: TableName,
partitionFilter: (PartitionKey => Boolean),
params: Map[String, String])
(processFunc: (ChunkRowMap => Unit)): Future[Response] = ???
}
5 changes: 5 additions & 0 deletions core/src/main/scala/filodb.core/metadata/Column.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package filodb.core.metadata
import com.typesafe.scalalogging.slf4j.StrictLogging
import enumeratum.{Enum, EnumEntry}

import filodb.core.Types._

/**
* Defines a column of data and its properties.
*
Expand All @@ -29,6 +31,9 @@ case class Column(name: String,
serializer: Column.Serializer = Column.Serializer.FiloSerializer,
isDeleted: Boolean = false,
isSystem: Boolean = false) {
// More type safe than just using ==, if we ever change the type of ColumnId
def hasId(id: ColumnId): Boolean = name == id

/**
* Has one of the properties other than name, dataset, version changed?
* (Name and dataset have to be constant for comparison to even be valid)
Expand Down
13 changes: 9 additions & 4 deletions core/src/main/scala/filodb.core/reprojector/MemTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package filodb.core.reprojector

import filodb.core.KeyRange
import filodb.core.Types._
import filodb.core.metadata.Dataset
import filodb.core.metadata.{Column, Dataset}
import filodb.core.columnstore.RowReader

object MemTable {
// Ingestion responses
trait IngestionResponse
case object Ingested extends IngestionResponse
case object PleaseWait extends IngestionResponse // Cannot quite ingest yet
case object BadSchema extends IngestionResponse

val DefaultTopK = 10 // # of top results to return
}

/**
Expand All @@ -32,8 +35,6 @@ trait MemTable {
def totalBytesUsed: Long
def datasets: Set[String]

val DefaultTopK = 10 // # of top results to return

def mostStaleDatasets(k: Int = DefaultTopK): Seq[String]
def mostStalePartitions(dataset: String, k: Int = DefaultTopK): Seq[PartitionKey]

Expand All @@ -45,11 +46,15 @@ trait MemTable {
* Ingests a bunch of new rows for a given table. The rows will be grouped by partition key and then
* sorted based on the superprojection sort order.
* @param dataset the Dataset to ingest
* @param schema the columns to be ingested, in order of appearance in a row of data
* @param rows the rows to ingest
* @param timestamp the write timestamp to associate with the rows, used to calculate staleness
* @returns Ingested or PleaseWait, if the MemTable is too full.
*/
def ingestRows(dataset: Dataset, rows: Seq[RowReader], timestamp: Long): IngestionResponse
def ingestRows(dataset: Dataset,
schema: Seq[Column],
rows: Seq[RowReader],
timestamp: Long): IngestionResponse

def readRows[K](keyRange: KeyRange[K], sortOrder: SortOrder): Iterator[RowReader]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import filodb.core.metadata.Dataset

/**
* Holds the current state of a dataset reprojector, especially outstanding ColumnStore tasks.
* One per dataset, typed to the dataset's sort key.
*
* outstandingTasks should hold a future for writing in each fixed keyRange interval.
*/
trait ReprojectorState[K] {
def dataset: Dataset
Expand All @@ -22,7 +25,7 @@ trait Reprojector {
/**
* Does reprojection (columnar flushes from memtable) for a single dataset. Returns an updated copy of
* the ReprojectorState. Side effects:
* - ColumnStore I/O
* - ColumnStore I/O. May split segments and write into multiple segments.
* - May delete rows from MemTable that have been acked successfully as flushed by ColumnStore.
*
* @returns an updated ReprojectorState with current outstanding I/O requests. Stale ones may be cleaned up.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package filodb.core.columnstore

import com.typesafe.config.ConfigFactory
import filodb.core._
import filodb.core.metadata.Column
import java.nio.ByteBuffer
import org.velvia.filo.{ColumnParser, TupleRowIngestSupport}
import filodb.core.cassandra.CassandraColumnStore

import org.scalatest.FunSpec
import org.scalatest.Matchers
Expand All @@ -15,8 +15,7 @@ class ChunkMergingStrategySpec extends FunSpec with Matchers {
val sortKey = "age"

import scala.concurrent.ExecutionContext.Implicits.global
val colStore = new CassandraColumnStore(ConfigFactory.load(),
{ x => schema(2) })
val colStore = new InMemoryColumnStore({ x => schema(2) })
val dataset = "foo"
val mergingStrategy = new AppendingChunkMergingStrategy(colStore, { x => schema(2) })

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package filodb.core.columnstore

import filodb.core._
import java.nio.ByteBuffer

import org.scalatest.FunSpec
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package filodb.core.columnstore

import filodb.core._
import filodb.core.metadata.Column
import java.nio.ByteBuffer
import org.velvia.filo.{ColumnParser, TupleRowIngestSupport}
Expand Down
Loading

0 comments on commit 4172c15

Please sign in to comment.