Skip to content

Commit

Permalink
[SPARK-16637] Unified containerizer
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

New config var: spark.mesos.docker.containerizer={"mesos","docker" (default)}

This adds support for running docker containers via the Mesos unified containerizer: http://mesos.apache.org/documentation/latest/container-image/

The benefit is losing the dependency on `dockerd`, and all the costs which it incurs.

I've also updated the supported Mesos version to 0.28.2 for support of the required protobufs.

This is blocked on: apache#14167

## How was this patch tested?

- manually testing jobs submitted with both "mesos" and "docker" settings for the new config var.
- spark/mesos integration test suite

Author: Michael Gummelt <[email protected]>

Closes apache#14275 from mgummelt/unified-containerizer.
  • Loading branch information
Michael Gummelt authored and srowen committed Jul 29, 2016
1 parent 04a2c07 commit 266b92f
Show file tree
Hide file tree
Showing 18 changed files with 149 additions and 79 deletions.
13 changes: 9 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import org.apache.spark.util.Utils
*
* @param loadDefaults whether to also load values from Java system properties
*/
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {

import SparkConf._

Expand Down Expand Up @@ -370,6 +370,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
settings.entrySet().asScala.map(x => (x.getKey, x.getValue)).toArray
}

/** Get all parameters that start with `prefix` */
def getAllWithPrefix(prefix: String): Array[(String, String)] = {
getAll.filter { case (k, v) => k.startsWith(prefix) }
.map { case (k, v) => (k.substring(prefix.length), v) }
}


