Skip to content

Commit b3ec51b

Browse files
andrewor14pwendell
authored andcommitted
[SPARK-2849] Handle driver configs separately in client mode
In client deploy mode, the driver is launched from within `SparkSubmit`'s JVM. This means by the time we parse Spark configs from `spark-defaults.conf`, it is already too late to control certain properties of the driver's JVM. We currently ignore these configs in client mode altogether. ``` spark.driver.memory spark.driver.extraJavaOptions spark.driver.extraClassPath spark.driver.extraLibraryPath ``` This PR handles these properties before launching the driver JVM. It achieves this by spawning a separate JVM that runs a new class called `SparkSubmitDriverBootstrapper`, which spawns `SparkSubmit` as a sub-process with the appropriate classpath, library paths, java opts and memory. Author: Andrew Or <[email protected]> Closes apache#1845 from andrewor14/handle-configs-bash and squashes the following commits: bed4bdf [Andrew Or] Change a few comments / messages (minor) 24dba60 [Andrew Or] Merge branch 'master' of github.com:apache/spark into handle-configs-bash 08fd788 [Andrew Or] Warn against external usages of SparkSubmitDriverBootstrapper ff34728 [Andrew Or] Minor comments 51aeb01 [Andrew Or] Filter out JVM memory in Scala rather than Bash (minor) 9a778f6 [Andrew Or] Fix PySpark: actually kill driver on termination d0f20db [Andrew Or] Don't pass empty library paths, classpath, java opts etc. a78cb26 [Andrew Or] Revert a few changes in utils.sh (minor) 9ba37e2 [Andrew Or] Don't barf when the properties file does not exist 8867a09 [Andrew Or] A few more naming things (minor) 19464ad [Andrew Or] SPARK_SUBMIT_JAVA_OPTS -> SPARK_SUBMIT_OPTS d6488f9 [Andrew Or] Merge branch 'master' of github.com:apache/spark into handle-configs-bash 1ea6bbe [Andrew Or] SparkClassLauncher -> SparkSubmitDriverBootstrapper a91ea19 [Andrew Or] Fix precedence of library paths, classpath, java opts and memory 158f813 [Andrew Or] Remove "client mode" boolean argument c84f5c8 [Andrew Or] Remove debug print statement (minor) b71f52b [Andrew Or] Revert a few more changes (minor) 7d94a8d [Andrew Or] Merge branch 'master' of github.com:apache/spark into handle-configs-bash 3a8235d [Andrew Or] Only parse the properties file if special configs exist c37e08d [Andrew Or] Revert a few more changes a396eda [Andrew Or] Nullify my own hard work to simplify bash 0effa1e [Andrew Or] Add code in Scala that handles special configs c886568 [Andrew Or] Fix lines too long + a few comments / style (minor) 7a4190a [Andrew Or] Merge branch 'master' of github.com:apache/spark into handle-configs-bash 7396be2 [Andrew Or] Explicitly comment that multi-line properties are not supported fa11ef8 [Andrew Or] Parse the properties file only if the special configs exist 371cac4 [Andrew Or] Add function prefix (minor) be99eb3 [Andrew Or] Fix tests to not include multi-line configs bd0d468 [Andrew Or] Simplify parsing config file by ignoring multi-line arguments 56ac247 [Andrew Or] Use eval and set to simplify splitting 8d4614c [Andrew Or] Merge branch 'master' of github.com:apache/spark into handle-configs-bash aeb79c7 [Andrew Or] Merge branch 'master' of github.com:apache/spark into handle-configs-bash 2732ac0 [Andrew Or] Integrate BASH tests into dev/run-tests + log error properly 8d26a5c [Andrew Or] Add tests for bash/utils.sh 4ae24c3 [Andrew Or] Fix bug: escape properly in quote_java_property b3c4cd5 [Andrew Or] Fix bug: count the number of quotes instead of detecting presence c2273fc [Andrew Or] Fix typo (minor) e793e5f [Andrew Or] Handle multi-line arguments 5d8f8c4 [Andrew Or] Merge branch 'master' of github.com:apache/spark into submit-driver-extra c7b9926 [Andrew Or] Minor changes to spark-defaults.conf.template a992ae2 [Andrew Or] Escape spark.*.extraJavaOptions correctly aabfc7e [Andrew Or] escape -> split (minor) 45a1eb9 [Andrew Or] Fix bug: escape escaped backslashes and quotes properly... 1cdc6b1 [Andrew Or] Fix bug: escape escaped double quotes properly c854859 [Andrew Or] Add small comment c13a2cb [Andrew Or] Merge branch 'master' of github.com:apache/spark into submit-driver-extra 8e552b7 [Andrew Or] Include an example of spark.*.extraJavaOptions de765c9 [Andrew Or] Print spark-class command properly a4df3c4 [Andrew Or] Move parsing and escaping logic to utils.sh dec2343 [Andrew Or] Only export variables if they exist fa2136e [Andrew Or] Escape Java options + parse java properties files properly ef12f74 [Andrew Or] Minor formatting 4ec22a1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into submit-driver-extra e5cfb46 [Andrew Or] Collapse duplicate code + fix potential whitespace issues 4edcaa8 [Andrew Or] Redirect stdout to stderr for python 130f295 [Andrew Or] Handle spark.driver.memory too 98dd8e3 [Andrew Or] Add warning if properties file does not exist 8843562 [Andrew Or] Fix compilation issues... 75ee6b4 [Andrew Or] Remove accidentally added file 63ed2e9 [Andrew Or] Merge branch 'master' of github.com:apache/spark into submit-driver-extra 0025474 [Andrew Or] Revert SparkSubmit handling of --driver-* options for only cluster mode a2ab1b0 [Andrew Or] Parse spark.driver.extra* in bash 250cb95 [Andrew Or] Do not ignore spark.driver.extra* for client mode
1 parent c1ba4cd commit b3ec51b

