Skip to content

Commit

Permalink
[SPARK-34104][SPARK-34105][CORE][K8S] Maximum decommissioning time & …
Browse files Browse the repository at this point in the history
…allow decommissioning for excludes

### What changes were proposed in this pull request?

Allow users to have Spark attempt to decommission excluded executors.
Since excluded executors may be flaky, this also adds the ability for users to specify a time limit after which a decommissioning executor will be killed by Spark.

### Why are the changes needed?

This may help prevent fetch failures from excluded executors, and also handle the situation in which executors

### Does this PR introduce _any_ user-facing change?

Yes, two new configuration flags for the behaviour.

### How was this patch tested?

Extended unit and integration tests.

Closes apache#31249 from holdenk/configure-inaccessibleList-kill-to-use-decommissioning.

Lead-authored-by: Holden Karau <[email protected]>
Co-authored-by: Holden Karau <[email protected]>
Signed-off-by: Holden Karau <[email protected]>
  • Loading branch information
holdenk and holdenk committed Feb 9, 2021
1 parent 2b51843 commit 50641d2
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ private[spark] trait ExecutorAllocationClient {
decommissionedExecutors.nonEmpty && decommissionedExecutors(0).equals(executorId)
}

/**
* Request that the cluster manager decommission every executor on the specified host.
*
* @return whether the request is acknowledged by the cluster manager.
*/
def decommissionExecutorsOnHost(host: String): Boolean

