Skip to content

Commit

Permalink
Updated docs for SparkConf and handled review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mateiz committed Dec 31, 2013
1 parent 994f080 commit 0fa5809
Show file tree
Hide file tree
Showing 23 changed files with 241 additions and 124 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object Partitioner {
for (r <- bySize if r.partitioner != None) {
return r.partitioner.get
}
if (rdd.context.conf.getOrElse("spark.default.parallelism", null) != null) {
if (rdd.context.conf.contains("spark.default.parallelism")) {
return new HashPartitioner(rdd.context.defaultParallelism)
} else {
return new HashPartitioner(bySize.head.partitions.size)
Expand Down
31 changes: 19 additions & 12 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ import com.typesafe.config.ConfigFactory
* For unit tests, you can also call `new SparkConf(false)` to skip loading external settings and
* get the same configuration no matter what is on the classpath.
*
* All setter methods in this class support chaining. For example, you can write
* `new SparkConf().setMaster("local").setAppName("My app")`.
*
* Note that once a SparkConf object is passed to Spark, it is cloned and can no longer be modified
* by the user. Spark does not support modifying the configuration at runtime.
*
* @param loadDefaults whether to load values from the system properties and classpath
*/
class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
Expand Down Expand Up @@ -69,10 +75,7 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {

/** Set JAR files to distribute to the cluster. (Java-friendly version.) */
def setJars(jars: Array[String]): SparkConf = {
if (!jars.isEmpty) {
settings("spark.jars") = jars.mkString(",")
}
this
setJars(jars.toSeq)
}

/**
Expand Down Expand Up @@ -102,15 +105,11 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
* (Java-friendly version.)
*/
def setExecutorEnv(variables: Array[(String, String)]): SparkConf = {
for ((k, v) <- variables) {
setExecutorEnv(k, v)
}
this
setExecutorEnv(variables.toSeq)
}

/**
* Set the location where Spark is installed on worker nodes. This is only needed on Mesos if
* you are not using `spark.executor.uri` to disseminate the Spark binary distribution.
* Set the location where Spark is installed on worker nodes.
*/
def setSparkHome(home: String): SparkConf = {
if (home != null) {
Expand Down Expand Up @@ -154,8 +153,8 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
/** Get all executor environment variables set on this SparkConf */
def getExecutorEnv: Seq[(String, String)] = {
val prefix = "spark.executorEnv."
getAll.filter(pair => pair._1.startsWith(prefix))
.map(pair => (pair._1.substring(prefix.length), pair._2))
getAll.filter{case (k, v) => k.startsWith(prefix)}
.map{case (k, v) => (k.substring(prefix.length), v)}
}

/** Does the configuration contain a given parameter? */
Expand All @@ -165,4 +164,12 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
override def clone: SparkConf = {
new SparkConf(false).setAll(settings)
}

/**
* Return a string listing all keys and values, one per line. This is useful to print the
* configuration out for debugging.
*/
def toDebugString: String = {
settings.toArray.sorted.map{case (k, v) => k + "=" + v}.mkString("\n")
}
}
18 changes: 12 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ import org.apache.spark.util._
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
*
* @param conf_ a Spark Config object describing the application configuration. Any settings in
* @param config a Spark Config object describing the application configuration. Any settings in
* this config overrides the default configs as well as system properties.
* @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. Can
* be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
* from a list of input files or InputFormats for the application.
*/
class SparkContext(
conf_ : SparkConf,
config: SparkConf,
// This is used only by YARN for now, but should be relevant to other cluster types (Mesos, etc)
// too. This is typically generated from InputFormatInfo.computePreferredLocations. It contains
// a map from hostname to a list of input format splits on the host.
Expand Down Expand Up @@ -107,7 +107,13 @@ class SparkContext(
preferredNodeLocationData)
}

val conf = conf_.clone()
private[spark] val conf = config.clone()

/**
* Return a copy of this SparkContext's configuration. The configuration ''cannot'' be
* changed at runtime.
*/
def getConf: SparkConf = conf.clone()

if (!conf.contains("spark.master")) {
throw new SparkException("A master URL must be set in your configuration")
Expand Down Expand Up @@ -135,11 +141,11 @@ class SparkContext(
initLogging()

// Create the Spark execution environment (cache, map output tracker, etc)
private[spark] val env = SparkEnv.createFromSystemProperties(
private[spark] val env = SparkEnv.create(
conf,
"<driver>",
conf.get("spark.driver.host"),
conf.get("spark.driver.port").toInt,
conf,
isDriver = true,
isLocal = isLocal)
SparkEnv.set(env)
Expand Down Expand Up @@ -730,7 +736,7 @@ class SparkContext(
* (in that order of preference). If neither of these is set, return None.
*/
private[spark] def getSparkHome(): Option[String] = {
if (conf.getOrElse("spark.home", null) != null) {
if (conf.contains("spark.home")) {
Some(conf.get("spark.home"))
} else if (System.getenv("SPARK_HOME") != null) {
Some(System.getenv("SPARK_HOME"))
Expand Down
13 changes: 7 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import com.google.common.collect.MapMaker
* objects needs to have the right SparkEnv set. You can get the current environment with
* SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
*/
class SparkEnv (
class SparkEnv private[spark] (
val executorId: String,
val actorSystem: ActorSystem,
val serializerManager: SerializerManager,
Expand All @@ -63,7 +63,7 @@ class SparkEnv (
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()

def stop() {
private[spark] def stop() {
pythonWorkers.foreach { case(key, worker) => worker.stop() }
httpFileServer.stop()
mapOutputTracker.stop()
Expand All @@ -79,6 +79,7 @@ class SparkEnv (
//actorSystem.awaitTermination()
}

private[spark]
def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = {
synchronized {
val key = (pythonExec, envVars)
Expand Down Expand Up @@ -111,11 +112,11 @@ object SparkEnv extends Logging {
env.get()
}

def createFromSystemProperties(
private[spark] def create(
conf: SparkConf,
executorId: String,
hostname: String,
port: Int,
conf: SparkConf,
isDriver: Boolean,
isLocal: Boolean): SparkEnv = {

Expand All @@ -129,7 +130,7 @@ object SparkEnv extends Logging {
}

// set only if unset until now.
if (conf.getOrElse("spark.hostPort", null) == null) {
if (!conf.contains("spark.hostPort")) {
if (!isDriver){
// unexpected
Utils.logErrorWithStack("Unexpected NOT to have spark.hostPort set")
Expand Down Expand Up @@ -216,7 +217,7 @@ object SparkEnv extends Logging {
}

// Warn about deprecated spark.cache.class property
if (conf.getOrElse("spark.cache.class", null) != null) {
if (conf.contains("spark.cache.class")) {
logWarning("The spark.cache.class property is no longer being used! Specify storage " +
"levels using the RDD.persist() method instead.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,12 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
new JavaRDD(sc.checkpointFile(path))
}

/**
* Return a copy of this JavaSparkContext's configuration. The configuration ''cannot'' be
* changed at runtime.
*/
def getConf: SparkConf = sc.getConf
}

object JavaSparkContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
/** Creates a SparkContext, which constructs a Client to interact with our cluster. */
def createClient() = {
if (sc != null) { sc.stop() }
// Counter-hack: Because of a hack in SparkEnv#createFromSystemProperties() that changes this
// Counter-hack: Because of a hack in SparkEnv#create() that changes this
// property, we need to reset it.
System.setProperty("spark.driver.port", "0")
sc = new SparkContext(getMasterUrls(masters), "fault-tolerance", containerSparkHome)
Expand Down Expand Up @@ -417,4 +417,4 @@ private[spark] object Docker extends Logging {
"docker ps -l -q".!(ProcessLogger(line => id = line))
new DockerId(id)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private[spark] class Executor(
// Initialize Spark environment (using system properties read above)
private val env = {
if (!isLocal) {
val _env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, conf,
val _env = SparkEnv.create(conf, executorId, slaveHostname, 0,
isDriver = false, isLocal = false)
SparkEnv.set(_env)
_env.metricsSystem.registerSource(executorSource)
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -397,12 +397,11 @@ private[spark] object Utils extends Logging {
}

def localHostPort(conf: SparkConf): String = {
val retval = conf.getOrElse("spark.hostPort", null)
val retval = conf.getOrElse("spark.hostPort", null)
if (retval == null) {
logErrorWithStack("spark.hostPort not set but invoking localHostPort")
return localHostName()
}

retval
}

Expand All @@ -414,9 +413,12 @@ private[spark] object Utils extends Logging {
assert(hostPort.indexOf(':') != -1, message)
}

// Used by DEBUG code : remove when all testing done
def logErrorWithStack(msg: String) {
try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } }
try {
throw new Exception
} catch {
case ex: Exception => logError(msg, ex)
}
}

// Typically, this will be of order of number of nodes in cluster
Expand Down
2 changes: 2 additions & 0 deletions core/src/test/resources/spark.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# A simple spark.conf file used only in our unit tests

spark.test.intTestProperty = 1

spark.test {
Expand Down
2 changes: 1 addition & 1 deletion docs/_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ markdown: kramdown
# These allow the documentation to be updated with nerw releases
# of Spark, Scala, and Mesos.
SPARK_VERSION: 0.9.0-incubating-SNAPSHOT
SPARK_VERSION_SHORT: 0.9.0-SNAPSHOT
SPARK_VERSION_SHORT: 0.9.0
SCALA_VERSION: 2.10
MESOS_VERSION: 0.13.0
SPARK_ISSUE_TRACKER_URL: https://spark-project.atlassian.net
71 changes: 56 additions & 15 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,37 @@ layout: global
title: Spark Configuration
---

Spark provides three main locations to configure the system:
Spark provides three locations to configure the system:

* [Java system properties](#system-properties), which control internal configuration parameters and can be set
either programmatically (by calling `System.setProperty` *before* creating a `SparkContext`) or through
JVM arguments.
* [Environment variables](#environment-variables) for configuring per-machine settings such as the IP address,
which can be set in the `conf/spark-env.sh` script.
* [Logging configuration](#configuring-logging), which is done through `log4j.properties`.
* [Spark properties](#spark-properties) control most application parameters and can be set by passing
a [SparkConf](api/core/index.html#org.apache.spark.SparkConf) object to SparkContext, or through Java
system properties.
* [Environment variables](#environment-variables) can be used to set per-machine settings, such as
the IP address, through the `conf/spark-env.sh` script on each node.
* [Logging](#configuring-logging) can be configured through `log4j.properties`.


# System Properties
# Spark Properties

To set a system property for configuring Spark, you need to either pass it with a -D flag to the JVM (for example `java -Dspark.cores.max=5 MyProgram`) or call `System.setProperty` in your code *before* creating your Spark context, as follows:
Spark properties control most application settings and are configured separately for each application.
The preferred way to set them is by passing a [SparkConf](api/core/index.html#org.apache.spark.SparkConf)
class to your SparkContext constructor.
Alternatively, Spark will also load them from Java system properties (for compatibility with old versions
of Spark) and from a [`spark.conf` file](#configuration-files) on your classpath.

SparkConf lets you configure most of the common properties to initialize a cluster (e.g., master URL and
application name), as well as arbitrary key-value pairs through the `set()` method. For example, we could
initialize an application as follows:

{% highlight scala %}
System.setProperty("spark.cores.max", "5")
val sc = new SparkContext(...)
val conf = new SparkConf()
.setMaster("local")
.setAppName("My application")
.set("spark.executor.memory", "1g")
val sc = new SparkContext(conf)
{% endhighlight %}

Most of the configurable system properties control internal settings that have reasonable default values. However,
Most of the properties control internal settings that have reasonable default values. However,
there are at least five properties that you will commonly want to control:

<table class="table">
Expand Down Expand Up @@ -385,11 +396,40 @@ Apart from these, the following properties are also available, and may be useful
</tr>
</table>

## Viewing Spark Properties

The application web UI at `http://<driver>:4040` lists Spark properties in the "Environment" tab.
This is a useful place to check to make sure that your properties have been set correctly.

## Configuration Files

You can also configure Spark properties through a `spark.conf` file on your Java classpath.
Because these properties are usually application-specific, we recommend putting this fine *only* on your
application's classpath, and not in a global Spark classpath.

The `spark.conf` file uses Typesafe Config's [HOCON format](https://github.com/typesafehub/config#json-superset),
which is a superset of Java properties files and JSON. For example, the following is a simple config file:

{% highlight awk %}
# Comments are allowed
spark.executor.memory = 512m
spark.serializer = org.apache.spark.serializer.KryoSerializer
{% endhighlight %}

The format also allows hierarchical nesting, as follows:

{% highlight awk %}
spark.akka {
threads = 8
timeout = 200
}
{% endhighlight %}

# Environment Variables

Certain Spark settings can also be configured through environment variables, which are read from the `conf/spark-env.sh`
Certain Spark settings can be configured through environment variables, which are read from the `conf/spark-env.sh`
script in the directory where Spark is installed (or `conf/spark-env.cmd` on Windows). These variables are meant to be for machine-specific settings, such
as library search paths. While Java system properties can also be set here, for application settings, we recommend setting
as library search paths. While Spark properties can also be set there through `SPARK_JAVA_OPTS`, for per-application settings, we recommend setting
these properties within the application instead of in `spark-env.sh` so that different applications can use different
settings.

Expand All @@ -406,7 +446,8 @@ The following variables can be set in `spark-env.sh`:
Note that applications can also add dependencies for themselves through `SparkContext.addJar` -- we recommend
doing that when possible.
* `SPARK_JAVA_OPTS`, to add JVM options. This includes Java options like garbage collector settings and any system
properties that you'd like to pass with `-D` (e.g., `-Dspark.local.dir=/disk1,/disk2`).
properties that you'd like to pass with `-D`. One use case is to set some Spark properties differently on this
machine, e.g., `-Dspark.local.dir=/disk1,/disk2`.
* Options for the Spark [standalone cluster scripts](spark-standalone.html#cluster-launch-scripts), such as number of cores
to use on each machine and maximum memory.

Expand Down
2 changes: 1 addition & 1 deletion docs/css/bootstrap.min.css

Large diffs are not rendered by default.

Loading

0 comments on commit 0fa5809

Please sign in to comment.