Skip to content

Commit 4494cd9

Browse files
cloud-fangatorsmile
authored andcommitted
[SPARK-18243][SQL] Port Hive writing to use FileFormat interface
## What changes were proposed in this pull request? Inserting data into Hive tables has its own implementation that is distinct from data sources: `InsertIntoHiveTable`, `SparkHiveWriterContainer` and `SparkHiveDynamicPartitionWriterContainer`. Note that one other major difference is that data source tables write directly to the final destination without using some staging directory, and then Spark itself adds the partitions/tables to the catalog. Hive tables actually write to some staging directory, and then call Hive metastore's loadPartition/loadTable function to load those data in. So we still need to keep `InsertIntoHiveTable` to put this special logic. In the future, we should think of writing to the hive table location directly, so that we don't need to call `loadTable`/`loadPartition` at the end and remove `InsertIntoHiveTable`. This PR removes `SparkHiveWriterContainer` and `SparkHiveDynamicPartitionWriterContainer`, and create a `HiveFileFormat` to implement the write logic. In the future, we should also implement the read logic in `HiveFileFormat`. ## How was this patch tested? existing tests Author: Wenchen Fan <[email protected]> Closes apache#16517 from cloud-fan/insert-hive.
1 parent e7f982b commit 4494cd9

File tree

15 files changed

+318
-533
lines changed

15 files changed

+318
-533
lines changed

core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
9999
}
100100

101101
private def getFilename(taskContext: TaskAttemptContext, ext: String): String = {
102-
// The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
102+
// The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet
103103
// Note that %05d does not truncate the split number, so if we have more than 100000 tasks,
104104
// the file name is fine and won't overflow.
105105
val split = taskContext.getTaskAttemptID.getTaskID.getId

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

+14-19
Original file line numberDiff line numberDiff line change
@@ -108,35 +108,30 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
108108

109109

110110
/**
111-
* Returns the result as a hive compatible sequence of strings. For native commands, the
112-
* execution is simply passed back to Hive.
111+
* Returns the result as a hive compatible sequence of strings. This is for testing only.
113112
*/
114113
def hiveResultString(): Seq[String] = executedPlan match {
115114
case ExecutedCommandExec(desc: DescribeTableCommand) =>
116-
SQLExecution.withNewExecutionId(sparkSession, this) {
117-
// If it is a describe command for a Hive table, we want to have the output format
118-
// be similar with Hive.
119-
desc.run(sparkSession).map {
120-
case Row(name: String, dataType: String, comment) =>
121-
Seq(name, dataType,
122-
Option(comment.asInstanceOf[String]).getOrElse(""))
123-
.map(s => String.format(s"%-20s", s))
124-
.mkString("\t")
125-
}
115+
// If it is a describe command for a Hive table, we want to have the output format
116+
// be similar with Hive.
117+
desc.run(sparkSession).map {
118+
case Row(name: String, dataType: String, comment) =>
119+
Seq(name, dataType,
120+
Option(comment.asInstanceOf[String]).getOrElse(""))
121+
.map(s => String.format(s"%-20s", s))
122+
.mkString("\t")
126123
}
127124
// SHOW TABLES in Hive only output table names, while ours outputs database, table name, isTemp.
128125
case command: ExecutedCommandExec if command.cmd.isInstanceOf[ShowTablesCommand] =>
129126
command.executeCollect().map(_.getString(1))
130127
case command: ExecutedCommandExec =>
131128
command.executeCollect().map(_.getString(0))
132129
case other =>
133-
SQLExecution.withNewExecutionId(sparkSession, this) {
134-
val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq
135-
// We need the types so we can output struct field names
136-
val types = analyzed.output.map(_.dataType)
137-
// Reformat to match hive tab delimited output.
138-
result.map(_.zip(types).map(toHiveString)).map(_.mkString("\t")).toSeq
139-
}
130+
val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq
131+
// We need the types so we can output struct field names
132+
val types = analyzed.output.map(_.dataType)
133+
// Reformat to match hive tab delimited output.
134+
result.map(_.zip(types).map(toHiveString)).map(_.mkString("\t"))
140135
}
141136

142137
/** Formats a datum (based on the given data type) and returns the string representation. */

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
6666
PreprocessTableInsertion(conf) ::
6767
DataSourceAnalysis(conf) ::
6868
new DetermineHiveSerde(conf) ::
69+
new HiveAnalysis(sparkSession) ::
6970
new ResolveDataSource(sparkSession) :: Nil
7071

7172
override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog))
@@ -88,7 +89,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
8889
SpecialLimits,
8990
InMemoryScans,
9091
HiveTableScans,
91-
DataSinks,
9292
Scripts,
9393
Aggregation,
9494
JoinSelection,

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

