Skip to content

Commit

Permalink
[SPARK-2784][SQL] Deprecate hql() method in favor of a config option,…
Browse files Browse the repository at this point in the history
… 'spark.sql.dialect'

Many users have reported being confused by the distinction between the `sql` and `hql` methods.  Specifically, many users think that `sql(...)` cannot be used to read hive tables.  In this PR I introduce a new configuration option `spark.sql.dialect` that picks which dialect with be used for parsing.  For SQLContext this must be set to `sql`.  In `HiveContext` it defaults to `hiveql` but can also be set to `sql`.

The `hql` and `hiveql` methods continue to act the same but are now marked as deprecated.

**This is a possibly breaking change for some users unless they set the dialect manually, though this is unlikely.**

For example: `hiveContex.sql("SELECT 1")` will now throw a parsing exception by default.

Author: Michael Armbrust <[email protected]>

Closes apache#1746 from marmbrus/sqlLanguageConf and squashes the following commits:

ad375cc [Michael Armbrust] Merge remote-tracking branch 'apache/master' into sqlLanguageConf
20c43f8 [Michael Armbrust] override function instead of just setting the value
7e4ae93 [Michael Armbrust] Deprecate hql() method in favor of a config option, 'spark.sql.dialect'
  • Loading branch information
marmbrus committed Aug 3, 2014
1 parent 2998e38 commit 236dfac
Show file tree
Hide file tree
Showing 21 changed files with 199 additions and 133 deletions.
8 changes: 4 additions & 4 deletions dev/audit-release/sbt_app_hive/src/main/scala/HiveApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ object SparkSqlExample {
val hiveContext = new HiveContext(sc)

import hiveContext._
hql("DROP TABLE IF EXISTS src")
hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
hql("LOAD DATA LOCAL INPATH 'data.txt' INTO TABLE src")
val results = hql("FROM src SELECT key, value WHERE key >= 0 AND KEY < 5").collect()
sql("DROP TABLE IF EXISTS src")
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'data.txt' INTO TABLE src")
val results = sql("FROM src SELECT key, value WHERE key >= 0 AND KEY < 5").collect()
results.foreach(println)

def test(f: => Boolean, failureMsg: String) = {
Expand Down
18 changes: 9 additions & 9 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -495,11 +495,11 @@ directory.
// sc is an existing SparkContext.
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

hiveContext.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
hiveContext.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
hiveContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
hiveContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
hiveContext.hql("FROM src SELECT key, value").collect().foreach(println)
hiveContext.sql("FROM src SELECT key, value").collect().foreach(println)
{% endhighlight %}

</div>
Expand All @@ -515,11 +515,11 @@ expressed in HiveQL.
// sc is an existing JavaSparkContext.
JavaHiveContext hiveContext = new org.apache.spark.sql.hive.api.java.HiveContext(sc);

hiveContext.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
hiveContext.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
hiveContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
hiveContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");

// Queries are expressed in HiveQL.
Row[] results = hiveContext.hql("FROM src SELECT key, value").collect();
Row[] results = hiveContext.sql("FROM src SELECT key, value").collect();

{% endhighlight %}

Expand All @@ -537,11 +537,11 @@ expressed in HiveQL.
from pyspark.sql import HiveContext
hiveContext = HiveContext(sc)

hiveContext.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
hiveContext.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
hiveContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
hiveContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries can be expressed in HiveQL.
results = hiveContext.hql("FROM src SELECT key, value").collect()
results = hiveContext.sql("FROM src SELECT key, value").collect()

{% endhighlight %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,20 @@ object HiveFromSpark {
val hiveContext = new HiveContext(sc)
import hiveContext._

hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
hql("LOAD DATA LOCAL INPATH 'src/main/resources/kv1.txt' INTO TABLE src")
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
println("Result of 'SELECT *': ")
hql("SELECT * FROM src").collect.foreach(println)
sql("SELECT * FROM src").collect.foreach(println)

// Aggregation queries are also supported.
val count = hql("SELECT COUNT(*) FROM src").collect().head.getLong(0)
val count = sql("SELECT COUNT(*) FROM src").collect().head.getLong(0)
println(s"COUNT(*): $count")

// The results of SQL queries are themselves RDDs and support all normal RDD functions. The
// items in the RDD are of type Row, which allows you to access each column by ordinal.
val rddFromSql = hql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
val rddFromSql = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

println("Result of RDD.map:")
val rddAsStrings = rddFromSql.map {
Expand All @@ -60,6 +60,6 @@ object HiveFromSpark {

// Queries can then join RDD data with data stored in Hive.
println("Result of SELECT *:")
hql("SELECT * FROM records r JOIN src s ON r.key = s.key").collect().foreach(println)
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").collect().foreach(println)
}
}
20 changes: 12 additions & 8 deletions python/pyspark/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1291,16 +1291,20 @@ def _get_hive_ctx(self):

def hiveql(self, hqlQuery):
"""
Runs a query expressed in HiveQL, returning the result as
a L{SchemaRDD}.
DEPRECATED: Use sql()
"""
warnings.warn("hiveql() is deprecated as the sql function now parses using HiveQL by" +
"default. The SQL dialect for parsing can be set using 'spark.sql.dialect'",
DeprecationWarning)
return SchemaRDD(self._ssql_ctx.hiveql(hqlQuery), self)

def hql(self, hqlQuery):
"""
Runs a query expressed in HiveQL, returning the result as
a L{SchemaRDD}.
DEPRECATED: Use sql()
"""
warnings.warn("hql() is deprecated as the sql function now parses using HiveQL by" +
"default. The SQL dialect for parsing can be set using 'spark.sql.dialect'",
DeprecationWarning)
return self.hiveql(hqlQuery)


Expand All @@ -1313,16 +1317,16 @@ class LocalHiveContext(HiveContext):
>>> import os
>>> hiveCtx = LocalHiveContext(sc)
>>> try:
... supress = hiveCtx.hql("DROP TABLE src")
... supress = hiveCtx.sql("DROP TABLE src")
... except Exception:
... pass
>>> kv1 = os.path.join(os.environ["SPARK_HOME"],
... 'examples/src/main/resources/kv1.txt')
>>> supress = hiveCtx.hql(
>>> supress = hiveCtx.sql(
... "CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
>>> supress = hiveCtx.hql("LOAD DATA LOCAL INPATH '%s' INTO TABLE src"
>>> supress = hiveCtx.sql("LOAD DATA LOCAL INPATH '%s' INTO TABLE src"
... % kv1)
>>> results = hiveCtx.hql("FROM src SELECT value"
>>> results = hiveCtx.sql("FROM src SELECT value"
... ).map(lambda r: int(r.value.split('_')[1]))
>>> num = results.count()
>>> reduce_sum = results.reduce(lambda x, y: x + y)
Expand Down
17 changes: 16 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ object SQLConf {
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
val JOIN_BROADCAST_TABLES = "spark.sql.join.broadcastTables"
val CODEGEN_ENABLED = "spark.sql.codegen"
val DIALECT = "spark.sql.dialect"

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
Expand All @@ -39,7 +40,7 @@ object SQLConf {
* A trait that enables the setting and getting of mutable config parameters/hints.
*
* In the presence of a SQLContext, these can be set and queried by passing SET commands
* into Spark SQL's query functions (sql(), hql(), etc.). Otherwise, users of this trait can
* into Spark SQL's query functions (i.e. sql()). Otherwise, users of this trait can
* modify the hints by programmatically calling the setters and getters of this trait.
*
* SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
Expand All @@ -53,6 +54,20 @@ trait SQLConf {
/** ************************ Spark SQL Params/Hints ******************* */
// TODO: refactor so that these hints accessors don't pollute the name space of SQLContext?

/**
* The SQL dialect that is used when parsing queries. This defaults to 'sql' which uses
* a simple SQL parser provided by Spark SQL. This is currently the only option for users of
* SQLContext.
*
* When using a HiveContext, this value defaults to 'hiveql', which uses the Hive 0.12.0 HiveQL
* parser. Users can change this to 'sql' if they want to run queries that aren't supported by
* HiveQL (e.g., SELECT 1).
*
* Note that the choice of dialect does not affect things like what tables are available or
* how query execution is performed.
*/
private[spark] def dialect: String = get(DIALECT, "sql")

/** When true tables cached using the in-memory columnar caching will be compressed. */
private[spark] def useCompression: Boolean = get(COMPRESS_CACHED, "false").toBoolean

Expand Down
11 changes: 9 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,18 @@ class SQLContext(@transient val sparkContext: SparkContext)
}

/**
* Executes a SQL query using Spark, returning the result as a SchemaRDD.
* Executes a SQL query using Spark, returning the result as a SchemaRDD. The dialect that is
* used for SQL parsing can be configured with 'spark.sql.dialect'.
*
* @group userf
*/
def sql(sqlText: String): SchemaRDD = new SchemaRDD(this, parseSql(sqlText))
def sql(sqlText: String): SchemaRDD = {
if (dialect == "sql") {
new SchemaRDD(this, parseSql(sqlText))
} else {
sys.error(s"Unsupported SQL dialect: $dialect")
}
}

/** Returns the specified table as a SchemaRDD */
def table(tableName: String): SchemaRDD =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,18 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
def this(sparkContext: JavaSparkContext) = this(new SQLContext(sparkContext.sc))

/**
* Executes a query expressed in SQL, returning the result as a JavaSchemaRDD
* Executes a SQL query using Spark, returning the result as a SchemaRDD. The dialect that is
* used for SQL parsing can be configured with 'spark.sql.dialect'.
*
* @group userf
*/
def sql(sqlQuery: String): JavaSchemaRDD =
new JavaSchemaRDD(sqlContext, sqlContext.parseSql(sqlQuery))
def sql(sqlText: String): JavaSchemaRDD = {
if (sqlContext.dialect == "sql") {
new JavaSchemaRDD(sqlContext, sqlContext.parseSql(sqlText))
} else {
sys.error(s"Unsupported SQL dialect: $sqlContext.dialect")
}
}

/**
* :: Experimental ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private[hive] class SparkSQLDriver(val context: HiveContext = SparkSQLEnv.hiveCo
override def run(command: String): CommandProcessorResponse = {
// TODO unify the error code
try {
val execution = context.executePlan(context.hql(command).logicalPlan)
val execution = context.executePlan(context.sql(command).logicalPlan)
hiveResponse = execution.stringResult()
tableSchema = getResultSetSchema(execution)
new CommandProcessorResponse(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class SparkSQLOperationManager(hiveContext: HiveContext) extends OperationManage
logInfo(s"Running query '$statement'")
setState(OperationState.RUNNING)
try {
result = hiveContext.hql(statement)
result = hiveContext.sql(statement)
logDebug(result.queryExecution.toString())
val groupId = round(random * 1000000).toString
hiveContext.sparkContext.setJobGroup(groupId, statement)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,29 @@ class LocalHiveContext(sc: SparkContext) extends HiveContext(sc) {
class HiveContext(sc: SparkContext) extends SQLContext(sc) {
self =>

// Change the default SQL dialect to HiveQL
override private[spark] def dialect: String = get(SQLConf.DIALECT, "hiveql")

override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution { val logical = plan }

/**
* Executes a query expressed in HiveQL using Spark, returning the result as a SchemaRDD.
*/
override def sql(sqlText: String): SchemaRDD = {
// TODO: Create a framework for registering parsers instead of just hardcoding if statements.
if (dialect == "sql") {
super.sql(sqlText)
} else if (dialect == "hiveql") {
new SchemaRDD(this, HiveQl.parseSql(sqlText))
} else {
sys.error(s"Unsupported SQL dialect: $dialect. Try 'sql' or 'hiveql'")
}
}

@deprecated("hiveql() is deprecated as the sql function now parses using HiveQL by default. " +
s"The SQL dialect for parsing can be set using ${SQLConf.DIALECT}", "1.1")
def hiveql(hqlQuery: String): SchemaRDD = new SchemaRDD(this, HiveQl.parseSql(hqlQuery))

/** An alias for `hiveql`. */
@deprecated("hql() is deprecated as the sql function now parses using HiveQL by default. " +
s"The SQL dialect for parsing can be set using ${SQLConf.DIALECT}", "1.1")
def hql(hqlQuery: String): SchemaRDD = hiveql(hqlQuery)

/**
Expand All @@ -95,7 +109,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {

// Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur.
@transient
protected val outputBuffer = new java.io.OutputStream {
protected lazy val outputBuffer = new java.io.OutputStream {
var pos: Int = 0
var buffer = new Array[Int](10240)
def write(i: Int): Unit = {
Expand Down Expand Up @@ -125,7 +139,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
/**
* SQLConf and HiveConf contracts: when the hive session is first initialized, params in
* HiveConf will get picked up by the SQLConf. Additionally, any properties set by
* set() or a SET command inside hql() or sql() will be set in the SQLConf *as well as*
* set() or a SET command inside sql() will be set in the SQLConf *as well as*
* in the HiveConf.
*/
@transient protected[hive] lazy val hiveconf = new HiveConf(classOf[SessionState])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.api.java

import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.api.java.{JavaSQLContext, JavaSchemaRDD}
import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.hive.{HiveContext, HiveQl}

/**
Expand All @@ -28,9 +29,21 @@ class JavaHiveContext(sparkContext: JavaSparkContext) extends JavaSQLContext(spa

override val sqlContext = new HiveContext(sparkContext)

override def sql(sqlText: String): JavaSchemaRDD = {
// TODO: Create a framework for registering parsers instead of just hardcoding if statements.
if (sqlContext.dialect == "sql") {
super.sql(sqlText)
} else if (sqlContext.dialect == "hiveql") {
new JavaSchemaRDD(sqlContext, HiveQl.parseSql(sqlText))
} else {
sys.error(s"Unsupported SQL dialect: ${sqlContext.dialect}. Try 'sql' or 'hiveql'")
}
}

/**
* Executes a query expressed in HiveQL, returning the result as a JavaSchemaRDD.
* DEPRECATED: Use sql(...) Instead
*/
@Deprecated
def hql(hqlQuery: String): JavaSchemaRDD =
new JavaSchemaRDD(sqlContext, HiveQl.parseSql(hqlQuery))
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,17 @@ class CachedTableSuite extends HiveComparisonTest {
"SELECT * FROM src LIMIT 1", reset = false)

test("Drop cached table") {
hql("CREATE TABLE test(a INT)")
sql("CREATE TABLE test(a INT)")
cacheTable("test")
hql("SELECT * FROM test").collect()
hql("DROP TABLE test")
sql("SELECT * FROM test").collect()
sql("DROP TABLE test")
intercept[org.apache.hadoop.hive.ql.metadata.InvalidTableException] {
hql("SELECT * FROM test").collect()
sql("SELECT * FROM test").collect()
}
}

test("DROP nonexistant table") {
hql("DROP TABLE IF EXISTS nonexistantTable")
sql("DROP TABLE IF EXISTS nonexistantTable")
}

test("check that table is cached and uncache") {
Expand Down Expand Up @@ -74,14 +74,14 @@ class CachedTableSuite extends HiveComparisonTest {
}

test("'CACHE TABLE' and 'UNCACHE TABLE' HiveQL statement") {
TestHive.hql("CACHE TABLE src")
TestHive.sql("CACHE TABLE src")
TestHive.table("src").queryExecution.executedPlan match {
case _: InMemoryColumnarTableScan => // Found evidence of caching
case _ => fail(s"Table 'src' should be cached")
}
assert(TestHive.isCached("src"), "Table 'src' should be cached")

TestHive.hql("UNCACHE TABLE src")
TestHive.sql("UNCACHE TABLE src")
TestHive.table("src").queryExecution.executedPlan match {
case _: InMemoryColumnarTableScan => fail(s"Table 'src' should not be cached")
case _ => // Found evidence of uncaching
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.hive.test.TestHive._
class StatisticsSuite extends QueryTest {

test("estimates the size of a test MetastoreRelation") {
val rdd = hql("""SELECT * FROM src""")
val rdd = sql("""SELECT * FROM src""")
val sizes = rdd.queryExecution.analyzed.collect { case mr: MetastoreRelation =>
mr.statistics.sizeInBytes
}
Expand All @@ -45,7 +45,7 @@ class StatisticsSuite extends QueryTest {
ct: ClassTag[_]) = {
before()

var rdd = hql(query)
var rdd = sql(query)

// Assert src has a size smaller than the threshold.
val sizes = rdd.queryExecution.analyzed.collect {
Expand All @@ -65,16 +65,16 @@ class StatisticsSuite extends QueryTest {
TestHive.settings.synchronized {
val tmp = autoBroadcastJoinThreshold

hql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1""")
rdd = hql(query)
sql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1""")
rdd = sql(query)
bhj = rdd.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j }
assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off")

val shj = rdd.queryExecution.sparkPlan.collect { case j: ShuffledHashJoin => j }
assert(shj.size === 1,
"ShuffledHashJoin should be planned when BroadcastHashJoin is turned off")

hql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=$tmp""")
sql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=$tmp""")
}

after()
Expand Down
Loading

0 comments on commit 236dfac

Please sign in to comment.