Skip to content

Commit

Permalink
[SPARK-32859][SQL] Introduce physical rule to decide bucketing dynami…
Browse files Browse the repository at this point in the history
…cally

### What changes were proposed in this pull request?

This PR is to add support to decide bucketed table scan dynamically based on actual query plan. Currently bucketing is enabled by default (`spark.sql.sources.bucketing.enabled`=true), so for all bucketed tables in the query plan, we will use bucket table scan (all input files per the bucket will be read by same task). This has the drawback that if the bucket table scan is not benefitting at all (no join/groupby/etc in the query), we don't need to use bucket table scan as it would restrict the # of tasks to be # of buckets and might hurt parallelism.

The feature is to add a physical plan rule right after `EnsureRequirements`:

The rule goes through plan nodes. For all operators which has "interesting partition" (i.e., require `ClusteredDistribution` or `HashClusteredDistribution`), check if the sub-plan for operator has `Exchange` and bucketed table scan (and only allow certain operators in plan (i.e. `Scan/Filter/Project/Sort/PartialAgg/etc`.), see details in `DisableUnnecessaryBucketedScan.disableBucketWithInterestingPartition`). If yes, disable the bucketed table scan in the sub-plan. In addition, disabling bucketed table scan if there's operator with interesting partition along the sub-plan.

Why the algorithm works is that if there's a shuffle between the bucketed table scan and operator with interesting partition, then bucketed table scan partitioning will be destroyed by the shuffle operator in the middle, and we don't need bucketed table scan for sure.