+46-31
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@ import org.apache.spark.sql._
2121
import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
2222
import org.apache.spark.sql.catalyst.expressions._
2323
import org.apache.spark.sql.catalyst.planning._
24-
import org.apache.spark.sql.catalyst.plans._
25-
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
24+
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation}
2625
import org.apache.spark.sql.catalyst.rules.Rule
2726
import org.apache.spark.sql.execution._
28-
import org.apache.spark.sql.execution.command.{DDLUtils, ExecutedCommandExec}
27+
import org.apache.spark.sql.execution.command.DDLUtils
2928
import org.apache.spark.sql.execution.datasources.CreateTable
3029
import org.apache.spark.sql.hive.execution._
3130
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
31+
import org.apache.spark.sql.types.StructType
3232

3333

3434
/**
@@ -86,6 +86,47 @@ class DetermineHiveSerde(conf: SQLConf) extends Rule[LogicalPlan] {
8686
}
8787
}
8888

89+
class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
90+
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
91+
case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, ifNotExists)
92+
if hasBeenPreprocessed(table.output, table.partitionKeys.toStructType, partSpec, query) =>
93+
InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists)
94+
95+
case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
96+
// Currently `DataFrameWriter.saveAsTable` doesn't support the Append mode of hive serde
97+
// tables yet.
98+
if (mode == SaveMode.Append) {
99+
throw new AnalysisException(
100+
"CTAS for hive serde tables does not support append semantics.")
101+
}
102+
103+
val dbName = tableDesc.identifier.database.getOrElse(session.catalog.currentDatabase)
104+
CreateHiveTableAsSelectCommand(
105+
tableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))),
106+
query,
107+
mode == SaveMode.Ignore)
108+
}
109+
110+
/**
111+
* Returns true if the [[InsertIntoTable]] plan has already been preprocessed by analyzer rule
112+
* [[PreprocessTableInsertion]]. It is important that this rule([[HiveAnalysis]]) has to
113+
* be run after [[PreprocessTableInsertion]], to normalize the column names in partition spec and
114+
* fix the schema mismatch by adding Cast.
115+
*/
116+
private def hasBeenPreprocessed(
117+
tableOutput: Seq[Attribute],
118+
partSchema: StructType,
119+
partSpec: Map[String, Option[String]],
120+
query: LogicalPlan): Boolean = {
121+
val partColNames = partSchema.map(_.name).toSet
122+
query.resolved && partSpec.keys.forall(partColNames.contains) && {
123+
val staticPartCols = partSpec.filter(_._2.isDefined).keySet
124+
val expectedColumns = tableOutput.filterNot(a => staticPartCols.contains(a.name))
125+
expectedColumns.toStructType.sameType(query.schema)
126+
}
127+
}
128+
}
129+
89130
private[hive] trait HiveStrategies {
90131
// Possibly being too clever with types here... or not clever enough.
91132
self: SparkPlanner =>
@@ -94,35 +135,9 @@ private[hive] trait HiveStrategies {
94135

95136
object Scripts extends Strategy {
96137
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
97-
case logical.ScriptTransformation(input, script, output, child, ioschema) =>
138+
case ScriptTransformation(input, script, output, child, ioschema) =>
98139
val hiveIoSchema = HiveScriptIOSchema(ioschema)
99-
ScriptTransformation(input, script, output, planLater(child), hiveIoSchema) :: Nil
100-
case _ => Nil
101-
}
102-
}
103-
104-
object DataSinks extends Strategy {
105-
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
106-
case logical.InsertIntoTable(
107-
table: MetastoreRelation, partition, child, overwrite, ifNotExists) =>
108-
InsertIntoHiveTable(
109-
table, partition, planLater(child), overwrite, ifNotExists) :: Nil
110-
111-
case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
112-
// Currently `DataFrameWriter.saveAsTable` doesn't support
113-
// the Append mode of hive serde tables yet.
114-
if (mode == SaveMode.Append) {
115-
throw new AnalysisException(
116-
"CTAS for hive serde tables does not support append semantics.")
117-
}
118-
119-
val dbName = tableDesc.identifier.database.getOrElse(sparkSession.catalog.currentDatabase)
120-
val cmd = CreateHiveTableAsSelectCommand(
121-
tableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))),
122-
query,
123-
mode == SaveMode.Ignore)
124-
ExecutedCommandExec(cmd) :: Nil
125-
140+
ScriptTransformationExec(input, script, output, planLater(child), hiveIoSchema) :: Nil
126141
case _ => Nil
127142
}
128143
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -311,10 +311,10 @@ private[hive] object HiveTableUtil {
311311
// that calls Hive.get() which tries to access metastore, but it's not valid in runtime
312312
// it would be fixed in next version of hive but till then, we should use this instead
313313
def configureJobPropertiesForStorageHandler(
314-
tableDesc: TableDesc, jobConf: JobConf, input: Boolean) {
314+
tableDesc: TableDesc, conf: Configuration, input: Boolean) {
315315
val property = tableDesc.getProperties.getProperty(META_TABLE_STORAGE)
316316
val storageHandler =
317-
org.apache.hadoop.hive.ql.metadata.HiveUtils.getStorageHandler(jobConf, property)
317+
org.apache.hadoop.hive.ql.metadata.HiveUtils.getStorageHandler(conf, property)
318318
if (storageHandler != null) {
319319
val jobProperties = new java.util.LinkedHashMap[String, String]
320320
if (input) {

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive
1919

2020
/** Support for interacting with different versions of the HiveMetastoreClient */
2121
package object client {
22-
private[client] abstract class HiveVersion(
22+
private[hive] abstract class HiveVersion(
2323
val fullVersion: String,
2424
val extraDeps: Seq[String] = Nil,
2525
val exclusions: Seq[String] = Nil)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive.execution
19+
20+
import scala.collection.JavaConverters._
21+
22+
import org.apache.hadoop.fs.{FileStatus, Path}
23+
import org.apache.hadoop.hive.ql.exec.Utilities
24+
import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
25+
import org.apache.hadoop.hive.serde2.Serializer
26+
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector}
27+
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
28+
import org.apache.hadoop.io.Writable
29+
import org.apache.hadoop.mapred.{JobConf, Reporter}
30+
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
31+
32+
import org.apache.spark.internal.Logging
33+
import org.apache.spark.sql.SparkSession
34+
import org.apache.spark.sql.catalyst.InternalRow
35+
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, OutputWriterFactory}
36+
import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil}
37+
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
38+
import org.apache.spark.sql.types.StructType
39+
import org.apache.spark.util.SerializableJobConf
40+
41+
/**
42+
* `FileFormat` for writing Hive tables.
43+
*
44+
* TODO: implement the read logic.
45+
*/
46+
class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat with Logging {
47+
override def inferSchema(
48+
sparkSession: SparkSession,
49+
options: Map[String, String],
50+
files: Seq[FileStatus]): Option[StructType] = {
51+
throw new UnsupportedOperationException(s"inferSchema is not supported for hive data source.")
52+
}
53+
54+
override def prepareWrite(
55+
sparkSession: SparkSession,
56+
job: Job,
57+
options: Map[String, String],
58+
dataSchema: StructType): OutputWriterFactory = {
59+
val conf = job.getConfiguration
60+
val tableDesc = fileSinkConf.getTableInfo
61+
conf.set("mapred.output.format.class", tableDesc.getOutputFileFormatClassName)
62+
63+
// When speculation is on and output committer class name contains "Direct", we should warn
64+
// users that they may loss data if they are using a direct output committer.
65+
val speculationEnabled = sparkSession.sparkContext.conf.getBoolean("spark.speculation", false)
66+
val outputCommitterClass = conf.get("mapred.output.committer.class", "")
67+
if (speculationEnabled && outputCommitterClass.contains("Direct")) {
68+
val warningMessage =
69+
s"$outputCommitterClass may be an output committer that writes data directly to " +
70+
"the final location. Because speculation is enabled, this output committer may " +
71+
"cause data loss (see the case in SPARK-10063). If possible, please use an output " +
72+
"committer that does not have this behavior (e.g. FileOutputCommitter)."
73+
logWarning(warningMessage)
74+
}
75+
76+
// Add table properties from storage handler to hadoopConf, so any custom storage
77+
// handler settings can be set to hadoopConf
78+
HiveTableUtil.configureJobPropertiesForStorageHandler(tableDesc, conf, false)
79+
Utilities.copyTableJobPropertiesToConf(tableDesc, conf)
80+
81+
// Avoid referencing the outer object.
82+
val fileSinkConfSer = fileSinkConf
83+
new OutputWriterFactory {
84+
private val jobConf = new SerializableJobConf(new JobConf(conf))
85+
@transient private lazy val outputFormat =
86+
jobConf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]]
87+
88+
override def getFileExtension(context: TaskAttemptContext): String = {
89+
Utilities.getFileExtension(jobConf.value, fileSinkConfSer.getCompressed, outputFormat)
90+
}
91+
92+
override def newInstance(
93+
path: String,
94+
dataSchema: StructType,
95+
context: TaskAttemptContext): OutputWriter = {
96+
new HiveOutputWriter(path, fileSinkConfSer, jobConf.value, dataSchema)
97+
}
98+
}
99+
}
100+
}
101+
102+
class HiveOutputWriter(
103+
path: String,
104+
fileSinkConf: FileSinkDesc,
105+
jobConf: JobConf,
106+
dataSchema: StructType) extends OutputWriter with HiveInspectors {
107+
108+
private def tableDesc = fileSinkConf.getTableInfo
109+
110+
private val serializer = {
111+
val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
112+
serializer.initialize(null, tableDesc.getProperties)
113+
serializer
114+
}
115+
116+
private val hiveWriter = HiveFileFormatUtils.getHiveRecordWriter(
117+
jobConf,
118+
tableDesc,
119+
serializer.getSerializedClass,
120+
fileSinkConf,
121+
new Path(path),
122+
Reporter.NULL)
123+
124+
private val standardOI = ObjectInspectorUtils
125+
.getStandardObjectInspector(
126+
tableDesc.getDeserializer.getObjectInspector,
127+
ObjectInspectorCopyOption.JAVA)
128+
.asInstanceOf[StructObjectInspector]
129+
130+
private val fieldOIs =
131+
standardOI.getAllStructFieldRefs.asScala.map(_.getFieldObjectInspector).toArray
132+
private val dataTypes = dataSchema.map(_.dataType).toArray
133+
private val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt) }
134+
private val outputData = new Array[Any](fieldOIs.length)
135+
136+
override def write(row: InternalRow): Unit = {
137+
var i = 0
138+
while (i < fieldOIs.length) {
139+
outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, dataTypes(i)))
140+
i += 1
141+
}
142+
hiveWriter.write(serializer.serialize(outputData, standardOI))
143+
}
144+
145+
override def close(): Unit = {
146+
// Seems the boolean value passed into close does not matter.
147+
hiveWriter.close(false)
148+
}
149+
}

0 commit comments

Comments
 (0)