Skip to content

Commit

Permalink
[SPARK-1720][SPARK-1719] use LD_LIBRARY_PATH instead of -Djava.librar…
Browse files Browse the repository at this point in the history
…y.path

- [X] Standalone
- [X] YARN
- [X] Mesos
- [X]  Mac OS X
- [X] Linux
- [ ]  Windows

This is another implementation about apache#1031

Author: GuoQiang Li <[email protected]>

Closes apache#2711 from witgo/SPARK-1719 and squashes the following commits:

c7b26f6 [GuoQiang Li] review commits
4488e41 [GuoQiang Li] Refactoring CommandUtils
a444094 [GuoQiang Li] review commits
40c0b4a [GuoQiang Li] Add buildLocalCommand method
c1a0ddd [GuoQiang Li] fix comments
156ce88 [GuoQiang Li] review commit
38aa377 [GuoQiang Li] Refactor CommandUtils.scala
4269e00 [GuoQiang Li] Refactor SparkSubmitDriverBootstrapper.scala
7a1d634 [GuoQiang Li] use LD_LIBRARY_PATH instead of -Djava.library.path
  • Loading branch information
witgo authored and Andrew Or committed Oct 30, 2014
1 parent 1234258 commit cd739bd
Show file tree
Hide file tree
Showing 13 changed files with 221 additions and 81 deletions.
6 changes: 5 additions & 1 deletion bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ case "$1" in
OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_SUBMIT_OPTS"
OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM}
if [ -n "$SPARK_SUBMIT_LIBRARY_PATH" ]; then
OUR_JAVA_OPTS="$OUR_JAVA_OPTS -Djava.library.path=$SPARK_SUBMIT_LIBRARY_PATH"
if [[ $OSTYPE == darwin* ]]; then
export DYLD_LIBRARY_PATH="$SPARK_SUBMIT_LIBRARY_PATH:$DYLD_LIBRARY_PATH"
else
export LD_LIBRARY_PATH="$SPARK_SUBMIT_LIBRARY_PATH:$LD_LIBRARY_PATH"
fi
fi
if [ -n "$SPARK_SUBMIT_DRIVER_MEMORY" ]; then
OUR_JAVA_MEM="$SPARK_SUBMIT_DRIVER_MEMORY"
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,19 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
val executorClasspathKey = "spark.executor.extraClassPath"
val driverOptsKey = "spark.driver.extraJavaOptions"
val driverClassPathKey = "spark.driver.extraClassPath"
val driverLibraryPathKey = "spark.driver.extraLibraryPath"

// Used by Yarn in 1.1 and before
sys.props.get("spark.driver.libraryPath").foreach { value =>
val warning =
s"""
|spark.driver.libraryPath was detected (set to '$value').
|This is deprecated in Spark 1.2+.
|
|Please instead use: $driverLibraryPathKey
""".stripMargin
logWarning(warning)
}

