Skip to content

Commit 8e7d5ba

Browse files
committedAug 4, 2014
SPARK-2792. Fix reading too much or too little data from each stream in ExternalMap / Sorter
All these changes are from mridulm's work in apache#1609, but extracted here to fix this specific issue and make it easier to merge not 1.1. This particular set of changes is to make sure that we read exactly the right range of bytes from each spill file in EAOM: some serializers can write bytes after the last object (e.g. the TC_RESET flag in Java serialization) and that would confuse the previous code into reading it as part of the next batch. There are also improvements to cleanup to make sure files are closed. In addition to bringing in the changes to ExternalAppendOnlyMap, I also copied them to the corresponding code in ExternalSorter and updated its test suite to test for the same issues. Author: Matei Zaharia <[email protected]> Closes apache#1722 from mateiz/spark-2792 and squashes the following commits: 5d4bfb5 [Matei Zaharia] Make objectStreamReset counter count the last object written too 18fe865 [Matei Zaharia] Update docs on objectStreamReset 576ee83 [Matei Zaharia] Allow objectStreamReset to be 0 0374217 [Matei Zaharia] Remove super paranoid code to close file handles bda37bb [Matei Zaharia] Implement Mridul's ExternalAppendOnlyMap fixes in ExternalSorter too 0d6dad7 [Matei Zaharia] Added Mridul's test changes for ExternalAppendOnlyMap 9a78e4b [Matei Zaharia] Add @mridulm's fixes to ExternalAppendOnlyMap for batch sizes
1 parent 59f84a9 commit 8e7d5ba

File tree

6 files changed

+194
-83
lines changed

6 files changed

+194
-83
lines changed
 

‎core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala

