Skip to content

Commit 76c155d

Browse files
committed
[SPARK-7837] [SQL] Avoids double closing output writers when commitTask() fails
When inserting data into a `HadoopFsRelation`, if `commitTask()` of the writer container fails, `abortTask()` will be invoked. However, both `commitTask()` and `abortTask()` try to close the output writer(s). The problem is that, closing underlying writers may not be an idempotent operation. E.g., `ParquetRecordWriter.close()` throws NPE when called twice. Author: Cheng Lian <[email protected]> Closes apache#8236 from liancheng/spark-7837/double-closing.
1 parent f7efda3 commit 76c155d

File tree

2 files changed

+61
-6
lines changed

2 files changed

+61
-6
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala

+17-4
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,8 @@ private[sql] class DefaultWriterContainer(
217217
val writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext)
218218
writer.initConverter(dataSchema)
219219

220+
var writerClosed = false
221+
220222
// If anything below fails, we should abort the task.
221223
try {
222224
while (iterator.hasNext) {
@@ -235,7 +237,10 @@ private[sql] class DefaultWriterContainer(
235237
def commitTask(): Unit = {
236238
try {
237239
assert(writer != null, "OutputWriter instance should have been initialized")
238-
writer.close()
240+
if (!writerClosed) {
241+
writer.close()
242+
writerClosed = true
243+
}
239244
super.commitTask()
240245
} catch {
241246
case cause: Throwable =>
@@ -247,7 +252,10 @@ private[sql] class DefaultWriterContainer(
247252

248253
def abortTask(): Unit = {
249254
try {
250-
writer.close()
255+
if (!writerClosed) {
256+
writer.close()
257+
writerClosed = true
258+
}
251259
} finally {
252260
super.abortTask()
253261
}
@@ -275,6 +283,8 @@ private[sql] class DynamicPartitionWriterContainer(
275283
val outputWriters = new java.util.HashMap[InternalRow, OutputWriter]
276284
executorSideSetup(taskContext)
277285

286+
var outputWritersCleared = false
287+
278288
// Returns the partition key given an input row
279289
val getPartitionKey = UnsafeProjection.create(partitionColumns, inputSchema)
280290
// Returns the data columns to be written given an input row
@@ -379,8 +389,11 @@ private[sql] class DynamicPartitionWriterContainer(
379389
}
380390

381391
def clearOutputWriters(): Unit = {
382-
outputWriters.asScala.values.foreach(_.close())
383-
outputWriters.clear()
392+
if (!outputWritersCleared) {
393+
outputWriters.asScala.values.foreach(_.close())
394+
outputWriters.clear()
395+
outputWritersCleared = true
396+
}
384397
}
385398

386399
def commitTask(): Unit = {

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala

+44-2
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
424424

425425
configuration.set(
426426
"spark.sql.parquet.output.committer.class",
427-
classOf[BogusParquetOutputCommitter].getCanonicalName)
427+
classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName)
428428

429429
try {
430430
val message = intercept[SparkException] {
@@ -450,12 +450,54 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
450450
}.toString
451451
assert(errorMessage.contains("UnknownHostException"))
452452
}
453+
454+
test("SPARK-7837 Do not close output writer twice when commitTask() fails") {
455+
val clonedConf = new Configuration(configuration)
456+
457+
// Using a output committer that always fail when committing a task, so that both
458+
// `commitTask()` and `abortTask()` are invoked.
459+
configuration.set(
460+
"spark.sql.parquet.output.committer.class",
461+
classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName)
462+
463+
try {
464+
// Before fixing SPARK-7837, the following code results in an NPE because both
465+
// `commitTask()` and `abortTask()` try to close output writers.
466+
467+
withTempPath { dir =>
468+
val m1 = intercept[SparkException] {
469+
sqlContext.range(1).coalesce(1).write.parquet(dir.getCanonicalPath)
470+
}.getCause.getMessage
471+
assert(m1.contains("Intentional exception for testing purposes"))
472+
}
473+
474+
withTempPath { dir =>
475+
val m2 = intercept[SparkException] {
476+
val df = sqlContext.range(1).select('id as 'a, 'id as 'b).coalesce(1)
477+
df.write.partitionBy("a").parquet(dir.getCanonicalPath)
478+
}.getCause.getMessage
479+
assert(m2.contains("Intentional exception for testing purposes"))
480+
}
481+
} finally {
482+
// Hadoop 1 doesn't have `Configuration.unset`
483+
configuration.clear()
484+
clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
485+
}
486+
}
453487
}
454488

455-
class BogusParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
489+
class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
456490
extends ParquetOutputCommitter(outputPath, context) {
457491

458492
override def commitJob(jobContext: JobContext): Unit = {
459493
sys.error("Intentional exception for testing purposes")
460494
}
461495
}
496+
497+
class TaskCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
498+
extends ParquetOutputCommitter(outputPath, context) {
499+
500+
override def commitTask(context: TaskAttemptContext): Unit = {
501+
sys.error("Intentional exception for testing purposes")
502+
}
503+
}

0 commit comments

Comments
 (0)