Skip to content

Commit

Permalink
[SPARK-7204] [SQL] Fix callSite for Dataframe and SQL operations
Browse files Browse the repository at this point in the history
This patch adds SQL to the set of excluded libraries when
generating a callSite. This makes the callSite mechanism work
properly for the data frame API. I also added a small improvement for
JDBC queries where we just use the string "Spark JDBC Server Query"
instead of trying to give a callsite that doesn't make any sense
to the user.

Before (DF):
![screen shot 2015-04-28 at 1 29 26 pm](https://cloud.githubusercontent.com/assets/320616/7380170/ef63bfb0-edae-11e4-989c-f88a5ba6bbee.png)

After (DF):
![screen shot 2015-04-28 at 1 34 58 pm](https://cloud.githubusercontent.com/assets/320616/7380181/fa7f6d90-edae-11e4-9559-26f163ed63b8.png)

After (JDBC):
![screen shot 2015-04-28 at 2 00 10 pm](https://cloud.githubusercontent.com/assets/320616/7380185/02f5b2a4-edaf-11e4-8e5b-99bdc3df66dd.png)

Author: Patrick Wendell <[email protected]>

Closes apache#5757 from pwendell/dataframes and squashes the following commits:

0d931a4 [Patrick Wendell] Attempting to fix PySpark tests
85bf740 [Patrick Wendell] [SPARK-7204] Fix callsite for dataframe operations.
  • Loading branch information
pwendell authored and rxin committed Apr 29, 2015
1 parent fe917f5 commit 1fd6ed9
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 10 deletions.
28 changes: 19 additions & 9 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1299,16 +1299,18 @@ private[spark] object Utils extends Logging {
}

/** Default filtering function for finding call sites using `getCallSite`. */
private def coreExclusionFunction(className: String): Boolean = {
// A regular expression to match classes of the "core" Spark API that we want to skip when
// finding the call site of a method.
private def sparkInternalExclusionFunction(className: String): Boolean = {
// A regular expression to match classes of the internal Spark API's
// that we want to skip when finding the call site of a method.
val SPARK_CORE_CLASS_REGEX =
"""^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?(\.broadcast)?\.[A-Z]""".r
val SPARK_SQL_CLASS_REGEX = """^org\.apache\.spark\.sql.*""".r
val SCALA_CORE_CLASS_PREFIX = "scala"
val isSparkCoreClass = SPARK_CORE_CLASS_REGEX.findFirstIn(className).isDefined
val isSparkClass = SPARK_CORE_CLASS_REGEX.findFirstIn(className).isDefined ||
SPARK_SQL_CLASS_REGEX.findFirstIn(className).isDefined
val isScalaClass = className.startsWith(SCALA_CORE_CLASS_PREFIX)
// If the class is a Spark internal class or a Scala class, then exclude.
isSparkCoreClass || isScalaClass
isSparkClass || isScalaClass
}

/**
Expand All @@ -1318,7 +1320,7 @@ private[spark] object Utils extends Logging {
*
* @param skipClass Function that is used to exclude non-user-code classes.
*/
def getCallSite(skipClass: String => Boolean = coreExclusionFunction): CallSite = {
def getCallSite(skipClass: String => Boolean = sparkInternalExclusionFunction): CallSite = {
// Keep crawling up the stack trace until we find the first function not inside of the spark
// package. We track the last (shallowest) contiguous Spark method. This might be an RDD
// transformation, a SparkContext function (such as parallelize), or anything else that leads
Expand Down Expand Up @@ -1357,9 +1359,17 @@ private[spark] object Utils extends Logging {
}

val callStackDepth = System.getProperty("spark.callstack.depth", "20").toInt
CallSite(
shortForm = s"$lastSparkMethod at $firstUserFile:$firstUserLine",
longForm = callStack.take(callStackDepth).mkString("\n"))
val shortForm =
if (firstUserFile == "HiveSessionImpl.java") {
// To be more user friendly, show a nicer string for queries submitted from the JDBC
// server.
"Spark JDBC Server Query"
} else {
s"$lastSparkMethod at $firstUserFile:$firstUserLine"
}
val longForm = callStack.take(callStackDepth).mkString("\n")

CallSite(shortForm, longForm)
}

/** Return a string containing part of a file from byte 'start' to 'end'. */
Expand Down
3 changes: 2 additions & 1 deletion python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ def explain(self, extended=False):
:param extended: boolean, default ``False``. If ``False``, prints only the physical plan.
>>> df.explain()
PhysicalRDD [age#0,name#1], MapPartitionsRDD[...] at mapPartitions at SQLContext.scala:...
PhysicalRDD [age#0,name#1], MapPartitionsRDD[...] at applySchemaToPythonRDD at\
NativeMethodAccessorImpl.java:...
>>> df.explain(True)
== Parsed Logical Plan ==
Expand Down

0 comments on commit 1fd6ed9

Please sign in to comment.