Skip to content

Commit

Permalink
[SPARK-36664][CORE] Log time waiting for cluster resources
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Keep track of and communicate to the listener bus how long we are waiting for execs to be allocated from the underlying cluster manager.

Replaces previous PR with GHA issues (  apache#35172 ) and WIP PR apache#34650  and apache#35881

### What changes were proposed in this pull request?

Keep track of and communicate to the listener bus how long we are waiting for execs to be allocated from the underlying cluster manager.

Replaces previous PR with GHA issues (  apache#35172 ) and WIP PR apache#34650

### Why are the changes needed?

Sometimes the cluster manager may choke or otherwise not be able to allocate resources and we don't have a good way of detecting this situation making it difficult for the user to debug and tell apart from Spark not scaling up correctly.

### Does this PR introduce _any_ user-facing change?

New field in the listener bus message for when a executor is allocated.

### How was this patch tested?

New unit test in the listener suite.
### Why are the changes needed?

Sometimes the cluster manager may choke or otherwise not be able to allocate resources and we don't have a good way of detecting this situation making it difficult for the user to debug and tell apart from Spark not scaling up correctly.

### Does this PR introduce _any_ user-facing change?

New field in the listener bus message for when a executor is allocated.

### How was this patch tested?

New unit test in the listener suite.

Closes apache#36185 from holdenk/SPARK-36664-Log-time-waiting-for-cluster-resources-r4.

Lead-authored-by: Holden Karau <[email protected]>
Co-authored-by: Holden Karau <[email protected]>
Signed-off-by: Holden Karau <[email protected]>
  • Loading branch information
holdenk and holdenk committed Apr 26, 2022
1 parent 5056c6c commit 148918b
Show file tree
Hide file tree
Showing 8 changed files with 266 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import javax.annotation.concurrent.GuardedBy

import scala.collection.mutable.{HashMap, HashSet}
import scala.collection.mutable.{HashMap, HashSet, Queue}
import scala.concurrent.Future

import org.apache.hadoop.security.UserGroupInformation
Expand Down Expand Up @@ -82,6 +82,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
@GuardedBy("CoarseGrainedSchedulerBackend.this")
private val requestedTotalExecutorsPerResourceProfile = new HashMap[ResourceProfile, Int]

// Profile IDs to the times that executors were requested for.
// The operations we do on queue are all amortized constant cost
// see https://www.scala-lang.org/api/2.13.x/scala/collection/mutable/ArrayDeque.html
@GuardedBy("CoarseGrainedSchedulerBackend.this")
private val execRequestTimes = new HashMap[Int, Queue[(Int, Long)]]

private val listenerBus = scheduler.sc.listenerBus

// Executors we have requested the cluster manager to kill that have not died yet; maps
Expand Down Expand Up @@ -260,9 +266,27 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
.resourceProfileFromId(resourceProfileId).getNumSlotsPerAddress(rName, conf)
(info.name, new ExecutorResourceInfo(info.name, info.addresses, numParts))
}
// If we've requested the executor figure out when we did.
val reqTs: Option[Long] = CoarseGrainedSchedulerBackend.this.synchronized {
execRequestTimes.get(resourceProfileId).flatMap {
times =>
times.headOption.map {
h =>
// Take off the top element
times.dequeue()
// If we requested more than one exec reduce the req count by 1 and prepend it back
if (h._1 > 1) {
((h._1 - 1, h._2)) +=: times
}
h._2
}
}
}

val data = new ExecutorData(executorRef, executorAddress, hostname,
0, cores, logUrlHandler.applyPattern(logUrls, attributes), attributes,
resourcesInfo, resourceProfileId, registrationTs = System.currentTimeMillis())
resourcesInfo, resourceProfileId, registrationTs = System.currentTimeMillis(),
requestTs = reqTs)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
Expand Down Expand Up @@ -742,6 +766,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
val numExisting = requestedTotalExecutorsPerResourceProfile.getOrElse(defaultProf, 0)
requestedTotalExecutorsPerResourceProfile(defaultProf) = numExisting + numAdditionalExecutors
// Account for executors pending to be added or removed
updateExecRequestTime(defaultProf.id, numAdditionalExecutors)
doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
}