The idea of "interesting partition" is inspired from "interesting order" in "Access Path Selection in a Relational Database Management System"(http://www.inf.ed.ac.uk/teaching/courses/adbs/AccessPath.pdf), after discussion with cloud-fan .

### Why are the changes needed?

To avoid unnecessary bucketed scan in the query, and this is prerequisite for apache#29625 (decide bucketed sorted scan dynamically will be added later in that PR).

### Does this PR introduce _any_ user-facing change?

A new config `spark.sql.sources.bucketing.autoBucketedScan.enabled` is introduced which set to false by default (the rule is disabled by default as it can regress cached bucketed table query, see discussion in apache#29804 (comment)). User can opt-in/opt-out by enabling/disabling the config, as we found in prod, some users rely on assumption of # of tasks == # of buckets when reading bucket table to precisely control # of tasks. This is a bad assumption but it does happen on our side, so leave a config here to allow them opt-out for the feature.

### How was this patch tested?

Added unit tests in `DisableUnnecessaryBucketedScanSuite.scala`

Closes apache#29804 from c21/bucket-rule.

Authored-by: Cheng Su <[email protected]>
Signed-off-by: Takeshi Yamamuro <[email protected]>
  • Loading branch information
c21 authored and maropu committed Oct 2, 2020
1 parent 8657742 commit d6f3138
Show file tree
Hide file tree
Showing 9 changed files with 454 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ case class OrderedJoin(
/**
* Reorder the joins using a dynamic programming algorithm. This implementation is based on the
* paper: Access Path Selection in a Relational Database Management System.
* http://www.inf.ed.ac.uk/teaching/courses/adbs/AccessPath.pdf
* https://dl.acm.org/doi/10.1145/582095.582099
*
* First we put all items (basic joined nodes) into level 0, then we build all two-way joins
* at level 1 from plans at level 0 (single items), then build all 3-way joins from plans
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,17 @@ object SQLConf {
.checkValue(_ > 0, "the value of spark.sql.sources.bucketing.maxBuckets must be greater than 0")
.createWithDefault(100000)

val AUTO_BUCKETED_SCAN_ENABLED =
buildConf("spark.sql.sources.bucketing.autoBucketedScan.enabled")
.doc("When true, decide whether to do bucketed scan on input tables based on query plan " +
"automatically. Do not use bucketed scan if 1. query does not have operators to utilize " +
"bucketing (e.g. join, group-by, etc), or 2. there's an exchange operator between these " +
s"operators and table scan. Note when '${BUCKETING_ENABLED.key}' is set to " +
"false, this configuration does not take any effect.")
.version("3.1.0")
.booleanConf
.createWithDefault(false)

val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled")
.internal()
.doc("When false, we will throw an error if a query contains a cartesian product without " +
Expand Down Expand Up @@ -3164,6 +3175,8 @@ class SQLConf extends Serializable with Logging {

def bucketingMaxBuckets: Int = getConf(SQLConf.BUCKETING_MAX_BUCKETS)

def autoBucketedScanEnabled: Boolean = getConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED)

def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =
getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,9 @@ case class RowDataSourceScanExec(
* @param optionalBucketSet Bucket ids for bucket pruning.
* @param optionalNumCoalescedBuckets Number of coalesced buckets.
* @param dataFilters Filters on non-partition columns.
* @param tableIdentifier identifier for the table in the metastore.
* @param tableIdentifier Identifier for the table in the metastore.
* @param disableBucketedScan Disable bucketed scan based on physical query plan, see rule
* [[DisableUnnecessaryBucketedScan]] for details.
*/
case class FileSourceScanExec(
@transient relation: HadoopFsRelation,
Expand All @@ -166,7 +168,8 @@ case class FileSourceScanExec(
optionalBucketSet: Option[BitSet],
optionalNumCoalescedBuckets: Option[Int],
dataFilters: Seq[Expression],
tableIdentifier: Option[TableIdentifier])
tableIdentifier: Option[TableIdentifier],
disableBucketedScan: Boolean = false)
extends DataSourceScanExec {

// Note that some vals referring the file-based relation are lazy intentionally
Expand Down Expand Up @@ -257,7 +260,8 @@ case class FileSourceScanExec(

// exposed for testing
lazy val bucketedScan: Boolean = {
if (relation.sparkSession.sessionState.conf.bucketingEnabled && relation.bucketSpec.isDefined) {
if (relation.sparkSession.sessionState.conf.bucketingEnabled && relation.bucketSpec.isDefined
&& !disableBucketedScan) {
val spec = relation.bucketSpec.get
val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n))
bucketColumns.size == spec.bucketColumnNames.size
Expand Down Expand Up @@ -348,20 +352,23 @@ case class FileSourceScanExec(
"DataFilters" -> seqToString(dataFilters),
"Location" -> locationDesc)

val withSelectedBucketsCount = relation.bucketSpec.map { spec =>
val numSelectedBuckets = optionalBucketSet.map { b =>
b.cardinality()
// TODO(SPARK-32986): Add bucketed scan info in explain output of FileSourceScanExec
if (bucketedScan) {
relation.bucketSpec.map { spec =>
val numSelectedBuckets = optionalBucketSet.map { b =>
b.cardinality()
} getOrElse {
spec.numBuckets
}
metadata + ("SelectedBucketsCount" ->
(s"$numSelectedBuckets out of ${spec.numBuckets}" +
optionalNumCoalescedBuckets.map { b => s" (Coalesced to $b)"}.getOrElse("")))
} getOrElse {
spec.numBuckets
metadata
}
metadata + ("SelectedBucketsCount" ->
(s"$numSelectedBuckets out of ${spec.numBuckets}" +
optionalNumCoalescedBuckets.map { b => s" (Coalesced to $b)"}.getOrElse("")))
} getOrElse {
} else {
metadata
}

withSelectedBucketsCount
}

override def verboseStringWithOperatorId(): String = {
Expand Down Expand Up @@ -539,6 +546,7 @@ case class FileSourceScanExec(
.getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
}

// TODO(SPARK-32985): Decouple bucket filter pruning and bucketed table scan
val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
val bucketSet = optionalBucketSet.get
filesGroupedToBuckets.filter {
Expand Down Expand Up @@ -624,6 +632,7 @@ case class FileSourceScanExec(
optionalBucketSet,
optionalNumCoalescedBuckets,
QueryPlan.normalizePredicates(dataFilters, output),
None)
None,
disableBucketedScan)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, InsertAdaptiveSparkPlan}
import org.apache.spark.sql.execution.bucketing.CoalesceBucketsInJoin
import org.apache.spark.sql.execution.bucketing.{CoalesceBucketsInJoin, DisableUnnecessaryBucketedScan}
import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata}
Expand Down Expand Up @@ -344,6 +344,7 @@ object QueryExecution {
PlanSubqueries(sparkSession),
RemoveRedundantProjects(sparkSession.sessionState.conf),
EnsureRequirements(sparkSession.sessionState.conf),
DisableUnnecessaryBucketedScan(sparkSession.sessionState.conf),
ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf,
sparkSession.sessionState.columnarRules),
CollapseCodegenStages(sparkSession.sessionState.conf),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.bucketing

import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, HashClusteredDistribution}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SortExec, SparkPlan}
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
import org.apache.spark.sql.execution.exchange.Exchange
import org.apache.spark.sql.internal.SQLConf

/**
* Disable unnecessary bucketed table scan based on actual physical query plan.
* NOTE: this rule is designed to be applied right after [[EnsureRequirements]],
* where all [[ShuffleExchangeExec]] and [[SortExec]] have been added to plan properly.
*
* When BUCKETING_ENABLED and AUTO_BUCKETED_SCAN_ENABLED are set to true, go through
* query plan to check where bucketed table scan is unnecessary, and disable bucketed table
* scan if:
*
* 1. The sub-plan from root to bucketed table scan, does not contain
* [[hasInterestingPartition]] operator.
*
* 2. The sub-plan from the nearest downstream [[hasInterestingPartition]] operator
* to the bucketed table scan, contains only [[isAllowedUnaryExecNode]] operators
* and at least one [[Exchange]].
*
* Examples:
* 1. no [[hasInterestingPartition]] operator:
* Project
* |
* Filter
* |
* Scan(t1: i, j)
* (bucketed on column j, DISABLE bucketed scan)
*
* 2. join:
* SortMergeJoin(t1.i = t2.j)
* / \
* Sort(i) Sort(j)
* / \
* Shuffle(i) Scan(t2: i, j)
* / (bucketed on column j, enable bucketed scan)
* Scan(t1: i, j)
* (bucketed on column j, DISABLE bucketed scan)
*
* 3. aggregate:
* HashAggregate(i, ..., Final)
* |
* Shuffle(i)
* |
* HashAggregate(i, ..., Partial)
* |
* Filter
* |
* Scan(t1: i, j)
* (bucketed on column j, DISABLE bucketed scan)
*
* The idea of [[hasInterestingPartition]] is inspired from "interesting order" in
* the paper "Access Path Selection in a Relational Database Management System"
* (https://dl.acm.org/doi/10.1145/582095.582099).
*/
case class DisableUnnecessaryBucketedScan(conf: SQLConf) extends Rule[SparkPlan] {

/**
* Disable bucketed table scan with pre-order traversal of plan.
*
* @param withInterestingPartition The traversed plan has operator with interesting partition.
* @param withExchange The traversed plan has [[Exchange]] operator.
* @param withAllowedNode The traversed plan has only [[isAllowedUnaryExecNode]] operators.
*/
private def disableBucketWithInterestingPartition(
plan: SparkPlan,
withInterestingPartition: Boolean,
withExchange: Boolean,
withAllowedNode: Boolean): SparkPlan = {
plan match {
case p if hasInterestingPartition(p) =>
// Operator with interesting partition, propagates `withInterestingPartition` as true
// to its children, and resets `withExchange` and `withAllowedNode`.
p.mapChildren(disableBucketWithInterestingPartition(_, true, false, true))
case exchange: Exchange =>
// Exchange operator propagates `withExchange` as true to its child.
exchange.mapChildren(disableBucketWithInterestingPartition(
_, withInterestingPartition, true, withAllowedNode))
case scan: FileSourceScanExec =>
if (isBucketedScanWithoutFilter(scan)) {
if (!withInterestingPartition || (withExchange && withAllowedNode)) {
scan.copy(disableBucketedScan = true)
} else {
scan
}
} else {
scan
}
case o =>
o.mapChildren(disableBucketWithInterestingPartition(
_,
withInterestingPartition,
withExchange,
withAllowedNode && isAllowedUnaryExecNode(o)))
}
}

private def hasInterestingPartition(plan: SparkPlan): Boolean = {
plan.requiredChildDistribution.exists {
case _: ClusteredDistribution | _: HashClusteredDistribution => true
case _ => false
}
}

/**
* Check if the operator is allowed single-child operator.
* We may revisit this method later as we probably can
* remove this restriction to allow arbitrary operator between
* bucketed table scan and operator with interesting partition.
*/
private def isAllowedUnaryExecNode(plan: SparkPlan): Boolean = {
plan match {
case _: SortExec | _: ProjectExec | _: FilterExec => true
case partialAgg: BaseAggregateExec =>
partialAgg.requiredChildDistributionExpressions.isEmpty
case _ => false
}
}

private def isBucketedScanWithoutFilter(scan: FileSourceScanExec): Boolean = {
// Do not disable bucketed table scan if it has filter pruning,
// because bucketed table scan is still useful here to save CPU/IO cost with
// only reading selected bucket files.
scan.bucketedScan && scan.optionalBucketSet.isEmpty
}

def apply(plan: SparkPlan): SparkPlan = {
lazy val hasBucketedScanWithoutFilter = plan.find {
case scan: FileSourceScanExec => isBucketedScanWithoutFilter(scan)
case _ => false
}.isDefined

if (!conf.bucketingEnabled || !conf.autoBucketedScanEnabled || !hasBucketedScanWithoutFilter) {
plan
} else {
disableBucketWithInterestingPartition(plan, false, false, true)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ class DataFrameJoinSuite extends QueryTest
}
assert(broadcastExchanges.size == 1)
val tables = broadcastExchanges.head.collect {
case FileSourceScanExec(_, _, _, _, _, _, _, Some(tableIdent)) => tableIdent
case FileSourceScanExec(_, _, _, _, _, _, _, Some(tableIdent), _) => tableIdent
}
assert(tables.size == 1)
assert(tables.head === TableIdentifier(table1Name, Some(dbName)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1314,7 +1314,7 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
// need to execute the query before we can examine fs.inputRDDs()
assert(stripAQEPlan(df.queryExecution.executedPlan) match {
case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _, _)))) =>
fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _, _, _)))) =>
partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
fs.inputRDDs().forall(
_.asInstanceOf[FileScanRDD].filePartitions.forall(
Expand Down
Loading

0 comments on commit d6f3138

Please sign in to comment.