Skip to content

Commit

Permalink
[SPARK-4769] [SQL] CTAS does not work when reading from temporary tables
Browse files Browse the repository at this point in the history
This is the code refactor and follow ups for apache#2570

Author: Cheng Hao <[email protected]>

Closes apache#3336 from chenghao-intel/createtbl and squashes the following commits:

3563142 [Cheng Hao] remove the unused variable
e215187 [Cheng Hao] eliminate the compiling warning
4f97f14 [Cheng Hao] fix bug in unittest
5d58812 [Cheng Hao] revert the API changes
b85b620 [Cheng Hao] fix the regression of temp tabl not found in CTAS
  • Loading branch information
chenghao-intel authored and marmbrus committed Dec 9, 2014
1 parent 9443843 commit 51b1fe1
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,15 +254,37 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
* For example, because of a CREATE TABLE X AS statement.
*/
object CreateTables extends Rule[LogicalPlan] {
import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.parse.{QB, ASTNode, SemanticAnalyzer}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Wait until children are resolved.
case p: LogicalPlan if !p.childrenResolved => p

case CreateTableAsSelect(db, tableName, child, allowExisting, extra) =>
case CreateTableAsSelect(db, tableName, child, allowExisting, Some(extra: ASTNode)) =>
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)

CreateTableAsSelect(Some(databaseName), tableName, child, allowExisting, extra)
// Get the CreateTableDesc from Hive SemanticAnalyzer
val desc: Option[CreateTableDesc] = if (tableExists(Some(databaseName), tblName)) {
None
} else {
val sa = new SemanticAnalyzer(hive.hiveconf) {
override def analyzeInternal(ast: ASTNode) {
// A hack to intercept the SemanticAnalyzer.analyzeInternal,
// to ignore the SELECT clause of the CTAS
val method = classOf[SemanticAnalyzer].getDeclaredMethod(
"analyzeCreateTable", classOf[ASTNode], classOf[QB])
method.setAccessible(true)
method.invoke(this, ast, this.getQB)
}
}

sa.analyze(extra, new Context(hive.hiveconf))
Some(sa.getQB().getTableDesc)
}

CreateTableAsSelect(Some(databaseName), tblName, child, allowExisting, desc)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hive

import org.apache.hadoop.hive.ql.parse.ASTNode
import org.apache.hadoop.hive.ql.plan.CreateTableDesc

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
Expand Down Expand Up @@ -181,13 +182,20 @@ private[hive] trait HiveStrategies {
execution.InsertIntoHiveTable(
table, partition, planLater(child), overwrite)(hiveContext) :: Nil
case logical.CreateTableAsSelect(
Some(database), tableName, child, allowExisting, Some(extra: ASTNode)) =>
CreateTableAsSelect(
Some(database), tableName, child, allowExisting, Some(desc: CreateTableDesc)) =>
execution.CreateTableAsSelect(
database,
tableName,
child,
allowExisting,
extra) :: Nil
Some(desc)) :: Nil
case logical.CreateTableAsSelect(Some(database), tableName, child, allowExisting, None) =>
execution.CreateTableAsSelect(
database,
tableName,
child,
allowExisting,
None) :: Nil
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.spark.sql.hive.execution

import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.parse.{SemanticAnalyzer, ASTNode}
import org.apache.hadoop.hive.ql.plan.CreateTableDesc

import org.apache.spark.annotation.Experimental
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Row
Expand All @@ -35,8 +35,7 @@ import org.apache.spark.sql.hive.MetastoreRelation
* @param query the query whose result will be insert into the new relation
* @param allowExisting allow continue working if it's already exists, otherwise
* raise exception
* @param extra the extra information for this Operator, it should be the
* ASTNode object for extracting the CreateTableDesc.
* @param desc the CreateTableDesc, which may contains serde, storage handler etc.
*/
@Experimental
Expand All @@ -45,21 +44,16 @@ case class CreateTableAsSelect(
tableName: String,
query: LogicalPlan,
allowExisting: Boolean,
extra: ASTNode) extends LeafNode with Command {
desc: Option[CreateTableDesc]) extends LeafNode with Command {

def output = Seq.empty

private[this] def sc = sqlContext.asInstanceOf[HiveContext]

// A lazy computing of the metastoreRelation
private[this] lazy val metastoreRelation: MetastoreRelation = {
// Get the CreateTableDesc from Hive SemanticAnalyzer
val sa = new SemanticAnalyzer(sc.hiveconf)

sa.analyze(extra, new Context(sc.hiveconf))
val desc = sa.getQB().getTableDesc
// Create Hive Table
sc.catalog.createTable(database, tableName, query.output, allowExisting, Some(desc))
sc.catalog.createTable(database, tableName, query.output, allowExisting, desc)

// Get the Metastore Relation
sc.catalog.lookupRelation(Some(database), tableName, None) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ class SQLQuerySuite extends QueryTest {
checkAnswer(
sql("SELECT f1.f2.f3 FROM nested"),
1)
checkAnswer(sql("CREATE TABLE test_ctas_1234 AS SELECT * from nested"),
Seq.empty[Row])
checkAnswer(
sql("SELECT * FROM test_ctas_1234"),
sql("SELECT * FROM nested").collect().toSeq)

intercept[org.apache.hadoop.hive.ql.metadata.InvalidTableException] {
sql("CREATE TABLE test_ctas_12345 AS SELECT * from notexists").collect()
}
}

test("test CTAS") {
Expand Down

0 comments on commit 51b1fe1

Please sign in to comment.