layout | title |
---|---|
global |
Running Spark on Mesos |
- This will become a table of contents (this text will be scraped). {:toc}
Spark can run on hardware clusters managed by Apache Mesos.
The advantages of deploying Spark with Mesos include:
- dynamic partitioning between Spark and other frameworks
- scalable partitioning between multiple instances of Spark
In a standalone cluster deployment, the cluster manager in the below diagram is a Spark master instance. When using Mesos, the Mesos master replaces the Spark master as the cluster manager.
Now when a driver creates a job and starts issuing tasks for scheduling, Mesos determines what machines handle what tasks. Because it takes into account other frameworks when scheduling these many short-lived tasks, multiple frameworks can coexist on the same cluster without resorting to a static partitioning of resources.
To get started, follow the steps below to install Mesos and deploy Spark jobs via Mesos.
Spark {{site.SPARK_VERSION}} is designed for use with Mesos {{site.MESOS_VERSION}} or newer and does not require any special patches of Mesos.
If you already have a Mesos cluster running, you can skip this Mesos installation step.
Otherwise, installing Mesos for Spark is no different than installing Mesos for use by other frameworks. You can install Mesos either from source or using prebuilt packages.
To install Apache Mesos from source, follow these steps:
- Download a Mesos release from a mirror
- Follow the Mesos Getting Started page for compiling and installing Mesos
Note: If you want to run Mesos without installing it into the default paths on your system
(e.g., if you lack administrative privileges to install it), pass the
--prefix
option to configure
to tell it where to install. For example, pass
--prefix=/home/me/mesos
. By default the prefix is /usr/local
.
The Apache Mesos project only publishes source releases, not binary packages. But other third party projects publish binary releases that may be helpful in setting Mesos up.
One of those is Mesosphere. To install Mesos using the binary releases provided by Mesosphere:
- Download Mesos installation package from downloads page
- Follow their instructions for installation and configuration
The Mesosphere installation documents suggest setting up ZooKeeper to handle Mesos master failover, but Mesos can be run without ZooKeeper using a single master as well.
To verify that the Mesos cluster is ready for Spark, navigate to the Mesos master webui at port
:5050
Confirm that all expected machines are present in the slaves tab.
To use Mesos from Spark, you need a Spark binary package available in a place accessible by Mesos, and a Spark driver program configured to connect to Mesos.
Alternatively, you can also install Spark in the same location in all the Mesos slaves, and configure
spark.mesos.executor.home
(defaults to SPARK_HOME) to point to that location.
When Mesos runs a task on a Mesos slave for the first time, that slave must have a Spark binary
package for running the Spark Mesos executor backend.
The Spark package can be hosted at any Hadoop-accessible URI, including HTTP via http://
,
Amazon Simple Storage Service via s3n://
, or HDFS via hdfs://
.
To use a precompiled package:
- Download a Spark binary package from the Spark download page
- Upload to hdfs/http/s3
To host on HDFS, use the Hadoop fs put command: hadoop fs -put spark-{{site.SPARK_VERSION}}.tar.gz /path/to/spark-{{site.SPARK_VERSION}}.tar.gz
Or if you are using a custom-compiled version of Spark, you will need to create a package using
the dev/make-distribution.sh
script included in a Spark source tarball/checkout.
- Download and build Spark using the instructions here
- Create a binary package using
./dev/make-distribution.sh --tgz
. - Upload archive to http/s3/hdfs
The Master URLs for Mesos are in the form mesos://host:5050
for a single-master Mesos
cluster, or mesos://zk://host1:2181,host2:2181,host3:2181/mesos
for a multi-master Mesos cluster using ZooKeeper.
In client mode, a Spark Mesos framework is launched directly on the client machine and waits for the driver output.
The driver needs some configuration in spark-env.sh
to interact properly with Mesos:
- In
spark-env.sh
set some environment variables:
export MESOS_NATIVE_JAVA_LIBRARY=<path to libmesos.so>
. This path is typically<prefix>/lib/libmesos.so
where the prefix is/usr/local
by default. See Mesos installation instructions above. On Mac OS X, the library is calledlibmesos.dylib
instead oflibmesos.so
.export SPARK_EXECUTOR_URI=<URL of spark-{{site.SPARK_VERSION}}.tar.gz uploaded above>
.
- Also set
spark.executor.uri
to<URL of spark-{{site.SPARK_VERSION}}.tar.gz>
.
Now when starting a Spark application against the cluster, pass a mesos://
URL as the master when creating a SparkContext
. For example:
{% highlight scala %} val conf = new SparkConf() .setMaster("mesos://HOST:5050") .setAppName("My app") .set("spark.executor.uri", "<path to spark-{{site.SPARK_VERSION}}.tar.gz uploaded above>") val sc = new SparkContext(conf) {% endhighlight %}
(You can also use spark-submit
and configure spark.executor.uri
in the conf/spark-defaults.conf file.)
When running a shell, the spark.executor.uri
parameter is inherited from SPARK_EXECUTOR_URI
, so
it does not need to be redundantly passed in as a system property.
{% highlight bash %} ./bin/spark-shell --master mesos://host:5050 {% endhighlight %}
Spark on Mesos also supports cluster mode, where the driver is launched in the cluster and the client can find the results of the driver from the Mesos Web UI.
To use cluster mode, you must start the MesosClusterDispatcher
in your cluster via the sbin/start-mesos-dispatcher.sh
script,
passing in the Mesos master URL (e.g: mesos://host:5050). This starts the MesosClusterDispatcher
as a daemon running on the host.
If you like to run the MesosClusterDispatcher
with Marathon, you need to run the MesosClusterDispatcher
in the foreground (i.e: bin/spark-class org.apache.spark.deploy.mesos.MesosClusterDispatcher
). Note that the MesosClusterDispatcher
not yet supports multiple instances for HA.
The MesosClusterDispatcher
also supports writing recovery state into Zookeeper. This will allow the MesosClusterDispatcher
to be able to recover all submitted and running containers on relaunch. In order to enable this recovery mode, you can set SPARK_DAEMON_JAVA_OPTS in spark-env by configuring spark.deploy.recoveryMode
and related spark.deploy.zookeeper.* configurations.
For more information about these configurations please refer to the configurations (doc)[configurations.html#deploy].
From the client, you can submit a job to Mesos cluster by running spark-submit
and specifying the master URL
to the URL of the MesosClusterDispatcher
(e.g: mesos://dispatcher:7077). You can view driver statuses on the
Spark cluster Web UI.
For example:
{% highlight bash %}
./bin/spark-submit
--class org.apache.spark.examples.SparkPi
--master mesos://207.184.161.138:7077
--deploy-mode cluster
--supervise
--executor-memory 20G
--total-executor-cores 100
http://path/to/examples.jar
1000
{% endhighlight %}
Note that jars or python files that are passed to spark-submit should be URIs reachable by Mesos slaves, as the Spark driver doesn't automatically upload local jars.
Spark can run over Mesos in two modes: "coarse-grained" (default) and "fine-grained".
The "coarse-grained" mode will launch only one long-running Spark task on each Mesos machine, and dynamically schedule its own "mini-tasks" within it. The benefit is much lower startup overhead, but at the cost of reserving the Mesos resources for the complete duration of the application.
Coarse-grained is the default mode. You can also set spark.mesos.coarse
property to true
to turn it on explicitly in SparkConf:
{% highlight scala %} conf.set("spark.mesos.coarse", "true") {% endhighlight %}
In addition, for coarse-grained mode, you can control the maximum number of resources Spark will
acquire. By default, it will acquire all cores in the cluster (that get offered by Mesos), which
only makes sense if you run just one application at a time. You can cap the maximum number of cores
using conf.set("spark.cores.max", "10")
(for example).
In "fine-grained" mode, each Spark task runs as a separate Mesos task. This allows multiple instances of Spark (and other frameworks) to share machines at a very fine granularity, where each application gets more or fewer machines as it ramps up and down, but it comes with an additional overhead in launching each task. This mode may be inappropriate for low-latency requirements like interactive queries or serving web requests.
To run in fine-grained mode, set the spark.mesos.coarse
property to false in your
SparkConf:
{% highlight scala %} conf.set("spark.mesos.coarse", "false") {% endhighlight %}
You may also make use of spark.mesos.constraints
to set attribute based constraints on mesos resource offers. By default, all resource offers will be accepted.
{% highlight scala %} conf.set("spark.mesos.constraints", "os:centos7;us-east-1:false") {% endhighlight %}
For example, Let's say spark.mesos.constraints
is set to os:centos7;us-east-1:false
, then the resource offers will be checked to see if they meet both these constraints and only then will be accepted to start new executors.
Spark can make use of a Mesos Docker containerizer by setting the property spark.mesos.executor.docker.image
in your SparkConf.
The Docker image used must have an appropriate version of Spark already part of the image, or you can have Mesos download Spark via the usual methods.
Requires Mesos version 0.20.1 or later.
You can run Spark and Mesos alongside your existing Hadoop cluster by just launching them as a
separate service on the machines. To access Hadoop data from Spark, a full hdfs://
URL is required
(typically hdfs://<namenode>:9000/path
, but you can find the right URL on your Hadoop Namenode web
UI).
In addition, it is possible to also run Hadoop MapReduce on Mesos for better resource isolation and sharing between the two. In this case, Mesos will act as a unified scheduler that assigns cores to either Hadoop or Spark, as opposed to having them share resources via the Linux scheduler on each node. Please refer to Hadoop on Mesos.
In either case, HDFS runs separately from Hadoop MapReduce, without being scheduled through Mesos.
Mesos supports dynamic allocation only with coarse-grain mode, which can resize the number of executors based on statistics of the application. For general information, see Dynamic Resource Allocation.
The External Shuffle Service to use is the Mesos Shuffle Service. It provides shuffle data cleanup functionality
on top of the Shuffle Service since Mesos doesn't yet support notifying another framework's
termination. To launch it, run $SPARK_HOME/sbin/start-mesos-shuffle-service.sh
on all slave nodes, with spark.shuffle.service.enabled
set to true
.
This can also be achieved through Marathon, using a unique host constraint, and the following command: bin/spark-class org.apache.spark.deploy.mesos.MesosExternalShuffleService
.
See the configuration page for information on Spark configurations. The following configs are specific for Spark on Mesos.
Property Name | Default | Meaning |
---|---|---|
spark.mesos.coarse |
true |
If set to true , runs over Mesos clusters in "coarse-grained" sharing mode, where Spark acquires one long-lived Mesos task on each machine.
If set to false , runs over Mesos cluster in "fine-grained" sharing mode, where one Mesos task is created per Spark task.
Detailed information in 'Mesos Run Modes'.
|
spark.mesos.extra.cores |
0 |
Set the extra number of cores for an executor to advertise. This does not result in more cores allocated. It instead means that an executor will "pretend" it has more cores, so that the driver will send it more tasks. Use this to increase parallelism. This setting is only used for Mesos coarse-grained mode. |
spark.mesos.mesosExecutor.cores |
1.0 |
(Fine-grained mode only) Number of cores to give each Mesos executor. This does not include the cores used to run the Spark tasks. In other words, even if no Spark task is being run, each Mesos executor will occupy the number of cores configured here. The value can be a floating point number. |
spark.mesos.executor.docker.image |
(none) |
Set the name of the docker image that the Spark executors will run in. The selected
image must have Spark installed, as well as a compatible version of the Mesos library.
The installed path of Spark in the image can be specified with spark.mesos.executor.home ;
the installed path of the Mesos library can be specified with spark.executorEnv.MESOS_NATIVE_JAVA_LIBRARY .
|
spark.mesos.executor.docker.volumes |
(none) |
Set the list of volumes which will be mounted into the Docker image, which was set using
spark.mesos.executor.docker.image . The format of this property is a comma-separated list of
mappings following the form passed to docker run -v . That is they take the form:
|
spark.mesos.executor.docker.portmaps |
(none) |
Set the list of incoming ports exposed by the Docker image, which was set using
spark.mesos.executor.docker.image . The format of this property is a comma-separated list of
mappings which take the form:
|
spark.mesos.executor.home |
driver side SPARK_HOME |
Set the directory in which Spark is installed on the executors in Mesos. By default, the
executors will simply use the driver's Spark home directory, which may not be visible to
them. Note that this is only relevant if a Spark binary package is not specified through
spark.executor.uri .
|
spark.mesos.executor.memoryOverhead |
executor memory * 0.10, with minimum of 384 |
The amount of additional memory, specified in MB, to be allocated per executor. By default,
the overhead will be larger of either 384 or 10% of spark.executor.memory . If set,
the final overhead will be this value.
|
spark.mesos.uris |
(none) | A comma-separated list of URIs to be downloaded to the sandbox when driver or executor is launched by Mesos. This applies to both coarse-grained and fine-grained mode. |
spark.mesos.principal |
(none) | Set the principal with which Spark framework will use to authenticate with Mesos. |
spark.mesos.secret |
(none) | Set the secret with which Spark framework will use to authenticate with Mesos. |
spark.mesos.role |
* |
Set the role of this Spark framework for Mesos. Roles are used in Mesos for reservations and resource weight sharing. |
spark.mesos.constraints |
(none) |
Attribute based constraints on mesos resource offers. By default, all resource offers will be accepted. Refer to Mesos Attributes & Resources for more information on attributes.
|
spark.mesos.driver.webui.url |
(none) |
Set the Spark Mesos driver webui_url for interacting with the framework. If unset it will point to Spark's internal web UI. |
spark.mesos.dispatcher.webui.url |
(none) |
Set the Spark Mesos dispatcher webui_url for interacting with the framework. If unset it will point to Spark's internal web UI. |
A few places to look during debugging:
- Mesos master on port
:5050
- Slaves should appear in the slaves tab
- Spark applications should appear in the frameworks tab
- Tasks should appear in the details of a framework
- Check the stdout and stderr of the sandbox of failed tasks
- Mesos logs
- Master and slave logs are both in
/var/log/mesos
by default
- Master and slave logs are both in
And common pitfalls:
- Spark assembly not reachable/accessible
- Slaves must be able to download the Spark binary package from the
http://
,hdfs://
ors3n://
URL you gave
- Slaves must be able to download the Spark binary package from the
- Firewall blocking communications
- Check for messages about failed connections
- Temporarily disable firewalls for debugging and then poke appropriate holes