Expand Down Expand Up @@ -780,15 +805,53 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
(scheduler.sc.resourceProfileManager.resourceProfileFromId(rpid), num)
}
val response = synchronized {
val oldResourceProfileToNumExecutors = requestedTotalExecutorsPerResourceProfile.map {
case (rp, num) =>
(rp.id, num)
}.toMap
this.requestedTotalExecutorsPerResourceProfile.clear()
this.requestedTotalExecutorsPerResourceProfile ++= resourceProfileToNumExecutors
this.numLocalityAwareTasksPerResourceProfileId = numLocalityAwareTasksPerResourceProfileId
this.rpHostToLocalTaskCount = hostToLocalTaskCount
updateExecRequestTimes(oldResourceProfileToNumExecutors, resourceProfileIdToNumExecutors)
doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
}
defaultAskTimeout.awaitResult(response)
}

private def updateExecRequestTimes(oldProfile: Map[Int, Int], newProfile: Map[Int, Int]): Unit = {
newProfile.map {
case (k, v) =>
val delta = v - oldProfile.getOrElse(k, 0)
if (delta != 0) {
updateExecRequestTime(k, delta)
}
}
}

private def updateExecRequestTime(profileId: Int, delta: Int) = {
val times = execRequestTimes.getOrElseUpdate(profileId, Queue[(Int, Long)]())
if (delta > 0) {
// Add the request to the end, constant time op
times += ((delta, System.currentTimeMillis()))
} else if (delta < 0) {
// Consume as if |delta| had been allocated
var toConsume = -delta
// Note: it's possible that something else allocated an executor and we have
// a negative delta, we can just avoid mutating the queue.
while (toConsume > 0 && times.nonEmpty) {
val h = times.dequeue
if (h._1 > toConsume) {
// Prepend updated first req to times, constant time op
((h._1 - toConsume, h._2)) +=: times
toConsume = 0
} else {
toConsume = toConsume - h._1
}
}
}
}

/**
* Request executors from the cluster manager by specifying the total number desired,
* including existing pending and running executors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.scheduler.ExecutorResourceInfo
* @param resourcesInfo The information of the currently available resources on the executor
* @param resourceProfileId The id of the ResourceProfile being used by this executor
* @param registrationTs The registration timestamp of this executor
* @param requestTs What time this executor was most likely requested at
*/
private[cluster] class ExecutorData(
val executorEndpoint: RpcEndpointRef,
Expand All @@ -42,6 +43,7 @@ private[cluster] class ExecutorData(
override val attributes: Map[String, String],
override val resourcesInfo: Map[String, ExecutorResourceInfo],
override val resourceProfileId: Int,
val registrationTs: Long
val registrationTs: Long,
val requestTs: Option[Long]
) extends ExecutorInfo(executorHost, totalCores, logUrlMap, attributes,
resourcesInfo, resourceProfileId)
resourcesInfo, resourceProfileId, Some(registrationTs), requestTs)
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,28 @@ class ExecutorInfo(
val logUrlMap: Map[String, String],
val attributes: Map[String, String],
val resourcesInfo: Map[String, ResourceInformation],
val resourceProfileId: Int) {
val resourceProfileId: Int,
val registrationTime: Option[Long],
val requestTime: Option[Long]) {

def this(executorHost: String, totalCores: Int, logUrlMap: Map[String, String],
attributes: Map[String, String], resourcesInfo: Map[String, ResourceInformation],
resourceProfileId: Int) = {
this(executorHost, totalCores, logUrlMap, attributes, resourcesInfo, resourceProfileId,
None, None)
}
def this(executorHost: String, totalCores: Int, logUrlMap: Map[String, String]) = {
this(executorHost, totalCores, logUrlMap, Map.empty, Map.empty, DEFAULT_RESOURCE_PROFILE_ID)
this(executorHost, totalCores, logUrlMap, Map.empty, Map.empty, DEFAULT_RESOURCE_PROFILE_ID,
None, None)
}

def this(
executorHost: String,
totalCores: Int,
logUrlMap: Map[String, String],
attributes: Map[String, String]) = {
this(executorHost, totalCores, logUrlMap, attributes, Map.empty, DEFAULT_RESOURCE_PROFILE_ID)
this(executorHost, totalCores, logUrlMap, attributes, Map.empty, DEFAULT_RESOURCE_PROFILE_ID,
None, None)
}

def this(
Expand All @@ -52,7 +62,7 @@ class ExecutorInfo(
attributes: Map[String, String],
resourcesInfo: Map[String, ResourceInformation]) = {
this(executorHost, totalCores, logUrlMap, attributes, resourcesInfo,
DEFAULT_RESOURCE_PROFILE_ID)
DEFAULT_RESOURCE_PROFILE_ID, None, None)
}

def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo]
Expand All @@ -72,6 +82,6 @@ class ExecutorInfo(
override def hashCode(): Int = {
val state = Seq(executorHost, totalCores, logUrlMap, attributes, resourcesInfo,
resourceProfileId)
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
state.filter(_ != null).map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
}
}
13 changes: 11 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,9 @@ private[spark] object JsonProtocol {
("Log Urls" -> mapToJson(executorInfo.logUrlMap)) ~
("Attributes" -> mapToJson(executorInfo.attributes)) ~
("Resources" -> resourcesMapToJson(executorInfo.resourcesInfo)) ~
("Resource Profile Id" -> executorInfo.resourceProfileId)
("Resource Profile Id" -> executorInfo.resourceProfileId) ~
("Registration Time" -> executorInfo.registrationTime) ~
("Request Time" -> executorInfo.requestTime)
}

