Skip to content

Commit

Permalink
Separate execution context for ColumnStore read paths
Browse files Browse the repository at this point in the history
This is to prevent a deadlock when appending segments, where getSegFromCache issues a read future;
if it uses the same write context which is blocking for backpressure then the write will deadlock
waiting for the read future which is waiting for other futures.
  • Loading branch information
velvia committed Feb 2, 2016
1 parent 6d8dc5f commit 042e380
Show file tree
Hide file tree
Showing 10 changed files with 300 additions and 247 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import spray.caching._

import filodb.cassandra.FiloCassandraConnector
import filodb.core._
import filodb.core.store.CachedMergingColumnStore
import filodb.core.store.{CachedMergingColumnStore, ColumnStoreScanner}
import filodb.core.metadata.{Column, Projection}

/**
Expand All @@ -31,23 +31,20 @@ import filodb.core.metadata.{Column, Projection}
*
* ==Constructor Args==
* @param config see the Configuration section above for the needed config
* @param ec An ExecutionContext for futures. See this for a way to do backpressure with futures:
* @param readEc An ExecutionContext for reads. This must be separate from writes to prevent deadlocks.
* @param ec An ExecutionContext for futures for writes. See this for a way to do backpressure with futures:
* http://quantifind.com/blog/2015/06/throttling-instantiations-of-scala-futures-1/
*/
class CassandraColumnStore(config: Config)
class CassandraColumnStore(val config: Config, val readEc: ExecutionContext)
(implicit val ec: ExecutionContext)
extends CachedMergingColumnStore with StrictLogging {
extends CachedMergingColumnStore with CassandraColumnStoreScanner with StrictLogging {
import filodb.core.store._
import Types._
import collection.JavaConverters._

val cassandraConfig = config.getConfig("cassandra")
val tableCacheSize = config.getInt("columnstore.tablecache-size")
val segmentCacheSize = config.getInt("columnstore.segment-cache-size")
logger.info(s"Starting CassandraColumnStore with config $cassandraConfig")

val chunkTableCache = LruCache[ChunkTable](tableCacheSize)
val rowMapTableCache = LruCache[ChunkRowMapTable](tableCacheSize)
val segmentCache = LruCache[Segment](segmentCacheSize)

val mergingStrategy = new AppendingChunkMergingStrategy(this)
Expand Down Expand Up @@ -94,10 +91,64 @@ extends CachedMergingColumnStore with StrictLogging {
yield { resp }
}

def shutdown(): Unit = {
clusterConnector.shutdown()
}

/**
* Splits scans of a dataset across multiple token ranges.
* params:
* splits_per_node - how much parallelism or ways to divide a token range on each node
*
* @return each split will have token_start, token_end, replicas filled in
*/
def getScanSplits(dataset: TableName,
params: Map[String, String] = Map.empty): Seq[Map[String, String]] = {
val metadata = clusterConnector.session.getCluster.getMetadata
val splitsPerNode = params.getOrElse("splits_per_node", "1").toInt
require(splitsPerNode >= 1, s"Must specify at least 1 splits_per_node, got $splitsPerNode")
val tokensByReplica = metadata.getTokenRanges.asScala.toSeq.groupBy { tokenRange =>
metadata.getReplicas(clusterConnector.keySpace.name, tokenRange)
}
val tokenRanges = for { key <- tokensByReplica.keys } yield {
if (tokensByReplica(key).size > 1) {
tokensByReplica(key).reduceLeft(_.mergeWith(_)).splitEvenly(splitsPerNode).asScala
}
else {
tokensByReplica(key).flatMap { range => range.splitEvenly(splitsPerNode).asScala }
}
}
val tokensComplete = tokenRanges.flatMap { token => token } .toSeq
tokensComplete.map { tokenRange =>
val replicas = metadata.getReplicas(clusterConnector.keySpace.name, tokenRange).asScala
Map("token_start" -> tokenRange.getStart.toString,
"token_end" -> tokenRange.getEnd.toString,
"replicas" -> replicas.map(_.toString).mkString(","))
}
}
}

trait CassandraColumnStoreScanner extends ColumnStoreScanner with StrictLogging {
import filodb.core.store._
import Types._
import collection.JavaConverters._

def config: Config

val cassandraConfig = config.getConfig("cassandra")
val tableCacheSize = config.getInt("columnstore.tablecache-size")

val chunkTableCache = LruCache[ChunkTable](tableCacheSize)
val rowMapTableCache = LruCache[ChunkRowMapTable](tableCacheSize)

protected val clusterConnector = new FiloCassandraConnector {
def config: Config = cassandraConfig
}

def readChunks(dataset: TableName,
columns: Set[ColumnId],
keyRange: BinaryKeyRange,
version: Int): Future[Seq[ChunkedData]] = {
version: Int)(implicit ec: ExecutionContext): Future[Seq[ChunkedData]] = {
for { (chunkTable, rowMapTable) <- getSegmentTables(dataset)
data <- Future.sequence(columns.toSeq.map(
chunkTable.readChunks(keyRange.partition, version, _,
Expand All @@ -108,7 +159,8 @@ extends CachedMergingColumnStore with StrictLogging {

def readChunkRowMaps(dataset: TableName,
keyRange: BinaryKeyRange,
version: Int): Future[Iterator[ChunkMapInfo]] = {
version: Int)
(implicit ec: ExecutionContext): Future[Iterator[ChunkMapInfo]] = {
for { (chunkTable, rowMapTable) <- getSegmentTables(dataset)
cassRowMaps <- rowMapTable.getChunkMaps(keyRange, version) }
yield {
Expand All @@ -127,7 +179,8 @@ extends CachedMergingColumnStore with StrictLogging {
*/
def scanChunkRowMaps(dataset: TableName,
version: Int,
params: Map[String, String]): Future[Iterator[ChunkMapInfo]] = {
params: Map[String, String])
(implicit ec: ExecutionContext): Future[Iterator[ChunkMapInfo]] = {
val tokenStart = params("token_start")
val tokenEnd = params("token_end")
for { (chunkTable, rowMapTable) <- getSegmentTables(dataset)
Expand All @@ -139,49 +192,10 @@ extends CachedMergingColumnStore with StrictLogging {
}
}

private val clusterConnector = new FiloCassandraConnector {
def config: Config = cassandraConfig
}

def shutdown(): Unit = {
clusterConnector.shutdown()
}

/**
* Splits scans of a dataset across multiple token ranges.
* params:
* splits_per_node - how much parallelism or ways to divide a token range on each node
*
* @return each split will have token_start, token_end, replicas filled in
*/
def getScanSplits(dataset: TableName,
params: Map[String, String] = Map.empty): Seq[Map[String, String]] = {
val metadata = clusterConnector.session.getCluster.getMetadata
val splitsPerNode = params.getOrElse("splits_per_node", "1").toInt
require(splitsPerNode >= 1, s"Must specify at least 1 splits_per_node, got $splitsPerNode")
val tokensByReplica = metadata.getTokenRanges.asScala.toSeq.groupBy { tokenRange =>
metadata.getReplicas(clusterConnector.keySpace.name, tokenRange)
}
val tokenRanges = for { key <- tokensByReplica.keys } yield {
if (tokensByReplica(key).size > 1) {
tokensByReplica(key).reduceLeft(_.mergeWith(_)).splitEvenly(splitsPerNode).asScala
}
else {
tokensByReplica(key).flatMap { range => range.splitEvenly(splitsPerNode).asScala }
}
}
val tokensComplete = tokenRanges.flatMap { token => token } .toSeq
tokensComplete.map { tokenRange =>
val replicas = metadata.getReplicas(clusterConnector.keySpace.name, tokenRange).asScala
Map("token_start" -> tokenRange.getStart.toString,
"token_end" -> tokenRange.getEnd.toString,
"replicas" -> replicas.map(_.toString).mkString(","))
}
}

// Retrieve handles to the tables for a particular dataset from the cache, creating the instances
// if necessary
def getSegmentTables(dataset: TableName): Future[(ChunkTable, ChunkRowMapTable)] = {
def getSegmentTables(dataset: TableName)
(implicit ec: ExecutionContext): Future[(ChunkTable, ChunkRowMapTable)] = {
val chunkTableFuture = chunkTableCache(dataset) {
logger.debug(s"Creating a new ChunkTable for dataset $dataset")
new ChunkTable(dataset, clusterConnector)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ trait AllTablesTest extends SimpleCassandraTest {
val config = ConfigFactory.load("application_test.conf")
implicit val keySpace = KeySpace(config.getString("cassandra.keyspace"))

lazy val columnStore = new CassandraColumnStore(config)
lazy val columnStore = new CassandraColumnStore(config, context)
lazy val metaStore = new CassandraMetaStore(config.getConfig("cassandra"))

import Column.ColumnType._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class CassandraColumnStoreSpec extends CassandraFlatSpec with ColumnStoreSpec {
import filodb.core.store._
import NamesTestData._

val colStore = new CassandraColumnStore(config)
val colStore = new CassandraColumnStore(config, global)
implicit val keySpace = KeySpace(config.getString("cassandra.keyspace"))

"getScanSplits" should "return splits from Cassandra" in {
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/filodb.core/store/Analyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ object Analyzer {
val NumChunksPerSegmentBucketKeys = Array(0, 5, 10, 25, 50, 100)
val NumRowsPerSegmentBucketKeys = Array(0, 10, 100, 1000, 5000, 10000, 50000)

import scala.concurrent.ExecutionContext.Implicits.global

def analyze(cs: CachedMergingColumnStore, dataset: TableName, version: Int): ColumnStoreAnalysis = {
var numSegments = 0
var rowsInSegment: Histogram = Histogram.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ trait ChunkMergingStrategy {
*/
def readSegmentForCache(projection: RichProjection,
version: Int)(
segInfo: SegmentInfo[projection.PK, projection.SK]): Future[Segment]
segInfo: SegmentInfo[projection.PK, projection.SK])
(implicit ec: ExecutionContext): Future[Segment]

/**
* Merges an existing segment cached using readSegmentForCache with a new partial segment to be inserted.
Expand Down Expand Up @@ -60,7 +61,6 @@ trait ChunkMergingStrategy {
* @param getSortColumn a function that returns the sort column given a dataset
*/
class AppendingChunkMergingStrategy(columnStore: ColumnStore)
(implicit ec: ExecutionContext)
extends ChunkMergingStrategy with StrictLogging {
/**
* Reads only the row key-relevant columns from a segment.
Expand All @@ -70,7 +70,8 @@ extends ChunkMergingStrategy with StrictLogging {
*/
def readSegmentForCache(projection: RichProjection,
version: Int)(
segInfo: SegmentInfo[projection.PK, projection.SK]): Future[Segment] = {
segInfo: SegmentInfo[projection.PK, projection.SK])
(implicit ec: ExecutionContext): Future[Segment] = {
val keyRange = KeyRange(segInfo.partition, segInfo.segment, segInfo.segment, endExclusive = false)
columnStore.readSegments(projection, projection.columns, version)(keyRange).map { iter =>
iter.toSeq.headOption match {
Expand Down
Loading

0 comments on commit 042e380

Please sign in to comment.