Skip to content

Commit

Permalink
No need to specify 'HBaseTableCatalog.newTable' option when saving da…
Browse files Browse the repository at this point in the history
…ta into an existing table (hortonworks-spark#159)

## What changes were proposed in this pull request?
Now users need to specify 'HBaseTableCatalog.newTable' option when saving data into an existing table. It's unnecessary. This PR is to fix this issue.

## How was this patch tested?
Pass the current test cases and add a new unit test.
  • Loading branch information
weiqingy authored Jul 14, 2017
1 parent 523eea3 commit a308f6b
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private[sql] class DefaultSource extends RelationProvider with CreatableRelation
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
val relation = HBaseRelation(parameters, Some(data.schema))(sqlContext)
relation.createTable()
relation.createTableIfNotExist()
relation.insert(data, false)
relation
}
Expand Down Expand Up @@ -117,56 +117,53 @@ case class HBaseRelation(

val serializedToken = SHCCredentialsManager.manager.getTokenForCluster(hbaseConf)

def createTable() {
if (catalog.numReg > 3) {
val cfs = catalog.getColumnFamilies
val connection = HBaseConnectionCache.getConnection(hbaseConf)
// Initialize hBase table if necessary
val admin = connection.getAdmin

val isNameSpaceExist = try {
admin.getNamespaceDescriptor(catalog.namespace)
true
} catch {
case e: NamespaceNotFoundException => false
}
if (!isNameSpaceExist) {
admin.createNamespace(NamespaceDescriptor.create(catalog.namespace).build)
}
def createTableIfNotExist() {
val cfs = catalog.getColumnFamilies
val connection = HBaseConnectionCache.getConnection(hbaseConf)
// Initialize hBase table if necessary
val admin = connection.getAdmin
val isNameSpaceExist = try {
admin.getNamespaceDescriptor(catalog.namespace)
true
} catch {
case e: NamespaceNotFoundException => false
}
if (!isNameSpaceExist) {
admin.createNamespace(NamespaceDescriptor.create(catalog.namespace).build)
}
val tName = TableName.valueOf(s"${catalog.namespace}:${catalog.name}")
// The names of tables which are created by the Examples has prefix "shcExample"
if (admin.isTableAvailable(tName)
&& tName.toString.startsWith(s"${catalog.namespace}:shcExample")){
admin.disableTable(tName)
admin.deleteTable(tName)
}

val tName = TableName.valueOf(s"${catalog.namespace}:${catalog.name}")
// The names of tables which are created by the Examples has prefix "shcExample"
if (admin.isTableAvailable(tName)
&& tName.toString.startsWith(s"${catalog.namespace}:shcExample")){
admin.disableTable(tName)
admin.deleteTable(tName)
if (!admin.isTableAvailable(tName)) {
if (catalog.numReg <= 3) {
throw new InvalidRegionNumberException("Creating a new table should " +
"specify the number of regions which must be greater than 3.")
}
if (!admin.isTableAvailable(tName)) {
val tableDesc = new HTableDescriptor(tName)
cfs.foreach { x =>
val cf = new HColumnDescriptor(x.getBytes())
logDebug(s"add family $x to ${catalog.name}")
tableDesc.addFamily(cf)
}

val startKey = catalog.shcTableCoder.toBytes("aaaaaaa")
val endKey = catalog.shcTableCoder.toBytes("zzzzzzz")
val splitKeys = Bytes.split(startKey, endKey, catalog.numReg - 3)
admin.createTable(tableDesc, splitKeys)
val r = connection.getRegionLocator(tName).getAllRegionLocations
while(r == null || r.size() == 0) {
logDebug(s"region not allocated")
Thread.sleep(1000)
}
logDebug(s"region allocated $r")

val tableDesc = new HTableDescriptor(tName)
cfs.foreach { x =>
val cf = new HColumnDescriptor(x.getBytes())
logDebug(s"add family $x to ${catalog.name}")
tableDesc.addFamily(cf)
}
admin.close()
connection.close()
}
else {
throw new InvalidRegionNumberException("Number of regions specified for new table must be greater than 3.")
val startKey = catalog.shcTableCoder.toBytes("aaaaaaa")
val endKey = catalog.shcTableCoder.toBytes("zzzzzzz")
val splitKeys = Bytes.split(startKey, endKey, catalog.numReg - 3)
admin.createTable(tableDesc, splitKeys)
val r = connection.getRegionLocator(tName).getAllRegionLocations
while(r == null || r.size() == 0) {
logDebug(s"region not allocated")
Thread.sleep(1000)
}
logDebug(s"region allocated $r")
}

admin.close()
connection.close()
}

/**
Expand Down
20 changes: 20 additions & 0 deletions core/src/test/scala/org/apache/spark/sql/DefaultSourceSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -356,4 +356,24 @@ class DefaultSourceSuite extends SHC with Logging {
assert(keys.contains("row100"))
assert(keys.contains("row57"))
}

test("No need to specify 'HBaseTableCatalog.newTable' option when saving data into an existing table") {
val sql = sqlContext
import sql.implicits._

val df1 = withCatalog(catalog)
assert(df1.count() == 101)

// add three more records to the existing table "table1" which has 101 records
val data = (256 to 258).map { i =>
HBaseRecord(i, "extra")
}
sc.parallelize(data).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog))
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()

val df2 = withCatalog(catalog)
assert(df2.count() == 104)
}
}

0 comments on commit a308f6b

Please sign in to comment.