Skip to content

Commit

Permalink
[SPARK-14944][SPARK-14943][SQL] Remove HiveConf from HiveTableScanExe…
Browse files Browse the repository at this point in the history
…c, HiveTableReader, and ScriptTransformation

## What changes were proposed in this pull request?
This patch removes HiveConf from HiveTableScanExec and HiveTableReader and instead just uses our own configuration system. I'm splitting the large change of removing HiveConf into multiple independent pull requests because it is very difficult to debug test failures when they are all combined in one giant one.

## How was this patch tested?
Should be covered by existing tests.

Author: Reynold Xin <[email protected]>

Closes apache#12727 from rxin/SPARK-14944.
  • Loading branch information
rxin authored and yhuai committed Apr 27, 2016
1 parent b2a4560 commit d73d67f
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
new SparkPlanner(sparkSession.sparkContext, conf, experimentalMethods.extraStrategies)
with HiveStrategies {
override val sparkSession: SparkSession = self.sparkSession
override val hiveconf: HiveConf = self.hiveconf

override def strategies: Seq[Strategy] = {
experimentalMethods.extraStrategies ++ Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,25 @@

package org.apache.spark.sql.hive

import org.apache.hadoop.hive.conf.HiveConf

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.execution._

private[hive] trait HiveStrategies {
// Possibly being too clever with types here... or not clever enough.
self: SparkPlanner =>

val sparkSession: SparkSession
val hiveconf: HiveConf

object Scripts extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.ScriptTransformation(input, script, output, child, ioschema) =>
val hiveIoSchema = HiveScriptIOSchema(ioschema)
ScriptTransformation(input, script, output, planLater(child), hiveIoSchema)(hiveconf) :: Nil
ScriptTransformation(input, script, output, planLater(child), hiveIoSchema) :: Nil
case _ => Nil
}
}
Expand Down Expand Up @@ -78,7 +73,7 @@ private[hive] trait HiveStrategies {
projectList,
otherPredicates,
identity[Seq[Expression]],
HiveTableScanExec(_, relation, pruningPredicates)(sparkSession, hiveconf)) :: Nil
HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) :: Nil
case _ =>
Nil
}
Expand Down
32 changes: 16 additions & 16 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,14 @@

package org.apache.spark.sql.hive

import java.util

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._
import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.serde2.Deserializer
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters,
StructObjectInspector}
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector}
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
Expand Down Expand Up @@ -62,7 +59,7 @@ class HadoopTableReader(
@transient private val attributes: Seq[Attribute],
@transient private val relation: MetastoreRelation,
@transient private val sparkSession: SparkSession,
hiveconf: HiveConf)
hadoopConf: Configuration)
extends TableReader with Logging {

// Hadoop honors "mapred.map.tasks" as hint, but will ignore when mapred.job.tracker is "local".
Expand All @@ -72,12 +69,15 @@ class HadoopTableReader(
private val _minSplitsPerRDD = if (sparkSession.sparkContext.isLocal) {
0 // will splitted based on block by default.
} else {
math.max(hiveconf.getInt("mapred.map.tasks", 1), sparkSession.sparkContext.defaultMinPartitions)
math.max(hadoopConf.getInt("mapred.map.tasks", 1),
sparkSession.sparkContext.defaultMinPartitions)
}

SparkHadoopUtil.get.appendS3AndSparkHadoopConfigurations(sparkSession.sparkContext.conf, hiveconf)
private val _broadcastedHiveConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hiveconf))
SparkHadoopUtil.get.appendS3AndSparkHadoopConfigurations(
sparkSession.sparkContext.conf, hadoopConf)

