Skip to content

Commit

Permalink
add support for multiple versions (hortonworks-spark#190)
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
* fix MAX_VERSIONS flag usage (it was not used)
* give the ability to create table which holds more than one version
* give the ability to fetch more than one (latest) view of the same entity
* you can set number of versions to fetch

## How was this patch tested?
It contains own suite with few integration tests.

* add support for multiple versions
* revert unneeded changes
* fix typo
* Cleaned & added comments
* update some comments
* update licence header
  • Loading branch information
btomala authored and weiqingy committed Nov 2, 2017
1 parent b8695a2 commit 42020d8
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ case class HBaseRelation(
val minStamp = parameters.get(HBaseRelation.MIN_STAMP).map(_.toLong)
val maxStamp = parameters.get(HBaseRelation.MAX_STAMP).map(_.toLong)
val maxVersions = parameters.get(HBaseRelation.MAX_VERSIONS).map(_.toInt)
val mergeToLatest = parameters.get(HBaseRelation.MERGE_TO_LATEST).map(_.toBoolean).getOrElse(true)

val catalog = HBaseTableCatalog(parameters)

Expand Down Expand Up @@ -155,6 +156,7 @@ case class HBaseRelation(
cfs.foreach { x =>
val cf = new HColumnDescriptor(x.getBytes())
logDebug(s"add family $x to ${catalog.name}")
maxVersions.foreach(v => cf.setMaxVersions(v))
tableDesc.addFamily(cf)
}
val startKey = catalog.shcTableCoder.toBytes("aaaaaaa")
Expand Down Expand Up @@ -322,6 +324,7 @@ object HBaseRelation {
val TIMESTAMP = "timestamp"
val MIN_STAMP = "minStamp"
val MAX_STAMP = "maxStamp"
val MERGE_TO_LATEST = "mergeToLatest"
val MAX_VERSIONS = "maxVersions"
val HBASE_CONFIGURATION = "hbaseConfiguration"
// HBase configuration file such as HBase-site.xml, core-site.xml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,42 @@ private[hbase] class HBaseTableScanRDD(
Row.fromSeq(fields.map(unioned.get(_).getOrElse(null)))
}

// TODO: It is a big performance overhead, as for each row, there is a hashmap lookup.
def buildRows(fields: Seq[Field], result: Result): Set[Row] = {
val r = result.getRow
val keySeq: Map[Field, Any] = {
if (relation.isComposite()) {
relation.catalog.shcTableCoder
.decodeCompositeRowKey(r, relation.catalog.getRowKey)
} else {
val f = relation.catalog.getRowKey.head
Seq((f, SHCDataTypeFactory.create(f).fromBytes(r))).toMap
}
}

val valueSeq: Seq[Map[Long, (Field, Any)]] = fields.filter(!_.isRowKey).map { x =>
import scala.collection.JavaConverters.asScalaBufferConverter
val dataType = SHCDataTypeFactory.create(x)
val kvs = result.getColumnCells(
relation.catalog.shcTableCoder.toBytes(x.cf),
relation.catalog.shcTableCoder.toBytes(x.col)).asScala

kvs.map(kv => {
val v = CellUtil.cloneValue(kv)
(kv.getTimestamp, x -> dataType.fromBytes(v))
}).toMap.withDefaultValue(x -> null)
}

val ts = valueSeq.foldLeft(Set.empty[Long])((acc, map) => acc ++ map.keySet)
//we are loosing duplicate here, because we didn't support passing version (timestamp) to the row
ts.map(version => {
keySeq ++ valueSeq.map(_.apply(version)).toMap
}).map { unioned =>
// Return the row ordered by the requested order
Row.fromSeq(fields.map(unioned.get(_).getOrElse(null)))
}
}

private def toResultIterator(result: GetResource): Iterator[Result] = {
val iterator = new Iterator[Result] {
var idx = 0
Expand Down Expand Up @@ -195,6 +231,57 @@ private[hbase] class HBaseTableScanRDD(
iterator
}

/**
* Convert result in to list of rows aggregated by timestamp and flat this list into one iterator of rows
* This solution stand for fetching more than one version
*/
private def toFlattenRowIterator(
it: Iterator[Result]): Iterator[Row] = {

val iterator = new Iterator[Row] {
val start = System.currentTimeMillis()
var rowCount: Int = 0
var rows: Set[Row] = Set.empty[Row]
val indexedFields = relation.getIndexedProjections(requiredColumns).map(_._1)

override def hasNext: Boolean = {
if(!rows.isEmpty || it.hasNext) {
true
}
else {
val end = System.currentTimeMillis()
logInfo(s"returned ${rowCount} rows from hbase in ${end - start} ms")
false
}
}

private def nextRow(): Row = {
val row = rows.head
rows = rows.tail
row
}

override def next(): Row = {
rowCount += 1
if(rows.isEmpty) {
val r = it.next()
rows = buildRows(indexedFields, r)
if(rows.isEmpty) {
// If 'requiredColumns' is empty, 'indexedFields' will be empty, which leads to empty 'rows'.
// This happens when users' query doesn't require Spark/SHC to return any real data from HBase tables,
// e.g. dataframe.count()
Row.fromSeq(Seq.empty)
} else {
nextRow()
}
} else {
nextRow()
}
}
}
iterator
}

override def getPreferredLocations(split: Partition): Seq[String] = {
split.asInstanceOf[HBaseScanPartition].regions.server.map {
identity
Expand Down Expand Up @@ -293,7 +380,11 @@ private[hbase] class HBaseTableScanRDD(
} ++ gIt

ShutdownHookManager.addShutdownHook { () => HBaseConnectionCache.close() }
toRowIterator(rIt)
if(relation.mergeToLatest) {
toRowIterator(rIt)
} else {
toFlattenRowIterator(rIt)
}
}

private def handleTimeSemantics(query: Query): Unit = {
Expand Down
99 changes: 99 additions & 0 deletions core/src/test/scala/org/apache/spark/sql/MaxVersionsSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* (C) 2017 Hortonworks, Inc. All rights reserved. See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. This file is licensed to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import org.apache.spark.sql.execution.datasources.hbase.Logging
import org.apache.spark.sql.execution.datasources.hbase.{HBaseRelation, HBaseTableCatalog}

class MaxVersionsSuite extends SHC with Logging {

def withCatalog(cat: String, options: Map[String,String]): DataFrame = {
sqlContext.read
.options(options ++ Map(HBaseTableCatalog.tableCatalog -> catalog))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
}

def persistDataInHBase(cat: String, data: Seq[HBaseRecord], timestamp: Long): Unit = {
val sql = sqlContext
import sql.implicits._
sc.parallelize(data).toDF.write
.options(Map(
HBaseTableCatalog.tableCatalog -> cat,
HBaseTableCatalog.newTable -> "5",
HBaseRelation.MAX_VERSIONS -> "3",
HBaseRelation.TIMESTAMP -> timestamp.toString
))
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()
}

test("Max Versions semantics") {

val oldestMs = 754869600000L
val oldMs = 754869611111L
val newMs = 754869622222L
val newestMs = 754869633333L

val oldestData = (0 to 2).map(HBaseRecord(_, "ancient"))
val oldData = (0 to 2).map(HBaseRecord(_, "old"))
val newData = (0 to 2).map(HBaseRecord(_, "new"))
val newestData = (0 to 1).map(HBaseRecord(_, "latest"))

persistDataInHBase(catalog, oldestData, oldestMs)
persistDataInHBase(catalog, oldData, oldMs)
persistDataInHBase(catalog, newData, newMs)
persistDataInHBase(catalog, newestData, newestMs)

// Test specific last two versions
val twoVersions: DataFrame = withCatalog(catalog, Map(
HBaseRelation.MAX_VERSIONS -> "2",
HBaseRelation.MERGE_TO_LATEST -> "false"
))

//count is made on HBase directly and return number of unique rows
assert(twoVersions.count() == 3)

val rows = twoVersions.take(10)
assert(rows.size == 6)
assert(rows.count(_.getString(7).contains("ancient")) == 0)
assert(rows.count(_.getString(7).contains("old")) == 1)
assert(rows.count(_.getString(7).contains("new")) == 3)
assert(rows.count(_.getString(7).contains("latest")) == 2)

//we cannot take more then three because we create table with that size
val threeVersions: DataFrame = withCatalog(catalog, Map(
HBaseRelation.MAX_VERSIONS -> "4",
HBaseRelation.MERGE_TO_LATEST -> "false"
))

val threeRows = threeVersions.take(10)
assert(threeRows.size == 9)
assert(threeRows.count(_.getString(7).contains("ancient")) == 1)
assert(threeRows.count(_.getString(7).contains("old")) == 3)
assert(threeRows.count(_.getString(7).contains("new")) == 3)
assert(threeRows.count(_.getString(7).contains("latest")) == 2)

// Test specific only last versions
val lastVersions: DataFrame = withCatalog(catalog, Map.empty)

val lastRows = lastVersions.take(10)
assert(lastRows.size == 3)
assert(lastRows.count(_.getString(7).contains("new")) == 1)
assert(lastRows.count(_.getString(7).contains("latest")) == 2)
}
}

0 comments on commit 42020d8

Please sign in to comment.