Skip to content

Commit

Permalink
[SPARK-34018][K8S] NPE in ExecutorPodsSnapshot
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Label both the statuses and ensure the ExecutorPodSnapshot starts with the default config to match.

### Why are the changes needed?

The current test depends on the order rather than testing the desired property.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Labeled the containers statuses, observed failures, added the default label as the initialization point, tests passed again.

Built Spark, ran on K8s cluster verified no NPE in driver log.

Closes apache#31071 from holdenk/SPARK-34018-finishedExecutorWithRunningSidecar-doesnt-correctly-constructt-the-test-case.

Authored-by: Holden Karau <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
holdenk authored and dongjoon-hyun committed Jan 8, 2021
1 parent 5b16d70 commit 8e11ce5
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private[spark] case class ExecutorPodsSnapshot(executorPods: Map[Long, ExecutorP

object ExecutorPodsSnapshot extends Logging {
private var shouldCheckAllContainers: Boolean = _
private var sparkContainerName: String = _
private var sparkContainerName: String = DEFAULT_EXECUTOR_CONTAINER_NAME

def apply(executorPods: Seq[Pod]): ExecutorPodsSnapshot = {
ExecutorPodsSnapshot(toStatesByExecutorId(executorPods))
Expand Down Expand Up @@ -80,24 +80,21 @@ object ExecutorPodsSnapshot extends Logging {
.anyMatch(t => t != null && t.getExitCode != 0)) {
PodFailed(pod)
} else {
// Otherwise look for the Spark container
val sparkContainerStatusOpt = pod.getStatus.getContainerStatuses.asScala
.find(_.getName() == sparkContainerName)
sparkContainerStatusOpt match {
case Some(sparkContainerStatus) =>
sparkContainerStatus.getState.getTerminated match {
case t if t.getExitCode != 0 =>
PodFailed(pod)
case t if t.getExitCode == 0 =>
// Otherwise look for the Spark container and get the exit code if present.
val sparkContainerExitCode = pod.getStatus.getContainerStatuses.asScala
.find(_.getName() == sparkContainerName).flatMap(x => Option(x.getState))
.flatMap(x => Option(x.getTerminated)).flatMap(x => Option(x.getExitCode))
.map(_.toInt)
sparkContainerExitCode match {
case Some(t) =>
t match {
case 0 =>
PodSucceeded(pod)
case _ =>
PodRunning(pod)
PodFailed(pod)
}
// If we can't find the Spark container status, fall back to the pod status. This is
// expected to occur during pod startup and other situations.
// No exit code means we are running.
case _ =>
logDebug(s"Unable to find container ${sparkContainerName} in pod ${pod} " +
"defaulting to entire pod status (running).")
PodRunning(pod)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,16 @@ object ExecutorLifecycleTestUtils {
.editOrNewStatus()
.withPhase("running")
.addNewContainerStatus()
.withName(DEFAULT_EXECUTOR_CONTAINER_NAME)
.withNewState()
.withNewTerminated()
.withMessage("message")
.withExitCode(exitCode)
.endTerminated()
.endState()
.endContainerStatus()
.addNewContainerStatus()
.withName("SIDECARFRIEND")
.withNewState()
.withNewRunning()
.endRunning()
Expand Down

0 comments on commit 8e11ce5

Please sign in to comment.