Skip to content

Commit

Permalink
[SPARK-1683] Track task read metrics.
Browse files Browse the repository at this point in the history
This commit adds a new metric in TaskMetrics to record
the input data size and displays this information in the UI.

An earlier version of this commit also added the read time,
which can be useful for diagnosing straggler problems,
but unfortunately that change introduced a significant performance
regression for jobs that don't do much computation. In order to
track read time, we'll need to do sampling.

The screenshots below show the UI with the new "Input" field,
which I added to the stage summary page, the executor summary page,
and the per-stage page.

![image](https://cloud.githubusercontent.com/assets/1108612/3167930/2627f92a-eb77-11e3-861c-98ea5bb7a1a2.png)

![image](https://cloud.githubusercontent.com/assets/1108612/3167936/475a889c-eb77-11e3-9706-f11c48751f17.png)

![image](https://cloud.githubusercontent.com/assets/1108612/3167948/80ebcf12-eb77-11e3-87ed-349fce6a770c.png)

Author: Kay Ousterhout <[email protected]>

Closes apache#962 from kayousterhout/read_metrics and squashes the following commits:

f13b67d [Kay Ousterhout] Correctly format input bytes on executor page
8b70cde [Kay Ousterhout] Added comment about potential inaccuracy of bytesRead
d1016e8 [Kay Ousterhout] Udated SparkListenerSuite test
8461492 [Kay Ousterhout] Miniscule style fix
ae04d99 [Kay Ousterhout] Remove input metrics for parallel collections
719f19d [Kay Ousterhout] Style fixes
bb6ec62 [Kay Ousterhout] Small fixes
869ac7b [Kay Ousterhout] Updated Json tests
44a0301 [Kay Ousterhout] Fixed accidentally added line
4bd0568 [Kay Ousterhout] Added input source, renamed Hdfs to Hadoop.
f27e535 [Kay Ousterhout] Updates based on review comments and to fix rebase
bf41029 [Kay Ousterhout] Updated Json tests to pass
0fc33e0 [Kay Ousterhout] Added explicit backward compatibility test
4e52925 [Kay Ousterhout] Added Json output and associated tests.
365400b [Kay Ousterhout] [SPARK-1683] Track task read metrics.
  • Loading branch information
kayousterhout committed Jun 30, 2014
1 parent cdf613f commit 7b71a0e
Show file tree
Hide file tree
Showing 20 changed files with 349 additions and 86 deletions.
10 changes: 6 additions & 4 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark

import scala.collection.mutable.{ArrayBuffer, HashSet}

import org.apache.spark.executor.InputMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._

Expand All @@ -41,9 +42,10 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
val key = RDDBlockId(rdd.id, partition.index)
logDebug(s"Looking for partition $key")
blockManager.get(key) match {
case Some(values) =>
case Some(blockResult) =>
// Partition is already materialized, so just return its values
new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])

case None =>
// Acquire a lock for loading this partition
Expand Down Expand Up @@ -110,7 +112,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
logInfo(s"Whoever was loading $id failed; we'll try it ourselves")
loading.add(id)
}
values.map(_.asInstanceOf[Iterator[T]])
values.map(_.data.asInstanceOf[Iterator[T]])
}
}
}
Expand All @@ -132,7 +134,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
* exceptions that can be avoided. */
updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true)
blockManager.get(key) match {
case Some(v) => v.asInstanceOf[Iterator[T]]
case Some(v) => v.data.asInstanceOf[Iterator[T]]
case None =>
logInfo(s"Failure to store $key")
throw new BlockException(key, s"Block manager failed to return cached value for $key!")
Expand Down
29 changes: 29 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ class TaskMetrics extends Serializable {
*/
var diskBytesSpilled: Long = _

/**
* If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read
* are stored here.
*/
var inputMetrics: Option[InputMetrics] = None

/**
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here
*/
Expand All @@ -87,6 +93,29 @@ private[spark] object TaskMetrics {
def empty: TaskMetrics = new TaskMetrics
}

/**
* :: DeveloperApi ::
* Method by which input data was read. Network means that the data was read over the network
* from a remote block manager (which may have stored the data on-disk or in-memory).
*/
@DeveloperApi
object DataReadMethod extends Enumeration with Serializable {
type DataReadMethod = Value
val Memory, Disk, Hadoop, Network = Value
}

/**
* :: DeveloperApi ::
* Metrics about reading input data.
*/
@DeveloperApi
case class InputMetrics(readMethod: DataReadMethod.Value) {
/**
* Total bytes read.
*/
var bytesRead: Long = 0L
}


/**
* :: DeveloperApi ::
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
val blockManager = SparkEnv.get.blockManager
val blockId = split.asInstanceOf[BlockRDDPartition].blockId
blockManager.get(blockId) match {
case Some(block) => block.asInstanceOf[Iterator[T]]
case Some(block) => block.data.asInstanceOf[Iterator[T]]
case None =>
throw new Exception("Could not compute split, block " + blockId + " not found")
}
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.{DataReadMethod, InputMetrics}
import org.apache.spark.util.NextIterator

/**
Expand Down Expand Up @@ -196,6 +197,20 @@ class HadoopRDD[K, V](
context.addOnCompleteCallback{ () => closeIfNeeded() }
val key: K = reader.createKey()
val value: V = reader.createValue()

// Set the task input metrics.
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
try {
/* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't
* always at record boundaries, so tasks may need to read into other splits to complete
* a record. */
inputMetrics.bytesRead = split.inputSplit.value.getLength()
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
}
context.taskMetrics.inputMetrics = Some(inputMetrics)

override def getNext() = {
try {
finished = !reader.next(key, value)
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.Logging
import org.apache.spark.Partition
import org.apache.spark.SerializableWritable
import org.apache.spark.{SparkContext, TaskContext}
import org.apache.spark.executor.{DataReadMethod, InputMetrics}

private[spark] class NewHadoopPartition(
rddId: Int,
Expand Down Expand Up @@ -112,6 +113,18 @@ class NewHadoopRDD[K, V](
split.serializableHadoopSplit.value, hadoopAttemptContext)
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)

val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
try {
/* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't
* always at record boundaries, so tasks may need to read into other splits to complete
* a record. */
inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength()
} catch {
case e: Exception =>
logWarning("Unable to get input split size in order to set task input bytes", e)
}
context.taskMetrics.inputMetrics = Some(inputMetrics)

// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback(() => close())
var havePair = false
Expand Down
15 changes: 12 additions & 3 deletions core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import scala.collection.mutable.HashMap

import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.executor.{DataReadMethod, TaskMetrics}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -160,7 +162,13 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
" START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime +
" EXECUTOR_ID=" + taskInfo.executorId + " HOST=" + taskMetrics.hostname
val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime
val readMetrics = taskMetrics.shuffleReadMetrics match {
val inputMetrics = taskMetrics.inputMetrics match {
case Some(metrics) =>
" READ_METHOD=" + metrics.readMethod.toString +
" INPUT_BYTES=" + metrics.bytesRead
case None => ""
}
val shuffleReadMetrics = taskMetrics.shuffleReadMetrics match {
case Some(metrics) =>
" SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime +
" BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched +
Expand All @@ -174,7 +182,8 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
case Some(metrics) => " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten
case None => ""
}
stageLogInfo(stageId, status + info + executorRunTime + readMetrics + writeMetrics)
stageLogInfo(stageId, status + info + executorRunTime + inputMetrics + shuffleReadMetrics +
writeMetrics)
}

/**
Expand Down
63 changes: 39 additions & 24 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import akka.actor.{ActorSystem, Cancellable, Props}
import sun.nio.ch.DirectBuffer

import org.apache.spark._
import org.apache.spark.executor.{DataReadMethod, InputMetrics}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.network._
import org.apache.spark.serializer.Serializer
Expand All @@ -39,6 +40,15 @@ private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValu
private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues
private[spark] case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends BlockValues

/* Class for returning a fetched block and associated metrics. */
private[spark] class BlockResult(
val data: Iterator[Any],
readMethod: DataReadMethod.Value,
bytes: Long) {
val inputMetrics = new InputMetrics(readMethod)
inputMetrics.bytesRead = bytes
}

private[spark] class BlockManager(
executorId: String,
actorSystem: ActorSystem,
Expand Down Expand Up @@ -334,9 +344,9 @@ private[spark] class BlockManager(
/**
* Get block from local block manager.
*/
def getLocal(blockId: BlockId): Option[Iterator[Any]] = {
def getLocal(blockId: BlockId): Option[BlockResult] = {
logDebug(s"Getting local block $blockId")
doGetLocal(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]]
doGetLocal(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
}

/**
Expand All @@ -355,11 +365,11 @@ private[spark] class BlockManager(
blockId, s"Block $blockId not found on disk, though it should be")
}
} else {
doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
}
}

private def doGetLocal(blockId: BlockId, asValues: Boolean): Option[Any] = {
private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
Expand All @@ -386,14 +396,14 @@ private[spark] class BlockManager(
// Look for the block in memory
if (level.useMemory) {
logDebug(s"Getting block $blockId from memory")
val result = if (asValues) {
memoryStore.getValues(blockId)
val result = if (asBlockResult) {
memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))
} else {
memoryStore.getBytes(blockId)
}
result match {
case Some(values) =>
return Some(values)
return result
case None =>
logDebug(s"Block $blockId not found in memory")
}
Expand All @@ -405,10 +415,11 @@ private[spark] class BlockManager(
if (tachyonStore.contains(blockId)) {
tachyonStore.getBytes(blockId) match {
case Some(bytes) =>
if (!asValues) {
if (!asBlockResult) {
return Some(bytes)
} else {
return Some(dataDeserialize(blockId, bytes))
return Some(new BlockResult(
dataDeserialize(blockId, bytes), DataReadMethod.Memory, info.size))
}
case None =>
logDebug(s"Block $blockId not found in tachyon")
Expand All @@ -429,14 +440,15 @@ private[spark] class BlockManager(

if (!level.useMemory) {
// If the block shouldn't be stored in memory, we can just return it
if (asValues) {
return Some(dataDeserialize(blockId, bytes))
if (asBlockResult) {
return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk,
info.size))
} else {
return Some(bytes)
}
} else {
// Otherwise, we also have to store something in the memory store
if (!level.deserialized || !asValues) {
if (!level.deserialized || !asBlockResult) {
/* We'll store the bytes in memory if the block's storage level includes
* "memory serialized", or if it should be cached as objects in memory
* but we only requested its serialized bytes. */
Expand All @@ -445,7 +457,7 @@ private[spark] class BlockManager(
memoryStore.putBytes(blockId, copyForMemory, level)
bytes.rewind()
}
if (!asValues) {
if (!asBlockResult) {
return Some(bytes)
} else {
val values = dataDeserialize(blockId, bytes)
Expand All @@ -457,12 +469,12 @@ private[spark] class BlockManager(
memoryStore.putValues(blockId, valuesBuffer, level, returnValues = true).data
match {
case Left(values2) =>
return Some(values2)
return Some(new BlockResult(values2, DataReadMethod.Disk, info.size))
case _ =>
throw new SparkException("Memory store did not return an iterator")
throw new SparkException("Memory store did not return back an iterator")
}
} else {
return Some(values)
return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
}
}
}
Expand All @@ -477,29 +489,32 @@ private[spark] class BlockManager(
/**
* Get block from remote block managers.
*/
def getRemote(blockId: BlockId): Option[Iterator[Any]] = {
def getRemote(blockId: BlockId): Option[BlockResult] = {
logDebug(s"Getting remote block $blockId")
doGetRemote(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]]
doGetRemote(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
}

/**
* Get block from remote block managers as serialized bytes.
*/
def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
logDebug(s"Getting remote block $blockId as bytes")
doGetRemote(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
doGetRemote(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
}

private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = {
private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
require(blockId != null, "BlockId is null")
val locations = Random.shuffle(master.getLocations(blockId))
for (loc <- locations) {
logDebug(s"Getting remote block $blockId from $loc")
val data = BlockManagerWorker.syncGetBlock(
GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
if (data != null) {
if (asValues) {
return Some(dataDeserialize(blockId, data))
if (asBlockResult) {
return Some(new BlockResult(
dataDeserialize(blockId, data),
DataReadMethod.Network,
data.limit()))
} else {
return Some(data)
}
Expand All @@ -513,7 +528,7 @@ private[spark] class BlockManager(
/**
* Get a block from the block manager (either local or remote).
*/
def get(blockId: BlockId): Option[Iterator[Any]] = {
def get(blockId: BlockId): Option[BlockResult] = {
val local = getLocal(blockId)
if (local.isDefined) {
logInfo(s"Found block $blockId locally")
Expand Down Expand Up @@ -792,7 +807,7 @@ private[spark] class BlockManager(
* Read a block consisting of a single object.
*/
def getSingle(blockId: BlockId): Option[Any] = {
get(blockId).map(_.next())
get(blockId).map(_.data.next())
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private[spark] object ThreadingTest {
val startTime = System.currentTimeMillis()
manager.get(blockId) match {
case Some(retrievedBlock) =>
assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList,
assert(retrievedBlock.data.toList.asInstanceOf[List[Int]] == block.toList,
"Block " + blockId + " did not match")
println("Got block " + blockId + " in " +
(System.currentTimeMillis - startTime) + " ms")
Expand Down
Loading

0 comments on commit 7b71a0e

Please sign in to comment.