/**
* Request that the cluster manager kill every executor on the specified host.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,13 @@ package object config {
.booleanConf
.createWithDefault(false)

private[spark] val EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED =
ConfigBuilder("spark.excludeOnFailure.killExcludedExecutors.decommission")
.doc("Attempt decommission of excluded nodes instead of going directly to kill")
.version("3.2.0")
.booleanConf
.createWithDefault(false)

private[spark] val EXCLUDE_ON_FAILURE_LEGACY_TIMEOUT_CONF =
ConfigBuilder("spark.scheduler.executorTaskExcludeOnFailureTime")
.internal()
Expand Down Expand Up @@ -1958,7 +1965,8 @@ package object config {

private[spark] val EXECUTOR_DECOMMISSION_KILL_INTERVAL =
ConfigBuilder("spark.executor.decommission.killInterval")
.doc("Duration after which a decommissioned executor will be killed forcefully." +
.doc("Duration after which a decommissioned executor will be killed forcefully " +
"*by an outside* (e.g. non-spark) service. " +
"This config is useful for cloud environments where we know in advance when " +
"an executor is going to go down after decommissioning signal i.e. around 2 mins " +
"in aws spot nodes, 1/2 hrs in spot block nodes etc. This config is currently " +
Expand All @@ -1967,6 +1975,15 @@ package object config {
.timeConf(TimeUnit.SECONDS)
.createOptional

private[spark] val EXECUTOR_DECOMMISSION_FORCE_KILL_TIMEOUT =
ConfigBuilder("spark.executor.decommission.forceKillTimeout")
.doc("Duration after which a Spark will force a decommissioning executor to exit." +
" this should be set to a high value in most situations as low values will prevent " +
" block migrations from having enough time to complete.")
.version("3.2.0")
.timeConf(TimeUnit.SECONDS)
.createOptional

private[spark] val EXECUTOR_DECOMMISSION_SIGNAL =
ConfigBuilder("spark.executor.decommission.signal")
.doc("The signal that used to trigger the executor to start decommission.")
Expand Down
35 changes: 28 additions & 7 deletions core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
* stage, but still many failures over the entire application
* * "flaky" executors -- they don't fail every task, but are still faulty enough to merit
* excluding
* * missing shuffle files -- may trigger fetch failures on healthy executors.
*
* See the design doc on SPARK-8425 for a more in-depth discussion. Note SPARK-32037 renamed
* the feature.
Expand All @@ -64,6 +65,8 @@ private[scheduler] class HealthTracker (
val EXCLUDE_ON_FAILURE_TIMEOUT_MILLIS = HealthTracker.getExludeOnFailureTimeout(conf)
private val EXCLUDE_FETCH_FAILURE_ENABLED =
conf.get(config.EXCLUDE_ON_FAILURE_FETCH_FAILURE_ENABLED)
private val EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED =
conf.get(config.EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED)

/**
* A map from executorId to information on task failures. Tracks the time of each task failure,
Expand Down Expand Up @@ -154,11 +157,21 @@ private[scheduler] class HealthTracker (
}

private def killExecutor(exec: String, msg: String): Unit = {
val fullMsg = if (EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED) {
s"${msg} (actually decommissioning)"
} else {
msg
}
allocationClient match {
case Some(a) =>
logInfo(msg)
a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, countFailures = false,
force = true)
logInfo(fullMsg)
if (EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED) {
a.decommissionExecutor(exec, ExecutorDecommissionInfo(fullMsg),
adjustTargetNumExecutors = false)
} else {
a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, countFailures = false,
force = true)
}
case None =>
logInfo(s"Not attempting to kill excluded executor id $exec " +
s"since allocation client is not defined.")
Expand All @@ -182,10 +195,18 @@ private[scheduler] class HealthTracker (
if (conf.get(config.EXCLUDE_ON_FAILURE_KILL_ENABLED)) {
allocationClient match {
case Some(a) =>
logInfo(s"Killing all executors on excluded host $node " +
s"since ${config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key} is set.")
if (a.killExecutorsOnHost(node) == false) {
logError(s"Killing executors on node $node failed.")
if (EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED) {
logInfo(s"Decommissioning all executors on excluded host $node " +
s"since ${config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key} is set.")
if (!a.decommissionExecutorsOnHost(node)) {
logError(s"Decommissioning executors on $node failed.")
}
} else {
logInfo(s"Killing all executors on excluded host $node " +
s"since ${config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key} is set.")
if (!a.killExecutorsOnHost(node)) {
logError(s"Killing executors on node $node failed.")
}
}
case None =>
logWarning(s"Not attempting to kill executors on excluded host $node " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ private[spark] object CoarseGrainedClusterMessages {
case class KillExecutorsOnHost(host: String)
extends CoarseGrainedClusterMessage

case class DecommissionExecutorsOnHost(host: String)
extends CoarseGrainedClusterMessage

case class UpdateDelegationTokens(tokens: Array[Byte])
extends CoarseGrainedClusterMessage

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.scheduler.cluster

import java.util.concurrent.TimeUnit
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import javax.annotation.concurrent.GuardedBy

Expand Down Expand Up @@ -115,6 +115,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
private val reviveThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")

private val cleanupService: Option[ScheduledExecutorService] =
conf.get(EXECUTOR_DECOMMISSION_FORCE_KILL_TIMEOUT).map { _ =>
ThreadUtils.newDaemonSingleThreadScheduledExecutor("cleanup-decommission-execs")
}

class DriverEndpoint extends IsolatedRpcEndpoint with Logging {

override val rpcEnv: RpcEnv = CoarseGrainedSchedulerBackend.this.rpcEnv
Expand Down Expand Up @@ -176,11 +181,20 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}

case KillExecutorsOnHost(host) =>
scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
killExecutors(exec.toSeq, adjustTargetNumExecutors = false, countFailures = false,
scheduler.getExecutorsAliveOnHost(host).foreach { execs =>
killExecutors(execs.toSeq, adjustTargetNumExecutors = false, countFailures = false,
force = true)
}

case DecommissionExecutorsOnHost(host) =>
val reason = ExecutorDecommissionInfo(s"Decommissioning all executors on $host.")
scheduler.getExecutorsAliveOnHost(host).foreach { execs =>
val execsWithReasons = execs.map(exec => (exec, reason)).toArray

decommissionExecutors(execsWithReasons, adjustTargetNumExecutors = false,
triggeredByExecutor = false)
}

case UpdateDelegationTokens(newDelegationTokens) =>
updateDelegationTokens(newDelegationTokens)

Expand Down Expand Up @@ -506,6 +520,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
}

conf.get(EXECUTOR_DECOMMISSION_FORCE_KILL_TIMEOUT).map { cleanupInterval =>
val cleanupTask = new Runnable() {
override def run(): Unit = Utils.tryLogNonFatalError {
val stragglers = CoarseGrainedSchedulerBackend.this.synchronized {
executorsToDecommission.filter(executorsPendingDecommission.contains)
}
if (stragglers.nonEmpty) {
logInfo(s"${stragglers.toList} failed to decommission in ${cleanupInterval}, killing.")
killExecutors(stragglers, false, false, true)
}
}
}
cleanupService.map(_.schedule(cleanupTask, cleanupInterval, TimeUnit.SECONDS))
}

executorsToDecommission
}

Expand Down Expand Up @@ -548,6 +577,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

override def stop(): Unit = {
reviveThread.shutdownNow()
cleanupService.foreach(_.shutdownNow())
stopExecutors()
delegationTokenManager.foreach(_.stop())
try {
Expand Down Expand Up @@ -850,14 +880,30 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
protected def doKillExecutors(executorIds: Seq[String]): Future[Boolean] =
Future.successful(false)

/**
* Request that the cluster manager decommissions all executors on a given host.
* @return whether the decommission request is acknowledged.
*/
final override def decommissionExecutorsOnHost(host: String): Boolean = {
logInfo(s"Requesting to kill any and all executors on host $host")
// A potential race exists if a new executor attempts to register on a host
// that is on the exclude list and is no longer valid. To avoid this race,
// all executor registration and decommissioning happens in the event loop. This way, either
// an executor will fail to register, or will be decommed when all executors on a host
// are decommed.
// Decommission all the executors on this host in an event loop to ensure serialization.
driverEndpoint.send(DecommissionExecutorsOnHost(host))
true
}