private val _broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] =
makeRDDForTable(
Expand Down Expand Up @@ -105,7 +105,7 @@ class HadoopTableReader(
// Create local references to member variables, so that the entire `this` object won't be
// serialized in the closure below.
val tableDesc = relation.tableDesc
val broadcastedHiveConf = _broadcastedHiveConf
val broadcastedHiveConf = _broadcastedHadoopConf

val tablePath = hiveTable.getPath
val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt)
Expand Down Expand Up @@ -162,7 +162,7 @@ class HadoopTableReader(
case (partition, partDeserializer) =>
def updateExistPathSetByPathPattern(pathPatternStr: String) {
val pathPattern = new Path(pathPatternStr)
val fs = pathPattern.getFileSystem(hiveconf)
val fs = pathPattern.getFileSystem(hadoopConf)
val matches = fs.globStatus(pathPattern)
matches.foreach(fileStatus => existPathSet += fileStatus.getPath.toString)
}
Expand Down Expand Up @@ -209,7 +209,7 @@ class HadoopTableReader(

// Create local references so that the outer object isn't serialized.
val tableDesc = relation.tableDesc
val broadcastedHiveConf = _broadcastedHiveConf
val broadcastedHiveConf = _broadcastedHadoopConf
val localDeserializer = partDeserializer
val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))

Expand Down Expand Up @@ -259,7 +259,7 @@ class HadoopTableReader(
private def applyFilterIfNeeded(path: Path, filterOpt: Option[PathFilter]): String = {
filterOpt match {
case Some(filter) =>
val fs = path.getFileSystem(hiveconf)
val fs = path.getFileSystem(hadoopConf)
val filteredFiles = fs.listStatus(path, filter).map(_.getPath.toString)
filteredFiles.mkString(",")
case None => path.toString
Expand All @@ -279,7 +279,7 @@ class HadoopTableReader(

val rdd = new HadoopRDD(
sparkSession.sparkContext,
_broadcastedHiveConf.asInstanceOf[Broadcast[SerializableConfiguration]],
_broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]],
Some(initializeJobConfFunc),
inputFormatClass,
classOf[Writable],
Expand All @@ -302,7 +302,7 @@ private[hive] object HiveTableUtil {
val storageHandler =
org.apache.hadoop.hive.ql.metadata.HiveUtils.getStorageHandler(jobConf, property)
if (storageHandler != null) {
val jobProperties = new util.LinkedHashMap[String, String]
val jobProperties = new java.util.LinkedHashMap[String, String]
if (input) {
storageHandler.configureInputJobProperties(tableDesc, jobProperties)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.execution

import scala.collection.JavaConverters._

import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition}
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.objectinspector._
Expand Down Expand Up @@ -48,8 +48,7 @@ case class HiveTableScanExec(
requestedAttributes: Seq[Attribute],
relation: MetastoreRelation,
partitionPruningPred: Seq[Expression])(
@transient private val sparkSession: SparkSession,
@transient private val hiveconf: HiveConf)
@transient private val sparkSession: SparkSession)
extends LeafExecNode {

require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned,
Expand Down Expand Up @@ -77,20 +76,20 @@ case class HiveTableScanExec(
// Create a local copy of hiveconf,so that scan specific modifications should not impact
// other queries
@transient
private[this] val hiveExtraConf = new HiveConf(hiveconf)
private[this] val hadoopConf = sparkSession.sessionState.newHadoopConf()

// append columns ids and names before broadcast
addColumnMetadataToConf(hiveExtraConf)
addColumnMetadataToConf(hadoopConf)

@transient
private[this] val hadoopReader =
new HadoopTableReader(attributes, relation, sparkSession, hiveExtraConf)
new HadoopTableReader(attributes, relation, sparkSession, hadoopConf)

private[this] def castFromString(value: String, dataType: DataType) = {
Cast(Literal(value), dataType).eval(null)
}

private def addColumnMetadataToConf(hiveConf: HiveConf) {
private def addColumnMetadataToConf(hiveConf: Configuration) {
// Specifies needed column IDs for those non-partitioning columns.
val neededColumnIDs = attributes.flatMap(relation.columnOrdinals.get).map(o => o: Integer)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.exec.{RecordReader, RecordWriter}
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.AbstractSerDe
Expand Down Expand Up @@ -58,25 +57,21 @@ case class ScriptTransformation(
script: String,
output: Seq[Attribute],
child: SparkPlan,
ioschema: HiveScriptIOSchema)(@transient private val hiveconf: HiveConf)
ioschema: HiveScriptIOSchema)
extends UnaryExecNode {

override protected def otherCopyArgs: Seq[HiveConf] = hiveconf :: Nil

override def producedAttributes: AttributeSet = outputSet -- inputSet

private val serializedHiveConf = new SerializableConfiguration(hiveconf)

protected override def doExecute(): RDD[InternalRow] = {
def processIterator(inputIterator: Iterator[InternalRow]): Iterator[InternalRow] = {
def processIterator(inputIterator: Iterator[InternalRow], hadoopConf: Configuration)
: Iterator[InternalRow] = {
val cmd = List("/bin/bash", "-c", script)
val builder = new ProcessBuilder(cmd.asJava)

val proc = builder.start()
val inputStream = proc.getInputStream
val outputStream = proc.getOutputStream
val errorStream = proc.getErrorStream
val localHiveConf = serializedHiveConf.value

// In order to avoid deadlocks, we need to consume the error output of the child process.
// To avoid issues caused by large error output, we use a circular buffer to limit the amount
Expand Down Expand Up @@ -107,7 +102,7 @@ case class ScriptTransformation(
proc,
stderrBuffer,
TaskContext.get(),
localHiveConf
hadoopConf
)

// This nullability is a performance optimization in order to avoid an Option.foreach() call
Expand All @@ -122,7 +117,7 @@ case class ScriptTransformation(
val scriptOutputStream = new DataInputStream(inputStream)

@Nullable val scriptOutputReader =
ioschema.recordReader(scriptOutputStream, localHiveConf).orNull
ioschema.recordReader(scriptOutputStream, hadoopConf).orNull

var scriptOutputWritable: Writable = null
val reusedWritableObject: Writable = if (null != outputSerde) {
Expand Down Expand Up @@ -214,10 +209,13 @@ case class ScriptTransformation(
outputIterator
}

val broadcastedHadoopConf =
new SerializableConfiguration(sqlContext.sessionState.newHadoopConf())

child.execute().mapPartitions { iter =>
if (iter.hasNext) {
val proj = UnsafeProjection.create(schema)
processIterator(iter).map(proj)
processIterator(iter, broadcastedHadoopConf.value).map(proj)
} else {
// If the input iterator has no rows then do not launch the external script.
Iterator.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
output = Seq(AttributeReference("a", StringType)()),
child = child,
ioschema = noSerdeIOSchema
)(hiveContext.sessionState.hiveconf),
),
rowsDf.collect())
}

Expand All @@ -72,7 +72,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
output = Seq(AttributeReference("a", StringType)()),
child = child,
ioschema = serdeIOSchema
)(hiveContext.sessionState.hiveconf),
),
rowsDf.collect())
}

Expand All @@ -87,7 +87,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
output = Seq(AttributeReference("a", StringType)()),
child = ExceptionInjectingOperator(child),
ioschema = noSerdeIOSchema
)(hiveContext.sessionState.hiveconf),
),
rowsDf.collect())
}
assert(e.getMessage().contains("intentional exception"))
Expand All @@ -104,7 +104,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
output = Seq(AttributeReference("a", StringType)()),
child = ExceptionInjectingOperator(child),
ioschema = serdeIOSchema
)(hiveContext.sessionState.hiveconf),
),
rowsDf.collect())
}
assert(e.getMessage().contains("intentional exception"))
Expand Down

0 comments on commit d73d67f

Please sign in to comment.