diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 9633f9e15baff..f071df75816e3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -107,7 +107,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) new SparkPlanner(sparkSession.sparkContext, conf, experimentalMethods.extraStrategies) with HiveStrategies { override val sparkSession: SparkSession = self.sparkSession - override val hiveconf: HiveConf = self.hiveconf override def strategies: Seq[Strategy] = { experimentalMethods.extraStrategies ++ Seq( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 7d1f87f3909a2..71b180e55b58c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -17,16 +17,12 @@ package org.apache.spark.sql.hive -import org.apache.hadoop.hive.conf.HiveConf - import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.execution._ private[hive] trait HiveStrategies { @@ -34,13 +30,12 @@ private[hive] trait HiveStrategies { self: SparkPlanner => val sparkSession: SparkSession - val hiveconf: HiveConf object Scripts extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.ScriptTransformation(input, script, output, child, ioschema) => val hiveIoSchema = HiveScriptIOSchema(ioschema) - ScriptTransformation(input, script, output, planLater(child), hiveIoSchema)(hiveconf) :: Nil + ScriptTransformation(input, script, output, planLater(child), hiveIoSchema) :: Nil case _ => Nil } } @@ -78,7 +73,7 @@ private[hive] trait HiveStrategies { projectList, otherPredicates, identity[Seq[Expression]], - HiveTableScanExec(_, relation, pruningPredicates)(sparkSession, hiveconf)) :: Nil + HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) :: Nil case _ => Nil } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index af0317f7a164c..df6abc258b8f8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -17,17 +17,14 @@ package org.apache.spark.sql.hive -import java.util - +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, PathFilter} -import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._ import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable} import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.hadoop.hive.serde2.Deserializer -import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, - StructObjectInspector} +import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector} import org.apache.hadoop.hive.serde2.objectinspector.primitive._ import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf} @@ -62,7 +59,7 @@ class HadoopTableReader( @transient private val attributes: Seq[Attribute], @transient private val relation: MetastoreRelation, @transient private val sparkSession: SparkSession, - hiveconf: HiveConf) + hadoopConf: Configuration) extends TableReader with Logging { // Hadoop honors "mapred.map.tasks" as hint, but will ignore when mapred.job.tracker is "local". @@ -72,12 +69,15 @@ class HadoopTableReader( private val _minSplitsPerRDD = if (sparkSession.sparkContext.isLocal) { 0 // will splitted based on block by default. } else { - math.max(hiveconf.getInt("mapred.map.tasks", 1), sparkSession.sparkContext.defaultMinPartitions) + math.max(hadoopConf.getInt("mapred.map.tasks", 1), + sparkSession.sparkContext.defaultMinPartitions) } - SparkHadoopUtil.get.appendS3AndSparkHadoopConfigurations(sparkSession.sparkContext.conf, hiveconf) - private val _broadcastedHiveConf = - sparkSession.sparkContext.broadcast(new SerializableConfiguration(hiveconf)) + SparkHadoopUtil.get.appendS3AndSparkHadoopConfigurations( + sparkSession.sparkContext.conf, hadoopConf) + + private val _broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] = makeRDDForTable( @@ -105,7 +105,7 @@ class HadoopTableReader( // Create local references to member variables, so that the entire `this` object won't be // serialized in the closure below. val tableDesc = relation.tableDesc - val broadcastedHiveConf = _broadcastedHiveConf + val broadcastedHiveConf = _broadcastedHadoopConf val tablePath = hiveTable.getPath val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt) @@ -162,7 +162,7 @@ class HadoopTableReader( case (partition, partDeserializer) => def updateExistPathSetByPathPattern(pathPatternStr: String) { val pathPattern = new Path(pathPatternStr) - val fs = pathPattern.getFileSystem(hiveconf) + val fs = pathPattern.getFileSystem(hadoopConf) val matches = fs.globStatus(pathPattern) matches.foreach(fileStatus => existPathSet += fileStatus.getPath.toString) } @@ -209,7 +209,7 @@ class HadoopTableReader( // Create local references so that the outer object isn't serialized. val tableDesc = relation.tableDesc - val broadcastedHiveConf = _broadcastedHiveConf + val broadcastedHiveConf = _broadcastedHadoopConf val localDeserializer = partDeserializer val mutableRow = new SpecificMutableRow(attributes.map(_.dataType)) @@ -259,7 +259,7 @@ class HadoopTableReader( private def applyFilterIfNeeded(path: Path, filterOpt: Option[PathFilter]): String = { filterOpt match { case Some(filter) => - val fs = path.getFileSystem(hiveconf) + val fs = path.getFileSystem(hadoopConf) val filteredFiles = fs.listStatus(path, filter).map(_.getPath.toString) filteredFiles.mkString(",") case None => path.toString @@ -279,7 +279,7 @@ class HadoopTableReader( val rdd = new HadoopRDD( sparkSession.sparkContext, - _broadcastedHiveConf.asInstanceOf[Broadcast[SerializableConfiguration]], + _broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]], Some(initializeJobConfFunc), inputFormatClass, classOf[Writable], @@ -302,7 +302,7 @@ private[hive] object HiveTableUtil { val storageHandler = org.apache.hadoop.hive.ql.metadata.HiveUtils.getStorageHandler(jobConf, property) if (storageHandler != null) { - val jobProperties = new util.LinkedHashMap[String, String] + val jobProperties = new java.util.LinkedHashMap[String, String] if (input) { storageHandler.configureInputJobProperties(tableDesc, jobProperties) } else { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index cc5bbf59dbccc..007c3384e5701 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.execution import scala.collection.JavaConverters._ -import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.objectinspector._ @@ -48,8 +48,7 @@ case class HiveTableScanExec( requestedAttributes: Seq[Attribute], relation: MetastoreRelation, partitionPruningPred: Seq[Expression])( - @transient private val sparkSession: SparkSession, - @transient private val hiveconf: HiveConf) + @transient private val sparkSession: SparkSession) extends LeafExecNode { require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned, @@ -77,20 +76,20 @@ case class HiveTableScanExec( // Create a local copy of hiveconf,so that scan specific modifications should not impact // other queries @transient - private[this] val hiveExtraConf = new HiveConf(hiveconf) + private[this] val hadoopConf = sparkSession.sessionState.newHadoopConf() // append columns ids and names before broadcast - addColumnMetadataToConf(hiveExtraConf) + addColumnMetadataToConf(hadoopConf) @transient private[this] val hadoopReader = - new HadoopTableReader(attributes, relation, sparkSession, hiveExtraConf) + new HadoopTableReader(attributes, relation, sparkSession, hadoopConf) private[this] def castFromString(value: String, dataType: DataType) = { Cast(Literal(value), dataType).eval(null) } - private def addColumnMetadataToConf(hiveConf: HiveConf) { + private def addColumnMetadataToConf(hiveConf: Configuration) { // Specifies needed column IDs for those non-partitioning columns. val neededColumnIDs = attributes.flatMap(relation.columnOrdinals.get).map(o => o: Integer) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index f27337eb36a64..f6e6a75c3ee58 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -26,7 +26,6 @@ import scala.collection.JavaConverters._ import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.exec.{RecordReader, RecordWriter} import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.AbstractSerDe @@ -58,17 +57,14 @@ case class ScriptTransformation( script: String, output: Seq[Attribute], child: SparkPlan, - ioschema: HiveScriptIOSchema)(@transient private val hiveconf: HiveConf) + ioschema: HiveScriptIOSchema) extends UnaryExecNode { - override protected def otherCopyArgs: Seq[HiveConf] = hiveconf :: Nil - override def producedAttributes: AttributeSet = outputSet -- inputSet - private val serializedHiveConf = new SerializableConfiguration(hiveconf) - protected override def doExecute(): RDD[InternalRow] = { - def processIterator(inputIterator: Iterator[InternalRow]): Iterator[InternalRow] = { + def processIterator(inputIterator: Iterator[InternalRow], hadoopConf: Configuration) + : Iterator[InternalRow] = { val cmd = List("/bin/bash", "-c", script) val builder = new ProcessBuilder(cmd.asJava) @@ -76,7 +72,6 @@ case class ScriptTransformation( val inputStream = proc.getInputStream val outputStream = proc.getOutputStream val errorStream = proc.getErrorStream - val localHiveConf = serializedHiveConf.value // In order to avoid deadlocks, we need to consume the error output of the child process. // To avoid issues caused by large error output, we use a circular buffer to limit the amount @@ -107,7 +102,7 @@ case class ScriptTransformation( proc, stderrBuffer, TaskContext.get(), - localHiveConf + hadoopConf ) // This nullability is a performance optimization in order to avoid an Option.foreach() call @@ -122,7 +117,7 @@ case class ScriptTransformation( val scriptOutputStream = new DataInputStream(inputStream) @Nullable val scriptOutputReader = - ioschema.recordReader(scriptOutputStream, localHiveConf).orNull + ioschema.recordReader(scriptOutputStream, hadoopConf).orNull var scriptOutputWritable: Writable = null val reusedWritableObject: Writable = if (null != outputSerde) { @@ -214,10 +209,13 @@ case class ScriptTransformation( outputIterator } + val broadcastedHadoopConf = + new SerializableConfiguration(sqlContext.sessionState.newHadoopConf()) + child.execute().mapPartitions { iter => if (iter.hasNext) { val proj = UnsafeProjection.create(schema) - processIterator(iter).map(proj) + processIterator(iter, broadcastedHadoopConf.value).map(proj) } else { // If the input iterator has no rows then do not launch the external script. Iterator.empty diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala index 1a15fb741a4ba..19e8025d6b7c9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala @@ -58,7 +58,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { output = Seq(AttributeReference("a", StringType)()), child = child, ioschema = noSerdeIOSchema - )(hiveContext.sessionState.hiveconf), + ), rowsDf.collect()) } @@ -72,7 +72,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { output = Seq(AttributeReference("a", StringType)()), child = child, ioschema = serdeIOSchema - )(hiveContext.sessionState.hiveconf), + ), rowsDf.collect()) } @@ -87,7 +87,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { output = Seq(AttributeReference("a", StringType)()), child = ExceptionInjectingOperator(child), ioschema = noSerdeIOSchema - )(hiveContext.sessionState.hiveconf), + ), rowsDf.collect()) } assert(e.getMessage().contains("intentional exception")) @@ -104,7 +104,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { output = Seq(AttributeReference("a", StringType)()), child = ExceptionInjectingOperator(child), ioschema = serdeIOSchema - )(hiveContext.sessionState.hiveconf), + ), rowsDf.collect()) } assert(e.getMessage().contains("intentional exception"))