Skip to content

Commit

Permalink
Renamed stageIdToActiveJob to jobIdToActiveJob.
Browse files Browse the repository at this point in the history
This data structure was misused and, as a result, later renamed to an incorrect name.

This data structure seems to have gotten into this tangled state as a result of @henrydavidge using the stageID instead of the job Id to index into it and later @andrewor14 renaming the data structure to reflect this misunderstanding.

This patch renames it and removes an incorrect indexing into it.  The incorrect indexing into it meant that the code added by @henrydavidge to warn when a task size is too large (added here apache/spark@5757993) was not always executed; this commit fixes that.

Author: Kay Ousterhout <[email protected]>

Closes #301 from kayousterhout/fixCancellation and squashes the following commits:

bd3d3a4 [Kay Ousterhout] Renamed stageIdToActiveJob to jobIdToActiveJob.
  • Loading branch information
kayousterhout authored and pwendell committed Apr 2, 2014
1 parent ea9de65 commit 11973a7
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 12 deletions.
21 changes: 10 additions & 11 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class DAGScheduler(
private[scheduler] val stageIdToJobIds = new TimeStampedHashMap[Int, HashSet[Int]]
private[scheduler] val stageIdToStage = new TimeStampedHashMap[Int, Stage]
private[scheduler] val shuffleToMapStage = new TimeStampedHashMap[Int, Stage]
private[scheduler] val stageIdToActiveJob = new HashMap[Int, ActiveJob]
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
private[scheduler] val resultStageToJob = new HashMap[Stage, ActiveJob]
private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo]

Expand Down Expand Up @@ -536,7 +536,7 @@ class DAGScheduler(
listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
runLocally(job)
} else {
stageIdToActiveJob(jobId) = job
jobIdToActiveJob(jobId) = job
activeJobs += job
resultStageToJob(finalStage) = job
listenerBus.post(
Expand All @@ -559,7 +559,7 @@ class DAGScheduler(
// Cancel all running jobs.
runningStages.map(_.jobId).foreach(handleJobCancellation)
activeJobs.clear() // These should already be empty by this point,
stageIdToActiveJob.clear() // but just in case we lost track of some jobs...
jobIdToActiveJob.clear() // but just in case we lost track of some jobs...

case ExecutorAdded(execId, host) =>
handleExecutorAdded(execId, host)
Expand All @@ -569,7 +569,6 @@ class DAGScheduler(

case BeginEvent(task, taskInfo) =>
for (
job <- stageIdToActiveJob.get(task.stageId);
stage <- stageIdToStage.get(task.stageId);
stageInfo <- stageToInfos.get(stage)
) {
Expand Down Expand Up @@ -697,7 +696,7 @@ class DAGScheduler(
private def activeJobForStage(stage: Stage): Option[Int] = {
if (stageIdToJobIds.contains(stage.id)) {
val jobsThatUseStage: Array[Int] = stageIdToJobIds(stage.id).toArray.sorted
jobsThatUseStage.find(stageIdToActiveJob.contains)
jobsThatUseStage.find(jobIdToActiveJob.contains)
} else {
None
}
Expand Down Expand Up @@ -750,8 +749,8 @@ class DAGScheduler(
}
}

val properties = if (stageIdToActiveJob.contains(jobId)) {
stageIdToActiveJob(stage.jobId).properties
val properties = if (jobIdToActiveJob.contains(jobId)) {
jobIdToActiveJob(stage.jobId).properties
} else {
// this stage will be assigned to "default" pool
null
Expand Down Expand Up @@ -827,7 +826,7 @@ class DAGScheduler(
job.numFinished += 1
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
stageIdToActiveJob -= stage.jobId
jobIdToActiveJob -= stage.jobId
activeJobs -= job
resultStageToJob -= stage
markStageAsFinished(stage)
Expand Down Expand Up @@ -986,11 +985,11 @@ class DAGScheduler(
val independentStages = removeJobAndIndependentStages(jobId)
independentStages.foreach(taskScheduler.cancelTasks)
val error = new SparkException("Job %d cancelled".format(jobId))
val job = stageIdToActiveJob(jobId)
val job = jobIdToActiveJob(jobId)
job.listener.jobFailed(error)
jobIdToStageIds -= jobId
activeJobs -= job
stageIdToActiveJob -= jobId
jobIdToActiveJob -= jobId
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, job.finalStage.id)))
}
}
Expand All @@ -1011,7 +1010,7 @@ class DAGScheduler(
val error = new SparkException("Job aborted: " + reason)
job.listener.jobFailed(error)
jobIdToStageIdsRemove(job.jobId)
stageIdToActiveJob -= resultStage.jobId
jobIdToActiveJob -= resultStage.jobId
activeJobs -= job
resultStageToJob -= resultStage
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, failedStage.id)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
assert(scheduler.pendingTasks.isEmpty)
assert(scheduler.activeJobs.isEmpty)
assert(scheduler.failedStages.isEmpty)
assert(scheduler.stageIdToActiveJob.isEmpty)
assert(scheduler.jobIdToActiveJob.isEmpty)
assert(scheduler.jobIdToStageIds.isEmpty)
assert(scheduler.stageIdToJobIds.isEmpty)
assert(scheduler.stageIdToStage.isEmpty)
Expand Down

0 comments on commit 11973a7

Please sign in to comment.