// Validate spark.executor.extraJavaOptions
settings.get(executorOptsKey).map { javaOpts =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,8 @@ private[spark] object SparkSubmitDriverBootstrapper {
.orElse(confDriverMemory)
.getOrElse(defaultDriverMemory)

val newLibraryPath =
if (submitLibraryPath.isDefined) {
// SPARK_SUBMIT_LIBRARY_PATH is already captured in JAVA_OPTS
""
} else {
confLibraryPath.map("-Djava.library.path=" + _).getOrElse("")
}

val newClasspath =
if (submitClasspath.isDefined) {
// SPARK_SUBMIT_CLASSPATH is already captured in CLASSPATH
classpath
} else {
classpath + confClasspath.map(sys.props("path.separator") + _).getOrElse("")
Expand All @@ -114,7 +105,6 @@ private[spark] object SparkSubmitDriverBootstrapper {
val command: Seq[String] =
Seq(runner) ++
Seq("-cp", newClasspath) ++
Seq(newLibraryPath) ++
filteredJavaOpts ++
Seq(s"-Xms$newDriverMemory", s"-Xmx$newDriverMemory") ++
Seq("org.apache.spark.deploy.SparkSubmit") ++
Expand All @@ -130,6 +120,13 @@ private[spark] object SparkSubmitDriverBootstrapper {
// Start the driver JVM
val filteredCommand = command.filter(_.nonEmpty)
val builder = new ProcessBuilder(filteredCommand)
val env = builder.environment()

if (submitLibraryPath.isEmpty && confLibraryPath.nonEmpty) {
val libraryPaths = confLibraryPath ++ sys.env.get(Utils.libraryPathEnvName)
env.put(Utils.libraryPathEnvName, libraryPaths.mkString(sys.props("path.separator")))
}

val process = builder.start()

// Redirect stdout and stderr from the child JVM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.deploy.worker
import java.io.{File, FileOutputStream, InputStream, IOException}
import java.lang.System._

import scala.collection.Map

import org.apache.spark.Logging
import org.apache.spark.deploy.Command
import org.apache.spark.util.Utils
Expand All @@ -29,7 +31,29 @@ import org.apache.spark.util.Utils
*/
private[spark]
object CommandUtils extends Logging {
def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String] = {

/**
* Build a ProcessBuilder based on the given parameters.
* The `env` argument is exposed for testing.
*/
def buildProcessBuilder(
command: Command,
memory: Int,
sparkHome: String,
substituteArguments: String => String,
classPaths: Seq[String] = Seq[String](),
env: Map[String, String] = sys.env): ProcessBuilder = {
val localCommand = buildLocalCommand(command, substituteArguments, classPaths, env)
val commandSeq = buildCommandSeq(localCommand, memory, sparkHome)
val builder = new ProcessBuilder(commandSeq: _*)
val environment = builder.environment()
for ((key, value) <- localCommand.environment) {
environment.put(key, value)
}
builder
}

private def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String] = {
val runner = sys.env.get("JAVA_HOME").map(_ + "/bin/java").getOrElse("java")

// SPARK-698: do not call the run.cmd script, as process.destroy()
Expand All @@ -38,11 +62,41 @@ object CommandUtils extends Logging {
command.arguments
}

/**
* Build a command based on the given one, taking into account the local environment
* of where this command is expected to run, substitute any placeholders, and append
* any extra class paths.
*/
private def buildLocalCommand(
command: Command,
substituteArguments: String => String,
classPath: Seq[String] = Seq[String](),
env: Map[String, String]): Command = {
val libraryPathName = Utils.libraryPathEnvName
val libraryPathEntries = command.libraryPathEntries
val cmdLibraryPath = command.environment.get(libraryPathName)

val newEnvironment = if (libraryPathEntries.nonEmpty && libraryPathName.nonEmpty) {
val libraryPaths = libraryPathEntries ++ cmdLibraryPath ++ env.get(libraryPathName)
command.environment + ((libraryPathName, libraryPaths.mkString(File.pathSeparator)))
} else {
command.environment
}

Command(
command.mainClass,
command.arguments.map(substituteArguments),
newEnvironment,
command.classPathEntries ++ classPath,
Seq[String](), // library path already captured in environment variable
command.javaOpts)
}

/**
* Attention: this must always be aligned with the environment variables in the run scripts and
* the way the JAVA_OPTS are assembled there.
*/
def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = {
private def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = {
val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M")

// Exists for backwards compatibility with older Spark versions
Expand All @@ -53,14 +107,6 @@ object CommandUtils extends Logging {
logWarning("Set SPARK_LOCAL_DIRS for node-specific storage locations.")
}

val libraryOpts =
if (command.libraryPathEntries.size > 0) {
val joined = command.libraryPathEntries.mkString(File.pathSeparator)
Seq(s"-Djava.library.path=$joined")
} else {
Seq()
}

// Figure out our classpath with the external compute-classpath script
val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
val classPath = Utils.executeAndGetOutput(
Expand All @@ -71,7 +117,7 @@ object CommandUtils extends Logging {
val javaVersion = System.getProperty("java.version")
val permGenOpt = if (!javaVersion.startsWith("1.8")) Some("-XX:MaxPermSize=128m") else None
Seq("-cp", userClassPath.filterNot(_.isEmpty).mkString(File.pathSeparator)) ++
permGenOpt ++ libraryOpts ++ workerLocalOpts ++ command.javaOpts ++ memoryOpts
permGenOpt ++ workerLocalOpts ++ command.javaOpts ++ memoryOpts
}

/** Spawn a thread that will redirect a given stream to a file */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,9 @@ private[spark] class DriverRunner(

// Make sure user application jar is on the classpath
// TODO: If we add ability to submit multiple jars they should also be added here
val classPath = driverDesc.command.classPathEntries ++ Seq(s"$localJarFilename")
val newCommand = Command(
driverDesc.command.mainClass,
driverDesc.command.arguments.map(substituteVariables),
driverDesc.command.environment,
classPath,
driverDesc.command.libraryPathEntries,
driverDesc.command.javaOpts)
val command = CommandUtils.buildCommandSeq(newCommand, driverDesc.mem,
sparkHome.getAbsolutePath)
launchDriver(command, driverDesc.command.environment, driverDir, driverDesc.supervise)
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, driverDesc.mem,
sparkHome.getAbsolutePath, substituteVariables, Seq(localJarFilename))
launchDriver(builder, driverDir, driverDesc.supervise)
}
catch {
case e: Exception => finalException = Some(e)
Expand Down Expand Up @@ -165,19 +157,16 @@ private[spark] class DriverRunner(
localJarFilename
}

private def launchDriver(command: Seq[String], envVars: Map[String, String], baseDir: File,
supervise: Boolean) {
val builder = new ProcessBuilder(command: _*).directory(baseDir)
envVars.map{ case(k,v) => builder.environment().put(k, v) }

private def launchDriver(builder: ProcessBuilder, baseDir: File, supervise: Boolean) {
builder.directory(baseDir)
def initialize(process: Process) = {
// Redirect stdout and stderr to files
val stdout = new File(baseDir, "stdout")
CommandUtils.redirectStream(process.getInputStream, stdout)

val stderr = new File(baseDir, "stderr")
val header = "Launch Command: %s\n%s\n\n".format(
command.mkString("\"", "\" \"", "\""), "=" * 40)
builder.command.mkString("\"", "\" \"", "\""), "=" * 40)
Files.append(header, stderr, UTF_8)
CommandUtils.redirectStream(process.getErrorStream, stderr)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.deploy.worker

import java.io._

import scala.collection.JavaConversions._

import akka.actor.ActorRef
import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
Expand Down Expand Up @@ -115,33 +117,21 @@ private[spark] class ExecutorRunner(
case other => other
}

def getCommandSeq = {
val command = Command(
appDesc.command.mainClass,
appDesc.command.arguments.map(substituteVariables),
appDesc.command.environment,
appDesc.command.classPathEntries,
appDesc.command.libraryPathEntries,
appDesc.command.javaOpts)
CommandUtils.buildCommandSeq(command, memory, sparkHome.getAbsolutePath)
}

/**
* Download and run the executor described in our ApplicationDescription
*/
def fetchAndRunExecutor() {
try {
// Launch the process
val command = getCommandSeq
val builder = CommandUtils.buildProcessBuilder(appDesc.command, memory,
sparkHome.getAbsolutePath, substituteVariables)
val command = builder.command()
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
val builder = new ProcessBuilder(command: _*).directory(executorDir)
val env = builder.environment()
for ((key, value) <- appDesc.command.environment) {
env.put(key, value)
}

builder.directory(executorDir)
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
env.put("SPARK_LAUNCH_WITH_SCALA", "0")
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
command.mkString("\"", "\" \"", "\""), "=" * 40)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTas
import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils

/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
Expand Down Expand Up @@ -120,16 +121,18 @@ private[spark] class CoarseMesosSchedulerBackend(
environment.addVariables(
Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build())
}
val extraJavaOpts = conf.getOption("spark.executor.extraJavaOptions")
val extraJavaOpts = conf.get("spark.executor.extraJavaOptions", "")

val libraryPathOption = "spark.executor.extraLibraryPath"
val extraLibraryPath = conf.getOption(libraryPathOption).map(p => s"-Djava.library.path=$p")
val extraOpts = Seq(extraJavaOpts, extraLibraryPath).flatten.mkString(" ")
// Set the environment variable through a command prefix
// to append to the existing value of the variable
val prefixEnv = conf.getOption("spark.executor.extraLibraryPath").map { p =>
Utils.libraryPathEnvPrefix(Seq(p))
}.getOrElse("")

environment.addVariables(
Environment.Variable.newBuilder()
.setName("SPARK_EXECUTOR_OPTS")
.setValue(extraOpts)
.setValue(extraJavaOpts)
.build())

sc.executorEnvs.foreach { case (key, value) =>
Expand All @@ -150,16 +153,17 @@ private[spark] class CoarseMesosSchedulerBackend(
if (uri == null) {
val runScript = new File(executorSparkHome, "./bin/spark-class").getCanonicalPath
command.setValue(
"\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s".format(
runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores, appId))
"%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s".format(
prefixEnv, runScript, driverUrl, offer.getSlaveId.getValue,
offer.getHostname, numCores, appId))
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
command.setValue(
("cd %s*; " +
("cd %s*; %s " +
"./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s")
.format(basename, driverUrl, offer.getSlaveId.getValue,
.format(basename, prefixEnv, driverUrl, offer.getSlaveId.getValue,
offer.getHostname, numCores, appId))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,16 @@ private[spark] class MesosSchedulerBackend(
environment.addVariables(
Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build())
}
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
val extraLibraryPath = sc.conf.getOption("spark.executor.extraLibraryPath").map { lp =>
s"-Djava.library.path=$lp"
}
val extraOpts = Seq(extraJavaOpts, extraLibraryPath).flatten.mkString(" ")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions").getOrElse("")

val prefixEnv = sc.conf.getOption("spark.executor.extraLibraryPath").map { p =>
Utils.libraryPathEnvPrefix(Seq(p))
}.getOrElse("")

environment.addVariables(
Environment.Variable.newBuilder()
.setName("SPARK_EXECUTOR_OPTS")
.setValue(extraOpts)
.setValue(extraJavaOpts)
.build())
sc.executorEnvs.foreach { case (key, value) =>
environment.addVariables(Environment.Variable.newBuilder()
Expand All @@ -118,12 +119,13 @@ private[spark] class MesosSchedulerBackend(
.setEnvironment(environment)
val uri = sc.conf.get("spark.executor.uri", null)
if (uri == null) {
command.setValue(new File(executorSparkHome, "/sbin/spark-executor").getCanonicalPath)
val executorPath = new File(executorSparkHome, "/sbin/spark-executor").getCanonicalPath
command.setValue("%s %s".format(prefixEnv, executorPath))
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
command.setValue("cd %s*; ./sbin/spark-executor".format(basename))
command.setValue("cd %s*; %s ./sbin/spark-executor".format(basename, prefixEnv))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
val cpus = Resource.newBuilder()
Expand Down
Loading

0 comments on commit cd739bd

Please sign in to comment.