Skip to content

Commit

Permalink
[SPARK-32037][CORE] Rename blacklisting feature
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

this PR renames the blacklisting feature. I ended up using  "excludeOnFailure" or "excluded" in most cases but there is a mix. I renamed the BlacklistTracker to HealthTracker, but for the TaskSetBlacklist HealthTracker didn't make sense to me since its not the health of the taskset itself but rather tracking the things its excluded on so I renamed it to be TaskSetExcludeList.  Everything else I tried to use the context and in most cases excluded made sense. It made more sense to me then blocked since you are basically excluding those executors and nodes from scheduling tasks on them. Then can be unexcluded later after timeouts and such. The configs I changed the name to use excludeOnFailure which I thought explained it.

I unfortunately couldn't get rid of some of them because its part of the event listener and history files.  To keep backwards compatibility I kept the events and some of the parsing so that the history server would still properly read older history files.  It is not forward compatible though - meaning a new application write the "Excluded" events so the older history server won't properly read display them as being blacklisted.

A few of the files below are showing up as deleted and recreated even though I did a git mv on them. I'm not sure why.

### Why are the changes needed?

get rid of problematic language

### Does this PR introduce _any_ user-facing change?

Config name changes but the old configs still work but are deprecated.

### How was this patch tested?

updated tests and also manually tested the UI changes and manually tested the history server reading older versions of history files and vice versa.

Closes apache#29906 from tgravescs/SPARK-32037.

Lead-authored-by: Thomas Graves <[email protected]>
Co-authored-by: Thomas Graves <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
  • Loading branch information
tgravescs and tgravescs committed Oct 30, 2020
1 parent 491a0fb commit 72ad9dc
Show file tree
Hide file tree
Showing 72 changed files with 2,557 additions and 2,075 deletions.
38 changes: 38 additions & 0 deletions core/src/main/java/org/apache/spark/SparkFirehoseListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark;

import org.apache.spark.annotation.DeveloperApi;
import org.apache.spark.scheduler.*;

