Skip to content

Commit

Permalink
Revert "[SPARK-35437][SQL] Use expressions to filter Hive partitions …
Browse files Browse the repository at this point in the history
…at client side"

This reverts commit f1727a6.
  • Loading branch information
HyukjinKwon committed Oct 27, 2021
1 parent 677aba2 commit fb9d6ae
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ import org.apache.hadoop.util.Shell

import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BasePredicate, BoundReference, Expression, Predicate}
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, Predicate}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType

object ExternalCatalogUtils {
// This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since catalyst doesn't
Expand Down Expand Up @@ -157,34 +156,27 @@ object ExternalCatalogUtils {
} else {
val partitionSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(
catalogTable.partitionSchema)
val boundPredicate = generatePartitionPredicateByFilter(catalogTable,
partitionSchema, predicates)
val partitionColumnNames = catalogTable.partitionColumnNames.toSet

inputPartitions.filter { p =>
boundPredicate.eval(p.toRow(partitionSchema, defaultTimeZoneId))
val nonPartitionPruningPredicates = predicates.filterNot {
_.references.map(_.name).toSet.subsetOf(partitionColumnNames)
}
if (nonPartitionPruningPredicates.nonEmpty) {
throw QueryCompilationErrors.nonPartitionPruningPredicatesNotExpectedError(
nonPartitionPruningPredicates)
}
}
}

def generatePartitionPredicateByFilter(
catalogTable: CatalogTable,
partitionSchema: StructType,
predicates: Seq[Expression]): BasePredicate = {
val partitionColumnNames = catalogTable.partitionColumnNames.toSet
val boundPredicate =
Predicate.createInterpreted(predicates.reduce(And).transform {
case att: AttributeReference =>
val index = partitionSchema.indexWhere(_.name == att.name)
BoundReference(index, partitionSchema(index).dataType, nullable = true)
})

val nonPartitionPruningPredicates = predicates.filterNot {
_.references.map(_.name).toSet.subsetOf(partitionColumnNames)
}
if (nonPartitionPruningPredicates.nonEmpty) {
throw QueryCompilationErrors.nonPartitionPruningPredicatesNotExpectedError(
nonPartitionPruningPredicates)
inputPartitions.filter { p =>
boundPredicate.eval(p.toRow(partitionSchema, defaultTimeZoneId))
}
}

Predicate.createInterpreted(predicates.reduce(And).transform {
case att: AttributeReference =>
val index = partitionSchema.indexWhere(_.name == att.name)
BoundReference(index, partitionSchema(index).dataType, nullable = true)
})
}

private def isNullPartitionValue(value: String): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1007,17 +1007,6 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val HIVE_METASTORE_PARTITION_PRUNING_FAST_FALLBACK =
buildConf("spark.sql.hive.metastorePartitionPruningFastFallback")
.doc("When this config is enabled, if the predicates are not supported by Hive or Spark " +
"does fallback due to encountering MetaException from the metastore, " +
"Spark will instead prune partitions by getting the partition names first " +
"and then evaluating the filter expressions on the client side. " +
"Note that the predicates with TimeZoneAwareExpression is not supported.")
.version("3.3.0")
.booleanConf
.createWithDefault(false)

val HIVE_MANAGE_FILESOURCE_PARTITIONS =
buildConf("spark.sql.hive.manageFilesourcePartitions")
.doc("When true, enable metastore partition management for file source tables as well. " +
Expand Down Expand Up @@ -3721,9 +3710,6 @@ class SQLConf extends Serializable with Logging {
def metastorePartitionPruningFallbackOnException: Boolean =
getConf(HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ON_EXCEPTION)

def metastorePartitionPruningFastFallback: Boolean =
getConf(HIVE_METASTORE_PARTITION_PRUNING_FAST_FALLBACK)

def manageFilesourcePartitions: Boolean = getConf(HIVE_MANAGE_FILESOURCE_PARTITIONS)

def filesourcePartitionFileCacheSize: Long = getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ private[hive] class HiveClientImpl(
table: CatalogTable,
predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState {
val hiveTable = toHiveTable(table, Some(userName))
val parts = shim.getPartitionsByFilter(client, hiveTable, predicates, table)
val parts = shim.getPartitionsByFilter(client, hiveTable, predicates)
.map(fromHivePartition)
HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
parts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,12 @@ import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde.serdeConstants

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow}
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTable, CatalogTablePartition, CatalogUtils, ExternalCatalogUtils, FunctionResource, FunctionResourceType}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, CatalogUtils, FunctionResource, FunctionResourceType}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateFormatter, TypeUtils}
import org.apache.spark.sql.catalyst.util.{DateFormatter, TypeUtils}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{AtomicType, DateType, IntegralType, StringType}
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -84,8 +82,7 @@ private[client] sealed abstract class Shim {
def getPartitionsByFilter(
hive: Hive,
table: Table,
predicates: Seq[Expression],
catalogTable: CatalogTable): Seq[Partition]
predicates: Seq[Expression]): Seq[Partition]

def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor

Expand Down Expand Up @@ -355,8 +352,7 @@ private[client] class Shim_v0_12 extends Shim with Logging {
override def getPartitionsByFilter(
hive: Hive,
table: Table,
predicates: Seq[Expression],
catalogTable: CatalogTable): Seq[Partition] = {
predicates: Seq[Expression]): Seq[Partition] = {
// getPartitionsByFilter() doesn't support binary comparison ops in Hive 0.12.
// See HIVE-4888.
logDebug("Hive 0.12 doesn't support predicate pushdown to metastore. " +
Expand Down Expand Up @@ -868,15 +864,15 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
override def getPartitionsByFilter(
hive: Hive,
table: Table,
predicates: Seq[Expression],
catalogTable: CatalogTable): Seq[Partition] = {
predicates: Seq[Expression]): Seq[Partition] = {

// Hive getPartitionsByFilter() takes a string that represents partition
// predicates like "str_key=\"value\" and int_key=1 ..."
val filter = convertFilters(table, predicates)

val partitions =
if (filter.isEmpty) {
prunePartitionsFastFallback(hive, table, catalogTable, predicates)
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]]
} else {
logDebug(s"Hive metastore filter is '$filter'.")
val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL
Expand All @@ -892,81 +888,25 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
// occurs and the config`spark.sql.hive.metastorePartitionPruningFallbackOnException` is
// enabled.
getPartitionsByFilterMethod.invoke(hive, table, filter)
.asInstanceOf[JArrayList[Partition]].asScala.toSeq
.asInstanceOf[JArrayList[Partition]]
} catch {
case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] &&
shouldFallback =>
logWarning("Caught Hive MetaException attempting to get partition metadata by " +
"filter from Hive. Falling back to fetching all partition metadata, which will " +
"degrade performance. Modifying your Hive metastore configuration to set " +
s"${tryDirectSqlConfVar.varname} to true (if it is not true already) may resolve " +
"this problem. Or you can enable " +
s"${SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FAST_FALLBACK.key} " +
"to alleviate performance downgrade. " +
"Otherwise, to avoid degraded performance you can set " +
"this problem. Otherwise, to avoid degraded performance you can set " +
s"${SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ON_EXCEPTION.key} " +
" to false and let the query fail instead.", ex)
// HiveShim clients are expected to handle a superset of the requested partitions
prunePartitionsFastFallback(hive, table, catalogTable, predicates)
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]]
case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] =>
throw QueryExecutionErrors.getPartitionMetadataByFilterError(ex)
}
}

