Skip to content

Commit

Permalink
[SPARK-16163] [SQL] Cache the statistics for logical plans
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This calculation of statistics is not trivial anymore, it could be very slow on large query (for example, TPC-DS Q64 took several minutes to plan).

During the planning of a query, the statistics of any logical plan should not change (even InMemoryRelation), so we should use `lazy val` to cache the statistics.

For InMemoryRelation, the statistics could be updated after materialization, it's only useful when used in another query (before planning), because once we finished the planning, the statistics will not be used anymore.

## How was this patch tested?

Testsed with TPC-DS Q64, it could be planned in a second after the patch.

Author: Davies Liu <[email protected]>

Closes apache#13871 from davies/fix_statistics.
  • Loading branch information
Davies Liu authored and davies committed Jun 23, 2016
1 parent 60398da commit 10396d9
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation
}
}

override def statistics: Statistics = {
override lazy val statistics: Statistics = {
val leftSize = left.statistics.sizeInBytes
val rightSize = right.statistics.sizeInBytes
val sizeInBytes = if (leftSize < rightSize) leftSize else rightSize
Expand All @@ -184,7 +184,7 @@ case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(le
left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } &&
duplicateResolved

override def statistics: Statistics = {
override lazy val statistics: Statistics = {
left.statistics.copy()
}
}
Expand Down Expand Up @@ -224,7 +224,7 @@ case class Union(children: Seq[LogicalPlan]) extends LogicalPlan {
children.length > 1 && childrenResolved && allChildrenCompatible
}

override def statistics: Statistics = {
override lazy val statistics: Statistics = {
val sizeInBytes = children.map(_.statistics.sizeInBytes).sum
Statistics(sizeInBytes = sizeInBytes)
}
Expand Down Expand Up @@ -333,7 +333,7 @@ case class Join(
case _ => resolvedExceptNatural
}

override def statistics: Statistics = joinType match {
override lazy val statistics: Statistics = joinType match {
case LeftAnti | LeftSemi =>
// LeftSemi and LeftAnti won't ever be bigger than left
left.statistics.copy()
Expand All @@ -351,7 +351,7 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output

// set isBroadcastable to true so the child will be broadcasted
override def statistics: Statistics = super.statistics.copy(isBroadcastable = true)
override lazy val statistics: Statistics = super.statistics.copy(isBroadcastable = true)
}

case class InsertIntoTable(
Expand Down Expand Up @@ -451,7 +451,7 @@ case class Range(

override def newInstance(): Range = copy(output = output.map(_.newInstance()))

override def statistics: Statistics = {
override lazy val statistics: Statistics = {
val sizeInBytes = LongType.defaultSize * numElements
Statistics( sizeInBytes = sizeInBytes )
}
Expand Down Expand Up @@ -486,7 +486,7 @@ case class Aggregate(
override def validConstraints: Set[Expression] =
child.constraints.union(getAliasedConstraints(aggregateExpressions))

override def statistics: Statistics = {
override lazy val statistics: Statistics = {
if (groupingExpressions.isEmpty) {
super.statistics.copy(sizeInBytes = 1)
} else {
Expand Down Expand Up @@ -586,7 +586,7 @@ case class Expand(
override def references: AttributeSet =
AttributeSet(projections.flatten.flatMap(_.references))

override def statistics: Statistics = {
override lazy val statistics: Statistics = {
val sizeInBytes = super.statistics.sizeInBytes * projections.length
Statistics(sizeInBytes = sizeInBytes)
}
Expand Down Expand Up @@ -706,7 +706,7 @@ case class Sample(

override def output: Seq[Attribute] = child.output

override def statistics: Statistics = {
override lazy val statistics: Statistics = {
val ratio = upperBound - lowerBound
// BigInt can't multiply with Double
var sizeInBytes = child.statistics.sizeInBytes * (ratio * 100).toInt / 100
Expand Down Expand Up @@ -753,5 +753,5 @@ case object OneRowRelation extends LeafNode {
*
* [[LeafNode]]s must override this.
*/
override def statistics: Statistics = Statistics(sizeInBytes = 1)
override lazy val statistics: Statistics = Statistics(sizeInBytes = 1)
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,58 +63,32 @@ private[sql] case class InMemoryRelation(
@transient child: SparkPlan,
tableName: Option[String])(
@transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = null,
@transient private[sql] var _statistics: Statistics = null,
private[sql] var _batchStats: CollectionAccumulator[InternalRow] = null)
private[sql] val batchStats: CollectionAccumulator[InternalRow] =
child.sqlContext.sparkContext.collectionAccumulator[InternalRow])
extends logical.LeafNode with MultiInstanceRelation {

override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)

override def producedAttributes: AttributeSet = outputSet

private[sql] val batchStats: CollectionAccumulator[InternalRow] =
if (_batchStats == null) {
child.sqlContext.sparkContext.collectionAccumulator[InternalRow]
} else {
_batchStats
}

@transient val partitionStatistics = new PartitionStatistics(output)

private def computeSizeInBytes = {
val sizeOfRow: Expression =
BindReferences.bindReference(
output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add),
partitionStatistics.schema)

batchStats.value.asScala.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum
}

// Statistics propagation contracts:
// 1. Non-null `_statistics` must reflect the actual statistics of the underlying data
// 2. Only propagate statistics when `_statistics` is non-null
private def statisticsToBePropagated = if (_statistics == null) {
val updatedStats = statistics
if (_statistics == null) null else updatedStats
} else {
_statistics
}

override def statistics: Statistics = {
if (_statistics == null) {
if (batchStats.value.isEmpty) {
// Underlying columnar RDD hasn't been materialized, no useful statistics information
// available, return the default statistics.
Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
} else {
// Underlying columnar RDD has been materialized, required information has also been
// collected via the `batchStats` accumulator, compute the final statistics,
// and update `_statistics`.
_statistics = Statistics(sizeInBytes = computeSizeInBytes)
_statistics
}
override lazy val statistics: Statistics = {
if (batchStats.value.isEmpty) {
// Underlying columnar RDD hasn't been materialized, no useful statistics information
// available, return the default statistics.
Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
} else {
// Pre-computed statistics
_statistics
// Underlying columnar RDD has been materialized, required information has also been
// collected via the `batchStats` accumulator.
val sizeOfRow: Expression =
BindReferences.bindReference(
output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add),
partitionStatistics.schema)

val sizeInBytes =
batchStats.value.asScala.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum
Statistics(sizeInBytes = sizeInBytes)
}
}

Expand Down Expand Up @@ -187,7 +161,7 @@ private[sql] case class InMemoryRelation(
def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = {
InMemoryRelation(
newOutput, useCompression, batchSize, storageLevel, child, tableName)(
_cachedColumnBuffers, statisticsToBePropagated, batchStats)
_cachedColumnBuffers, batchStats)
}

override def newInstance(): this.type = {
Expand All @@ -199,12 +173,11 @@ private[sql] case class InMemoryRelation(
child,
tableName)(
_cachedColumnBuffers,
statisticsToBePropagated,
batchStats).asInstanceOf[this.type]
}

def cachedColumnBuffers: RDD[CachedBatch] = _cachedColumnBuffers

override protected def otherCopyArgs: Seq[AnyRef] =
Seq(_cachedColumnBuffers, statisticsToBePropagated, batchStats)
Seq(_cachedColumnBuffers, batchStats)
}
3 changes: 1 addition & 2 deletions sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,7 @@ abstract class QueryTest extends PlanTest {
origin.child,
l.tableName)(
origin.cachedColumnBuffers,
l._statistics,
origin._batchStats)
origin.batchStats)
case p =>
p.transformExpressions {
case s: SubqueryExpression =>
Expand Down

0 comments on commit 10396d9

Please sign in to comment.