10 files changed

+250
-56
lines changed

bin/spark-class

+38-11
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
# limitations under the License.
1818
#
1919

20+
# NOTE: Any changes to this file must be reflected in SparkSubmitDriverBootstrapper.scala!
21+
2022
cygwin=false
2123
case "`uname`" in
2224
CYGWIN*) cygwin=true;;
@@ -39,7 +41,7 @@ fi
3941

4042
if [ -n "$SPARK_MEM" ]; then
4143
echo -e "Warning: SPARK_MEM is deprecated, please use a more specific config option" 1>&2
42-
echo -e "(e.g., spark.executor.memory or SPARK_DRIVER_MEMORY)." 1>&2
44+
echo -e "(e.g., spark.executor.memory or spark.driver.memory)." 1>&2
4345
fi
4446

4547
# Use SPARK_MEM or 512m as the default memory, to be overridden by specific options
@@ -73,11 +75,17 @@ case "$1" in
7375
OUR_JAVA_MEM=${SPARK_EXECUTOR_MEMORY:-$DEFAULT_MEM}
7476
;;
7577

76-
# Spark submit uses SPARK_SUBMIT_OPTS and SPARK_JAVA_OPTS
77-
'org.apache.spark.deploy.SparkSubmit')
78-
OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_SUBMIT_OPTS \
79-
-Djava.library.path=$SPARK_SUBMIT_LIBRARY_PATH"
78+
# Spark submit uses SPARK_JAVA_OPTS + SPARK_SUBMIT_OPTS +
79+
# SPARK_DRIVER_MEMORY + SPARK_SUBMIT_DRIVER_MEMORY.
80+
'org.apache.spark.deploy.SparkSubmit')
81+
OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_SUBMIT_OPTS"
8082
OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM}
83+
if [ -n "$SPARK_SUBMIT_LIBRARY_PATH" ]; then
84+
OUR_JAVA_OPTS="$OUR_JAVA_OPTS -Djava.library.path=$SPARK_SUBMIT_LIBRARY_PATH"
85+
fi
86+
if [ -n "$SPARK_SUBMIT_DRIVER_MEMORY" ]; then
87+
OUR_JAVA_MEM="$SPARK_SUBMIT_DRIVER_MEMORY"
88+
fi
8189
;;
8290