partitions
}

private def prunePartitionsFastFallback(
hive: Hive,
table: Table,
catalogTable: CatalogTable,
predicates: Seq[Expression]): Seq[Partition] = {
val timeZoneId = SQLConf.get.sessionLocalTimeZone

// Because there is no way to know whether the partition properties has timeZone,
// client-side filtering cannot be used with TimeZoneAwareExpression.
def hasTimeZoneAwareExpression(e: Expression): Boolean = {
e.collectFirst {
case t: TimeZoneAwareExpression => t
}.isDefined
}

if (!SQLConf.get.metastorePartitionPruningFastFallback ||
predicates.isEmpty ||
predicates.exists(hasTimeZoneAwareExpression)) {
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].asScala.toSeq
} else {
try {
val partitionSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(
catalogTable.partitionSchema)
val boundPredicate = ExternalCatalogUtils.generatePartitionPredicateByFilter(
catalogTable, partitionSchema, predicates)

def toRow(spec: TablePartitionSpec): InternalRow = {
InternalRow.fromSeq(partitionSchema.map { field =>
val partValue = if (spec(field.name) == ExternalCatalogUtils.DEFAULT_PARTITION_NAME) {
null
} else {
spec(field.name)
}
Cast(Literal(partValue), field.dataType, Option(timeZoneId)).eval()
})
}

val allPartitionNames = hive.getPartitionNames(
table.getDbName, table.getTableName, -1).asScala
val partNames = allPartitionNames.filter { p =>
val spec = PartitioningUtils.parsePathFragment(p)
boundPredicate.eval(toRow(spec))
}
hive.getPartitionsByNames(table, partNames.asJava).asScala.toSeq
} catch {
case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] =>
logWarning("Caught Hive MetaException attempting to get partition metadata by " +
"filter from client side. Falling back to fetching all partition metadata", ex)
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].asScala.toSeq
}
}
partitions.asScala.toSeq
}

