Skip to content

Commit

Permalink
SPARK-1686: keep schedule() calling in the main thread
Browse files Browse the repository at this point in the history
https://issues.apache.org/jira/browse/SPARK-1686

moved from original JIRA (by @markhamstra):

In deploy.master.Master, the completeRecovery method is the last thing to be called when a standalone Master is recovering from failure. It is responsible for resetting some state, relaunching drivers, and eventually resuming its scheduling duties.

There are currently four places in Master.scala where completeRecovery is called. Three of them are from within the actor's receive method, and aren't problems. The last starts from within receive when the ElectedLeader message is received, but the actual completeRecovery() call is made from the Akka scheduler. That means that it will execute on a different scheduler thread, and Master itself will end up running (i.e., schedule() ) from that Akka scheduler thread.

In this PR, I added a new master message TriggerSchedule to trigger the "local" call of schedule() in the scheduler thread

Author: CodingCat <[email protected]>

Closes apache#639 from CodingCat/SPARK-1686 and squashes the following commits:

81bb4ca [CodingCat] rename variable
69e0a2a [CodingCat] style fix
36a2ac0 [CodingCat] address Aaron's comments
ec9b7bb [CodingCat] address the comments
02b37ca [CodingCat] keep schedule() calling in the main thread
  • Loading branch information
CodingCat authored and aarondav committed May 10, 2014
1 parent 59577df commit 2f452cb
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ private[spark] class Master(

var leaderElectionAgent: ActorRef = _

private var recoveryCompletionTask: Cancellable = _

// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
Expand Down Expand Up @@ -152,6 +154,10 @@ private[spark] class Master(
}

override def postStop() {
// prevent the CompleteRecovery message sending to restarted master
if (recoveryCompletionTask != null) {
recoveryCompletionTask.cancel()
}
webUi.stop()
fileSystemsUsed.foreach(_.close())
masterMetricsSystem.stop()
Expand All @@ -171,10 +177,13 @@ private[spark] class Master(
logInfo("I have been elected leader! New state: " + state)
if (state == RecoveryState.RECOVERING) {
beginRecovery(storedApps, storedDrivers, storedWorkers)
context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() }
recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self,
CompleteRecovery)
}
}

case CompleteRecovery => completeRecovery()

case RevokedLeadership => {
logError("Leadership has been revoked -- master shutting down.")
System.exit(0)
Expand Down Expand Up @@ -465,7 +474,7 @@ private[spark] class Master(
* Schedule the currently available resources among waiting apps. This method will be called
* every time a new app joins or resource availability changes.
*/
def schedule() {
private def schedule() {
if (state != RecoveryState.ALIVE) { return }

// First schedule drivers, they take strict precedence over applications
Expand All @@ -485,7 +494,7 @@ private[spark] class Master(
// Try to spread out each app among all the nodes, until it has all its cores
for (app <- waitingApps if app.coresLeft > 0) {
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(canUse(app, _)).sortBy(_.coresFree).reverse
.filter(canUse(app, _)).sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
Expand Down

0 comments on commit 2f452cb

Please sign in to comment.