Skip to content

Commit

Permalink
Enable all Cassandra tables to take host/port/keyspace config from a …
Browse files Browse the repository at this point in the history
…Config
  • Loading branch information
velvia committed Sep 18, 2015
1 parent d9d5ccd commit 13ffc15
Show file tree
Hide file tree
Showing 13 changed files with 142 additions and 75 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package filodb.cassandra

import com.datastax.driver.core.{Session, VersionNumber}
import com.typesafe.config.Config
import com.websudos.phantom.connectors.{ContactPoints, KeySpace}
import net.ceedubs.ficus.Ficus._

trait FiloCassandraConnector {
private[this] lazy val connector =
ContactPoints(config.as[Seq[String]]("hosts"), config.getInt("port")).keySpace(keySpace.name)

// Cassandra config with following keys: keyspace, hosts, port
def config: Config

implicit lazy val keySpace = KeySpace(config.getString("keyspace"))

implicit lazy val session: Session = connector.session

def cassandraVersion: VersionNumber = connector.cassandraVersion

def cassandraVersions: Set[VersionNumber] = connector.cassandraVersions
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import java.nio.ByteBuffer
import scala.concurrent.Future
import scodec.bits._

import filodb.cassandra.FiloCassandraConnector
import filodb.core._

case class ChunkRowMapRecord(segmentId: Types.SegmentId,
Expand All @@ -18,16 +19,16 @@ case class ChunkRowMapRecord(segmentId: Types.SegmentId,
* Represents the table which holds the ChunkRowMap for each segment of a partition.
* This maps sort keys in sorted order to chunks and row number within each chunk.
* The ChunkRowMap is written as two Filo binary vectors.
*
* @param config a Typesafe Config with hosts, port, and keyspace parameters for Cassandra connection
*/
sealed class ChunkRowMapTable(dataset: String, config: Config)
sealed class ChunkRowMapTable(dataset: String, val config: Config)
extends CassandraTable[ChunkRowMapTable, ChunkRowMapRecord]
with SimpleCassandraConnector {
with FiloCassandraConnector {
import filodb.cassandra.Util._
import scala.collection.JavaConversions._

override val tableName = dataset + "_chunkmap"
// TODO: keySpace and other things really belong to a trait
implicit val keySpace = KeySpace(config.getString("keyspace"))

//scalastyle:off
object partition extends StringColumn(this) with PartitionKey[String]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import java.nio.ByteBuffer
import scala.concurrent.Future
import scodec.bits._

import filodb.cassandra.FiloCassandraConnector
import filodb.core._
import filodb.core.columnstore.ChunkedData

Expand All @@ -16,15 +17,15 @@ import filodb.core.columnstore.ChunkedData
*
* Data is stored in a columnar fashion similar to Parquet -- grouped by column. Each
* chunk actually stores many many rows grouped together into one binary chunk for efficiency.
*
* @param config a Typesafe Config with hosts, port, and keyspace parameters for Cassandra connection
*/
sealed class ChunkTable(dataset: String, config: Config)
sealed class ChunkTable(dataset: String, val config: Config)
extends CassandraTable[ChunkTable, (String, Types.SegmentId, Int, ByteBuffer)]
with SimpleCassandraConnector {
with FiloCassandraConnector {
import filodb.cassandra.Util._

override val tableName = dataset + "_chunks"
// TODO: keySpace and other things really belong to a trait
implicit val keySpace = KeySpace(config.getString("keyspace"))

//scalastyle:off
object partition extends StringColumn(this) with PartitionKey[String]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@ import scala.concurrent.{ExecutionContext, Future}
import filodb.core._
import filodb.core.metadata.{Column, Dataset, MetaStore}

/**
* A class for Cassandra implementation of the MetaStore.
*
* @param config a Typesafe Config with hosts, port, and keyspace parameters for Cassandra connection
*/
class CassandraMetaStore(config: Config)
(implicit val ec: ExecutionContext) extends MetaStore {
val datasetTable = DatasetTable
val columnTable = ColumnTable
val datasetTable = new DatasetTable(config)
val columnTable = new ColumnTable(config)

def initialize(): Future[Response] =
for { dtResp <- datasetTable.initialize()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
package filodb.cassandra.metastore

import com.datastax.driver.core.Row
import com.typesafe.config.Config
import com.websudos.phantom.dsl._
import play.api.libs.iteratee.Iteratee
import scala.concurrent.Future

import filodb.cassandra.FiloCassandraConnector
import filodb.core.metadata.Column

/**
* Represents the "columns" Cassandra table tracking column and schema changes for a dataset
*
* @param config a Typesafe Config with hosts, port, and keyspace parameters for Cassandra connection
*/
sealed class ColumnTable extends CassandraTable[ColumnTable, Column] {
sealed class ColumnTable(val config: Config) extends CassandraTable[ColumnTable, Column]
with FiloCassandraConnector {
override val tableName = "columns"

// scalastyle:off
object dataset extends StringColumn(this) with PartitionKey[String]
object version extends IntColumn(this) with PrimaryKey[Int]
Expand All @@ -21,6 +28,9 @@ sealed class ColumnTable extends CassandraTable[ColumnTable, Column] {
object isSystem extends BooleanColumn(this)
// scalastyle:on

import filodb.cassandra.Util._
import filodb.core._

// May throw IllegalArgumentException if cannot convert one of the string types to one of the Enums
override def fromRow(row: Row): Column =
Column(name(row),
Expand All @@ -30,20 +40,6 @@ sealed class ColumnTable extends CassandraTable[ColumnTable, Column] {
Column.Serializer.withName(serializer(row)),
isDeleted(row),
isSystem(row))
}

/**
* Asynchronous methods to operate on columns. All normal errors and exceptions are returned
* through ErrorResponse types.
*/
object ColumnTable extends ColumnTable with SimpleCassandraConnector {
override val tableName = "columns"

// TODO: add in Config-based initialization code to find the keyspace, cluster, etc.
implicit val keySpace = KeySpace("unittest")

import filodb.cassandra.Util._
import filodb.core._

def initialize(): Future[Response] = create.ifNotExists.future().toResponse()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
package filodb.cassandra.metastore

import com.datastax.driver.core.Row
import com.typesafe.config.Config
import com.websudos.phantom.dsl._
import scala.concurrent.Future

import filodb.cassandra.FiloCassandraConnector
import filodb.core.metadata.{Dataset, DatasetOptions, Projection}

/**
* Represents the "dataset" Cassandra table tracking each dataset and its partitions
*
* @param config a Typesafe Config with hosts, port, and keyspace parameters for Cassandra connection
*/
sealed class DatasetTable extends CassandraTable[DatasetTable, Projection] {
sealed class DatasetTable(val config: Config) extends CassandraTable[DatasetTable, Projection]
with FiloCassandraConnector {
override val tableName = "datasets"

// scalastyle:off
object name extends StringColumn(this) with PartitionKey[String]
object partitionColumn extends StringColumn(this) with StaticColumn[String]
Expand All @@ -20,27 +27,16 @@ sealed class DatasetTable extends CassandraTable[DatasetTable, Projection] {
object projectionSegmentSize extends StringColumn(this)
// scalastyle:on

import filodb.cassandra.Util._
import filodb.core._
import filodb.core.Types._

override def fromRow(row: Row): Projection =
Projection(projectionId(row),
name(row),
projectionSortColumn(row),
projectionReverse(row),
segmentSize = projectionSegmentSize(row))
}

/**
* Asynchronous methods to operate on datasets. All normal errors and exceptions are returned
* through ErrorResponse types.
*/
object DatasetTable extends DatasetTable with SimpleCassandraConnector {
override val tableName = "datasets"

// TODO: add in Config-based initialization code to find the keyspace, cluster, etc.
implicit val keySpace = KeySpace("unittest")

import filodb.cassandra.Util._
import filodb.core._
import filodb.core.Types._

def initialize(): Future[Response] = create.ifNotExists.future().toResponse()

Expand Down
14 changes: 3 additions & 11 deletions cassandra/src/test/scala/filodb.cassandra/AllTablesTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,19 @@ import filodb.core.metadata.{Column, Dataset}
import filodb.core._
import filodb.cassandra.columnstore.CassandraColumnStore

object AllTablesTest {
val CassConfigStr = """
| max-outstanding-futures = 2
""".stripMargin
val CassConfig = ConfigFactory.parseString(CassConfigStr)
}

trait AllTablesTest extends SimpleCassandraTest {
import filodb.cassandra.metastore._

implicit val defaultPatience =
PatienceConfig(timeout = Span(10, Seconds), interval = Span(50, Millis))

implicit val keySpace = KeySpace("unittest")

implicit val context = scala.concurrent.ExecutionContext.Implicits.global

val config = ConfigFactory.load
val config = ConfigFactory.load("application_test.conf")
implicit val keySpace = KeySpace(config.getString("cassandra.keyspace"))

lazy val columnStore = new CassandraColumnStore(config)
lazy val metaStore = new CassandraMetaStore(config)
lazy val metaStore = new CassandraMetaStore(config.getConfig("cassandra"))

import Column.ColumnType._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.scalatest.{FunSpec, BeforeAndAfter}
class CassandraMetaStoreSpec extends FunSpec with BeforeAndAfter with AllTablesTest {
import MetaStore._

val metastore = new CassandraMetaStore(ConfigFactory.load)
val metastore = new CassandraMetaStore(ConfigFactory.load("application_test.conf").getConfig("cassandra"))

override def beforeAll() {
super.beforeAll()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package filodb.cassandra.metastore

import com.typesafe.config.ConfigFactory
import com.websudos.phantom.dsl._
import com.websudos.phantom.testkit._
import org.scalatest.BeforeAndAfter
Expand All @@ -11,45 +12,47 @@ import filodb.core._
import filodb.core.metadata.Column

class ColumnTableSpec extends CassandraFlatSpec with BeforeAndAfter {
implicit val keySpace = KeySpace("unittest")

import Column.ColumnType

val firstColumn = Column("first", "foo", 1, ColumnType.StringColumn)

val config = ConfigFactory.load("application_test.conf").getConfig("cassandra")
val columnTable = new ColumnTable(config)
implicit val keySpace = KeySpace(config.getString("keyspace"))

// First create the columns table
override def beforeAll() {
super.beforeAll()
// Note: This is a CREATE TABLE IF NOT EXISTS
Await.result(ColumnTable.create.ifNotExists.future(), 3 seconds)
columnTable.initialize().futureValue
}

before {
Await.result(ColumnTable.truncate.future(), 3 seconds)
columnTable.clearAll().futureValue
}

import scala.concurrent.ExecutionContext.Implicits.global

"ColumnTable" should "return empty schema if a dataset does not exist in columns table" in {
ColumnTable.getSchema("foo", 1).futureValue should equal (Map())
columnTable.getSchema("foo", 1).futureValue should equal (Map())
}

it should "add the first column and read it back as a schema" in {
ColumnTable.insertColumn(firstColumn).futureValue should equal (Success)
ColumnTable.getSchema("foo", 2).futureValue should equal (Map("first" -> firstColumn))
columnTable.insertColumn(firstColumn).futureValue should equal (Success)
columnTable.getSchema("foo", 2).futureValue should equal (Map("first" -> firstColumn))

// Check that limiting the getSchema query to version 0 does not return the version 1 column
ColumnTable.getSchema("foo", 0).futureValue should equal (Map())
columnTable.getSchema("foo", 0).futureValue should equal (Map())
}

it should "return MetadataException if illegal column type encoded in Cassandra" in {
val f = ColumnTable.insert.value(_.dataset, "bar")
val f = columnTable.insert.value(_.dataset, "bar")
.value(_.name, "age")
.value(_.version, 5)
.value(_.columnType, "_so_not_a_real_type")
.future()
f.futureValue

ColumnTable.getSchema("bar", 7).failed.futureValue shouldBe a [MetadataException]
columnTable.getSchema("bar", 7).failed.futureValue shouldBe a [MetadataException]
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package filodb.cassandra.metastore

import com.typesafe.config.ConfigFactory
import com.websudos.phantom.dsl._
import com.websudos.phantom.testkit._
import org.scalatest.BeforeAndAfter
Expand All @@ -11,60 +12,62 @@ import filodb.core._
import filodb.core.metadata.Dataset

class DatasetTableSpec extends CassandraFlatSpec with BeforeAndAfter {
implicit val keySpace = KeySpace("unittest")
val config = ConfigFactory.load("application_test.conf").getConfig("cassandra")
val datasetTable = new DatasetTable(config)
implicit val keySpace = KeySpace(config.getString("keyspace"))

// First create the datasets table
override def beforeAll() {
super.beforeAll()
// Note: This is a CREATE TABLE IF NOT EXISTS
Await.result(DatasetTable.create.ifNotExists.future(), 3 seconds)
datasetTable.initialize().futureValue
}

before {
Await.result(DatasetTable.truncate.future(), 3 seconds)
datasetTable.clearAll().futureValue
}

val fooDataset = Dataset("foo", "someSortCol")

import scala.concurrent.ExecutionContext.Implicits.global

"DatasetTable" should "create a dataset successfully, then return AlreadyExists" in {
whenReady(DatasetTable.createNewDataset(fooDataset)) { response =>
whenReady(datasetTable.createNewDataset(fooDataset)) { response =>
response should equal (Success)
}

// Second time around, dataset already exists
whenReady(DatasetTable.createNewDataset(fooDataset)) { response =>
whenReady(datasetTable.createNewDataset(fooDataset)) { response =>
response should equal (AlreadyExists)
}
}

// Apparently, deleting a nonexisting dataset also returns success. :/

it should "delete a dataset" in {
whenReady(DatasetTable.createNewDataset(fooDataset)) { response =>
whenReady(datasetTable.createNewDataset(fooDataset)) { response =>
response should equal (Success)
}
whenReady(DatasetTable.deleteDataset("foo")) { response =>
whenReady(datasetTable.deleteDataset("foo")) { response =>
response should equal (Success)
}

whenReady(DatasetTable.getDataset("foo").failed) { err =>
whenReady(datasetTable.getDataset("foo").failed) { err =>
err shouldBe a [NotFoundError]
}
}

it should "return NotFoundError when trying to get nonexisting dataset" in {
whenReady(DatasetTable.getDataset("foo").failed) { err =>
whenReady(datasetTable.getDataset("foo").failed) { err =>
err shouldBe a [NotFoundError]
}
}

it should "return the Dataset if it exists" in {
val barDataset = Dataset("bar", "sortCol")
DatasetTable.createNewDataset(barDataset).futureValue should equal (Success)
datasetTable.createNewDataset(barDataset).futureValue should equal (Success)

whenReady(DatasetTable.getDataset("bar")) { dataset =>
whenReady(datasetTable.getDataset("bar")) { dataset =>
dataset should equal (barDataset)
}
}
Expand Down
Loading

0 comments on commit 13ffc15

Please sign in to comment.