Skip to content

Commit

Permalink
[GH-17] Debug log partition md5 in shuffle write (#18)
Browse files Browse the repository at this point in the history
Add logic to Write the md5 of the shuffle partition during shuffle write if
debug log is enabled.  Developers could compare the md5 for the same
partition during shuffle read and write to find out whether the data is
corrupt during the shuffle process or the storage process.
  • Loading branch information
jealous committed Jan 28, 2019
1 parent bdea6a7 commit dae37a5
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<groupId>com.memverge</groupId>
<artifactId>splash</artifactId>
<version>0.2.5</version>
<version>0.2.6</version>
<name>splash</name>
<description>A shuffle manager that contains a storage interface.</description>
<url>https://github.com/MemVerge/splash/</url>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ private[spark] class SplashShuffleBlockResolver(

var indexFileOpt: Option[ShuffleFile] = None
lockMap.computeIfAbsent((shuffleId, mapId), lockSupplier).synchronized {
// check data in the shuffle output that already exists
val existingLengths = checkIndexAndDataFile(shuffleId, mapId)
if (existingLengths != null) {
val blocks = lengths.length
Expand All @@ -190,6 +191,10 @@ private[spark] class SplashShuffleBlockResolver(
logDebug(s"commit shuffle index: " +
s"${indexTmp.getCommitTarget.getPath}, " +
s"size: ${indexTmp.getCommitTarget.getSize}")
if (log.isDebugEnabled()) {
// check data in the shuffle output we just committed
checkIndexAndDataFile(shuffleId, mapId)
}
}
}
}
Expand Down Expand Up @@ -229,13 +234,7 @@ private[spark] class SplashShuffleBlockResolver(
val offsets = 0L until numOfLong map { _ => in.readLong() }

if (offsets.nonEmpty) {
// calculate lengths from offsets
val lengths = offsets zip offsets.tail map (i => i._2 - i._1)
// the size of data file should match with index file
// first element must be 0
if (offsets(0) == 0 && data.getSize == lengths.sum) {
ret = lengths.toArray
}
ret = validateData(offsets, data)
} else {
log.warn("offsets length is zero, {} is empty & corrupt.", index.getPath)
}
Expand All @@ -244,6 +243,27 @@ private[spark] class SplashShuffleBlockResolver(
ret
}

private def validateData(offsets: IndexedSeq[Long], data: ShuffleFile): Array[Long] = {
var ret: Array[Long] = null

// calculate lengths from offsets
val lengths = offsets zip offsets.tail map (i => i._2 - i._1)
// the size of data file should match with index file
// first element must be 0
if (offsets(0) == 0 && data.getSize == lengths.sum) {
ret = lengths.toArray

if (log.isDebugEnabled) {
log.debug("log md5 for {} during shuffle write.", data.getPath)
// print MD5 for each partition
(0 to offsets.length - 2).foreach { i =>
logPartitionMd5(data, offsets(i), offsets(i + 1))
}
}
}
ret
}

def writeData(dataFile: TmpShuffleFile, data: Array[Byte]): Unit = {
SplashUtils.withResources(
new BufferedOutputStream(
Expand Down

0 comments on commit dae37a5

Please sign in to comment.