Skip to content

Commit

Permalink
[SPARK-6376][SQL] Avoid eliminating subqueries until optimization
Browse files Browse the repository at this point in the history
Previously it was okay to throw away subqueries after analysis, as we would never try to use that tree for resolution again.  However, with eager analysis in `DataFrame`s this can cause errors for queries such as:

```scala
val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str")
df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("x.str").count()
```

As a result, in this PR we defer the elimination of subqueries until the optimization phase.

Author: Michael Armbrust <[email protected]>

Closes apache#5160 from marmbrus/subqueriesInDfs and squashes the following commits:

a9bb262 [Michael Armbrust] Update Optimizer.scala
27d25bf [Michael Armbrust] fix hive tests
9137e03 [Michael Armbrust] add type
81cd597 [Michael Armbrust] Avoid eliminating subqueries until optimization
  • Loading branch information
marmbrus committed Mar 24, 2015
1 parent 046c1e2 commit cbeaf9e
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ class Analyzer(catalog: Catalog,
UnresolvedHavingClauseAttributes ::
TrimGroupingAliases ::
typeCoercionRules ++
extendedResolutionRules : _*),
Batch("Remove SubQueries", fixedPoint,
EliminateSubQueries)
extendedResolutionRules : _*)
)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp}
import scala.language.implicitConversions
import scala.reflect.runtime.universe.{TypeTag, typeTag}

import org.apache.spark.sql.catalyst.analysis.{UnresolvedGetField, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, UnresolvedGetField, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
Expand Down Expand Up @@ -289,7 +289,7 @@ package object dsl {
InsertIntoTable(
analysis.UnresolvedRelation(Seq(tableName)), Map.empty, logicalPlan, overwrite)

def analyze = analysis.SimpleAnalyzer(logicalPlan)
def analyze: LogicalPlan = EliminateSubQueries(analysis.SimpleAnalyzer(logicalPlan))
}

object plans { // scalastyle:ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.optimizer

import scala.collection.immutable.HashSet
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.FullOuter
Expand All @@ -32,6 +33,9 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan]

object DefaultOptimizer extends Optimizer {
val batches =
// SubQueries are only needed for analysis and can be removed before execution.
Batch("Remove SubQueries", FixedPoint(100),
EliminateSubQueries) ::
Batch("Combine Limits", FixedPoint(100),
CombineLimits) ::
Batch("ConstantFolding", FixedPoint(100),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{UnresolvedGetField, Resolver}
import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, UnresolvedGetField, Resolver}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.trees.TreeNode
Expand Down Expand Up @@ -73,12 +73,16 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
* can do better should override this function.
*/
def sameResult(plan: LogicalPlan): Boolean = {
plan.getClass == this.getClass &&
plan.children.size == children.size && {
logDebug(s"[${cleanArgs.mkString(", ")}] == [${plan.cleanArgs.mkString(", ")}]")
cleanArgs == plan.cleanArgs
val cleanLeft = EliminateSubQueries(this)
val cleanRight = EliminateSubQueries(plan)

cleanLeft.getClass == cleanRight.getClass &&
cleanLeft.children.size == cleanRight.children.size && {
logDebug(
s"[${cleanRight.cleanArgs.mkString(", ")}] == [${cleanLeft.cleanArgs.mkString(", ")}]")
cleanRight.cleanArgs == cleanLeft.cleanArgs
} &&
(plan.children, children).zipped.forall(_ sameResult _)
(cleanLeft.children, cleanRight.children).zipped.forall(_ sameResult _)
}

/** Args that have cleaned such that differences in expression id should not affect equality */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
val caseInsensitiveCatalog = new SimpleCatalog(false)

val caseSensitiveAnalyzer =
new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitive = true)
new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitive = true) {
override val extendedResolutionRules = EliminateSubQueries :: Nil
}
val caseInsensitiveAnalyzer =
new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseSensitive = false)
new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseSensitive = false) {
override val extendedResolutionRules = EliminateSubQueries :: Nil
}

val checkAnalysis = new CheckAnalysis

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ class DataFrameSuite extends QueryTest {
)
}

test("self join with aliases") {
val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str")
checkAnswer(
df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("x.str").count(),
Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil)
}

test("explode") {
val df = Seq((1, "a b c"), (2, "a b"), (3, "a")).toDF("number", "letters")
val df2 =
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
test("equi-join is hash-join") {
val x = testData2.as("x")
val y = testData2.as("y")
val join = x.join(y, $"x.a" === $"y.a", "inner").queryExecution.analyzed
val join = x.join(y, $"x.a" === $"y.a", "inner").queryExecution.optimizedPlan
val planned = planner.HashJoin(join)
assert(planned.size === 1)
}
Expand Down Expand Up @@ -109,7 +109,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
test("multiple-key equi-join is hash-join") {
val x = testData2.as("x")
val y = testData2.as("y")
val join = x.join(y, ($"x.a" === $"y.a") && ($"x.b" === $"y.b")).queryExecution.analyzed
val join = x.join(y, ($"x.a" === $"y.a") && ($"x.b" === $"y.b")).queryExecution.optimizedPlan
val planned = planner.HashJoin(join)
assert(planned.size === 1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
Row(3) :: Row(4) :: Nil
)

table("test_parquet_ctas").queryExecution.analyzed match {
table("test_parquet_ctas").queryExecution.optimizedPlan match {
case LogicalRelation(p: ParquetRelation2) => // OK
case _ =>
fail(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
Seq(Row(1, "str1"))
)

table("test_parquet_ctas").queryExecution.analyzed match {
table("test_parquet_ctas").queryExecution.optimizedPlan match {
case LogicalRelation(p: ParquetRelation2) => // OK
case _ =>
fail(
Expand Down

0 comments on commit cbeaf9e

Please sign in to comment.