Skip to content

Commit

Permalink
[SPARK-6437][SQL] Use completion iterator to close external sorter
Browse files Browse the repository at this point in the history
Otherwise we will leak files when spilling occurs.

Author: Michael Armbrust <[email protected]>

Closes apache#5161 from marmbrus/cleanupAfterSort and squashes the following commits:

cb13d3c [Michael Armbrust] hint to inferencer
cdebdf5 [Michael Armbrust] Use completion iterator to close external sorter
  • Loading branch information
marmbrus committed Mar 24, 2015
1 parent 32efadd commit 26c6ce3
Showing 1 changed file with 4 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.util.MutablePair
import org.apache.spark.util.{CompletionIterator, MutablePair}
import org.apache.spark.util.collection.ExternalSorter

/**
Expand Down Expand Up @@ -194,7 +194,9 @@ case class ExternalSort(
val ordering = newOrdering(sortOrder, child.output)
val sorter = new ExternalSorter[Row, Null, Row](ordering = Some(ordering))
sorter.insertAll(iterator.map(r => (r, null)))
sorter.iterator.map(_._1)
val baseIterator = sorter.iterator.map(_._1)
// TODO(marmbrus): The complex type signature below thwarts inference for no reason.
CompletionIterator[Row, Iterator[Row]](baseIterator, sorter.stop())
}, preservesPartitioning = true)
}

Expand Down

0 comments on commit 26c6ce3

Please sign in to comment.