Skip to content

Commit

Permalink
[SPARK-5645] Added local read bytes/time to task metrics
Browse files Browse the repository at this point in the history
ksakellis I stumbled on your JIRA for this yesterday; I know it's assigned to you but I'd already done this for my own uses a while ago so thought I could help save you the work of doing it!  Hopefully this doesn't duplicate any work you've already done.

Here's a screenshot of what the UI looks like:
![image](https://cloud.githubusercontent.com/assets/1108612/6135352/c03e7276-b11c-11e4-8f11-c6aefe1f35b9.png)
Based on a discussion with pwendell, I put the data read remotely in as an additional metric rather than showing it in brackets as you'd suggested, Kostas.  The assumption here is that the average user doesn't care about the differentiation between local / remote data, so it's better not to pollute the UI.

I also added data about the local read time, which I've found very helpful for debugging, but I didn't put it in the UI because I think it's probably something not a ton of people will need to use.

With this change, the total read time and total write time shown in the UI will be equal, fixing a long-term source of user confusion:
![image](https://cloud.githubusercontent.com/assets/1108612/6135399/25f14490-b11d-11e4-8086-20be5f4002e6.png)

Author: Kay Ousterhout <[email protected]>

Closes apache#4510 from kayousterhout/SPARK-5645 and squashes the following commits:

4a0182c [Kay Ousterhout] oops
5f5da1b [Kay Ousterhout] Small style fix
5da04cf [Kay Ousterhout] Addressed more comments from Kostas
ba05149 [Kay Ousterhout] Remove parens
a9dc685 [Kay Ousterhout] Kostas comment, test fix
33d2e2d [Kay Ousterhout] Merge remote-tracking branch 'upstream/master' into SPARK-5645
347e2cd [Kay Ousterhout] [SPARK-5645] Added local read bytes/time to task metrics
  • Loading branch information
kayousterhout authored and Andrew Or committed Feb 12, 2015
1 parent aa4ca8b commit 893d6fd
Show file tree
Hide file tree
Showing 13 changed files with 125 additions and 28 deletions.
4 changes: 2 additions & 2 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ span.additional-metric-title {

/* Hide all additional metrics by default. This is done here rather than using JavaScript to
* avoid slow page loads for stage pages with large numbers (e.g., thousands) of tasks. */
.scheduler_delay, .deserialization_time, .fetch_wait_time, .serialization_time,
.getting_result_time {
.scheduler_delay, .deserialization_time, .fetch_wait_time, .shuffle_read_remote,
.serialization_time, .getting_result_time {
display: none;
}
21 changes: 21 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ class TaskMetrics extends Serializable {
merged.incLocalBlocksFetched(depMetrics.localBlocksFetched)
merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched)
merged.incRemoteBytesRead(depMetrics.remoteBytesRead)
merged.incLocalBytesRead(depMetrics.localBytesRead)
merged.incLocalReadTime(depMetrics.localReadTime)
merged.incRecordsRead(depMetrics.recordsRead)
}
_shuffleReadMetrics = Some(merged)
Expand Down Expand Up @@ -343,6 +345,25 @@ class ShuffleReadMetrics extends Serializable {
private[spark] def incRemoteBytesRead(value: Long) = _remoteBytesRead += value
private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value

/**
* Time the task spent (in milliseconds) reading local shuffle blocks (from the local disk).
*/
private var _localReadTime: Long = _
def localReadTime = _localReadTime
private[spark] def incLocalReadTime(value: Long) = _localReadTime += value

/**
* Shuffle data that was read from the local disk (as opposed to from a remote executor).
*/
private var _localBytesRead: Long = _
def localBytesRead = _localBytesRead
private[spark] def incLocalBytesRead(value: Long) = _localBytesRead += value

/**
* Total bytes fetched in the shuffle by this task (both remote and local).
*/
def totalBytesRead = _remoteBytesRead + _localBytesRead

/**
* Number of blocks fetched in this shuffle by this task (remote or local)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,9 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
" BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched +
" BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched +
" REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime +
" REMOTE_BYTES_READ=" + metrics.remoteBytesRead
" REMOTE_BYTES_READ=" + metrics.remoteBytesRead +
" LOCAL_READ_TIME=" + metrics.localReadTime +
" LOCAL_BYTES_READ=" + metrics.localBytesRead
case None => ""
}
val writeMetrics = taskMetrics.shuffleWriteMetrics match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,14 @@ final class ShuffleBlockFetcherIterator(
* track in-memory are the ManagedBuffer references themselves.
*/
private[this] def fetchLocalBlocks() {
val startTime = System.currentTimeMillis
val iter = localBlocks.iterator
while (iter.hasNext) {
val blockId = iter.next()
try {
val buf = blockManager.getBlockData(blockId)
shuffleMetrics.incLocalBlocksFetched(1)
shuffleMetrics.incLocalBytesRead(buf.size)
buf.retain()
results.put(new SuccessFetchResult(blockId, 0, buf))
} catch {
Expand All @@ -244,6 +246,7 @@ final class ShuffleBlockFetcherIterator(
return
}
}
shuffleMetrics.incLocalReadTime(System.currentTimeMillis - startTime)
}

private[this] def initialize(): Unit = {
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/ToolTips.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@ private[spark] object ToolTips {
"Bytes and records written to disk in order to be read by a shuffle in a future stage."

val SHUFFLE_READ =
"""Bytes and records read from remote executors. Typically less than shuffle write bytes
because this does not include shuffle data read locally."""
"""Total shuffle bytes and records read (includes both data read locally and data read from
remote executors). """

val SHUFFLE_READ_REMOTE_SIZE =
"""Total shuffle bytes read from remote executors. This is a subset of the shuffle
read bytes; the remaining shuffle data is read locally. """

val GETTING_RESULT_TIME =
"""Time that the driver spends fetching task results from workers. If this is large, consider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
execSummary.shuffleWriteRecords += shuffleWriteRecordsDelta

val shuffleReadDelta =
(taskMetrics.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L)
- oldMetrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead).getOrElse(0L))
stageData.shuffleReadBytes += shuffleReadDelta
(taskMetrics.shuffleReadMetrics.map(_.totalBytesRead).getOrElse(0L)
- oldMetrics.flatMap(_.shuffleReadMetrics).map(_.totalBytesRead).getOrElse(0L))
stageData.shuffleReadTotalBytes += shuffleReadDelta
execSummary.shuffleRead += shuffleReadDelta

val shuffleReadRecordsDelta =
Expand Down
67 changes: 54 additions & 13 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
{if (stageData.hasShuffleRead) {
<li>
<strong>Shuffle read: </strong>
{s"${Utils.bytesToString(stageData.shuffleReadBytes)} / " +
{s"${Utils.bytesToString(stageData.shuffleReadTotalBytes)} / " +
s"${stageData.shuffleReadRecords}"}
</li>
}}
Expand Down Expand Up @@ -143,6 +143,13 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
<span class="additional-metric-title">Shuffle Read Blocked Time</span>
</span>
</li>
<li>
<span data-toggle="tooltip"
title={ToolTips.SHUFFLE_READ_REMOTE_SIZE} data-placement="right">
<input type="checkbox" name={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}/>
<span class="additional-metric-title">Shuffle Remote Reads</span>
</span>
</li>
}}
<li>
<span data-toggle="tooltip"
Expand Down Expand Up @@ -181,7 +188,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
{if (stageData.hasOutput) Seq(("Output Size / Records", "")) else Nil} ++
{if (stageData.hasShuffleRead) {
Seq(("Shuffle Read Blocked Time", TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME),
("Shuffle Read Size / Records", ""))
("Shuffle Read Size / Records", ""),
("Shuffle Remote Reads", TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE))
} else {
Nil
}} ++
Expand Down Expand Up @@ -320,19 +328,41 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val shuffleReadBlockedTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleReadMetrics.map(_.fetchWaitTime).getOrElse(0L).toDouble
}
val shuffleReadBlockedQuantiles = <td>Shuffle Read Blocked Time</td> +:
val shuffleReadBlockedQuantiles =
<td>
<span data-toggle="tooltip"
title={ToolTips.SHUFFLE_READ_BLOCKED_TIME} data-placement="right">
Shuffle Read Blocked Time
</span>
</td> +:
getFormattedTimeQuantiles(shuffleReadBlockedTimes)

val shuffleReadSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
val shuffleReadTotalSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleReadMetrics.map(_.totalBytesRead).getOrElse(0L).toDouble
}

val shuffleReadRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
val shuffleReadTotalRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleReadMetrics.map(_.recordsRead).getOrElse(0L).toDouble
}
val shuffleReadTotalQuantiles =
<td>
<span data-toggle="tooltip"
title={ToolTips.SHUFFLE_READ} data-placement="right">
Shuffle Read Size / Records
</span>
</td> +:
getFormattedSizeQuantilesWithRecords(shuffleReadTotalSizes, shuffleReadTotalRecords)

val shuffleReadQuantiles = <td>Shuffle Read Size / Records (Remote)</td> +:
getFormattedSizeQuantilesWithRecords(shuffleReadSizes, shuffleReadRecords)
val shuffleReadRemoteSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
}
val shuffleReadRemoteQuantiles =
<td>
<span data-toggle="tooltip"
title={ToolTips.SHUFFLE_READ_REMOTE_SIZE} data-placement="right">
Shuffle Remote Reads
</span>
</td> +:
getFormattedSizeQuantiles(shuffleReadRemoteSizes)

val shuffleWriteSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble
Expand Down Expand Up @@ -374,7 +404,10 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
<tr class={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}>
{shuffleReadBlockedQuantiles}
</tr>
<tr>{shuffleReadQuantiles}</tr>
<tr>{shuffleReadTotalQuantiles}</tr>
<tr class={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}>
{shuffleReadRemoteQuantiles}
</tr>
} else {
Nil
},
Expand Down Expand Up @@ -454,11 +487,15 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val shuffleReadBlockedTimeReadable =
maybeShuffleRead.map(ms => UIUtils.formatDuration(ms.fetchWaitTime)).getOrElse("")

val shuffleReadSortable = maybeShuffleRead.map(_.remoteBytesRead.toString).getOrElse("")
val shuffleReadReadable = maybeShuffleRead
.map(m => s"${Utils.bytesToString(m.remoteBytesRead)}").getOrElse("")
val totalShuffleBytes = maybeShuffleRead.map(_.totalBytesRead)
val shuffleReadSortable = totalShuffleBytes.map(_.toString).getOrElse("")
val shuffleReadReadable = totalShuffleBytes.map(Utils.bytesToString).getOrElse("")
val shuffleReadRecords = maybeShuffleRead.map(_.recordsRead.toString).getOrElse("")

val remoteShuffleBytes = maybeShuffleRead.map(_.remoteBytesRead)
val shuffleReadRemoteSortable = remoteShuffleBytes.map(_.toString).getOrElse("")
val shuffleReadRemoteReadable = remoteShuffleBytes.map(Utils.bytesToString).getOrElse("")

val maybeShuffleWrite = metrics.flatMap(_.shuffleWriteMetrics)
val shuffleWriteSortable = maybeShuffleWrite.map(_.shuffleBytesWritten.toString).getOrElse("")
val shuffleWriteReadable = maybeShuffleWrite
Expand Down Expand Up @@ -536,6 +573,10 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
<td sorttable_customkey={shuffleReadSortable}>
{s"$shuffleReadReadable / $shuffleReadRecords"}
</td>
<td sorttable_customkey={shuffleReadRemoteSortable}
class={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}>
{shuffleReadRemoteReadable}
</td>
}}
{if (hasShuffleWrite) {
<td sorttable_customkey={writeTimeSortable}>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private[ui] class StageTableBase(
val inputReadWithUnit = if (inputRead > 0) Utils.bytesToString(inputRead) else ""
val outputWrite = stageData.outputBytes
val outputWriteWithUnit = if (outputWrite > 0) Utils.bytesToString(outputWrite) else ""
val shuffleRead = stageData.shuffleReadBytes
val shuffleRead = stageData.shuffleReadTotalBytes
val shuffleReadWithUnit = if (shuffleRead > 0) Utils.bytesToString(shuffleRead) else ""
val shuffleWrite = stageData.shuffleWriteBytes
val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ private[spark] object TaskDetailsClassNames {
val SCHEDULER_DELAY = "scheduler_delay"
val TASK_DESERIALIZATION_TIME = "deserialization_time"
val SHUFFLE_READ_BLOCKED_TIME = "fetch_wait_time"
val SHUFFLE_READ_REMOTE_SIZE = "shuffle_read_remote"
val RESULT_SERIALIZATION_TIME = "serialization_time"
val GETTING_RESULT_TIME = "getting_result_time"
}
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private[jobs] object UIData {
var inputRecords: Long = _
var outputBytes: Long = _
var outputRecords: Long = _
var shuffleReadBytes: Long = _
var shuffleReadTotalBytes: Long = _
var shuffleReadRecords : Long = _
var shuffleWriteBytes: Long = _
var shuffleWriteRecords: Long = _
Expand All @@ -96,7 +96,7 @@ private[jobs] object UIData {

def hasInput = inputBytes > 0
def hasOutput = outputBytes > 0
def hasShuffleRead = shuffleReadBytes > 0
def hasShuffleRead = shuffleReadTotalBytes > 0
def hasShuffleWrite = shuffleWriteBytes > 0
def hasBytesSpilled = memoryBytesSpilled > 0 && diskBytesSpilled > 0
}
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,8 @@ private[spark] object JsonProtocol {
("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~
("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~
("Remote Bytes Read" -> shuffleReadMetrics.remoteBytesRead) ~
("Local Read Time" -> shuffleReadMetrics.localReadTime) ~
("Local Bytes Read" -> shuffleReadMetrics.localBytesRead) ~
("Total Records Read" -> shuffleReadMetrics.recordsRead)
}

Expand Down Expand Up @@ -674,6 +676,8 @@ private[spark] object JsonProtocol {
metrics.incLocalBlocksFetched((json \ "Local Blocks Fetched").extract[Int])
metrics.incFetchWaitTime((json \ "Fetch Wait Time").extract[Long])
metrics.incRemoteBytesRead((json \ "Remote Bytes Read").extract[Long])
metrics.incLocalReadTime((json \ "Local Read Time").extractOpt[Long].getOrElse(0))
metrics.incLocalBytesRead((json \ "Local Bytes Read").extractOpt[Long].getOrElse(0))
metrics.incRecordsRead((json \ "Total Records Read").extractOpt[Long].getOrElse(0))
metrics
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics))
taskMetrics.shuffleWriteMetrics = Some(shuffleWriteMetrics)
shuffleReadMetrics.incRemoteBytesRead(base + 1)
shuffleReadMetrics.incLocalBytesRead(base + 9)
shuffleReadMetrics.incRemoteBlocksFetched(base + 2)
shuffleWriteMetrics.incShuffleBytesWritten(base + 3)
taskMetrics.setExecutorRunTime(base + 4)
Expand Down Expand Up @@ -260,8 +261,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc

var stage0Data = listener.stageIdToData.get((0, 0)).get
var stage1Data = listener.stageIdToData.get((1, 0)).get
assert(stage0Data.shuffleReadBytes == 102)
assert(stage1Data.shuffleReadBytes == 201)
assert(stage0Data.shuffleReadTotalBytes == 220)
assert(stage1Data.shuffleReadTotalBytes == 410)
assert(stage0Data.shuffleWriteBytes == 106)
assert(stage1Data.shuffleWriteBytes == 203)
assert(stage0Data.executorRunTime == 108)
Expand Down Expand Up @@ -290,8 +291,11 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc

stage0Data = listener.stageIdToData.get((0, 0)).get
stage1Data = listener.stageIdToData.get((1, 0)).get
assert(stage0Data.shuffleReadBytes == 402)
assert(stage1Data.shuffleReadBytes == 602)
// Task 1235 contributed (100+1)+(100+9) = 210 shuffle bytes, and task 1234 contributed
// (300+1)+(300+9) = 610 total shuffle bytes, so the total for the stage is 820.
assert(stage0Data.shuffleReadTotalBytes == 820)
// Task 1236 contributed 410 shuffle bytes, and task 1237 contributed 810 shuffle bytes.
assert(stage1Data.shuffleReadTotalBytes == 1220)
assert(stage0Data.shuffleWriteBytes == 406)
assert(stage1Data.shuffleWriteBytes == 606)
assert(stage0Data.executorRunTime == 408)
Expand Down
17 changes: 17 additions & 0 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,19 @@ class JsonProtocolSuite extends FunSuite {
assert(expectedFetchFailed === JsonProtocol.taskEndReasonFromJson(oldEvent))
}

test("ShuffleReadMetrics: Local bytes read and time taken backwards compatibility") {
// Metrics about local shuffle bytes read and local read time were added in 1.3.1.
val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6,
hasHadoopInput = false, hasOutput = false, hasRecords = false)
assert(metrics.shuffleReadMetrics.nonEmpty)
val newJson = JsonProtocol.taskMetricsToJson(metrics)
val oldJson = newJson.removeField { case (field, _) => field == "Local Bytes Read" }
.removeField { case (field, _) => field == "Local Read Time" }
val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
assert(newMetrics.shuffleReadMetrics.get.localBytesRead == 0)
assert(newMetrics.shuffleReadMetrics.get.localReadTime == 0)
}

test("SparkListenerApplicationStart backwards compatibility") {
// SparkListenerApplicationStart in Spark 1.0.0 do not have an "appId" property.
val applicationStart = SparkListenerApplicationStart("test", None, 1L, "user")
Expand Down Expand Up @@ -695,6 +708,8 @@ class JsonProtocolSuite extends FunSuite {
sr.incFetchWaitTime(a + d)
sr.incRemoteBlocksFetched(f)
sr.incRecordsRead(if (hasRecords) (b + d) / 100 else -1)
sr.incLocalReadTime(a + e)
sr.incLocalBytesRead(a + f)
t.setShuffleReadMetrics(Some(sr))
}
if (hasOutput) {
Expand Down Expand Up @@ -941,6 +956,8 @@ class JsonProtocolSuite extends FunSuite {
| "Local Blocks Fetched": 700,
| "Fetch Wait Time": 900,
| "Remote Bytes Read": 1000,
| "Local Read Time": 1000,
| "Local Bytes Read": 1100,
| "Total Records Read" : 10
| },
| "Shuffle Write Metrics": {
Expand Down

0 comments on commit 893d6fd

Please sign in to comment.