Skip to content

Commit

Permalink
feat(core): Simplify Cassandra Schema for Index Data Persistence (fil…
Browse files Browse the repository at this point in the history
…odb#551)

Older time bucketed persistence model for part key data is now changing. This PR:
* Vastly simplifies partKey/start+endTime persistence in Cassandra with new simplified schema. We employ a table per shard for efficient download of entire table's contents using token range scans.
* Vastly simplifies partKey persistence logic in TimeSeriesShard. Earlier time bucket management and roll-over of keys is gone with simple flush of keys when necessary.
* Simplification of index data recovery. No need to maintain partId map at the beginning because they are guaranteed to not repeat. Much less memory is required for bootstrap. Especially needed for long term data retention.

This change is absolutely necessary for the FiloDB cluster that will handle serving downsampled data. The old schema is operationally very expensive and not viable for long retention times. It is also essential for simple execution of chunk repair across DCs.
  • Loading branch information
vishramachandran authored Nov 19, 2019
1 parent 016621e commit 31d603b
Show file tree
Hide file tree
Showing 32 changed files with 539 additions and 740 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._

import com.datastax.driver.core.{ConsistencyLevel, Metadata, TokenRange}
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap
import com.typesafe.config.Config
import com.typesafe.scalalogging.StrictLogging
import kamon.Kamon
import monix.eval.Task
import monix.execution.Scheduler
import monix.reactive.Observable

import filodb.cassandra.{DefaultFiloSessionProvider, FiloCassandraConnector, FiloSessionProvider, Util}
import filodb.cassandra.{DefaultFiloSessionProvider, FiloCassandraConnector, FiloSessionProvider}
import filodb.core._
import filodb.core.store._
import filodb.memory.BinaryRegionLarge
Expand Down Expand Up @@ -59,42 +60,50 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {

val sinkStats = new ChunkSinkStats

def initialize(dataset: DatasetRef): Future[Response] = {
def initialize(dataset: DatasetRef, numShards: Int): Future[Response] = {
val chunkTable = getOrCreateChunkTable(dataset)
val partIndexTable = getOrCreatePartitionIndexTable(dataset)
val partKeyTablesInit = Observable.fromIterable(0.until(numShards)).map { s =>
getOrCreatePartitionKeysTable(dataset, s)
}.mapAsync(t => Task.fromFuture(t.initialize())).toListL
clusterConnector.createKeyspace(chunkTable.keyspace)
val indexTable = getOrCreateIngestionTimeIndexTable(dataset)
// Important: make sure nodes are in agreement before any schema changes
clusterMeta.checkSchemaAgreement()
for { ctResp <- chunkTable.initialize()
ixResp <- indexTable.initialize()
pitResp <- partIndexTable.initialize() } yield pitResp
for { ctResp <- chunkTable.initialize() if ctResp == Success
ixResp <- indexTable.initialize() if ixResp == Success
partKeyTablesResp <- partKeyTablesInit.runAsync if partKeyTablesResp.forall( _ == Success)
} yield Success
}

def truncate(dataset: DatasetRef): Future[Response] = {
def truncate(dataset: DatasetRef, numShards: Int): Future[Response] = {
logger.info(s"Clearing all data for dataset ${dataset}")
val chunkTable = getOrCreateChunkTable(dataset)
val partKeyTablesTrunc = Observable.fromIterable(0.until(numShards)).map { s =>
getOrCreatePartitionKeysTable(dataset, s)
}.mapAsync(t => Task.fromFuture(t.clearAll())).toListL
val indexTable = getOrCreateIngestionTimeIndexTable(dataset)
val partIndexTable = getOrCreatePartitionIndexTable(dataset)
clusterMeta.checkSchemaAgreement()
for { ctResp <- chunkTable.clearAll()
ixResp <- indexTable.clearAll()
pitResp <- partIndexTable.clearAll() } yield pitResp
for { ctResp <- chunkTable.clearAll() if ctResp == Success
ixResp <- indexTable.clearAll() if ixResp == Success
partKeyTablesResp <- partKeyTablesTrunc.runAsync if partKeyTablesResp.forall( _ == Success)
} yield Success
}

def dropDataset(dataset: DatasetRef): Future[Response] = {
def dropDataset(dataset: DatasetRef, numShards: Int): Future[Response] = {
val chunkTable = getOrCreateChunkTable(dataset)
val indexTable = getOrCreateIngestionTimeIndexTable(dataset)
val partIndexTable = getOrCreatePartitionIndexTable(dataset)
val partKeyTablesDrop = Observable.fromIterable(0.until(numShards)).map { s =>
getOrCreatePartitionKeysTable(dataset, s)
}.mapAsync(t => Task.fromFuture(t.drop())).toListL
clusterMeta.checkSchemaAgreement()
for { ctResp <- chunkTable.drop() if ctResp == Success
ixResp <- indexTable.drop() if ixResp == Success
pitResp <- partIndexTable.drop() if pitResp == Success }
yield {
for {ctResp <- chunkTable.drop() if ctResp == Success
ixResp <- indexTable.drop() if ixResp == Success
partKeyTablesResp <- partKeyTablesDrop.runAsync if partKeyTablesResp.forall(_ == Success)
} yield {
chunkTableCache.remove(dataset)
indexTableCache.remove(dataset)
partitionIndexTableCache.remove(dataset)
pitResp
partitionKeysTableCache.remove(dataset)
Success
}
}

Expand Down Expand Up @@ -240,22 +249,26 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
def unwrapTokenRanges(wrappedRanges : Seq[TokenRange]): Seq[TokenRange] =
wrappedRanges.flatMap(_.unwrap().asScala.toSeq)

def getPartKeyTimeBucket(ref: DatasetRef, shardNum: Int, timeBucket: Int): Observable[PartKeyTimeBucketSegment] = {
getOrCreatePartitionIndexTable(ref).getPartKeySegments(shardNum, timeBucket)
def scanPartKeys(ref: DatasetRef, shard: Int): Observable[PartKeyRecord] = {
val table = getOrCreatePartitionKeysTable(ref, shard)
Observable.fromIterable(getScanSplits(ref)).flatMap { tokenRange =>
table.scanPartKeys(tokenRange.asInstanceOf[CassandraTokenRangeSplit].tokens, shard)
}
}

def writePartKeyTimeBucket(ref: DatasetRef,
shardNum: Int,
timeBucket: Int,
partitionIndex: Seq[Array[Byte]],
diskTimeToLive: Int): Future[Response] = {

val table = getOrCreatePartitionIndexTable(ref)
val write = table.writePartKeySegments(shardNum, timeBucket, partitionIndex.map(Util.toBuffer(_)), diskTimeToLive)
write.map { response =>
sinkStats.indexTimeBucketWritten(partitionIndex.map(_.length).sum)
response
}
def writePartKeys(ref: DatasetRef, shard: Int,
partKeys: Observable[PartKeyRecord], diskTTLSeconds: Int): Future[Response] = {
val table = getOrCreatePartitionKeysTable(ref, shard)
val span = Kamon.buildSpan("write-part-keys").start()
val ret = partKeys.mapAsync(writeParallelism) { pk =>
val ttl = if (pk.endTime == Long.MaxValue) -1 else diskTTLSeconds
Task.fromFuture(table.writePartKey(pk, ttl)).map { resp =>
sinkStats.partKeysWrite(1)
resp
}
}.findL(_.isInstanceOf[ErrorResponse]).map(_.getOrElse(Success)).runAsync
ret.onComplete(_ => span.finish())
ret
}
}

Expand All @@ -281,7 +294,8 @@ trait CassandraChunkSource extends RawChunkSource with StrictLogging {

val chunkTableCache = concurrentCache[DatasetRef, TimeSeriesChunksTable](tableCacheSize)
val indexTableCache = concurrentCache[DatasetRef, IngestionTimeIndexTable](tableCacheSize)
val partitionIndexTableCache = concurrentCache[DatasetRef, PartitionIndexTable](tableCacheSize)
val partitionKeysTableCache = concurrentCache[DatasetRef,
ConcurrentLinkedHashMap[Int, PartitionKeysTable]](tableCacheSize)

protected val clusterConnector = new FiloCassandraConnector {
def config: Config = cassandraConfig
Expand Down Expand Up @@ -342,9 +356,13 @@ trait CassandraChunkSource extends RawChunkSource with StrictLogging {
new IngestionTimeIndexTable(dataset, clusterConnector)(readEc) })
}

def getOrCreatePartitionIndexTable(dataset: DatasetRef): PartitionIndexTable = {
partitionIndexTableCache.getOrElseUpdate(dataset, { dataset: DatasetRef =>
new PartitionIndexTable(dataset, clusterConnector, ingestionConsistencyLevel)(readEc)

def getOrCreatePartitionKeysTable(dataset: DatasetRef, shard: Int): PartitionKeysTable = {
val map = partitionKeysTableCache.getOrElseUpdate(dataset, { _ =>
concurrentCache[Int, PartitionKeysTable](tableCacheSize)
})
map.getOrElseUpdate(shard, { shard: Int =>
new PartitionKeysTable(dataset, shard, clusterConnector, ingestionConsistencyLevel)(readEc)
})
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package filodb.cassandra.columnstore

import java.lang.{Integer => JInt, Long => JLong}

import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}

import com.datastax.driver.core.ConsistencyLevel
import monix.reactive.Observable

import filodb.cassandra.FiloCassandraConnector
import filodb.core.{DatasetRef, Response}
import filodb.core.store.PartKeyRecord

sealed class PartitionKeysTable(val dataset: DatasetRef,
val shard: Int,
val connector: FiloCassandraConnector,
writeConsistencyLevel: ConsistencyLevel)
(implicit ec: ExecutionContext) extends BaseDatasetTable {

import filodb.cassandra.Util._

val suffix = s"partitionkeys_$shard"

val createCql =
s"""CREATE TABLE IF NOT EXISTS $tableString (
| partKey blob,
| startTime bigint,
| endTime bigint,
| PRIMARY KEY (partKey)
|) WITH compression = {'chunk_length_in_kb': '16', 'sstable_compression': '$sstableCompression'}""".stripMargin

lazy val writePartitionCql =
session.prepare(
s"INSERT INTO ${tableString} (partKey, startTime, endTime) VALUES (?, ?, ?) USING TTL ?")
.setConsistencyLevel(writeConsistencyLevel)

lazy val writePartitionCqlNoTtl =
session.prepare(
s"INSERT INTO ${tableString} (partKey, startTime, endTime) VALUES (?, ?, ?)")
.setConsistencyLevel(writeConsistencyLevel)

def writePartKey(pk: PartKeyRecord, diskTimeToLive: Int): Future[Response] = {
if (diskTimeToLive <= 0) {
connector.execStmtWithRetries(writePartitionCqlNoTtl.bind(
toBuffer(pk.partKey), pk.startTime: JLong, pk.endTime: JLong))
} else {
connector.execStmtWithRetries(writePartitionCql.bind(
toBuffer(pk.partKey), pk.startTime: JLong, pk.endTime: JLong, diskTimeToLive: JInt))
}
}

def scanPartKeys(tokens: Seq[(String, String)], shard: Int): Observable[PartKeyRecord] = {
def cql(start: String, end: String): String =
s"SELECT * FROM ${tableString} " +
s"WHERE TOKEN(partKey) >= $start AND TOKEN(partKey) < $end "
val it = tokens.iterator.flatMap { case (start, end) =>
session.execute(cql(start, end)).iterator.asScala
.map { row => PartKeyRecord(row.getBytes("partKey").array(),
row.getLong("startTime"), row.getLong("endTime")) }
}
Observable.fromIterator(it).handleObservableErrors
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,4 @@ class CassandraMetaStore(config: Config, filoSessionProvider: Option[FiloSession
}
}

/**
* Record highest time bucket for part key indexable data in meta store
*/
def writeHighestIndexTimeBucket(dataset: DatasetRef, shardNum: Int,
highestTimeBucket: Int): Future[Response] = {
checkpointTable.writeHighestIndexTimeBucket(dataset, shardNum, highestTimeBucket)
}

/**
* Read highest time bucket for part key indexable data in meta store
*/
def readHighestIndexTimeBucket(dataset: DatasetRef,
shardNum: Int): Future[Option[Int]] = {
checkpointTable.readHighestIndexTimeBucket(dataset, shardNum)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ sealed class CheckpointTable(val config: Config,
| databasename text,
| datasetname text,
| shardnum int,
| highesttimebucket int STATIC,
| groupnum int,
| offset bigint,
| PRIMARY KEY ((databasename, datasetname, shardnum), groupnum)
Expand All @@ -41,19 +40,6 @@ sealed class CheckpointTable(val config: Config,
| shardnum = ? """.stripMargin).setConsistencyLevel(ConsistencyLevel.QUORUM)
// we want consistent reads during recovery

lazy val readTimeBucketCql =
session.prepare(
s"""SELECT highesttimebucket FROM $tableString WHERE
| databasename = ? AND
| datasetname = ? AND
| shardnum = ? """.stripMargin).setConsistencyLevel(ConsistencyLevel.QUORUM)

lazy val writeTimeBucketCql =
session.prepare(
s"""INSERT INTO $tableString (databasename, datasetname, shardnum, highesttimebucket)
| VALUES (?, ?, ?, ?)""".stripMargin
)

lazy val writeCheckpointCql = {
val statement = session.prepare(
s"""INSERT INTO $tableString (databasename, datasetname, shardnum, groupnum, offset)
Expand Down Expand Up @@ -82,16 +68,4 @@ sealed class CheckpointTable(val config: Config,
.toIterator // future of Iterator
.map { it => it.map(r => r.getInt(0) -> r.getLong(1)).toMap }
}

def writeHighestIndexTimeBucket(dataset: DatasetRef, shardNum: Int, highestTimeBucket: Int): Future[Response] = {
// TODO database name should not be an optional in internally since there is a default value. Punted for later.
execStmt(writeTimeBucketCql.bind(dataset.database.getOrElse(""),
dataset.dataset, shardNum: JInt, highestTimeBucket: JInt))
}

def readHighestIndexTimeBucket(dataset: DatasetRef, shardNum: Int): Future[Option[Int]] = {
session.executeAsync(readTimeBucketCql.bind(dataset.database.getOrElse(""),
dataset.dataset, shardNum: JInt)).toScalaFuture.map { rs => Option(rs.one()).map(_.getInt("highesttimebucket")) }
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,23 @@ class MemstoreCassandraSinkSpec extends AllTablesTest {
import MachineMetricsData._

val memStore = new TimeSeriesMemStore(config, columnStore, metaStore)
val numShards = 4

// First create the tables in C*
override def beforeAll(): Unit = {
super.beforeAll()
metaStore.initialize().futureValue
columnStore.initialize(dataset1.ref).futureValue
columnStore.initialize(dataset1.ref, numShards).futureValue
}

before {
columnStore.truncate(dataset1.ref).futureValue
columnStore.truncate(dataset1.ref, numShards).futureValue
metaStore.clearAllData().futureValue
}

it("should flush MemStore data to C*, and be able to read back data from C* directly") {
memStore.setup(dataset1.ref, Schemas(dataset1.schema), 0, TestData.storeConf)
memStore.store.sinkStats.chunksetsWritten shouldEqual 0
memStore.store.sinkStats.chunksetsWritten.get shouldEqual 0

// Flush every ~50 records
val start = System.currentTimeMillis
Expand All @@ -44,8 +45,8 @@ class MemstoreCassandraSinkSpec extends AllTablesTest {
Thread sleep 1000

// Two flushes and 3 chunksets have been flushed
memStore.store.sinkStats.chunksetsWritten should be >= 3
memStore.store.sinkStats.chunksetsWritten should be <= 4
memStore.store.sinkStats.chunksetsWritten.get should be >= 3
memStore.store.sinkStats.chunksetsWritten.get should be <= 4

memStore.refreshIndexForTesting(dataset1.ref)
// Verify data still in MemStore... all of it
Expand Down
Loading

0 comments on commit 31d603b

Please sign in to comment.