Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: hortonworks-spark/shc
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: master
Choose a base ref
...
head repository: vim89/shc
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: master
Choose a head ref
Able to merge. These branches can be automatically merged.
  • 1 commit
  • 4 files changed
  • 1 contributor

Commits on Jun 19, 2020

  1. Added enhancement in the API to support for Google BigTable.

    Basically Google BigTable doesn't have namespaces & name descriptors Check this - https://cloud.google.com/bigtable/docs/hbase-differences#namespaces.
    Hence, during createRelation task we have to skip getter/setter methods of namespaces & name descriptors.
    
    I had to create an if else branch for that based on "tableType" property in HBaseTableCatalog class.
    "tableType" variable takes value "bigtable" when using with Google BigTable and by default value will be "hbase" if not explicitly specified.
    vim89 committed Jun 19, 2020
    Copy the full SHA
    57379fd View commit details
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -88,6 +88,18 @@ The above defines a schema for a HBase table with name as table1, row key as key
Given a DataFrame with specified schema, above will create an HBase table with 5 regions and save the DataFrame inside. Note that if HBaseTableCatalog.newTable is not specified, the table has to be pre-created.

### Write to Google BigTable to populate data

sc.parallelize(data).toDF.write.options(
Map(HBaseTableCatalog.tableType -> "bigtable", HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()

Given a DataFrame with specified schema, above will create an Google BigTable with 5 regions and save the DataFrame inside.
Note that if HBaseTableCatalog.newTable is not specified, the table has to be pre-created.
HBaseTableCatalog.tableType -> "bigtable" must be explicitly set for writing into Google BigTable, otherwise by default API assumes writing to HBase table


### Perform DataFrame operation on top of HBase table

def withCatalog(cat: String): DataFrame = {
@@ -97,7 +109,9 @@ Given a DataFrame with specified schema, above will create an HBase table with 5
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
}


Note: The above task remain same in case of writing to Google BigTable

### Complicated query

val df = withCatalog(catalog)
Original file line number Diff line number Diff line change
@@ -124,18 +124,26 @@ case class HBaseRelation(
def createTableIfNotExist() {
val cfs = catalog.getColumnFamilies
val connection = HBaseConnectionCache.getConnection(hbaseConf)

// Get Table Type from HBaseTableCatalog expected values = "hbase" or "bigtable"
val tableType = catalog.getTableType

// Initialize hBase table if necessary
val admin = connection.getAdmin
val isNameSpaceExist = try {
admin.getNamespaceDescriptor(catalog.namespace)
true
if(tableType.equals("hbase")) { // Google BigTable do not have namespaces, hence skip calling getNamespaceDescriptor if tableType is "bigtable"
admin.getNamespaceDescriptor(catalog.namespace)
true
}
else
false
} catch {
case e: NamespaceNotFoundException => false
case NonFatal(e) =>
logError("Unexpected error", e)
false
}
if (!isNameSpaceExist) {
if (!isNameSpaceExist && tableType.equals("hbase")) { // Google BigTable do not have namespaces, hence skip calling createNamespace
admin.createNamespace(NamespaceDescriptor.create(catalog.namespace).build)
}
val tName = TableName.valueOf(s"${catalog.namespace}:${catalog.name}")
Original file line number Diff line number Diff line change
@@ -169,6 +169,11 @@ case class HBaseTableCatalog(
coderSet: Set[String],
val numReg: Int,
val splitRange: (String, String)) extends Logging {
var tableType = "hbase"
def getTableType = this.tableType
// Setter method to over write default value (hbase) for tableType class variable, in case of Google BigTable
def setTableType(tableType: String) = this.tableType = tableType

def toDataType = StructType(sMap.toFields)
def getField(name: String) = sMap.getField(name)
def getRowKey: Seq[Field] = row.fields
@@ -234,6 +239,8 @@ object HBaseTableCatalog {
val rowKey = "rowkey"
// The key for hbase table whose value specify namespace and table name
val table = "table"
// The table type Hbase or Google bigtable
val tableType = "tableType"
// The namespace of hbase table
val nameSpace = "namespace"
// The name of hbase table
@@ -300,8 +307,9 @@ object HBaseTableCatalog {

val minSplit = parameters.get(minTableSplitPoint).getOrElse("aaaaaa")
val maxSplit = parameters.get(maxTableSplitPoint).getOrElse("zzzzzz")

HBaseTableCatalog(nSpace, tName, rKey, SchemaMap(schemaMap), tCoder, coderSet, numReg, (minSplit, maxSplit))
val hbaseTableCatalog = HBaseTableCatalog(nSpace, tName, rKey, SchemaMap(schemaMap), tCoder, coderSet, numReg, (minSplit, maxSplit))
hbaseTableCatalog.setTableType(parameters.get(tableType).getOrElse("hbase"))
hbaseTableCatalog
}

/**
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.apache.spark.sql

import org.apache.spark.sql.execution.datasources.hbase.{HBaseTableCatalog, Logging}
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}

class HBaseTableCatalogSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll with Logging {
def catalog = s"""{
|"table":{"namespace":"default", "name":"table1", "tableCoder":"PrimitiveType"},
|"rowkey":"key1:key2",
|"columns":{
|"col00":{"cf":"rowkey", "col":"key1", "type":"string", "length":"6"},
|"col01":{"cf":"rowkey", "col":"key2", "type":"int"},
|"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
|"col2":{"cf":"cf2", "col":"col2", "type":"double"},
|"col3":{"cf":"cf3", "col":"col3", "type":"float"},
|"col4":{"cf":"cf4", "col":"col4", "type":"int"},
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
|"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
|"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"},
|"col7":{"cf":"cf7", "col":"col7", "type":"string"}
|}
|}""".stripMargin

test("HBaseTableCatalog tableType class variable test") {
var hbasetablecatalogobject = HBaseTableCatalog(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.nameSpace -> "default",
HBaseTableCatalog.newTable -> "3"))
assert(hbasetablecatalogobject.getTableType.equals("hbase"))

hbasetablecatalogobject = HBaseTableCatalog(Map(HBaseTableCatalog.tableType -> "bigtable", HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.nameSpace -> "default",
HBaseTableCatalog.newTable -> "3"))
assert(hbasetablecatalogobject.getTableType.equals("bigtable"))
}
}