/**
* Request that the cluster manager kill all executors on a given host.
* @return whether the kill request is acknowledged.
*/
final override def killExecutorsOnHost(host: String): Boolean = {
logInfo(s"Requesting to kill any and all executors on host ${host}")
logInfo(s"Requesting to kill any and all executors on host $host")
// A potential race exists if a new executor attempts to register on a host
// that is on the exclude list and is no no longer valid. To avoid this race,
// that is on the exclude list and is no longer valid. To avoid this race,
// all executor registration and killing happens in the event loop. This way, either
// an executor will fail to register, or will be killed when all executors on a host
// are killed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,51 @@ class HealthTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with Mock
verify(allocationClientMock).killExecutorsOnHost("hostA")
}

test("excluding decommission and kills executors when enabled") {
val allocationClientMock = mock[ExecutorAllocationClient]

// verify we decommission when configured
conf.set(config.EXCLUDE_ON_FAILURE_KILL_ENABLED, true)
conf.set(config.DECOMMISSION_ENABLED.key, "true")
conf.set(config.EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED.key, "true")
conf.set(config.MAX_FAILURES_PER_EXEC.key, "1")
conf.set(config.MAX_FAILED_EXEC_PER_NODE.key, "2")
healthTracker = new HealthTracker(listenerBusMock, conf, Some(allocationClientMock), clock)

// Fail 4 tasks in one task set on executor 1, so that executor gets excluded for the whole
// application.
val taskSetExclude2 = createTaskSetExcludelist(stageId = 0)
(0 until 4).foreach { partition =>
taskSetExclude2.updateExcludedForFailedTask(
"hostA", exec = "1", index = partition, failureReason = "testing")
}
healthTracker.updateExcludedForSuccessfulTaskSet(0, 0, taskSetExclude2.execToFailures)

val msg1 =
"Killing excluded executor id 1 since spark.excludeOnFailure.killExcludedExecutors is set." +
" (actually decommissioning)"

verify(allocationClientMock).decommissionExecutor(
"1", ExecutorDecommissionInfo(msg1), false)

val taskSetExclude3 = createTaskSetExcludelist(stageId = 1)
// Fail 4 tasks in one task set on executor 2, so that executor gets excluded for the whole
// application. Since that's the second executor that is excluded on the same node, we also
// exclude that node.
(0 until 4).foreach { partition =>
taskSetExclude3.updateExcludedForFailedTask(
"hostA", exec = "2", index = partition, failureReason = "testing")
}
healthTracker.updateExcludedForSuccessfulTaskSet(0, 0, taskSetExclude3.execToFailures)

val msg2 =
"Killing excluded executor id 2 since spark.excludeOnFailure.killExcludedExecutors is set." +
" (actually decommissioning)"
verify(allocationClientMock).decommissionExecutor(
"2", ExecutorDecommissionInfo(msg2), false, false)
verify(allocationClientMock).decommissionExecutorsOnHost("hostA")
}

test("fetch failure excluding kills executors, configured by EXCLUDE_ON_FAILURE_KILL_ENABLED") {
val allocationClientMock = mock[ExecutorAllocationClient]
when(allocationClientMock.killExecutors(any(), any(), any(), any())).thenReturn(Seq("called"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,38 @@ private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite =>
executorPatience = None,
decommissioningTest = false)
}

test("Test decommissioning timeouts", k8sTestTag) {
sparkAppConf
.set(config.DECOMMISSION_ENABLED.key, "true")
.set("spark.kubernetes.container.image", pyImage)
.set(config.STORAGE_DECOMMISSION_ENABLED.key, "true")
.set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED.key, "true")
.set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED.key, "true")
// Ensure we have somewhere to migrate our data too
.set("spark.executor.instances", "3")
// Set super high so the timeout is triggered
.set("spark.storage.decommission.replicationReattemptInterval", "8640000")
// Set super low so the timeout is triggered
.set(config.EXECUTOR_DECOMMISSION_CLEANUP_INTERVAL.key, "10")

runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_DECOMISSIONING,
mainClass = "",
expectedDriverLogOnCompletion = Seq(
"Finished waiting, stopping Spark",
"Decommission executors",
"failed to decommission in 10, killing",
"killed by driver."),
appArgs = Array.empty[String],
driverPodChecker = doBasicDriverPyPodCheck,
executorPodChecker = doBasicExecutorPyPodCheck,
appLocator = appLocator,
isJVM = false,
pyFiles = None,
executorPatience = None,
decommissioningTest = true)
}
}

private[spark] object DecommissionSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,11 @@ class KubernetesSuite extends SparkFunSuite
}
// Delete the pod to simulate cluster scale down/migration.
// This will allow the pod to remain up for the grace period
// We set an intentionally long grace period to test that Spark
// exits once the blocks are done migrating and doesn't wait for the
// entire grace period if it does not need to.
kubernetesTestComponents.kubernetesClient.pods()
.withName(name).delete()
.withName(name).withGracePeriod(Int.MaxValue).delete()
logDebug(s"Triggered pod decom/delete: $name deleted")
// Make sure this pod is deleted
Eventually.eventually(TIMEOUT, INTERVAL) {
Expand Down

0 comments on commit 50641d2

Please sign in to comment.