def resourcesMapToJson(m: Map[String, ResourceInformation]): JValue = {
Expand Down Expand Up @@ -1223,8 +1225,15 @@ private[spark] object JsonProtocol {
case Some(id) => id.extract[Int]
case None => ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
}
val registrationTs = jsonOption(json \ "Registration Time") map { ts =>
ts.extract[Long]
}
val requestTs = jsonOption(json \ "Request Time") map { ts =>
ts.extract[Long]
}

new ExecutorInfo(executorHost, totalCores, logUrls, attributes.toMap, resources.toMap,
resourceProfileId)
resourceProfileId, registrationTs, requestTs)
}

def blockUpdatedInfoFromJson(json: JValue): BlockUpdatedInfo = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.resource.TestResourceIDs._
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.util.{RpcUtils, SerializableBuffer, Utils}

class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext
Expand Down Expand Up @@ -189,6 +190,8 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo

test("extra resources from executor") {

val testStartTime = System.currentTimeMillis()

val execCores = 3
val conf = new SparkConf()
.set(EXECUTOR_CORES, execCores)
Expand All @@ -207,15 +210,23 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
sc.resourceProfileManager.addResourceProfile(rp)
assert(rp.id > ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
val backend = sc.schedulerBackend.asInstanceOf[TestCoarseGrainedSchedulerBackend]
// Note we get two in default profile and one in the new rp
// we need to put a req time in for all of them.
backend.requestTotalExecutors(Map((rp.id, 1)), Map(), Map())
backend.requestExecutors(3)
val mockEndpointRef = mock[RpcEndpointRef]
val mockAddress = mock[RpcAddress]
when(mockEndpointRef.send(LaunchTask)).thenAnswer((_: InvocationOnMock) => {})

val resources = Map(GPU -> new ResourceInformation(GPU, Array("0", "1", "3")))

var executorAddedCount: Int = 0
val infos = scala.collection.mutable.ArrayBuffer[ExecutorInfo]()
val listener = new SparkListener() {
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
// Lets check that the exec allocation times "make sense"
val info = executorAdded.executorInfo
infos += info
executorAddedCount += 1
}
}
Expand Down Expand Up @@ -271,8 +282,128 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
}
sc.listenerBus.waitUntilEmpty(executorUpTimeout.toMillis)
assert(executorAddedCount === 3)
infos.foreach { info =>
assert(info.requestTime.get > 0,
"Exec allocation and request times don't make sense")
assert(info.requestTime.get > testStartTime,
"Exec allocation and request times don't make sense")
assert(info.registrationTime.get > info.requestTime.get,
"Exec allocation and request times don't make sense")
}
}