+2-3
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,15 @@ private[spark] class JavaSerializationStream(out: OutputStream, counterReset: In
3535
/**
3636
* Calling reset to avoid memory leak:
3737
* http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api
38-
* But only call it every 10,000th time to avoid bloated serialization streams (when
38+
* But only call it every 100th time to avoid bloated serialization streams (when
3939
* the stream 'resets' object class descriptions have to be re-written)
4040
*/
4141
def writeObject[T: ClassTag](t: T): SerializationStream = {
4242
objOut.writeObject(t)
43+
counter += 1
4344
if (counterReset > 0 && counter >= counterReset) {
4445
objOut.reset()
4546
counter = 0
46-
} else {
47-
counter += 1
4847
}
4948
this
5049
}

‎core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

+65-21
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.util.collection
1919

20-
import java.io.{InputStream, BufferedInputStream, FileInputStream, File, Serializable, EOFException}
20+
import java.io._
2121
import java.util.Comparator
2222

2323
import scala.collection.BufferedIterator
@@ -28,7 +28,7 @@ import com.google.common.io.ByteStreams
2828

2929
import org.apache.spark.{Logging, SparkEnv}
3030
import org.apache.spark.annotation.DeveloperApi
31-
import org.apache.spark.serializer.Serializer
31+
import org.apache.spark.serializer.{DeserializationStream, Serializer}
3232
import org.apache.spark.storage.{BlockId, BlockManager}
3333
import org.apache.spark.util.collection.ExternalAppendOnlyMap.HashComparator
3434

@@ -199,13 +199,16 @@ class ExternalAppendOnlyMap[K, V, C](
199199

200200
// Flush the disk writer's contents to disk, and update relevant variables
201201
def flush() = {
202-
writer.commitAndClose()
203-
val bytesWritten = writer.bytesWritten
202+
val w = writer
203+
writer = null
204+
w.commitAndClose()
205+
val bytesWritten = w.bytesWritten
204206
batchSizes.append(bytesWritten)
205207
_diskBytesSpilled += bytesWritten
206208
objectsWritten = 0
207209
}
208210

211+
var success = false
209212
try {
210213
val it = currentMap.destructiveSortedIterator(keyComparator)
211214
while (it.hasNext) {
@@ -215,16 +218,28 @@ class ExternalAppendOnlyMap[K, V, C](
215218

216219
if (objectsWritten == serializerBatchSize) {
217220
flush()
218-
writer.close()
219221
writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize)
220222
}
221223
}
222224
if (objectsWritten > 0) {
223225
flush()
226+
} else if (writer != null) {
227+
val w = writer
228+
writer = null
229+
w.revertPartialWritesAndClose()
224230
}
231+
success = true
225232
} finally {
226-
// Partial failures cannot be tolerated; do not revert partial writes
227-
writer.close()
233+
if (!success) {
234+
// This code path only happens if an exception was thrown above before we set success;
235+
// close our stuff and let the exception be thrown further
236+
if (writer != null) {
237+
writer.revertPartialWritesAndClose()
238+
}
239+
if (file.exists()) {
240+
file.delete()
241+
}
242+
}
228243
}
229244

230245
currentMap = new SizeTrackingAppendOnlyMap[K, C]
@@ -389,27 +404,51 @@ class ExternalAppendOnlyMap[K, V, C](
389404
* An iterator that returns (K, C) pairs in sorted order from an on-disk map
390405
*/
391406
private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long])
392-
extends Iterator[(K, C)] {
393-
private val fileStream = new FileInputStream(file)
394-
private val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize)
407+
extends Iterator[(K, C)]
408+
{
409+
private val batchOffsets = batchSizes.scanLeft(0L)(_ + _) // Size will be batchSize.length + 1
410+
assert(file.length() == batchOffsets(batchOffsets.length - 1))
411+
412+
private var batchIndex = 0 // Which batch we're in
413+
private var fileStream: FileInputStream = null
395414

396415
// An intermediate stream that reads from exactly one batch
397416
// This guards against pre-fetching and other arbitrary behavior of higher level streams
398-
private var batchStream = nextBatchStream()
399-
private var compressedStream = blockManager.wrapForCompression(blockId, batchStream)
400-
private var deserializeStream = ser.deserializeStream(compressedStream)
417+
private var deserializeStream = nextBatchStream()
401418
private var nextItem: (K, C) = null
402419
private var objectsRead = 0
403420

404421
/**
405422
* Construct a stream that reads only from the next batch.
406423
*/
407-
private def nextBatchStream(): InputStream = {
408-
if (batchSizes.length > 0) {
409-
ByteStreams.limit(bufferedStream, batchSizes.remove(0))
424+
private def nextBatchStream(): DeserializationStream = {
425+
// Note that batchOffsets.length = numBatches + 1 since we did a scan above; check whether
426+
// we're still in a valid batch.
427+
if (batchIndex < batchOffsets.length - 1) {
428+
if (deserializeStream != null) {
429+
deserializeStream.close()
430+
fileStream.close()
431+
deserializeStream = null
432+
fileStream = null
433+
}
434+
435+
val start = batchOffsets(batchIndex)
436+
fileStream = new FileInputStream(file)
437+
fileStream.getChannel.position(start)
438+
batchIndex += 1
439+
440+
val end = batchOffsets(batchIndex)
441+
442+
assert(end >= start, "start = " + start + ", end = " + end +
443+
", batchOffsets = " + batchOffsets.mkString("[", ", ", "]"))
444+
445+
val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))
446+
val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream)
447+
ser.deserializeStream(compressedStream)
410448
} else {
411449
// No more batches left
412-
bufferedStream
450+
cleanup()
451+
null
413452
}
414453
}
415454

@@ -424,10 +463,8 @@ class ExternalAppendOnlyMap[K, V, C](
424463
val item = deserializeStream.readObject().asInstanceOf[(K, C)]
425464
objectsRead += 1
426465
if (objectsRead == serializerBatchSize) {
427-
batchStream = nextBatchStream()
428-
compressedStream = blockManager.wrapForCompression(blockId, batchStream)
429-
deserializeStream = ser.deserializeStream(compressedStream)
430466
objectsRead = 0
467+
deserializeStream = nextBatchStream()
431468
}
432469
item
433470
} catch {
@@ -439,6 +476,9 @@ class ExternalAppendOnlyMap[K, V, C](
439476

440477
override def hasNext: Boolean = {
441478
if (nextItem == null) {
479+
if (deserializeStream == null) {
480+
return false
481+
}
442482
nextItem = readNextItem()
443483
}
444484
nextItem != null
@@ -455,7 +495,11 @@ class ExternalAppendOnlyMap[K, V, C](
455495

456496
// TODO: Ensure this gets called even if the iterator isn't drained.
457497
private def cleanup() {
458-
deserializeStream.close()
498+
batchIndex = batchOffsets.length // Prevent reading any other batch
499+
val ds = deserializeStream
500+
deserializeStream = null
501+
fileStream = null
502+
ds.close()
459503
file.delete()
460504
}
461505
}

‎core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

+75-29
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import scala.collection.mutable
2626
import com.google.common.io.ByteStreams
2727

2828
import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
29-
import org.apache.spark.serializer.Serializer
29+
import org.apache.spark.serializer.{DeserializationStream, Serializer}
3030
import org.apache.spark.storage.BlockId
3131

3232
/**
@@ -273,13 +273,16 @@ private[spark] class ExternalSorter[K, V, C](
273273
// Flush the disk writer's contents to disk, and update relevant variables.
274274
// The writer is closed at the end of this process, and cannot be reused.
275275
def flush() = {
276-
writer.commitAndClose()
277-
val bytesWritten = writer.bytesWritten
276+
val w = writer
277+
writer = null
278+
w.commitAndClose()
279+
val bytesWritten = w.bytesWritten
278280
batchSizes.append(bytesWritten)
279281
_diskBytesSpilled += bytesWritten
280282
objectsWritten = 0
281283
}
282284

285+
var success = false
283286
try {
284287
val it = collection.destructiveSortedIterator(partitionKeyComparator)
285288
while (it.hasNext) {
@@ -299,13 +302,23 @@ private[spark] class ExternalSorter[K, V, C](
299302
}
300303
if (objectsWritten > 0) {
301304
flush()
305+
} else if (writer != null) {
306+
val w = writer
307+
writer = null
308+
w.revertPartialWritesAndClose()
309+
}
310+
success = true
311+
} finally {
312+
if (!success) {
313+
// This code path only happens if an exception was thrown above before we set success;
314+
// close our stuff and let the exception be thrown further
315+
if (writer != null) {
316+
writer.revertPartialWritesAndClose()
317+
}
318+
if (file.exists()) {
319+
file.delete()
320+
}
302321
}
303-
writer.close()
304-
} catch {
305-
case e: Exception =>
306-
writer.close()
307-
file.delete()
308-
throw e
309322
}
310323

311324
if (usingMap) {
@@ -472,36 +485,58 @@ private[spark] class ExternalSorter[K, V, C](
472485
* partitions to be requested in order.
473486
*/
474487
private[this] class SpillReader(spill: SpilledFile) {
475-
val fileStream = new FileInputStream(spill.file)
476-
val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize)
488+
// Serializer batch offsets; size will be batchSize.length + 1
489+
val batchOffsets = spill.serializerBatchSizes.scanLeft(0L)(_ + _)
477490

478491
// Track which partition and which batch stream we're in. These will be the indices of
479492
// the next element we will read. We'll also store the last partition read so that
480493
// readNextPartition() can figure out what partition that was from.
481494
var partitionId = 0
482495
var indexInPartition = 0L
483-
var batchStreamsRead = 0
496+
var batchId = 0
484497
var indexInBatch = 0
485498
var lastPartitionId = 0
486499

487500
skipToNextPartition()
488501

489-
// An intermediate stream that reads from exactly one batch
502+
503+
// Intermediate file and deserializer streams that read from exactly one batch
490504
// This guards against pre-fetching and other arbitrary behavior of higher level streams
491-
var batchStream = nextBatchStream()
492-
var compressedStream = blockManager.wrapForCompression(spill.blockId, batchStream)
493-
var deserStream = serInstance.deserializeStream(compressedStream)
505+
var fileStream: FileInputStream = null
506+
var deserializeStream = nextBatchStream() // Also sets fileStream
507+
494508
var nextItem: (K, C) = null
495509
var finished = false
496510

497511
/** Construct a stream that only reads from the next batch */
498-
def nextBatchStream(): InputStream = {
499-
if (batchStreamsRead < spill.serializerBatchSizes.length) {
500-
batchStreamsRead += 1
501-
ByteStreams.limit(bufferedStream, spill.serializerBatchSizes(batchStreamsRead - 1))
512+
def nextBatchStream(): DeserializationStream = {
513+
// Note that batchOffsets.length = numBatches + 1 since we did a scan above; check whether
514+
// we're still in a valid batch.
515+
if (batchId < batchOffsets.length - 1) {
516+
if (deserializeStream != null) {
517+
deserializeStream.close()
518+
fileStream.close()
519+
deserializeStream = null
520+
fileStream = null
521+
}
522+
523+
val start = batchOffsets(batchId)
524+
fileStream = new FileInputStream(spill.file)
525+
fileStream.getChannel.position(start)
526+
batchId += 1
527+
528+
val end = batchOffsets(batchId)
529+
530+
assert(end >= start, "start = " + start + ", end = " + end +
531+
", batchOffsets = " + batchOffsets.mkString("[", ", ", "]"))
532+
533+
val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))
534+
val compressedStream = blockManager.wrapForCompression(spill.blockId, bufferedStream)
535+
serInstance.deserializeStream(compressedStream)
502536
} else {
503-
// No more batches left; give an empty stream
504-
bufferedStream
537+
// No more batches left
538+
cleanup()
539+
null
505540
}
506541
}
507542

@@ -525,27 +560,27 @@ private[spark] class ExternalSorter[K, V, C](
525560
* If no more pairs are left, return null.
526561
*/
527562
private def readNextItem(): (K, C) = {
528-
if (finished) {
563+
if (finished || deserializeStream == null) {
529564
return null
530565
}
531-
val k = deserStream.readObject().asInstanceOf[K]
532-
val c = deserStream.readObject().asInstanceOf[C]
566+
val k = deserializeStream.readObject().asInstanceOf[K]
567+
val c = deserializeStream.readObject().asInstanceOf[C]
533568
lastPartitionId = partitionId
534569
// Start reading the next batch if we're done with this one
535570
indexInBatch += 1
536571
if (indexInBatch == serializerBatchSize) {
537-
batchStream = nextBatchStream()
538-
compressedStream = blockManager.wrapForCompression(spill.blockId, batchStream)
539-
deserStream = serInstance.deserializeStream(compressedStream)
540572
indexInBatch = 0
573+
deserializeStream = nextBatchStream()
541574
}
542575
// Update the partition location of the element we're reading
543576
indexInPartition += 1
544577
skipToNextPartition()
545578
// If we've finished reading the last partition, remember that we're done
546579
if (partitionId == numPartitions) {
547580
finished = true
548-
deserStream.close()
581+
if (deserializeStream != null) {
582+
deserializeStream.close()
583+
}
549584
}
550585
(k, c)
551586
}
@@ -578,6 +613,17 @@ private[spark] class ExternalSorter[K, V, C](
578613
item
579614
}
580615
}
616+
617+
// Clean up our open streams and put us in a state where we can't read any more data
618+
def cleanup() {
619+
batchId = batchOffsets.length // Prevent reading any other batch
620+
val ds = deserializeStream
621+
deserializeStream = null
622+
fileStream = null
623+
ds.close()
624+
// NOTE: We don't do file.delete() here because that is done in ExternalSorter.stop().
625+
// This should also be fixed in ExternalAppendOnlyMap.
626+
}
581627
}
582628

583629
/**

‎core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala

+22-11
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,19 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
3030
private def mergeValue(buffer: ArrayBuffer[Int], i: Int) = buffer += i
3131
private def mergeCombiners(buf1: ArrayBuffer[Int], buf2: ArrayBuffer[Int]) = buf1 ++= buf2
3232

33+
private def createSparkConf(loadDefaults: Boolean): SparkConf = {
34+
val conf = new SparkConf(loadDefaults)
35+
// Make the Java serializer write a reset instruction (TC_RESET) after each object to test
36+
// for a bug we had with bytes written past the last object in a batch (SPARK-2792)
37+
conf.set("spark.serializer.objectStreamReset", "1")
38+
conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
39+
// Ensure that we actually have multiple batches per spill file
40+
conf.set("spark.shuffle.spill.batchSize", "10")
41+
conf
42+
}
43+
3344
test("simple insert") {
34-
val conf = new SparkConf(false)
45+
val conf = createSparkConf(false)
3546
sc = new SparkContext("local", "test", conf)
3647

3748
val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
@@ -57,7 +68,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
5768
}
5869

5970
test("insert with collision") {
60-
val conf = new SparkConf(false)
71+
val conf = createSparkConf(false)
6172
sc = new SparkContext("local", "test", conf)
6273

6374
val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
@@ -80,7 +91,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
8091
}
8192

8293
test("ordering") {
83-
val conf = new SparkConf(false)
94+
val conf = createSparkConf(false)
8495
sc = new SparkContext("local", "test", conf)
8596

8697
val map1 = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
@@ -125,7 +136,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
125136
}
126137

127138
test("null keys and values") {
128-
val conf = new SparkConf(false)
139+
val conf = createSparkConf(false)
129140
sc = new SparkContext("local", "test", conf)
130141

131142
val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
@@ -166,7 +177,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
166177
}
167178

168179
test("simple aggregator") {
169-
val conf = new SparkConf(false)
180+
val conf = createSparkConf(false)
170181
sc = new SparkContext("local", "test", conf)
171182

172183
// reduceByKey
@@ -181,7 +192,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
181192
}
182193

183194
test("simple cogroup") {
184-
val conf = new SparkConf(false)
195+
val conf = createSparkConf(false)
185196
sc = new SparkContext("local", "test", conf)
186197
val rdd1 = sc.parallelize(1 to 4).map(i => (i, i))
187198
val rdd2 = sc.parallelize(1 to 4).map(i => (i%2, i))
@@ -199,7 +210,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
199210
}
200211

201212
test("spilling") {
202-
val conf = new SparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
213+
val conf = createSparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
203214
conf.set("spark.shuffle.memoryFraction", "0.001")
204215
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
205216

@@ -249,7 +260,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
249260
}
250261

251262
test("spilling with hash collisions") {
252-
val conf = new SparkConf(true)
263+
val conf = createSparkConf(true)
253264
conf.set("spark.shuffle.memoryFraction", "0.001")
254265
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
255266

@@ -304,7 +315,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
304315
}
305316

306317
test("spilling with many hash collisions") {
307-
val conf = new SparkConf(true)
318+
val conf = createSparkConf(true)
308319
conf.set("spark.shuffle.memoryFraction", "0.0001")
309320
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
310321

@@ -329,7 +340,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
329340
}
330341

331342
test("spilling with hash collisions using the Int.MaxValue key") {
332-
val conf = new SparkConf(true)
343+
val conf = createSparkConf(true)
333344
conf.set("spark.shuffle.memoryFraction", "0.001")
334345
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
335346

@@ -347,7 +358,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
347358
}
348359

349360
test("spilling with null keys and values") {
350-
val conf = new SparkConf(true)
361+
val conf = createSparkConf(true)
351362
conf.set("spark.shuffle.memoryFraction", "0.001")
352363
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
353364

‎core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala

+29-18
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,17 @@ import org.apache.spark._
2525
import org.apache.spark.SparkContext._
2626

2727
class ExternalSorterSuite extends FunSuite with LocalSparkContext {
28+
private def createSparkConf(loadDefaults: Boolean): SparkConf = {
29+
val conf = new SparkConf(loadDefaults)
30+
// Make the Java serializer write a reset instruction (TC_RESET) after each object to test
31+
// for a bug we had with bytes written past the last object in a batch (SPARK-2792)
32+
conf.set("spark.serializer.objectStreamReset", "1")
33+
conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
34+
// Ensure that we actually have multiple batches per spill file
35+
conf.set("spark.shuffle.spill.batchSize", "10")
36+
conf
37+
}
38+
2839
test("empty data stream") {
2940
val conf = new SparkConf(false)
3041
conf.set("spark.shuffle.memoryFraction", "0.001")
@@ -60,7 +71,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
6071
}
6172

6273
test("few elements per partition") {
63-
val conf = new SparkConf(false)
74+
val conf = createSparkConf(false)
6475
conf.set("spark.shuffle.memoryFraction", "0.001")
6576
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
6677
sc = new SparkContext("local", "test", conf)
@@ -102,7 +113,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
102113
}
103114

104115
test("empty partitions with spilling") {
105-
val conf = new SparkConf(false)
116+
val conf = createSparkConf(false)
106117
conf.set("spark.shuffle.memoryFraction", "0.001")
107118
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
108119
sc = new SparkContext("local", "test", conf)
@@ -127,7 +138,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
127138
}
128139

129140
test("spilling in local cluster") {
130-
val conf = new SparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
141+
val conf = createSparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
131142
conf.set("spark.shuffle.memoryFraction", "0.001")
132143
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
133144
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
@@ -198,7 +209,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
198209
}
199210

200211
test("spilling in local cluster with many reduce tasks") {
201-
val conf = new SparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
212+
val conf = createSparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
202213
conf.set("spark.shuffle.memoryFraction", "0.001")
203214
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
204215
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
@@ -269,7 +280,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
269280
}
270281

271282
test("cleanup of intermediate files in sorter") {
272-
val conf = new SparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
283+
val conf = createSparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
273284
conf.set("spark.shuffle.memoryFraction", "0.001")
274285
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
275286
sc = new SparkContext("local", "test", conf)
@@ -290,7 +301,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
290301
}
291302

292303
test("cleanup of intermediate files in sorter if there are errors") {
293-
val conf = new SparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
304+
val conf = createSparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
294305
conf.set("spark.shuffle.memoryFraction", "0.001")
295306
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
296307
sc = new SparkContext("local", "test", conf)
@@ -311,7 +322,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
311322
}
312323

313324
test("cleanup of intermediate files in shuffle") {
314-
val conf = new SparkConf(false)
325+
val conf = createSparkConf(false)
315326
conf.set("spark.shuffle.memoryFraction", "0.001")
316327
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
317328
sc = new SparkContext("local", "test", conf)
@@ -326,7 +337,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
326337
}
327338

328339
test("cleanup of intermediate files in shuffle with errors") {
329-
val conf = new SparkConf(false)
340+
val conf = createSparkConf(false)
330341
conf.set("spark.shuffle.memoryFraction", "0.001")
331342
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
332343
sc = new SparkContext("local", "test", conf)
@@ -348,7 +359,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
348359
}
349360

350361
test("no partial aggregation or sorting") {
351-
val conf = new SparkConf(false)
362+
val conf = createSparkConf(false)
352363
conf.set("spark.shuffle.memoryFraction", "0.001")
353364
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
354365
sc = new SparkContext("local", "test", conf)
@@ -363,7 +374,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
363374
}
364375

365376
test("partial aggregation without spill") {
366-
val conf = new SparkConf(false)
377+
val conf = createSparkConf(false)
367378
conf.set("spark.shuffle.memoryFraction", "0.001")
368379
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
369380
sc = new SparkContext("local", "test", conf)
@@ -379,7 +390,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
379390
}
380391

381392
test("partial aggregation with spill, no ordering") {
382-
val conf = new SparkConf(false)
393+
val conf = createSparkConf(false)
383394
conf.set("spark.shuffle.memoryFraction", "0.001")
384395
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
385396
sc = new SparkContext("local", "test", conf)
@@ -395,7 +406,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
395406
}
396407

397408
test("partial aggregation with spill, with ordering") {
398-
val conf = new SparkConf(false)
409+
val conf = createSparkConf(false)
399410
conf.set("spark.shuffle.memoryFraction", "0.001")
400411
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
401412
sc = new SparkContext("local", "test", conf)
@@ -412,7 +423,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
412423
}
413424

414425
test("sorting without aggregation, no spill") {
415-
val conf = new SparkConf(false)
426+
val conf = createSparkConf(false)
416427
conf.set("spark.shuffle.memoryFraction", "0.001")
417428
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
418429
sc = new SparkContext("local", "test", conf)
@@ -429,7 +440,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
429440
}
430441

431442
test("sorting without aggregation, with spill") {
432-
val conf = new SparkConf(false)
443+
val conf = createSparkConf(false)
433444
conf.set("spark.shuffle.memoryFraction", "0.001")
434445
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
435446
sc = new SparkContext("local", "test", conf)
@@ -446,7 +457,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
446457
}
447458

448459
test("spilling with hash collisions") {
449-
val conf = new SparkConf(true)
460+
val conf = createSparkConf(true)
450461
conf.set("spark.shuffle.memoryFraction", "0.001")
451462
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
452463

@@ -503,7 +514,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
503514
}
504515

505516
test("spilling with many hash collisions") {
506-
val conf = new SparkConf(true)
517+
val conf = createSparkConf(true)
507518
conf.set("spark.shuffle.memoryFraction", "0.0001")
508519
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
509520

@@ -526,7 +537,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
526537
}
527538

528539
test("spilling with hash collisions using the Int.MaxValue key") {
529-
val conf = new SparkConf(true)
540+
val conf = createSparkConf(true)
530541
conf.set("spark.shuffle.memoryFraction", "0.001")
531542
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
532543

@@ -547,7 +558,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
547558
}
548559

549560
test("spilling with null keys and values") {
550-
val conf = new SparkConf(true)
561+
val conf = createSparkConf(true)
551562
conf.set("spark.shuffle.memoryFraction", "0.001")
552563
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
553564

‎docs/configuration.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ Apart from these, the following properties are also available, and may be useful
385385
When serializing using org.apache.spark.serializer.JavaSerializer, the serializer caches
386386
objects to prevent writing redundant data, however that stops garbage collection of those
387387
objects. By calling 'reset' you flush that info from the serializer, and allow old
388-
objects to be collected. To turn off this periodic reset set it to a value &lt;= 0.
388+
objects to be collected. To turn off this periodic reset set it to -1.
389389
By default it will reset the serializer every 100 objects.
390390
</td>
391391
</tr>

0 commit comments

Comments
 (0)
Please sign in to comment.