/** Get a parameter as an integer, falling back to a default if not set */
def getInt(key: String, defaultValue: Int): Int = {
getOption(key).map(_.toInt).getOrElse(defaultValue)
Expand All @@ -392,9 +399,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {

/** Get all executor environment variables set on this SparkConf */
def getExecutorEnv: Seq[(String, String)] = {
val prefix = "spark.executorEnv."
getAll.filter{case (k, v) => k.startsWith(prefix)}
.map{case (k, v) => (k.substring(prefix.length), v)}
getAllWithPrefix("spark.executorEnv.")
}

/**
Expand Down
8 changes: 3 additions & 5 deletions core/src/main/scala/org/apache/spark/TaskState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,11 @@ private[spark] object TaskState extends Enumeration {
}

def fromMesos(mesosState: MesosTaskState): TaskState = mesosState match {
case MesosTaskState.TASK_STAGING => LAUNCHING
case MesosTaskState.TASK_STARTING => LAUNCHING
case MesosTaskState.TASK_RUNNING => RUNNING
case MesosTaskState.TASK_STAGING | MesosTaskState.TASK_STARTING => LAUNCHING
case MesosTaskState.TASK_RUNNING | MesosTaskState.TASK_KILLING => RUNNING
case MesosTaskState.TASK_FINISHED => FINISHED
case MesosTaskState.TASK_FAILED => FAILED
case MesosTaskState.TASK_KILLED => KILLED
case MesosTaskState.TASK_LOST => LOST
case MesosTaskState.TASK_ERROR => LOST
case MesosTaskState.TASK_LOST | MesosTaskState.TASK_ERROR => LOST
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.deploy.mesos

import java.util.Date

import org.apache.spark.SparkConf
import org.apache.spark.deploy.Command
import org.apache.spark.scheduler.cluster.mesos.MesosClusterRetryState

Expand All @@ -40,24 +41,28 @@ private[spark] class MesosDriverDescription(
val cores: Double,
val supervise: Boolean,
val command: Command,
val schedulerProperties: Map[String, String],
schedulerProperties: Map[String, String],
val submissionId: String,
val submissionDate: Date,
val retryState: Option[MesosClusterRetryState] = None)
extends Serializable {

val conf = new SparkConf(false)
schedulerProperties.foreach {case (k, v) => conf.set(k, v)}

def copy(
name: String = name,
jarUrl: String = jarUrl,
mem: Int = mem,
cores: Double = cores,
supervise: Boolean = supervise,
command: Command = command,
schedulerProperties: Map[String, String] = schedulerProperties,
schedulerProperties: SparkConf = conf,
submissionId: String = submissionId,
submissionDate: Date = submissionDate,
retryState: Option[MesosClusterRetryState] = retryState): MesosDriverDescription = {
new MesosDriverDescription(name, jarUrl, mem, cores, supervise, command, schedulerProperties,

new MesosDriverDescription(name, jarUrl, mem, cores, supervise, command, conf.getAll.toMap,
submissionId, submissionDate, retryState)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver")
val driverDescription = Iterable.apply(driverState.description)
val submissionState = Iterable.apply(driverState.submissionState)
val command = Iterable.apply(driverState.description.command)
val schedulerProperties = Iterable.apply(driverState.description.schedulerProperties)
val schedulerProperties = Iterable.apply(driverState.description.conf.getAll.toMap)
val commandEnv = Iterable.apply(driverState.description.command.environment)
val driverTable =
UIUtils.listingTable(driverHeaders, driverRow, driverDescription)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,19 +353,16 @@ private[spark] class MesosClusterScheduler(
}
}

private def getDriverExecutorURI(desc: MesosDriverDescription) = {
desc.schedulerProperties.get("spark.executor.uri")
.orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
private def getDriverExecutorURI(desc: MesosDriverDescription): Option[String] = {
desc.conf.getOption("spark.executor.uri")
.orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
}

private def getDriverEnvironment(desc: MesosDriverDescription): Environment = {
val env = {
val executorOpts = desc.schedulerProperties.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
val executorOpts = desc.conf.getAll.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
val executorEnv = Map("SPARK_EXECUTOR_OPTS" -> executorOpts)

val prefix = "spark.mesos.driverEnv."
val driverEnv = desc.schedulerProperties.filterKeys(_.startsWith(prefix))
.map { case (k, v) => (k.substring(prefix.length), v) }
val driverEnv = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.")

driverEnv ++ executorEnv ++ desc.command.environment
}
Expand All @@ -379,8 +376,8 @@ private[spark] class MesosClusterScheduler(

private def getDriverUris(desc: MesosDriverDescription): List[CommandInfo.URI] = {
val confUris = List(conf.getOption("spark.mesos.uris"),
desc.schedulerProperties.get("spark.mesos.uris"),
desc.schedulerProperties.get("spark.submit.pyFiles")).flatMap(
desc.conf.getOption("spark.mesos.uris"),
desc.conf.getOption("spark.submit.pyFiles")).flatMap(
_.map(_.split(",").map(_.trim))
).flatten

Expand All @@ -391,7 +388,7 @@ private[spark] class MesosClusterScheduler(
}

private def getDriverCommandValue(desc: MesosDriverDescription): String = {
val dockerDefined = desc.schedulerProperties.contains("spark.mesos.executor.docker.image")
val dockerDefined = desc.conf.contains("spark.mesos.executor.docker.image")
val executorUri = getDriverExecutorURI(desc)
// Gets the path to run spark-submit, and the path to the Mesos sandbox.
val (executable, sandboxPath) = if (dockerDefined) {
Expand All @@ -411,7 +408,7 @@ private[spark] class MesosClusterScheduler(
// Sandbox path points to the parent folder as we chdir into the folderBasename.
(cmdExecutable, "..")
} else {
val executorSparkHome = desc.schedulerProperties.get("spark.mesos.executor.home")
val executorSparkHome = desc.conf.getOption("spark.mesos.executor.home")
.orElse(conf.getOption("spark.home"))
.orElse(Option(System.getenv("SPARK_HOME")))
.getOrElse {
Expand All @@ -438,7 +435,7 @@ private[spark] class MesosClusterScheduler(

private def generateCmdOption(desc: MesosDriverDescription, sandboxPath: String): Seq[String] = {
var options = Seq(
"--name", desc.schedulerProperties("spark.app.name"),
"--name", desc.conf.get("spark.app.name"),
"--master", s"mesos://${conf.get("spark.master")}",
"--driver-cores", desc.cores.toString,
"--driver-memory", s"${desc.mem}M")
Expand All @@ -454,19 +451,19 @@ private[spark] class MesosClusterScheduler(
options ++= Seq("--class", desc.command.mainClass)
}

desc.schedulerProperties.get("spark.executor.memory").map { v =>
desc.conf.getOption("spark.executor.memory").foreach { v =>
options ++= Seq("--executor-memory", v)
}
desc.schedulerProperties.get("spark.cores.max").map { v =>
desc.conf.getOption("spark.cores.max").foreach { v =>
options ++= Seq("--total-executor-cores", v)
}
desc.schedulerProperties.get("spark.submit.pyFiles").map { pyFiles =>
desc.conf.getOption("spark.submit.pyFiles").foreach { pyFiles =>
val formattedFiles = pyFiles.split(",")
.map { path => new File(sandboxPath, path.split("/").last).toString() }
.mkString(",")
options ++= Seq("--py-files", formattedFiles)
}
desc.schedulerProperties
desc.conf.getAll
.filter { case (key, _) => !replicatedOptionsBlacklist.contains(key) }
.foreach { case (key, value) => options ++= Seq("--conf", s"$key=${shellEscape(value)}") }
options
Expand All @@ -476,6 +473,7 @@ private[spark] class MesosClusterScheduler(
* Escape args for Unix-like shells, unless already quoted by the user.
* Based on: http://www.gnu.org/software/bash/manual/html_node/Double-Quotes.html
* and http://www.grymoire.com/Unix/Quote.html
*
* @param value argument
* @return escaped argument
*/
Expand All @@ -498,6 +496,33 @@ private[spark] class MesosClusterScheduler(
}
}

private def createTaskInfo(desc: MesosDriverDescription, offer: ResourceOffer): TaskInfo = {
val taskId = TaskID.newBuilder().setValue(desc.submissionId).build()

val (remainingResources, cpuResourcesToUse) =
partitionResources(offer.resources, "cpus", desc.cores)
val (finalResources, memResourcesToUse) =
partitionResources(remainingResources.asJava, "mem", desc.mem)
offer.resources = finalResources.asJava

val appName = desc.conf.get("spark.app.name")
val taskInfo = TaskInfo.newBuilder()
.setTaskId(taskId)
.setName(s"Driver for ${appName}")
.setSlaveId(offer.slaveId)
.setCommand(buildDriverCommand(desc))
.addAllResources(cpuResourcesToUse.asJava)
.addAllResources(memResourcesToUse.asJava)

desc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(image,
desc.conf,
taskInfo.getContainerBuilder)
}

taskInfo.build
}

