Skip to content

Commit

Permalink
[SPARK-25196][SPARK-27251][SQL][FOLLOWUP] Add synchronized for InMemo…
Browse files Browse the repository at this point in the history
…ryRelation.statsOfPlanToCache

## What changes were proposed in this pull request?
This is a follow-up of apache#24047; to follow the `CacheManager.cachedData` lock semantics, this pr wrapped the `statsOfPlanToCache` update with `synchronized`.

## How was this patch tested?
Pass Jenkins

Closes apache#24178 from maropu/SPARK-24047-FOLLOWUP.

Authored-by: Takeshi Yamamuro <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
maropu authored and dongjoon-hyun committed Mar 24, 2019
1 parent 6242885 commit 01e6305
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.execution

import java.util.concurrent.locks.ReentrantReadWriteLock

import scala.collection.immutable.IndexedSeq

import org.apache.hadoop.fs.{FileSystem, Path}
Expand Down Expand Up @@ -163,12 +161,7 @@ class CacheManager extends Logging {
val relation = cachedData.cachedRepresentation
val (rowCount, newColStats) =
CommandUtils.computeColumnStats(sparkSession, relation, column)
val oldStats = relation.statsOfPlanToCache
val newStats = oldStats.copy(
rowCount = Some(rowCount),
attributeStats = AttributeMap((oldStats.attributeStats ++ newColStats).toSeq)
)
relation.statsOfPlanToCache = newStats
relation.updateStats(rowCount, newColStats)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -145,37 +145,60 @@ object InMemoryRelation {
tableName: Option[String],
logicalPlan: LogicalPlan): InMemoryRelation = {
val cacheBuilder = CachedRDDBuilder(useCompression, batchSize, storageLevel, child, tableName)
new InMemoryRelation(child.output, cacheBuilder, logicalPlan.outputOrdering)(
statsOfPlanToCache = logicalPlan.stats)
val relation = new InMemoryRelation(child.output, cacheBuilder, logicalPlan.outputOrdering)
relation.statsOfPlanToCache = logicalPlan.stats
relation
}

def apply(cacheBuilder: CachedRDDBuilder, logicalPlan: LogicalPlan): InMemoryRelation = {
new InMemoryRelation(cacheBuilder.cachedPlan.output, cacheBuilder, logicalPlan.outputOrdering)(
statsOfPlanToCache = logicalPlan.stats)
val relation = new InMemoryRelation(
cacheBuilder.cachedPlan.output, cacheBuilder, logicalPlan.outputOrdering)
relation.statsOfPlanToCache = logicalPlan.stats
relation
}

def apply(
output: Seq[Attribute],
cacheBuilder: CachedRDDBuilder,
outputOrdering: Seq[SortOrder],
statsOfPlanToCache: Statistics): InMemoryRelation = {
val relation = InMemoryRelation(output, cacheBuilder, outputOrdering)
relation.statsOfPlanToCache = statsOfPlanToCache
relation
}
}

case class InMemoryRelation(
output: Seq[Attribute],
@transient cacheBuilder: CachedRDDBuilder,
override val outputOrdering: Seq[SortOrder])(
@volatile var statsOfPlanToCache: Statistics)
override val outputOrdering: Seq[SortOrder])
extends logical.LeafNode with MultiInstanceRelation {

@volatile var statsOfPlanToCache: Statistics = null

override protected def innerChildren: Seq[SparkPlan] = Seq(cachedPlan)

override def doCanonicalize(): logical.LogicalPlan =
copy(output = output.map(QueryPlan.normalizeExprId(_, cachedPlan.output)),
cacheBuilder,
outputOrdering)(
statsOfPlanToCache)
outputOrdering)

override def producedAttributes: AttributeSet = outputSet

@transient val partitionStatistics = new PartitionStatistics(output)

def cachedPlan: SparkPlan = cacheBuilder.cachedPlan

private[sql] def updateStats(
rowCount: Long,
newColStats: Map[Attribute, ColumnStat]): Unit = this.synchronized {
val newStats = statsOfPlanToCache.copy(
rowCount = Some(rowCount),
attributeStats = AttributeMap((statsOfPlanToCache.attributeStats ++ newColStats).toSeq)
)
statsOfPlanToCache = newStats
}

override def computeStats(): Statistics = {
if (cacheBuilder.sizeInBytesStats.value == 0L) {
// Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache.
Expand All @@ -185,20 +208,17 @@ case class InMemoryRelation(
}
}

def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = {
InMemoryRelation(newOutput, cacheBuilder, outputOrdering)(statsOfPlanToCache)
}
def withOutput(newOutput: Seq[Attribute]): InMemoryRelation =
InMemoryRelation(newOutput, cacheBuilder, outputOrdering, statsOfPlanToCache)

override def newInstance(): this.type = {
new InMemoryRelation(
InMemoryRelation(
output.map(_.newInstance()),
cacheBuilder,
outputOrdering)(
statsOfPlanToCache).asInstanceOf[this.type]
outputOrdering,
statsOfPlanToCache).asInstanceOf[this.type]
}

override protected def otherCopyArgs: Seq[AnyRef] = Seq(statsOfPlanToCache)

override def simpleString(maxFields: Int): String =
s"InMemoryRelation [${truncatedString(output, ", ", maxFields)}], ${cacheBuilder.storageLevel}"
}
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
test("SPARK-25727 - otherCopyArgs in InMemoryRelation does not include outputOrdering") {
val data = Seq(100).toDF("count").cache()
val json = data.queryExecution.optimizedPlan.toJSON
assert(json.contains("outputOrdering") && json.contains("statsOfPlanToCache"))
assert(json.contains("outputOrdering"))
}

test("SPARK-22673: InMemoryRelation should utilize existing stats of the plan to be cached") {
Expand Down

0 comments on commit 01e6305

Please sign in to comment.