Skip to content

Commit 72e3369

Browse files
committedAug 1, 2014
SPARK-983. Support external sorting in sortByKey()
This patch simply uses the ExternalSorter class from sort-based shuffle. Closes apache#931 and Closes apache#1090 Author: Matei Zaharia <[email protected]> Closes apache#1677 from mateiz/spark-983 and squashes the following commits: 96b3fda [Matei Zaharia] SPARK-983. Support external sorting in sortByKey()
1 parent 8ff4417 commit 72e3369

File tree

2 files changed

+20
-12
lines changed

2 files changed

+20
-12
lines changed
 

‎core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala

+10-12
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.shuffle.hash
2020
import org.apache.spark.{InterruptibleIterator, TaskContext}
2121
import org.apache.spark.serializer.Serializer
2222
import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleReader}
23+
import org.apache.spark.util.collection.ExternalSorter
2324

2425
private[spark] class HashShuffleReader[K, C](
2526
handle: BaseShuffleHandle[K, _, C],
@@ -35,8 +36,8 @@ private[spark] class HashShuffleReader[K, C](
3536

3637
/** Read the combined key-values for this reduce task */
3738
override def read(): Iterator[Product2[K, C]] = {
38-
val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context,
39-
Serializer.getSerializer(dep.serializer))
39+
val ser = Serializer.getSerializer(dep.serializer)
40+
val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)
4041

4142
val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
4243
if (dep.mapSideCombine) {
@@ -54,16 +55,13 @@ private[spark] class HashShuffleReader[K, C](
5455
// Sort the output if there is a sort ordering defined.
5556
dep.keyOrdering match {
5657
case Some(keyOrd: Ordering[K]) =>
57-
// Define a Comparator for the whole record based on the key Ordering.
58-
val cmp = new Ordering[Product2[K, C]] {
59-
override def compare(o1: Product2[K, C], o2: Product2[K, C]): Int = {
60-
keyOrd.compare(o1._1, o2._1)
61-
}
62-
}
63-
val sortBuffer: Array[Product2[K, C]] = aggregatedIter.toArray
64-
// TODO: do external sort.
65-
scala.util.Sorting.quickSort(sortBuffer)(cmp)
66-
sortBuffer.iterator
58+
// Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
59+
// the ExternalSorter won't spill to disk.
60+
val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
61+
sorter.write(aggregatedIter)
62+
context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled
63+
context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled
64+
sorter.iterator
6765
case None =>
6866
aggregatedIter
6967
}

‎core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala

+10
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,11 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
190190
fail(s"Value 2 for ${i} was wrong: expected ${expected}, got ${seq2.toSet}")
191191
}
192192
}
193+
194+
// sortByKey - should spill ~17 times
195+
val rddE = sc.parallelize(0 until 100000).map(i => (i/4, i))
196+
val resultE = rddE.sortByKey().collect().toSeq
197+
assert(resultE === (0 until 100000).map(i => (i/4, i)).toSeq)
193198
}
194199

195200
test("spilling in local cluster with many reduce tasks") {
@@ -256,6 +261,11 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
256261
fail(s"Value 2 for ${i} was wrong: expected ${expected}, got ${seq2.toSet}")
257262
}
258263
}
264+
265+
// sortByKey - should spill ~8 times per executor
266+
val rddE = sc.parallelize(0 until 100000).map(i => (i/4, i))
267+
val resultE = rddE.sortByKey().collect().toSeq
268+
assert(resultE === (0 until 100000).map(i => (i/4, i)).toSeq)
259269
}
260270

261271
test("cleanup of intermediate files in sorter") {

0 commit comments

Comments
 (0)
Please sign in to comment.