Skip to content

Commit

Permalink
[SPARK-17696][SPARK-12330][CORE] Partial backport of to branch-1.6.
Browse files Browse the repository at this point in the history
From the original commit message:

This PR also fixes a regression caused by [SPARK-10987] whereby submitting a shutdown causes a race between the local shutdown procedure and the notification of the scheduler driver disconnection. If the scheduler driver disconnection wins the race, the coarse executor incorrectly exits with status 1 (instead of the proper status 0)

Author: Charles Allen <charlesallen-net.com>

(cherry picked from commit 2eaeafe)

Author: Charles Allen <[email protected]>

Closes apache#15270 from vanzin/SPARK-17696.
  • Loading branch information
drcrallen authored and zsxwing committed Sep 28, 2016
1 parent e2ce0ca commit b999fa4
Showing 1 changed file with 7 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.executor

import java.net.URL
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicBoolean

import org.apache.hadoop.conf.Configuration

Expand All @@ -45,6 +46,7 @@ private[spark] class CoarseGrainedExecutorBackend(
env: SparkEnv)
extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {

private[this] val stopping = new AtomicBoolean(false)
var executor: Executor = null
@volatile var driver: Option[RpcEndpointRef] = None

Expand Down Expand Up @@ -106,19 +108,23 @@ private[spark] class CoarseGrainedExecutorBackend(
}

case StopExecutor =>
stopping.set(true)
logInfo("Driver commanded a shutdown")
// Cannot shutdown here because an ack may need to be sent back to the caller. So send
// a message to self to actually do the shutdown.
self.send(Shutdown)

case Shutdown =>
stopping.set(true)
executor.stop()
stop()
rpcEnv.shutdown()
}

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
if (driver.exists(_.address == remoteAddress)) {
if (stopping.get()) {
logInfo(s"Driver from $remoteAddress disconnected during shutdown")
} else if (driver.exists(_.address == remoteAddress)) {
logError(s"Driver $remoteAddress disassociated! Shutting down.")
System.exit(1)
} else {
Expand Down

0 comments on commit b999fa4

Please sign in to comment.