Skip to content

Commit

Permalink
[SPARK-3277] Fix external spilling with LZ4 assertion error
Browse files Browse the repository at this point in the history
**Summary of the changes**

The bulk of this PR is comprised of tests and documentation; the actual fix is really just adding 1 line of code (see `BlockObjectWriter.scala`). We currently do not run the `External*` test suites with different compression codecs, and this would have caught the bug reported in [SPARK-3277](https://issues.apache.org/jira/browse/SPARK-3277). This PR extends the existing code to test spilling using all compression codecs known to Spark, including `LZ4`.

**The bug itself**

In `DiskBlockObjectWriter`, we only report the shuffle bytes written before we close the streams. With `LZ4`, all the bytes written reported by our metrics were 0 because `flush()` was not taking effect for some reason. In general, compression codecs may write additional bytes to the file after we call `close()`, and so we must also capture those bytes in our shuffle write metrics.

Thanks mridulm and pwendell for help with debugging.

Author: Andrew Or <[email protected]>
Author: Patrick Wendell <[email protected]>

Closes apache#2187 from andrewor14/fix-lz4-spilling and squashes the following commits:

1b54bdc [Andrew Or] Speed up tests by not compressing everything
1c4624e [Andrew Or] Merge branch 'master' of github.com:apache/spark into fix-lz4-spilling
6b2e7d1 [Andrew Or] Fix compilation error
92e251b [Patrick Wendell] Better documentation for BlockObjectWriter.
a1ad536 [Andrew Or] Fix tests
089593f [Andrew Or] Actually fix SPARK-3277 (tests still fail)
4bbcf68 [Andrew Or] Update tests to actually test all compression codecs
b264a84 [Andrew Or] ExternalAppendOnlyMapSuite code style fixes (minor)
1bfa743 [Andrew Or] Add more information to assert for better debugging
  • Loading branch information
andrewor14 authored and pwendell committed Aug 29, 2014
1 parent 92af231 commit a46b8f2
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ private[spark] object CompressionCodec {
}

val DEFAULT_COMPRESSION_CODEC = "snappy"
val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {

/**
* BlockObjectWriter which writes directly to a file on disk. Appends to the given file.
* The given write metrics will be updated incrementally, but will not necessarily be current until
* commitAndClose is called.
*/
private[spark] class DiskBlockObjectWriter(
blockId: BlockId,
Expand All @@ -75,6 +73,8 @@ private[spark] class DiskBlockObjectWriter(
bufferSize: Int,
compressStream: OutputStream => OutputStream,
syncWrites: Boolean,
// These write metrics concurrently shared with other active BlockObjectWriter's who
// are themselves performing writes. All updates must be relative.
writeMetrics: ShuffleWriteMetrics)
extends BlockObjectWriter(blockId)
with Logging
Expand All @@ -94,14 +94,30 @@ private[spark] class DiskBlockObjectWriter(
private var fos: FileOutputStream = null
private var ts: TimeTrackingOutputStream = null
private var objOut: SerializationStream = null
private var initialized = false

/**
* Cursors used to represent positions in the file.
*
* xxxxxxxx|--------|--- |
* ^ ^ ^
* | | finalPosition
* | reportedPosition
* initialPosition
*
* initialPosition: Offset in the file where we start writing. Immutable.
* reportedPosition: Position at the time of the last update to the write metrics.
* finalPosition: Offset where we stopped writing. Set on closeAndCommit() then never changed.
* -----: Current writes to the underlying file.
* xxxxx: Existing contents of the file.
*/
private val initialPosition = file.length()
private var finalPosition: Long = -1
private var initialized = false
private var reportedPosition = initialPosition

/** Calling channel.position() to update the write metrics can be a little bit expensive, so we
* only call it every N writes */
private var writesSinceMetricsUpdate = 0
private var lastPosition = initialPosition

override def open(): BlockObjectWriter = {
fos = new FileOutputStream(file, true)
Expand Down Expand Up @@ -140,17 +156,18 @@ private[spark] class DiskBlockObjectWriter(
// serializer stream and the lower level stream.
objOut.flush()
bs.flush()
updateBytesWritten()
close()
}
finalPosition = file.length()
// In certain compression codecs, more bytes are written after close() is called
writeMetrics.shuffleBytesWritten += (finalPosition - reportedPosition)
}

// Discard current writes. We do this by flushing the outstanding writes and then
// truncating the file to its initial position.
override def revertPartialWritesAndClose() {
try {
writeMetrics.shuffleBytesWritten -= (lastPosition - initialPosition)
writeMetrics.shuffleBytesWritten -= (reportedPosition - initialPosition)

if (initialized) {
objOut.flush()
Expand Down Expand Up @@ -189,10 +206,14 @@ private[spark] class DiskBlockObjectWriter(
new FileSegment(file, initialPosition, finalPosition - initialPosition)
}

/**
* Report the number of bytes written in this writer's shuffle write metrics.
* Note that this is only valid before the underlying streams are closed.
*/
private def updateBytesWritten() {
val pos = channel.position()
writeMetrics.shuffleBytesWritten += (pos - lastPosition)
lastPosition = pos
writeMetrics.shuffleBytesWritten += (pos - reportedPosition)
reportedPosition = pos
}

private def callWithTiming(f: => Unit) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,12 @@ class ExternalAppendOnlyMap[K, V, C](
extends Iterator[(K, C)]
{
private val batchOffsets = batchSizes.scanLeft(0L)(_ + _) // Size will be batchSize.length + 1
assert(file.length() == batchOffsets(batchOffsets.length - 1))
assert(file.length() == batchOffsets.last,
"File length is not equal to the last batch offset:\n" +
s" file length = ${file.length}\n" +
s" last batch offset = ${batchOffsets.last}\n" +
s" all batch offsets = ${batchOffsets.mkString(",")}"
)

private var batchIndex = 0 // Which batch we're in
private var fileStream: FileInputStream = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ import org.apache.spark.util.{JsonProtocol, Utils}
*/
class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
private val fileSystem = Utils.getHadoopFileSystem("/")
private val allCompressionCodecs = Seq[String](
"org.apache.spark.io.LZFCompressionCodec",
"org.apache.spark.io.SnappyCompressionCodec"
)
private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS
private var testDir: File = _

before {
Expand Down
Loading

0 comments on commit a46b8f2

Please sign in to comment.