8391
*)
@@ -101,11 +109,12 @@ fi
101109
# Set JAVA_OPTS to be able to load native libraries and to set heap size
102110
JAVA_OPTS="-XX:MaxPermSize=128m $OUR_JAVA_OPTS"
103111
JAVA_OPTS="$JAVA_OPTS -Xms$OUR_JAVA_MEM -Xmx$OUR_JAVA_MEM"
112+
104113
# Load extra JAVA_OPTS from conf/java-opts, if it exists
105114
if [ -e "$FWDIR/conf/java-opts" ] ; then
106115
JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`"
107116
fi
108-
export JAVA_OPTS
117+
109118
# Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in CommandUtils.scala!
110119

111120
TOOLS_DIR="$FWDIR"/tools
@@ -146,10 +155,28 @@ if $cygwin; then
146155
fi
147156
export CLASSPATH
148157

149-
if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then
150-
echo -n "Spark Command: " 1>&2
151-
echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" 1>&2
152-
echo -e "========================================\n" 1>&2
158+
# In Spark submit client mode, the driver is launched in the same JVM as Spark submit itself.
159+
# Here we must parse the properties file for relevant "spark.driver.*" configs before launching
160+
# the driver JVM itself. Instead of handling this complexity in Bash, we launch a separate JVM
161+
# to prepare the launch environment of this driver JVM.
162+
163+
if [ -n "$SPARK_SUBMIT_BOOTSTRAP_DRIVER" ]; then
164+
# This is used only if the properties file actually contains these special configs
165+
# Export the environment variables needed by SparkSubmitDriverBootstrapper
166+
export RUNNER
167+
export CLASSPATH
168+
export JAVA_OPTS
169+
export OUR_JAVA_MEM
170+
export SPARK_CLASS=1
171+
shift # Ignore main class (org.apache.spark.deploy.SparkSubmit) and use our own
172+
exec "$RUNNER" org.apache.spark.deploy.SparkSubmitDriverBootstrapper "$@"
173+
else
174+
# Note: The format of this command is closely echoed in SparkSubmitDriverBootstrapper.scala
175+
if [ -n "$SPARK_PRINT_LAUNCH_COMMAND" ]; then
176+
echo -n "Spark Command: " 1>&2
177+
echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" 1>&2
178+
echo -e "========================================\n" 1>&2
179+
fi
180+
exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
153181
fi
154182

155-
exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"

bin/spark-submit

+23-5
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,18 @@
1717
# limitations under the License.
1818
#
1919

20+
# NOTE: Any changes in this file must be reflected in SparkClassLauncher.scala!
21+
2022
export SPARK_HOME="$(cd `dirname $0`/..; pwd)"
2123
ORIG_ARGS=("$@")
2224

2325
while (($#)); do
2426
if [ "$1" = "--deploy-mode" ]; then
25-
DEPLOY_MODE=$2
27+
SPARK_SUBMIT_DEPLOY_MODE=$2
28+
elif [ "$1" = "--properties-file" ]; then
29+
SPARK_SUBMIT_PROPERTIES_FILE=$2
2630
elif [ "$1" = "--driver-memory" ]; then
27-
DRIVER_MEMORY=$2
31+
export SPARK_SUBMIT_DRIVER_MEMORY=$2
2832
elif [ "$1" = "--driver-library-path" ]; then
2933
export SPARK_SUBMIT_LIBRARY_PATH=$2
3034
elif [ "$1" = "--driver-class-path" ]; then
@@ -35,10 +39,24 @@ while (($#)); do
3539
shift
3640
done
3741

38-
DEPLOY_MODE=${DEPLOY_MODE:-"client"}
42+
DEFAULT_PROPERTIES_FILE="$SPARK_HOME/conf/spark-defaults.conf"
43+
export SPARK_SUBMIT_DEPLOY_MODE=${SPARK_SUBMIT_DEPLOY_MODE:-"client"}
44+
export SPARK_SUBMIT_PROPERTIES_FILE=${SPARK_SUBMIT_PROPERTIES_FILE:-"$DEFAULT_PROPERTIES_FILE"}
45+
46+
# For client mode, the driver will be launched in the same JVM that launches
47+
# SparkSubmit, so we may need to read the properties file for any extra class
48+
# paths, library paths, java options and memory early on. Otherwise, it will
49+
# be too late by the time the driver JVM has started.
3950

40-
if [ -n "$DRIVER_MEMORY" ] && [ $DEPLOY_MODE == "client" ]; then
41-
export SPARK_DRIVER_MEMORY=$DRIVER_MEMORY
51+
if [[ "$SPARK_SUBMIT_DEPLOY_MODE" == "client" && -f "$SPARK_SUBMIT_PROPERTIES_FILE" ]]; then
52+
# Parse the properties file only if the special configs exist
53+
contains_special_configs=$(
54+
grep -e "spark.driver.extra*\|spark.driver.memory" "$SPARK_SUBMIT_PROPERTIES_FILE" | \
55+
grep -v "^[[:space:]]*#"
56+
)
57+
if [ -n "$contains_special_configs" ]; then
58+
export SPARK_SUBMIT_BOOTSTRAP_DRIVER=1
59+
fi
4260
fi
4361

4462
exec $SPARK_HOME/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}"

bin/utils.sh

100644100755
File mode changed.

conf/spark-defaults.conf.template

+6-4
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
# This is useful for setting default environmental settings.
33

44
# Example:
5-
# spark.master spark://master:7077
6-
# spark.eventLog.enabled true
7-
# spark.eventLog.dir hdfs://namenode:8021/directory
8-
# spark.serializer org.apache.spark.serializer.KryoSerializer
5+
# spark.master spark://master:7077
6+
# spark.eventLog.enabled true
7+
# spark.eventLog.dir hdfs://namenode:8021/directory
8+
# spark.serializer org.apache.spark.serializer.KryoSerializer
9+
# spark.driver.memory 5g
10+
# spark.executor.extraJavaOptions -XX:+PrintGCDetail -Dkey=value -Dnumbers="one two three"

core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala

-25
Original file line numberDiff line numberDiff line change
@@ -40,28 +40,3 @@ private[spark] object PythonUtils {
4040
paths.filter(_ != "").mkString(File.pathSeparator)
4141
}
4242
}
43-
44-
45-
/**
46-
* A utility class to redirect the child process's stdout or stderr.
47-
*/
48-
private[spark] class RedirectThread(
49-
in: InputStream,
50-
out: OutputStream,
51-
name: String)
52-
extends Thread(name) {
53-
54-
setDaemon(true)
55-
override def run() {
56-
scala.util.control.Exception.ignoring(classOf[IOException]) {
57-
// FIXME: We copy the stream on the level of bytes to avoid encoding problems.
58-
val buf = new Array[Byte](1024)
59-
var len = in.read(buf)
60-
while (len != -1) {
61-
out.write(buf, 0, len)
62-
out.flush()
63-
len = in.read(buf)
64-
}
65-
}
66-
}
67-
}

core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,14 @@
1717

1818
package org.apache.spark.api.python
1919

20-
import java.lang.Runtime
2120
import java.io.{DataOutputStream, DataInputStream, InputStream, OutputStreamWriter}
2221
import java.net.{InetAddress, ServerSocket, Socket, SocketException}
2322

2423
import scala.collection.mutable
2524
import scala.collection.JavaConversions._
2625

2726
import org.apache.spark._
28-
import org.apache.spark.util.Utils
27+
import org.apache.spark.util.{RedirectThread, Utils}
2928

3029
private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String])
3130
extends Logging {

core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ import java.net.URI
2222
import scala.collection.mutable.ArrayBuffer
2323
import scala.collection.JavaConversions._
2424

25-
import org.apache.spark.api.python.{PythonUtils, RedirectThread}
26-
import org.apache.spark.util.Utils
25+
import org.apache.spark.api.python.PythonUtils
26+
import org.apache.spark.util.{RedirectThread, Utils}
2727

2828
/**
2929
* A main class used by spark-submit to launch Python applications. It executes python as a

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

+10-7
Original file line numberDiff line numberDiff line change
@@ -195,18 +195,21 @@ object SparkSubmit {
195195
OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),
196196

197197
// Other options
198-
OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, CLUSTER,
199-
sysProp = "spark.driver.extraClassPath"),
200-
OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, CLUSTER,
201-
sysProp = "spark.driver.extraJavaOptions"),
202-
OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, CLUSTER,
203-
sysProp = "spark.driver.extraLibraryPath"),
204198
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
205199
sysProp = "spark.executor.memory"),
206200
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
207201
sysProp = "spark.cores.max"),
208202
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
209-
sysProp = "spark.files")
203+
sysProp = "spark.files"),
204+
205+
// Only process driver specific options for cluster mode here,
206+
// because they have already been processed in bash for client mode
207+
OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, CLUSTER,
208+
sysProp = "spark.driver.extraClassPath"),
209+
OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, CLUSTER,
210+
sysProp = "spark.driver.extraJavaOptions"),
211+
OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, CLUSTER,
212+
sysProp = "spark.driver.extraLibraryPath")
210213
)
211214

212215
// In client mode, launch the application main class directly
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy
19+
20+
import java.io.File
21+
22+
import scala.collection.JavaConversions._
23+
24+
import org.apache.spark.util.{RedirectThread, Utils}
25+
26+
/**
27+
* Launch an application through Spark submit in client mode with the appropriate classpath,
28+
* library paths, java options and memory. These properties of the JVM must be set before the
29+
* driver JVM is launched. The sole purpose of this class is to avoid handling the complexity
30+
* of parsing the properties file for such relevant configs in Bash.
31+
*
32+
* Usage: org.apache.spark.deploy.SparkSubmitDriverBootstrapper <submit args>
33+
*/
34+
private[spark] object SparkSubmitDriverBootstrapper {
35+
36+
// Note: This class depends on the behavior of `bin/spark-class` and `bin/spark-submit`.
37+
// Any changes made there must be reflected in this file.
38+
39+
def main(args: Array[String]): Unit = {
40+
41+
// This should be called only from `bin/spark-class`
42+
if (!sys.env.contains("SPARK_CLASS")) {
43+
System.err.println("SparkSubmitDriverBootstrapper must be called from `bin/spark-class`!")
44+
System.exit(1)
45+
}
46+
47+
val submitArgs = args
48+
val runner = sys.env("RUNNER")
49+
val classpath = sys.env("CLASSPATH")
50+
val javaOpts = sys.env("JAVA_OPTS")
51+
val defaultDriverMemory = sys.env("OUR_JAVA_MEM")
52+
53+
// Spark submit specific environment variables
54+
val deployMode = sys.env("SPARK_SUBMIT_DEPLOY_MODE")
55+
val propertiesFile = sys.env("SPARK_SUBMIT_PROPERTIES_FILE")
56+
val bootstrapDriver = sys.env("SPARK_SUBMIT_BOOTSTRAP_DRIVER")
57+
val submitDriverMemory = sys.env.get("SPARK_SUBMIT_DRIVER_MEMORY")
58+
val submitLibraryPath = sys.env.get("SPARK_SUBMIT_LIBRARY_PATH")
59+
val submitClasspath = sys.env.get("SPARK_SUBMIT_CLASSPATH")
60+
val submitJavaOpts = sys.env.get("SPARK_SUBMIT_OPTS")
61+
62+
assume(runner != null, "RUNNER must be set")
63+
assume(classpath != null, "CLASSPATH must be set")
64+
assume(javaOpts != null, "JAVA_OPTS must be set")
65+
assume(defaultDriverMemory != null, "OUR_JAVA_MEM must be set")
66+
assume(deployMode == "client", "SPARK_SUBMIT_DEPLOY_MODE must be \"client\"!")
67+
assume(propertiesFile != null, "SPARK_SUBMIT_PROPERTIES_FILE must be set")
68+
assume(bootstrapDriver != null, "SPARK_SUBMIT_BOOTSTRAP_DRIVER must be set")
69+
70+
// Parse the properties file for the equivalent spark.driver.* configs
71+
val properties = SparkSubmitArguments.getPropertiesFromFile(new File(propertiesFile)).toMap
72+
val confDriverMemory = properties.get("spark.driver.memory")
73+
val confLibraryPath = properties.get("spark.driver.extraLibraryPath")
74+
val confClasspath = properties.get("spark.driver.extraClassPath")
75+
val confJavaOpts = properties.get("spark.driver.extraJavaOptions")
76+
77+
// Favor Spark submit arguments over the equivalent configs in the properties file.
78+
// Note that we do not actually use the Spark submit values for library path, classpath,
79+
// and Java opts here, because we have already captured them in Bash.
80+
81+
val newDriverMemory = submitDriverMemory
82+
.orElse(confDriverMemory)
83+
.getOrElse(defaultDriverMemory)
84+
85+
val newLibraryPath =
86+
if (submitLibraryPath.isDefined) {
87+
// SPARK_SUBMIT_LIBRARY_PATH is already captured in JAVA_OPTS
88+
""
89+
} else {
90+
confLibraryPath.map("-Djava.library.path=" + _).getOrElse("")
91+
}
92+
93+
val newClasspath =
94+
if (submitClasspath.isDefined) {
95+
// SPARK_SUBMIT_CLASSPATH is already captured in CLASSPATH
96+
classpath
97+
} else {
98+
classpath + confClasspath.map(sys.props("path.separator") + _).getOrElse("")
99+
}
100+
101+
val newJavaOpts =
102+
if (submitJavaOpts.isDefined) {
103+
// SPARK_SUBMIT_OPTS is already captured in JAVA_OPTS
104+
javaOpts
105+
} else {
106+
javaOpts + confJavaOpts.map(" " + _).getOrElse("")
107+
}
108+
109+
val filteredJavaOpts = Utils.splitCommandString(newJavaOpts)
110+
.filterNot(_.startsWith("-Xms"))
111+
.filterNot(_.startsWith("-Xmx"))
112+
113+
// Build up command
114+
val command: Seq[String] =
115+
Seq(runner) ++
116+
Seq("-cp", newClasspath) ++
117+
Seq(newLibraryPath) ++
118+
filteredJavaOpts ++
119+
Seq(s"-Xms$newDriverMemory", s"-Xmx$newDriverMemory") ++
120+
Seq("org.apache.spark.deploy.SparkSubmit") ++
121+
submitArgs
122+
123+
// Print the launch command. This follows closely the format used in `bin/spark-class`.
124+
if (sys.env.contains("SPARK_PRINT_LAUNCH_COMMAND")) {
125+
System.err.print("Spark Command: ")
126+
System.err.println(command.mkString(" "))
127+
System.err.println("========================================\n")
128+
}
129+
130+
// Start the driver JVM
131+
val filteredCommand = command.filter(_.nonEmpty)
132+
val builder = new ProcessBuilder(filteredCommand)
133+
val process = builder.start()
134+
135+
// Redirect stdin, stdout, and stderr to/from the child JVM
136+
val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
137+
val stdoutThread = new RedirectThread(process.getInputStream, System.out, "redirect stdout")
138+
val stderrThread = new RedirectThread(process.getErrorStream, System.err, "redirect stderr")
139+
stdinThread.start()
140+
stdoutThread.start()
141+
stderrThread.start()
142+
143+
// Terminate on broken pipe, which signals that the parent process has exited. This is
144+
// important for the PySpark shell, where Spark submit itself is a python subprocess.
145+
stdinThread.join()
146+
process.destroy()
147+
}
148+
149+
}

0 commit comments

Comments
 (0)