Skip to content

Commit

Permalink
[SPARK-16414][YARN] Fix bugs for "Can not get user config when callin…
Browse files Browse the repository at this point in the history
…g SparkHadoopUtil.get.conf on yarn cluser mode"

## What changes were proposed in this pull request?

The `SparkHadoopUtil` singleton was instantiated before `ApplicationMaster` in `ApplicationMaster.main` when deploying spark on yarn cluster mode, the `conf` in the `SparkHadoopUtil` singleton didn't include user's configuration.

So, we should load the properties file with the Spark configuration and set entries as system properties before `SparkHadoopUtil` first instantiate.

## How was this patch tested?

Add a test case

Author: sharkd <[email protected]>
Author: sharkdtu <[email protected]>

Closes apache#14088 from sharkdtu/master.

(cherry picked from commit d513c99)
  • Loading branch information
sharkdtu authored and Marcelo Vanzin committed Jul 12, 2016
1 parent f419476 commit 2f47b37
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,6 @@ private[spark] class ApplicationMaster(
client: YarnRMClient)
extends Logging {

// Load the properties file with the Spark configuration and set entries as system properties,
// so that user code run inside the AM also has access to them.
if (args.propertiesFile != null) {
Utils.getPropertiesFromFile(args.propertiesFile).foreach { case (k, v) =>
sys.props(k) = v
}
}

// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
// optimal as more containers are available. Might need to handle this better.

Expand Down Expand Up @@ -743,6 +735,15 @@ object ApplicationMaster extends Logging {
def main(args: Array[String]): Unit = {
SignalUtils.registerLogger(log)
val amArgs = new ApplicationMasterArguments(args)

// Load the properties file with the Spark configuration and set entries as system properties,
// so that user code run inside the AM also has access to them.
// Note: we must do this before SparkHadoopUtil instantiated
if (amArgs.propertiesFile != null) {
Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) =>
sys.props(k) = v
}
}
SparkHadoopUtil.get.runAsSparkUser { () =>
master = new ApplicationMaster(amArgs, new YarnRMClient)
System.exit(master.run())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.scalatest.Matchers
import org.scalatest.concurrent.Eventually._

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.launcher._
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart,
Expand Down Expand Up @@ -106,6 +107,10 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
))
}

test("run Spark in yarn-cluster mode with using SparkHadoopUtil.conf") {
testYarnAppUseSparkHadoopUtilConf()
}

test("run Spark in yarn-client mode with additional jar") {
testWithAddJar(true)
}
Expand Down Expand Up @@ -181,6 +186,15 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
checkResult(finalState, result)
}

private def testYarnAppUseSparkHadoopUtilConf(): Unit = {
val result = File.createTempFile("result", null, tempDir)
val finalState = runSpark(false,
mainClassName(YarnClusterDriverUseSparkHadoopUtilConf.getClass),
appArgs = Seq("key=value", result.getAbsolutePath()),
extraConf = Map("spark.hadoop.key" -> "value"))
checkResult(finalState, result)
}

private def testWithAddJar(clientMode: Boolean): Unit = {
val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir)
val driverResult = File.createTempFile("driver", null, tempDir)
Expand Down Expand Up @@ -274,6 +288,37 @@ private object YarnClusterDriverWithFailure extends Logging with Matchers {
}
}

private object YarnClusterDriverUseSparkHadoopUtilConf extends Logging with Matchers {
def main(args: Array[String]): Unit = {
if (args.length != 2) {
// scalastyle:off println
System.err.println(
s"""
|Invalid command line: ${args.mkString(" ")}
|
|Usage: YarnClusterDriverUseSparkHadoopUtilConf [hadoopConfKey=value] [result file]
""".stripMargin)
// scalastyle:on println
System.exit(1)
}

val sc = new SparkContext(new SparkConf()
.set("spark.extraListeners", classOf[SaveExecutorInfo].getName)
.setAppName("yarn test using SparkHadoopUtil's conf"))

val kv = args(0).split("=")
val status = new File(args(1))
var result = "failure"
try {
SparkHadoopUtil.get.conf.get(kv(0)) should be (kv(1))
result = "success"
} finally {
Files.write(result, status, StandardCharsets.UTF_8)
sc.stop()
}
}
}

private object YarnClusterDriver extends Logging with Matchers {

val WAIT_TIMEOUT_MILLIS = 10000
Expand Down

0 comments on commit 2f47b37

Please sign in to comment.