Skip to content

Commit

Permalink
Merge segmentless changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Evan Chan committed Jan 24, 2017
2 parents 798a01e + ee89445 commit 6bb4804
Show file tree
Hide file tree
Showing 59 changed files with 1,955 additions and 1,400 deletions.
122 changes: 63 additions & 59 deletions README.md

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ lazy val stress = (project in file("stress"))
.settings(name := "filodb-stress")
.settings(libraryDependencies ++= stressDeps)
.settings(assemblySettings:_*)
.settings(assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false))
.dependsOn(spark)

val cassDriverVersion = "3.0.2"
Expand Down Expand Up @@ -93,13 +94,15 @@ lazy val coreDeps = commonDeps ++ Seq(
"org.slf4j" % "slf4j-api" % "1.7.10",
"com.beachape" %% "enumeratum" % "1.2.1",
"org.velvia.filo" %% "filo-scala" % "0.2.6",
"io.monix" %% "monix" % "2.1.1",
"joda-time" % "joda-time" % "2.2",
"org.joda" % "joda-convert" % "1.2",
"io.spray" %% "spray-caching" % "1.3.2",
"com.googlecode.concurrentlinkedhashmap" % "concurrentlinkedhashmap-lru" % "1.4",
"net.ceedubs" %% "ficus" % "1.1.2",
"org.scodec" %% "scodec-bits" % "1.0.10",
"io.fastjson" % "boon" % "0.33",
"com.github.alexandrnikitin" %% "bloom-filter" % "0.7.0",
"com.github.rholder.fauxflake" % "fauxflake-core" % "1.1.0",
"org.scalactic" %% "scalactic" % "2.2.6",
"com.markatta" %% "futiles" % "1.1.3",
"com.nativelibs4java" %% "scalaxy-loops" % "0.3.3" % "provided",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.util.HashSet
import kamon.trace.{TraceContext, Tracer}
import monix.reactive.Observable

import scala.concurrent.{ExecutionContext, Future}
import filodb.cassandra.{DefaultFiloSessionProvider, FiloCassandraConnector, FiloSessionProvider}
import filodb.core._
import filodb.core.binaryrecord.BinaryRecord
import filodb.core.store._
import filodb.core.metadata.{Column, Projection, RichProjection}
import filodb.core.query.{PartitionChunkIndex, ChunkIDPartitionChunkIndex}

/**
* Implementation of a column store using Apache Cassandra tables.
Expand Down Expand Up @@ -122,10 +125,9 @@ extends ColumnStore with CassandraColumnStoreScanner with StrictLogging {
ctx: TraceContext): Future[Response] = {
asyncSubtrace("write-chunks", "ingestion", Some(ctx)) {
val binPartition = segment.binaryPartition
val segmentId = segment.segmentId
val chunkTable = getOrCreateChunkTable(dataset)
Future.traverse(segment.chunkSets) { chunkSet =>
chunkTable.writeChunks(binPartition, version, segmentId, chunkSet.info.id, chunkSet.chunks, stats)
chunkTable.writeChunks(binPartition, version, chunkSet.info.id, chunkSet.chunks, stats)
}.map { responses => responses.head }
}
}
Expand All @@ -139,7 +141,7 @@ extends ColumnStore with CassandraColumnStoreScanner with StrictLogging {
val indices = segment.chunkSets.map { case ChunkSet(info, skips, _, _, _) =>
(info.id, ChunkSetInfo.toBytes(projection, info, skips))
}
indexTable.writeIndices(segment.binaryPartition, version, segment.segmentId, indices, stats)
indexTable.writeIndices(segment.binaryPartition, version, indices, stats)
}
}

Expand All @@ -150,7 +152,7 @@ extends ColumnStore with CassandraColumnStoreScanner with StrictLogging {
asyncSubtrace("write-filter", "ingestion", Some(ctx)) {
val filterTable = getOrCreateFilterTable(projection.datasetRef)
val filters = segment.chunkSets.map { case ChunkSet(info, _, filter, _, _) => (info.id, filter) }
filterTable.writeFilters(segment.binaryPartition, version, segment.segmentId, filters, stats)
filterTable.writeFilters(segment.binaryPartition, version, filters, stats)
}
}

Expand Down Expand Up @@ -238,28 +240,41 @@ trait CassandraColumnStoreScanner extends ColumnStoreScanner with StrictLogging
val sessionProvider = filoSessionProvider.getOrElse(new DefaultFiloSessionProvider(cassandraConfig))
}

def readChunks(dataset: DatasetRef,
columns: Set[ColumnId],
keyRange: BinaryKeyRange,
version: Int)(implicit ec: ExecutionContext): Future[Seq[ChunkedData]] = {
val chunkTable = getOrCreateChunkTable(dataset)
Future.sequence(columns.toSeq.map(
chunkTable.readChunks(keyRange.partition, version, _,
keyRange.start, keyRange.end,
keyRange.endExclusive)))
}
// Produce an empty stream of chunks so that results can still be returned correctly
private def emptyChunkStream(infosSkips: Seq[(ChunkSetInfo, Array[Int])], colNo: Int):
Observable[SingleChunkInfo] =
Observable.fromIterator(infosSkips.toIterator.map { case (info, skips) =>
//scalastyle:off
SingleChunkInfo(info.id, colNo, null.asInstanceOf[ByteBuffer])
//scalastyle:on
})

def readChunks(dataset: DatasetRef,
version: Int,
columns: Seq[ColumnId],
partition: Types.BinaryPartition,
segment: Types.SegmentId,
chunkRange: (Types.ChunkID, Types.ChunkID))
(implicit ec: ExecutionContext): Future[Seq[ChunkedData]] = {
def readPartitionChunks(dataset: DatasetRef,
version: Int,
columns: Seq[Column],
partitionIndex: PartitionChunkIndex,
chunkMethod: ChunkScanMethod): Observable[ChunkPipeItem] = {
val chunkTable = getOrCreateChunkTable(dataset)
Future.sequence(columns.toSeq.map(
chunkTable.readChunks(partition, version, _,
segment, chunkRange)))
val colsWithIndex = columns.map(_.name).zipWithIndex

// For now, use a rowkey-sorted PartitionChunkIndex. If storage layout changes to chunkID order,
// then we'd have to do something else.
logger.debug(s"Reading chunks from columns $columns, ${partitionIndex.binPartition}, method $chunkMethod")
val (rangeQuery, infosSkips) = chunkMethod match {
case AllChunkScan => (true, partitionIndex.allChunks)
case RowKeyChunkScan(k1, k2) => (false, partitionIndex.rowKeyRange(k1.binRec, k2.binRec))
case SingleChunkScan(key, id) => (false, partitionIndex.singleChunk(key.binRec, id))
}

val groupedInfos = infosSkips.grouped(10) // TODO: group by # of rows read

Observable.fromIterator(groupedInfos).flatMap { infosSkipsGroup =>
val groupedIds = infosSkipsGroup.map(_._1.id)
val chunkStreams = colsWithIndex.map { case (col, index) =>
chunkTable.readChunks(partitionIndex.binPartition, version, col, index, groupedIds, rangeQuery)
.switchIfEmpty(emptyChunkStream(infosSkipsGroup, index)) }
Observable.now(ChunkPipeInfos(infosSkipsGroup)) ++ Observable.merge(chunkStreams:_*)
}
}

def readFilters(dataset: DatasetRef,
Expand All @@ -269,74 +284,49 @@ trait CassandraColumnStoreScanner extends ColumnStoreScanner with StrictLogging
chunkRange: (Types.ChunkID, Types.ChunkID))
(implicit ec: ExecutionContext): Future[Iterator[SegmentState.IDAndFilter]] = {
val filterTable = getOrCreateFilterTable(dataset)
filterTable.readFilters(partition, version, segment, chunkRange._1, chunkRange._2)
}

def multiPartRangeScan(projection: RichProjection,
keyRanges: Seq[KeyRange[_, _]],
indexTable: IndexTable,
version: Int)
(implicit ec: ExecutionContext): Future[Iterator[IndexRecord]] = {
val futureRows = keyRanges.map { range => {
indexTable.getIndices(projection.toBinaryKeyRange(range), version)
}}
Future.sequence(futureRows).map { rows => rows.flatten.toIterator }
filterTable.readFilters(partition, version, chunkRange._1, chunkRange._2)
}

def multiPartScan(projection: RichProjection,
partitions: Seq[Any],
indexTable: IndexTable,
version: Int)
(implicit ec: ExecutionContext): Future[Iterator[IndexRecord]] = {
val futureRows = partitions.map { partition => {
version: Int): Observable[IndexRecord] = {
// Get each partition index observable concurrently. As observables they are lazy
val its = partitions.map { partition =>
val binPart = projection.partitionType.toBytes(partition.asInstanceOf[projection.PK])
indexTable.getIndices(binPart, version)
}}
Future.sequence(futureRows).map { rows => rows.flatten.toIterator }
}
Observable.concat(its :_*)
}

def scanIndices(projection: RichProjection,
version: Int,
method: ScanMethod)
(implicit ec: ExecutionContext):
Future[Iterator[SegmentIndex[projection.PK, projection.SK]]] = {
def scanPartitions(projection: RichProjection,
version: Int,
partMethod: PartitionScanMethod): Observable[PartitionChunkIndex] = {
val indexTable = getOrCreateIndexTable(projection.datasetRef)
val filterFunc = method match {
case FilteredPartitionScan(_, filterFunc) => filterFunc
case FilteredPartitionRangeScan(_, segRange, filterFunc) => filterFunc
case other: ScanMethod => (x: Any) => true
}
val futIndexRecords = method match {
logger.debug(s"Scanning partitions for ${projection.datasetRef} with method $partMethod...")
val (filterFunc, indexRecords) = partMethod match {
case SinglePartitionScan(partition) =>
val binPart = projection.partitionType.toBytes(partition.asInstanceOf[projection.PK])
indexTable.getIndices(binPart, version)

case SinglePartitionRangeScan(k) =>
indexTable.getIndices(projection.toBinaryKeyRange(k), version)
((x: Any) => true, indexTable.getIndices(binPart, version))

case MultiPartitionScan(partitions) =>
multiPartScan(projection, partitions, indexTable, version)

case MultiPartitionRangeScan(keyRanges) =>
multiPartRangeScan(projection, keyRanges, indexTable, version)
((x: Any) => true, multiPartScan(projection, partitions, indexTable, version))

case FilteredPartitionScan(CassandraTokenRangeSplit(tokens, _), _) =>
indexTable.scanIndices(version, tokens)
case FilteredPartitionScan(CassandraTokenRangeSplit(tokens, _), func) =>
(func, indexTable.scanIndices(version, tokens))

case FilteredPartitionRangeScan(CassandraTokenRangeSplit(tokens, _), segRange, _) =>
val binRange = projection.toBinarySegRange(segRange)
indexTable.scanIndicesRange(version, tokens, binRange.start, binRange.end)

case other: ScanMethod => ???
}
futIndexRecords.map { indexIt =>
indexIt.sortedGroupBy(index => (index.binPartition, index.segmentId))
.filter { case ((binPart, _), _) => filterFunc(projection.partitionType.fromBytes(binPart)) }
.map { case ((binPart, binSeg), records) =>
val skips = records.map { r => ChunkSetInfo.fromBytes(projection, r.data.array) }.toBuffer
toSegIndex(projection, binPart, binSeg, skips)
}
case other: PartitionScanMethod => ???
}
indexRecords.sortedGroupBy(_.binPartition)
.collect { case (binPart, binIndices)
if filterFunc(projection.partitionType.fromBytes(binPart)) =>
val newIndex = new ChunkIDPartitionChunkIndex(binPart, projection)
binIndices.foreach { binIndex =>
val (info, skips) = ChunkSetInfo.fromBytes(projection, binIndex.data.array)
newIndex.add(info, skips)
}
newIndex
}
}

def getOrCreateChunkTable(dataset: DatasetRef): ChunkTable = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package filodb.cassandra.columnstore

import com.datastax.driver.core.Row
import com.datastax.driver.core.{BoundStatement, Row}
import java.nio.ByteBuffer
import java.lang.{Long => jlLong, Integer => jlInt}
import monix.reactive.Observable
import scala.concurrent.{ExecutionContext, Future}
import scodec.bits._

import filodb.cassandra.FiloCassandraConnector
import filodb.core._
import filodb.core.store.{ColumnStoreStats, ChunkedData, compress, decompress}
import filodb.core.store.{ColumnStoreStats, ChunkSetInfo, SingleChunkInfo, compress, decompress}

/**
* Represents the table which holds the actual columnar chunks for segments
Expand All @@ -31,87 +33,65 @@ sealed class ChunkTable(val dataset: DatasetRef, val connector: FiloCassandraCon
| partition blob,
| version int,
| columnname text,
| segmentid blob,
| chunkid int,
| chunkid bigint,
| data blob,
| PRIMARY KEY ((partition, version), columnname, segmentid, chunkid)
| PRIMARY KEY ((partition, version), columnname, chunkid)
|) WITH COMPACT STORAGE AND compression = {
'sstable_compression': '$sstableCompression'}""".stripMargin

lazy val writeChunksCql = session.prepare(
s"""INSERT INTO $tableString (partition, version, segmentid, chunkid, columnname, data
|) VALUES (?, ?, ?, ?, ?, ?)""".stripMargin
s"""INSERT INTO $tableString (partition, version, chunkid, columnname, data
|) VALUES (?, ?, ?, ?, ?)""".stripMargin
)

def writeChunks(partition: Types.BinaryPartition,
version: Int,
segmentId: Types.SegmentId,
chunkId: Types.ChunkID,
chunks: Map[String, ByteBuffer],
stats: ColumnStoreStats): Future[Response] = {
val partBytes = toBuffer(partition)
val segKeyBytes = toBuffer(segmentId)
var chunkBytes = 0L
val statements = chunks.map { case (columnName, bytes) =>
val finalBytes = compressChunk(bytes)
chunkBytes += finalBytes.capacity.toLong
writeChunksCql.bind(partBytes, version: java.lang.Integer, segKeyBytes,
chunkId: java.lang.Integer, columnName, finalBytes)
writeChunksCql.bind(partBytes, version: jlInt, chunkId: jlLong, columnName, finalBytes)
}.toSeq
stats.addChunkWriteStats(statements.length, chunkBytes)
connector.execStmt(unloggedBatch(statements))
}

val readChunksCql = s"""SELECT segmentid, chunkid, data FROM $tableString WHERE
| columnname = ? AND partition = ? AND version = ? AND
| segmentid >= ? AND """.stripMargin

lazy val readChunksCqlExcl = session.prepare(readChunksCql + "segmentid < ?")
lazy val readChunksCqlIncl = session.prepare(readChunksCql + "segmentid <= ?")

// Reads back all the chunks from the requested column for the segments falling within
// the starting and ending segment IDs. No paging is performed - so be sure to not
// ask for too large of a range. Also, beware the starting segment ID must line up with the
// segment boundary.
// endExclusive indicates if the end segment ID is exclusive or not.
def readChunks(partition: Types.BinaryPartition,
version: Int,
column: String,
startSegmentId: Types.SegmentId,
untilSegmentId: Types.SegmentId,
endExclusive: Boolean = true): Future[ChunkedData] = {
val query = (if (endExclusive) readChunksCqlExcl else readChunksCqlIncl).bind(
column, toBuffer(partition), version: java.lang.Integer,
toBuffer(startSegmentId), toBuffer(untilSegmentId))
session.executeAsync(query).toScalaFuture.map { rs =>
val rows = rs.all().asScala
val byteVectorChunks = rows.map { row => (ByteVector(row.getBytes(0)),
row.getInt(1),
decompressChunk(row.getBytes(2))) }
ChunkedData(column, byteVectorChunks)
}
}
lazy val readChunkInCql = session.prepare(
s"""SELECT chunkid, data FROM $tableString WHERE
| columnname = ? AND partition = ? AND version = ?
| AND chunkid IN ?""".stripMargin)

lazy val readChunkRangeCql = session.prepare(
s"""SELECT chunkid, data FROM $tableString WHERE
| columnname = ? AND partition = ? AND version = ? AND
| segmentid = ? AND chunkid >= ? AND chunkid <= ?""".stripMargin)
| columnname = ? AND partition = ? AND version = ?
| AND chunkid >= ? AND chunkid <= ?""".stripMargin)

def readChunks(partition: Types.BinaryPartition,
version: Int,
column: String,
segmentId: Types.SegmentId,
chunkRange: (Types.ChunkID, Types.ChunkID)): Future[ChunkedData] = {
val query = readChunkRangeCql.bind(column, toBuffer(partition), version: java.lang.Integer,
toBuffer(segmentId),
chunkRange._1: java.lang.Integer,
chunkRange._2: java.lang.Integer)
session.executeAsync(query).toScalaFuture.map { rs =>
val rows = rs.all().asScala
val byteVectorChunks = rows.map { row => (segmentId,
row.getInt(0),
decompressChunk(row.getBytes(1))) }
ChunkedData(column, byteVectorChunks)
colNo: Int,
chunkIds: Seq[Types.ChunkID],
rangeQuery: Boolean = false): Observable[SingleChunkInfo] = {
val query = if (rangeQuery) {
readChunkRangeCql.bind(column, toBuffer(partition), version: jlInt,
chunkIds.head: jlLong, chunkIds.last: jlLong)
} else {
readChunkInCql.bind(column, toBuffer(partition), version: jlInt, chunkIds.asJava)
}
Observable.fromFuture(execReadChunk(colNo, query))
.flatMap(it => Observable.fromIterator(it))
}

private def execReadChunk(colNo: Int,
query: BoundStatement): Future[Iterator[SingleChunkInfo]] = {
session.executeAsync(query).toIterator.map { rowIt =>
rowIt.map { row =>
SingleChunkInfo(row.getLong(0), colNo, decompressChunk(row.getBytes(1)))
}
}
}

Expand Down
Loading

0 comments on commit 6bb4804

Please sign in to comment.