/**
Expand All @@ -27,7 +28,11 @@
* new methods to SparkListener: forgetting to add a method will result in a compilation error (if
* this was a concrete Scala class, default implementations of new event handlers would be inherited
* from the SparkListener trait).
*
* Please note until Spark 3.1.0 this was missing the DevelopApi annotation, this needs to be
* taken into account if changing this API before a major release.
*/
@DeveloperApi
public class SparkFirehoseListener implements SparkListenerInterface {

public void onEvent(SparkListenerEvent event) { }
Expand Down Expand Up @@ -124,34 +129,67 @@ public final void onExecutorBlacklisted(SparkListenerExecutorBlacklisted executo
onEvent(executorBlacklisted);
}

@Override
public final void onExecutorExcluded(SparkListenerExecutorExcluded executorExcluded) {
onEvent(executorExcluded);
}

@Override
public void onExecutorBlacklistedForStage(
SparkListenerExecutorBlacklistedForStage executorBlacklistedForStage) {
onEvent(executorBlacklistedForStage);
}

@Override
public void onExecutorExcludedForStage(
SparkListenerExecutorExcludedForStage executorExcludedForStage) {
onEvent(executorExcludedForStage);
}

@Override
public void onNodeBlacklistedForStage(
SparkListenerNodeBlacklistedForStage nodeBlacklistedForStage) {
onEvent(nodeBlacklistedForStage);
}

@Override
public void onNodeExcludedForStage(
SparkListenerNodeExcludedForStage nodeExcludedForStage) {
onEvent(nodeExcludedForStage);
}

@Override
public final void onExecutorUnblacklisted(
SparkListenerExecutorUnblacklisted executorUnblacklisted) {
onEvent(executorUnblacklisted);
}

@Override
public final void onExecutorUnexcluded(
SparkListenerExecutorUnexcluded executorUnexcluded) {
onEvent(executorUnexcluded);
}

@Override
public final void onNodeBlacklisted(SparkListenerNodeBlacklisted nodeBlacklisted) {
onEvent(nodeBlacklisted);
}

@Override
public final void onNodeExcluded(SparkListenerNodeExcluded nodeExcluded) {
onEvent(nodeExcluded);
}

@Override
public final void onNodeUnblacklisted(SparkListenerNodeUnblacklisted nodeUnblacklisted) {
onEvent(nodeUnblacklisted);
}

@Override
public final void onNodeUnexcluded(SparkListenerNodeUnexcluded nodeUnexcluded) {
onEvent(nodeUnexcluded);
}

@Override
public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) {
onEvent(blockUpdated);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ <h4 class="title-table">Summary</h4>
</th>
<th>
<span data-toggle="tooltip" data-placement="top"
title="Number of executors blacklisted by the scheduler due to task failures.">
Blacklisted</span>
title="Number of executors excluded by the scheduler due to task failures.">
Excluded</span>
</th>
</tr>
</thead>
Expand Down
28 changes: 14 additions & 14 deletions core/src/main/resources/org/apache/spark/ui/static/executorspage.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ function getThreadDumpEnabled() {
}

function formatStatus(status, type, row) {
if (row.isBlacklisted) {
return "Blacklisted";
if (row.isExcluded) {
return "Excluded";
}

if (status) {
if (row.blacklistedInStages.length == 0) {
if (row.excludedInStages.length == 0) {
return "Active"
}
return "Active (Blacklisted in Stages: [" + row.blacklistedInStages.join(", ") + "])";
return "Active (Excluded in Stages: [" + row.excludedInStages.join(", ") + "])";
}
return "Dead"
}
Expand Down Expand Up @@ -168,7 +168,7 @@ $(document).ready(function () {
var allTotalInputBytes = 0;
var allTotalShuffleRead = 0;
var allTotalShuffleWrite = 0;
var allTotalBlacklisted = 0;
var allTotalExcluded = 0;

var activeExecCnt = 0;
var activeRDDBlocks = 0;
Expand All @@ -190,7 +190,7 @@ $(document).ready(function () {
var activeTotalInputBytes = 0;
var activeTotalShuffleRead = 0;
var activeTotalShuffleWrite = 0;
var activeTotalBlacklisted = 0;
var activeTotalExcluded = 0;

var deadExecCnt = 0;
var deadRDDBlocks = 0;
Expand All @@ -212,7 +212,7 @@ $(document).ready(function () {
var deadTotalInputBytes = 0;
var deadTotalShuffleRead = 0;
var deadTotalShuffleWrite = 0;
var deadTotalBlacklisted = 0;
var deadTotalExcluded = 0;

response.forEach(function (exec) {
var memoryMetrics = {
Expand Down Expand Up @@ -246,7 +246,7 @@ $(document).ready(function () {
allTotalInputBytes += exec.totalInputBytes;
allTotalShuffleRead += exec.totalShuffleRead;
allTotalShuffleWrite += exec.totalShuffleWrite;
allTotalBlacklisted += exec.isBlacklisted ? 1 : 0;
allTotalExcluded += exec.isExcluded ? 1 : 0;
if (exec.isActive) {
activeExecCnt += 1;
activeRDDBlocks += exec.rddBlocks;
Expand All @@ -268,7 +268,7 @@ $(document).ready(function () {
activeTotalInputBytes += exec.totalInputBytes;
activeTotalShuffleRead += exec.totalShuffleRead;
activeTotalShuffleWrite += exec.totalShuffleWrite;
activeTotalBlacklisted += exec.isBlacklisted ? 1 : 0;
activeTotalExcluded += exec.isExcluded ? 1 : 0;
} else {
deadExecCnt += 1;
deadRDDBlocks += exec.rddBlocks;
Expand All @@ -290,7 +290,7 @@ $(document).ready(function () {
deadTotalInputBytes += exec.totalInputBytes;
deadTotalShuffleRead += exec.totalShuffleRead;
deadTotalShuffleWrite += exec.totalShuffleWrite;
deadTotalBlacklisted += exec.isBlacklisted ? 1 : 0;
deadTotalExcluded += exec.isExcluded ? 1 : 0; // todo - TEST BACKWARDS compatibility history?
}
});

Expand All @@ -315,7 +315,7 @@ $(document).ready(function () {
"allTotalInputBytes": allTotalInputBytes,
"allTotalShuffleRead": allTotalShuffleRead,
"allTotalShuffleWrite": allTotalShuffleWrite,
"allTotalBlacklisted": allTotalBlacklisted
"allTotalExcluded": allTotalExcluded
};
var activeSummary = {
"execCnt": ( "Active(" + activeExecCnt + ")"),
Expand All @@ -338,7 +338,7 @@ $(document).ready(function () {
"allTotalInputBytes": activeTotalInputBytes,
"allTotalShuffleRead": activeTotalShuffleRead,
"allTotalShuffleWrite": activeTotalShuffleWrite,
"allTotalBlacklisted": activeTotalBlacklisted
"allTotalExcluded": activeTotalExcluded
};
var deadSummary = {
"execCnt": ( "Dead(" + deadExecCnt + ")" ),
Expand All @@ -361,7 +361,7 @@ $(document).ready(function () {
"allTotalInputBytes": deadTotalInputBytes,
"allTotalShuffleRead": deadTotalShuffleRead,
"allTotalShuffleWrite": deadTotalShuffleWrite,
"allTotalBlacklisted": deadTotalBlacklisted
"allTotalExcluded": deadTotalExcluded
};

var data = {executors: response, "execSummary": [activeSummary, deadSummary, totalSummary]};
Expand Down Expand Up @@ -547,7 +547,7 @@ $(document).ready(function () {
{data: 'allTotalInputBytes', render: formatBytes},
{data: 'allTotalShuffleRead', render: formatBytes},
{data: 'allTotalShuffleWrite', render: formatBytes},
{data: 'allTotalBlacklisted'}
{data: 'allTotalExcluded'}
],
"paging": false,
"searching": false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ $(document).ready(function () {
{data : "failedTasks"},
{data : "killedTasks"},
{data : "succeededTasks"},
{data : "isBlacklistedForStage"},
{data : "isExcludedForStage"},
{
data : function (row, type) {
return row.inputRecords != 0 ? formatBytes(row.inputBytes, type) + " / " + row.inputRecords : "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ <h4 class="title-table">Aggregated Metrics by Executor</h4>
<th>Succeeded Tasks</th>
<th>
<span data-toggle="tooltip" data-placement="top"
title="Shows if this executor has been blacklisted by the scheduler due to task failures.">
Blacklisted</span>
title="Shows if this executor has been excluded by the scheduler due to task failures.">
Excluded</span>
</th>
<th><span id="executor-summary-input">Input Size / Records</span></th>
<th><span id="executor-summary-output">Output Size / Records</span></th>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ private[spark] class ExecutorAllocationManager(

if (unschedulableTaskSets > 0) {
// Request additional executors to account for task sets having tasks that are unschedulable
// due to blacklisting when the active executor count has already reached the max needed
// which we would normally get.
// due to executors excluded for failures when the active executor count has already reached
// the max needed which we would normally get.
val maxNeededForUnschedulables = math.ceil(unschedulableTaskSets * executorAllocationRatio /
tasksPerExecutor).toInt
math.max(maxNeededWithSpeculationLocalityOffset,
Expand Down Expand Up @@ -662,10 +662,10 @@ private[spark] class ExecutorAllocationManager(
private val resourceProfileIdToStageAttempt =
new mutable.HashMap[Int, mutable.Set[StageAttempt]]

// Keep track of unschedulable task sets due to blacklisting. This is a Set of StageAttempt's
// because we'll only take the last unschedulable task in a taskset although there can be more.
// This is done in order to avoid costly loops in the scheduling.
// Check TaskSetManager#getCompletelyBlacklistedTaskIfAny for more details.
// Keep track of unschedulable task sets because of executor/node exclusions from too many task
// failures. This is a Set of StageAttempt's because we'll only take the last unschedulable task
// in a taskset although there can be more. This is done in order to avoid costly loops in the
// scheduling. Check TaskSetManager#getCompletelyExcludedTaskIfAny for more details.
private val unschedulableTaskSets = new mutable.HashSet[StageAttempt]

// stageAttempt to tuple (the number of task with locality preferences, a map where each pair
Expand Down
28 changes: 26 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ private[spark] object SparkConf extends Logging {
"are no longer accepted. To specify the equivalent now, one may use '64k'."),
DeprecatedConfig("spark.rpc", "2.0", "Not used anymore."),
DeprecatedConfig("spark.scheduler.executorTaskBlacklistTime", "2.1.0",
"Please use the new blacklisting options, spark.blacklist.*"),
"Please use the new excludedOnFailure options, spark.excludeOnFailure.*"),
DeprecatedConfig("spark.yarn.am.port", "2.0.0", "Not used anymore"),
DeprecatedConfig("spark.executor.port", "2.0.0", "Not used anymore"),
DeprecatedConfig("spark.shuffle.service.index.cache.entries", "2.3.0",
Expand All @@ -612,7 +612,31 @@ private[spark] object SparkConf extends Logging {
DeprecatedConfig("spark.yarn.credentials.file.retention.days", "2.4.0", "Not used anymore."),
DeprecatedConfig("spark.yarn.services", "3.0.0", "Feature no longer available."),
DeprecatedConfig("spark.executor.plugins", "3.0.0",
"Feature replaced with new plugin API. See Monitoring documentation.")
"Feature replaced with new plugin API. See Monitoring documentation."),
DeprecatedConfig("spark.blacklist.enabled", "3.1.0",
"Please use spark.excludeOnFailure.enabled"),
DeprecatedConfig("spark.blacklist.task.maxTaskAttemptsPerExecutor", "3.1.0",
"Please use spark.excludeOnFailure.task.maxTaskAttemptsPerExecutor"),
DeprecatedConfig("spark.blacklist.task.maxTaskAttemptsPerNode", "3.1.0",
"Please use spark.excludeOnFailure.task.maxTaskAttemptsPerNode"),
DeprecatedConfig("spark.blacklist.application.maxFailedTasksPerExecutor", "3.1.0",
"Please use spark.excludeOnFailure.application.maxFailedTasksPerExecutor"),
DeprecatedConfig("spark.blacklist.stage.maxFailedTasksPerExecutor", "3.1.0",
"Please use spark.excludeOnFailure.stage.maxFailedTasksPerExecutor"),
DeprecatedConfig("spark.blacklist.application.maxFailedExecutorsPerNode", "3.1.0",
"Please use spark.excludeOnFailure.application.maxFailedExecutorsPerNode"),
DeprecatedConfig("spark.blacklist.stage.maxFailedExecutorsPerNode", "3.1.0",
"Please use spark.excludeOnFailure.stage.maxFailedExecutorsPerNode"),
DeprecatedConfig("spark.blacklist.timeout", "3.1.0",
"Please use spark.excludeOnFailure.timeout"),
DeprecatedConfig("spark.blacklist.application.fetchFailure.enabled", "3.1.0",
"Please use spark.excludeOnFailure.application.fetchFailure.enabled"),
DeprecatedConfig("spark.scheduler.blacklist.unschedulableTaskSetTimeout", "3.1.0",
"Please use spark.scheduler.excludeOnFailure.unschedulableTaskSetTimeout"),
DeprecatedConfig("spark.blacklist.killBlacklistedExecutors", "3.1.0",
"Please use spark.excludeOnFailure.killExcludedExecutors"),
DeprecatedConfig("spark.yarn.blacklist.executor.launch.blacklisting.enabled", "3.1.0",
"Please use spark.yarn.executor.launch.excludeOnFailure.enabled")
)

Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
Expand Down
9 changes: 5 additions & 4 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,11 @@ case class FetchFailed(
/**
* Fetch failures lead to a different failure handling path: (1) we don't abort the stage after
* 4 task failures, instead we immediately go back to the stage which generated the map output,
* and regenerate the missing data. (2) we don't count fetch failures for blacklisting, since
* presumably its not the fault of the executor where the task ran, but the executor which
* stored the data. This is especially important because we might rack up a bunch of
* fetch-failures in rapid succession, on all nodes of the cluster, due to one bad node.
* and regenerate the missing data. (2) we don't count fetch failures from executors excluded
* due to too many task failures, since presumably its not the fault of the executor where
* the task ran, but the executor which stored the data. This is especially important because
* we might rack up a bunch of fetch-failures in rapid succession, on all nodes of the cluster,
* due to one bad node.
*/
override def countTowardsTaskFailures: Boolean = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ private[spark] class BasicEventFilter(
case e: SparkListenerExecutorRemoved => liveExecutors.contains(e.executorId)
case e: SparkListenerExecutorBlacklisted => liveExecutors.contains(e.executorId)
case e: SparkListenerExecutorUnblacklisted => liveExecutors.contains(e.executorId)
case e: SparkListenerExecutorExcluded => liveExecutors.contains(e.executorId)
case e: SparkListenerExecutorUnexcluded => liveExecutors.contains(e.executorId)
case e: SparkListenerStageExecutorMetrics => liveExecutors.contains(e.execId)
case e: SparkListenerBlockManagerAdded => acceptBlockManagerEvent(e.blockManagerId)
case e: SparkListenerBlockManagerRemoved => acceptBlockManagerEvent(e.blockManagerId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private[spark] class HistoryAppStatusStore(
source.totalShuffleWrite, source.isBlacklisted, source.maxMemory, source.addTime,
source.removeTime, source.removeReason, newExecutorLogs, source.memoryMetrics,
source.blacklistedInStages, source.peakMemoryMetrics, source.attributes, source.resources,
source.resourceProfileId)
source.resourceProfileId, source.isExcluded, source.excludedInStages)
}

}
Loading

0 comments on commit 72ad9dc

Please sign in to comment.