override def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ class HivePartitionFilteringSuite(version: String)

private val tryDirectSqlKey = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname
private val fallbackKey = SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ON_EXCEPTION.key
private val pruningFastFallback = SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FAST_FALLBACK.key

// Support default partition in metastoredirectsql since HIVE-11898(Hive 2.0.0).
private val defaultPartition = if (version >= "2.0") Some(DEFAULT_PARTITION_NAME) else None
Expand Down Expand Up @@ -578,60 +577,6 @@ class HivePartitionFilteringSuite(version: String)
dateStrValue)
}

test("getPartitionsByFilter: substr(chunk,0,1)=a") {
Seq("true" -> Seq("aa", "ab"), "false" -> chunkValue).foreach { t =>
withSQLConf(pruningFastFallback -> t._1) {
testMetastorePartitionFiltering(
Substring(attr("chunk"), Literal(0), Literal(1)) === "a",
dsValue,
hValue,
t._2,
dateValue,
dateStrValue)
}
}
}

test("getPartitionsByFilter: year(d)=2019") {
Seq("true" -> Seq("2019-01-01", "2019-01-02", "2019-01-03"),
"false" -> dateValue).foreach { t =>
withSQLConf(pruningFastFallback -> t._1) {
testMetastorePartitionFiltering(
Year(attr("d")) === 2019,
dsValue,
hValue,
chunkValue,
t._2,
dateStrValue)
}
}
}

test("getPartitionsByFilter: datestr=concat(2020-,01-,01)") {
Seq("true" -> Seq("2020-01-01"), "false" -> dateStrValue).foreach { t =>
withSQLConf(pruningFastFallback -> t._1) {
testMetastorePartitionFiltering(
attr("datestr") === Concat(Seq("2020-", "01-", "01")),
dsValue,
hValue,
chunkValue,
dateValue,
t._2)
}
}
}

test(s"getPartitionsByFilter: ds=20170101 when $fallbackKey=true") {
withSQLConf(fallbackKey -> "true", pruningFastFallback -> "true") {
val client = init(false)
val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test"),
Seq(attr("ds") === 20170101))

assert(filteredPartitions.size == 1 * hValue.size * chunkValue.size *
dateValue.size * dateStrValue.size)
}
}

private def testMetastorePartitionFiltering(
filterExpr: Expression,
expectedDs: Seq[Int],
Expand Down

0 comments on commit fb9d6ae

Please sign in to comment.