/**
* This method takes all the possible candidates and attempt to schedule them with Mesos offers.
* Every time a new task is scheduled, the afterLaunchCallback is called to perform post scheduled
Expand All @@ -521,32 +546,12 @@ private[spark] class MesosClusterScheduler(
s"cpu: $driverCpu, mem: $driverMem")
} else {
val offer = offerOption.get
val taskId = TaskID.newBuilder().setValue(submission.submissionId).build()
val (remainingResources, cpuResourcesToUse) =
partitionResources(offer.resources, "cpus", driverCpu)
val (finalResources, memResourcesToUse) =
partitionResources(remainingResources.asJava, "mem", driverMem)
val commandInfo = buildDriverCommand(submission)
val appName = submission.schedulerProperties("spark.app.name")
val taskInfo = TaskInfo.newBuilder()
.setTaskId(taskId)
.setName(s"Driver for $appName")
.setSlaveId(offer.slaveId)
.setCommand(commandInfo)
.addAllResources(cpuResourcesToUse.asJava)
.addAllResources(memResourcesToUse.asJava)
offer.resources = finalResources.asJava
submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
image,
submission.schedulerProperties.get,
taskInfo.getContainerBuilder())
}
val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
queuedTasks += taskInfo.build()
val task = createTaskInfo(submission, offer)
queuedTasks += task
logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " +
submission.submissionId)
val newState = new MesosClusterSubmissionState(submission, taskId, offer.slaveId,
val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId,
None, new Date(), None)
launchedDrivers(submission.submissionId) = newState
launchedDriversState.persist(submission.submissionId, newState)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
image,
sc.conf.getOption,
sc.conf,
taskBuilder.getContainerBuilder
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
image,
sc.conf.getOption,
sc.conf,
executorInfo.getContainerBuilder()
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.spark.scheduler.cluster.mesos

import org.apache.mesos.Protos.{ContainerInfo, Volume}
import org.apache.mesos.Protos.{ContainerInfo, Image, Volume}
import org.apache.mesos.Protos.ContainerInfo.DockerInfo

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging

/**
Expand Down Expand Up @@ -104,19 +105,33 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
def addDockerInfo(
container: ContainerInfo.Builder,
image: String,
containerizer: String,
forcePullImage: Boolean = false,
volumes: Option[List[Volume]] = None,
network: Option[ContainerInfo.DockerInfo.Network] = None,
portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = {

val docker = ContainerInfo.DockerInfo.newBuilder()
.setImage(image)
.setForcePullImage(forcePullImage)
containerizer match {
case "docker" =>
container.setType(ContainerInfo.Type.DOCKER)
val docker = ContainerInfo.DockerInfo.newBuilder()
.setImage(image)
.setForcePullImage(forcePullImage)
// TODO (mgummelt): Remove this. Portmaps have no effect,
// as we don't support bridge networking.
portmaps.foreach(_.foreach(docker.addPortMappings))
container.setDocker(docker)
case "mesos" =>
container.setType(ContainerInfo.Type.MESOS)
val imageProto = Image.newBuilder()
.setType(Image.Type.DOCKER)
.setDocker(Image.Docker.newBuilder().setName(image))
.setCached(!forcePullImage)
container.setMesos(ContainerInfo.MesosInfo.newBuilder().setImage(imageProto))
case _ =>
throw new SparkException(
"spark.mesos.containerizer must be one of {\"docker\", \"mesos\"}")
}

network.foreach(docker.setNetwork)
portmaps.foreach(_.foreach(docker.addPortMappings))
container.setType(ContainerInfo.Type.DOCKER)
container.setDocker(docker.build())
volumes.foreach(_.foreach(container.addVolumes))
}

Expand All @@ -125,18 +140,23 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
*/
def setupContainerBuilderDockerInfo(
imageName: String,
conf: String => Option[String],
conf: SparkConf,
builder: ContainerInfo.Builder): Unit = {
val forcePullImage = conf("spark.mesos.executor.docker.forcePullImage")
val forcePullImage = conf
.getOption("spark.mesos.executor.docker.forcePullImage")
.exists(_.equals("true"))
val volumes = conf("spark.mesos.executor.docker.volumes")
val volumes = conf
.getOption("spark.mesos.executor.docker.volumes")
.map(parseVolumesSpec)
val portmaps = conf("spark.mesos.executor.docker.portmaps")
val portmaps = conf
.getOption("spark.mesos.executor.docker.portmaps")
.map(parsePortMappingsSpec)

val containerizer = conf.get("spark.mesos.containerizer", "docker")
addDockerInfo(
builder,
imageName,
containerizer,
forcePullImage = forcePullImage,
volumes = volumes,
portmaps = portmaps)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.{SparkConf, SparkContext, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils


/**
* Shared trait for implementing a Mesos Scheduler. This holds common state and helper
* methods and Mesos scheduler will use.
Expand Down Expand Up @@ -79,7 +80,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
credBuilder.setPrincipal(principal)
}
conf.getOption("spark.mesos.secret").foreach { secret =>
credBuilder.setSecret(ByteString.copyFromUtf8(secret))
credBuilder.setSecret(secret)
}
if (credBuilder.hasSecret && !fwInfoBuilder.hasPrincipal) {
throw new SparkException(
Expand Down
Loading

0 comments on commit 266b92f

Please sign in to comment.