test("exec alloc decrease.") {

val testStartTime = System.currentTimeMillis()

val execCores = 3
val conf = new SparkConf()
.set(EXECUTOR_CORES, execCores)
.set(SCHEDULER_REVIVE_INTERVAL.key, "1m") // don't let it auto revive during test
.set(EXECUTOR_INSTANCES, 0) // avoid errors about duplicate executor registrations
.setMaster(
"coarseclustermanager[org.apache.spark.scheduler.TestCoarseGrainedSchedulerBackend]")
.setAppName("test")
conf.set(TASK_GPU_ID.amountConf, "1")
conf.set(EXECUTOR_GPU_ID.amountConf, "1")

sc = new SparkContext(conf)
val execGpu = new ExecutorResourceRequests().cores(1).resource(GPU, 3)
val taskGpu = new TaskResourceRequests().cpus(1).resource(GPU, 1)
val rp = new ResourceProfile(execGpu.requests, taskGpu.requests)
sc.resourceProfileManager.addResourceProfile(rp)
assert(rp.id > ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
val backend = sc.schedulerBackend.asInstanceOf[TestCoarseGrainedSchedulerBackend]
// Note we get two in default profile and one in the new rp
// we need to put a req time in for all of them.
backend.requestTotalExecutors(Map((rp.id, 1)), Map(), Map())
// Decrease the number of execs requested in the new rp.
backend.requestTotalExecutors(Map((rp.id, 0)), Map(), Map())
// Request execs in the default profile.
backend.requestExecutors(3)
val mockEndpointRef = mock[RpcEndpointRef]
val mockAddress = mock[RpcAddress]
when(mockEndpointRef.send(LaunchTask)).thenAnswer((_: InvocationOnMock) => {})

val resources = Map(GPU -> new ResourceInformation(GPU, Array("0", "1", "3")))

var executorAddedCount: Int = 0
val infos = scala.collection.mutable.ArrayBuffer[ExecutorInfo]()
val listener = new SparkListener() {
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
// Lets check that the exec allocation times "make sense"
val info = executorAdded.executorInfo
infos += info
executorAddedCount += 1
}
}

sc.addSparkListener(listener)

backend.driverEndpoint.askSync[Boolean](
RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, Map.empty, Map.empty, resources,
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
backend.driverEndpoint.askSync[Boolean](
RegisterExecutor("2", mockEndpointRef, mockAddress.host, 1, Map.empty, Map.empty, resources,
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
backend.driverEndpoint.askSync[Boolean](
RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, Map.empty, Map.empty, resources,
rp.id))

val frameSize = RpcUtils.maxMessageSizeBytes(sc.conf)
val bytebuffer = java.nio.ByteBuffer.allocate(frameSize - 100)
val buffer = new SerializableBuffer(bytebuffer)

var execResources = backend.getExecutorAvailableResources("1")
assert(execResources(GPU).availableAddrs.sorted === Array("0", "1", "3"))

val exec3ResourceProfileId = backend.getExecutorResourceProfileId("3")
assert(exec3ResourceProfileId === rp.id)

val taskResources = Map(GPU -> new ResourceInformation(GPU, Array("0")))
val taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1, 0, "1",
"t1", 0, 1, mutable.Map.empty[String, Long],
mutable.Map.empty[String, Long], mutable.Map.empty[String, Long],
new Properties(), 1, taskResources, bytebuffer)))
val ts = backend.getTaskSchedulerImpl()
when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]], any[Boolean])).thenReturn(taskDescs)

backend.driverEndpoint.send(ReviveOffers)

eventually(timeout(5 seconds)) {
execResources = backend.getExecutorAvailableResources("1")
assert(execResources(GPU).availableAddrs.sorted === Array("1", "3"))
assert(execResources(GPU).assignedAddrs === Array("0"))
}

// To avoid allocating any resources immediately after releasing the resource from the task to
// make sure that `availableAddrs` below won't change
when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]], any[Boolean])).thenReturn(Seq.empty)
backend.driverEndpoint.send(
StatusUpdate("1", 1, TaskState.FINISHED, buffer, taskResources))

eventually(timeout(5 seconds)) {
execResources = backend.getExecutorAvailableResources("1")
assert(execResources(GPU).availableAddrs.sorted === Array("0", "1", "3"))
assert(execResources(GPU).assignedAddrs.isEmpty)
}
sc.listenerBus.waitUntilEmpty(executorUpTimeout.toMillis)
assert(executorAddedCount === 3)
infos.foreach { info =>
info.requestTime.map { t =>
assert(t > 0,
"Exec request times don't make sense")
assert(t >= testStartTime,
"Exec allocation and request times don't make sense")
assert(t <= info.registrationTime.get,
"Exec allocation and request times don't make sense")
}
}
assert(infos.filter(_.requestTime.isEmpty).length === 1,
"Our unexpected executor does not have a request time.")
}


private def testSubmitJob(sc: SparkContext, rdd: RDD[Int]): Unit = {
sc.submitJob(
rdd,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class SparkListenerWithClusterSuite extends SparkFunSuite with LocalSparkContext
assert(listener.addedExecutorInfo.size == 2)
assert(listener.addedExecutorInfo("0").totalCores == 1)
assert(listener.addedExecutorInfo("1").totalCores == 1)
assert(listener.addedExecutorInfo("0").registrationTime.get > 0 )
}

private class SaveExecutorInfo extends SparkListener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
knownExecs ++= Set("1", "2", "3")

val execInfoRp1 = new ExecutorInfo("host1", 1, Map.empty,
Map.empty, Map.empty, 1)
Map.empty, Map.empty, 1, None, None)

monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", execInfo))
Expand Down
Loading

0 comments on commit 148918b

Please sign in to comment.