Skip to content

Commit

Permalink
[SPARK-14082][MESOS] Enable GPU support with Mesos
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Enable GPU resources to be used when running coarse grain mode with Mesos.

## How was this patch tested?

Manual test with GPU.

Author: Timothy Chen <[email protected]>

Closes apache#14644 from tnachen/gpu_mesos.
  • Loading branch information
tnachen authored and srowen committed Oct 10, 2016
1 parent 3f8a022 commit 29f186b
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 23 deletions.
9 changes: 9 additions & 0 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,15 @@ See the [configuration page](configuration.html) for information on Spark config
in the history server.
</td>
</tr>
<tr>
<td><code>spark.mesos.gpus.max</code></td>
<td><code>0</code></td>
<td>
Set the maximum number GPU resources to acquire for this job. Note that executors will still launch when no GPU resources are found
since this configuration is just a upper limit and not a guaranteed amount.
</td>
</tr>


</table>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
// Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt

val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)

private[this] val shutdownTimeoutMS =
conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s")
.ensuring(_ >= 0, "spark.mesos.coarse.shutdownTimeout must be >= 0")
Expand All @@ -72,7 +74,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(

// Cores we have acquired with each Mesos task ID
val coresByTaskId = new mutable.HashMap[String, Int]
val gpusByTaskId = new mutable.HashMap[String, Int]
var totalCoresAcquired = 0
var totalGpusAcquired = 0

// SlaveID -> Slave
// This map accumulates entries for the duration of the job. Slaves are never deleted, because
Expand Down Expand Up @@ -396,14 +400,16 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
launchTasks = true
val taskId = newMesosTaskId()
val offerCPUs = getResource(resources, "cpus").toInt
val taskGPUs = Math.min(
Math.max(0, maxGpus - totalGpusAcquired), getResource(resources, "gpus").toInt)

val taskCPUs = executorCores(offerCPUs)
val taskMemory = executorMemory(sc)

slaves.getOrElseUpdate(slaveId, new Slave(offer.getHostname)).taskIDs.add(taskId)

val (resourcesLeft, resourcesToUse) =
partitionTaskResources(resources, taskCPUs, taskMemory)
partitionTaskResources(resources, taskCPUs, taskMemory, taskGPUs)

val taskBuilder = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
Expand All @@ -425,28 +431,39 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
remainingResources(offerId) = resourcesLeft.asJava
totalCoresAcquired += taskCPUs
coresByTaskId(taskId) = taskCPUs
if (taskGPUs > 0) {
totalGpusAcquired += taskGPUs
gpusByTaskId(taskId) = taskGPUs
}
}
}
}
tasks.toMap
}

/** Extracts task needed resources from a list of available resources. */
private def partitionTaskResources(resources: JList[Resource], taskCPUs: Int, taskMemory: Int)
private def partitionTaskResources(
resources: JList[Resource],
taskCPUs: Int,
taskMemory: Int,
taskGPUs: Int)
: (List[Resource], List[Resource]) = {

// partition cpus & mem
val (afterCPUResources, cpuResourcesToUse) = partitionResources(resources, "cpus", taskCPUs)
val (afterMemResources, memResourcesToUse) =
partitionResources(afterCPUResources.asJava, "mem", taskMemory)
val (afterGPUResources, gpuResourcesToUse) =
partitionResources(afterMemResources.asJava, "gpus", taskGPUs)

// If user specifies port numbers in SparkConfig then consecutive tasks will not be launched
// on the same host. This essentially means one executor per host.
// TODO: handle network isolator case
val (nonPortResources, portResourcesToUse) =
partitionPortResources(nonZeroPortValuesFromConfig(sc.conf), afterMemResources)
partitionPortResources(nonZeroPortValuesFromConfig(sc.conf), afterGPUResources)

(nonPortResources, cpuResourcesToUse ++ memResourcesToUse ++ portResourcesToUse)
(nonPortResources,
cpuResourcesToUse ++ memResourcesToUse ++ portResourcesToUse ++ gpuResourcesToUse)
}

private def canLaunchTask(slaveId: String, resources: JList[Resource]): Boolean = {
Expand Down Expand Up @@ -513,6 +530,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
totalCoresAcquired -= cores
coresByTaskId -= taskId
}
// Also remove the gpus we have remembered for this task, if it's in the hashmap
for (gpus <- gpusByTaskId.get(taskId)) {
totalGpusAcquired -= gpus
gpusByTaskId -= taskId
}
// If it was a failure, mark the slave as failed for blacklisting purposes
if (TaskState.isFailed(state)) {
slave.taskFailures += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import scala.util.control.NonFatal
import com.google.common.base.Splitter
import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler, SchedulerDriver}
import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
import org.apache.mesos.Protos.FrameworkInfo.Capability
import org.apache.mesos.protobuf.{ByteString, GeneratedMessage}

import org.apache.spark.{SparkConf, SparkContext, SparkException}
Expand Down Expand Up @@ -93,6 +94,10 @@ trait MesosSchedulerUtils extends Logging {
conf.getOption("spark.mesos.role").foreach { role =>
fwInfoBuilder.setRole(role)
}
val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
if (maxGpus > 0) {
fwInfoBuilder.addCapabilities(Capability.newBuilder().setType(Capability.Type.GPU_RESOURCES))
}
if (credBuilder.hasPrincipal) {
new MesosSchedulerDriver(
scheduler, fwInfoBuilder.build(), masterUrl, credBuilder.build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite

val minMem = backend.executorMemory(sc)
val minCpu = 4
val offers = List((minMem, minCpu))
val offers = List(Resources(minMem, minCpu))

// launches a task on a valid offer
offerResources(offers)
Expand Down Expand Up @@ -95,8 +95,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
// launches a task on a valid offer
val minMem = backend.executorMemory(sc) + 1024
val minCpu = 4
val offer1 = (minMem, minCpu)
val offer2 = (minMem, 1)
val offer1 = Resources(minMem, minCpu)
val offer2 = Resources(minMem, 1)
offerResources(List(offer1, offer2))
verifyTaskLaunched(driver, "o1")

Expand All @@ -115,7 +115,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
setBackend(Map("spark.executor.cores" -> executorCores.toString))

val executorMemory = backend.executorMemory(sc)
val offers = List((executorMemory * 2, executorCores + 1))
val offers = List(Resources(executorMemory * 2, executorCores + 1))
offerResources(offers)

val taskInfos = verifyTaskLaunched(driver, "o1")
Expand All @@ -130,7 +130,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite

val executorMemory = backend.executorMemory(sc)
val offerCores = 10
offerResources(List((executorMemory * 2, offerCores)))
offerResources(List(Resources(executorMemory * 2, offerCores)))

val taskInfos = verifyTaskLaunched(driver, "o1")
assert(taskInfos.length == 1)
Expand All @@ -144,7 +144,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
setBackend(Map("spark.cores.max" -> maxCores.toString))

val executorMemory = backend.executorMemory(sc)
offerResources(List((executorMemory, maxCores + 1)))
offerResources(List(Resources(executorMemory, maxCores + 1)))

val taskInfos = verifyTaskLaunched(driver, "o1")
assert(taskInfos.length == 1)
Expand All @@ -153,9 +153,38 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(cpus == maxCores)
}

test("mesos does not acquire gpus if not specified") {
setBackend()

val executorMemory = backend.executorMemory(sc)
offerResources(List(Resources(executorMemory, 1, 1)))

val taskInfos = verifyTaskLaunched(driver, "o1")
assert(taskInfos.length == 1)

val gpus = backend.getResource(taskInfos.head.getResourcesList, "gpus")
assert(gpus == 0.0)
}


test("mesos does not acquire more than spark.mesos.gpus.max") {
val maxGpus = 5
setBackend(Map("spark.mesos.gpus.max" -> maxGpus.toString))

val executorMemory = backend.executorMemory(sc)
offerResources(List(Resources(executorMemory, 1, maxGpus + 1)))

val taskInfos = verifyTaskLaunched(driver, "o1")
assert(taskInfos.length == 1)

val gpus = backend.getResource(taskInfos.head.getResourcesList, "gpus")
assert(gpus == maxGpus)
}


test("mesos declines offers that violate attribute constraints") {
setBackend(Map("spark.mesos.constraints" -> "x:true"))
offerResources(List((backend.executorMemory(sc), 4)))
offerResources(List(Resources(backend.executorMemory(sc), 4)))
verifyDeclinedOffer(driver, createOfferId("o1"), true)
}

Expand All @@ -165,8 +194,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite

val executorMemory = backend.executorMemory(sc)
offerResources(List(
(executorMemory, maxCores + 1),
(executorMemory, maxCores + 1)))
Resources(executorMemory, maxCores + 1),
Resources(executorMemory, maxCores + 1)))

verifyTaskLaunched(driver, "o1")
verifyDeclinedOffer(driver, createOfferId("o2"), true)
Expand All @@ -180,8 +209,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite

val executorMemory = backend.executorMemory(sc)
offerResources(List(
(executorMemory * 2, executorCores * 2),
(executorMemory * 2, executorCores * 2)))
Resources(executorMemory * 2, executorCores * 2),
Resources(executorMemory * 2, executorCores * 2)))

verifyTaskLaunched(driver, "o1")
verifyTaskLaunched(driver, "o2")
Expand All @@ -193,7 +222,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite

// offer with room for two executors
val executorMemory = backend.executorMemory(sc)
offerResources(List((executorMemory * 2, executorCores * 2)))
offerResources(List(Resources(executorMemory * 2, executorCores * 2)))

// verify two executors were started on a single offer
val taskInfos = verifyTaskLaunched(driver, "o1")
Expand Down Expand Up @@ -397,7 +426,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
setBackend()

// launches a task on a valid offer
val offers = List((backend.executorMemory(sc), 1))
val offers = List(Resources(backend.executorMemory(sc), 1))
offerResources(offers)
verifyTaskLaunched(driver, "o1")

Expand Down Expand Up @@ -434,6 +463,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(launchedTasks.head.getCommand.getUrisList.asScala(0).getValue == url)
}

private case class Resources(mem: Int, cpus: Int, gpus: Int = 0)

private def verifyDeclinedOffer(driver: SchedulerDriver,
offerId: OfferID,
filter: Boolean = false): Unit = {
Expand All @@ -444,9 +475,9 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
}
}

private def offerResources(offers: List[(Int, Int)], startId: Int = 1): Unit = {
private def offerResources(offers: List[Resources], startId: Int = 1): Unit = {
val mesosOffers = offers.zipWithIndex.map {case (offer, i) =>
createOffer(s"o${i + startId}", s"s${i + startId}", offer._1, offer._2)}
createOffer(s"o${i + startId}", s"s${i + startId}", offer.mem, offer.cpus, None, offer.gpus)}

backend.resourceOffers(driver, mesosOffers.asJava)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ object Utils {
offerId: String,
slaveId: String,
mem: Int,
cpu: Int,
ports: Option[(Long, Long)] = None): Offer = {
cpus: Int,
ports: Option[(Long, Long)] = None,
gpus: Int = 0): Offer = {
val builder = Offer.newBuilder()
builder.addResourcesBuilder()
.setName("mem")
Expand All @@ -42,14 +43,20 @@ object Utils {
builder.addResourcesBuilder()
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Scalar.newBuilder().setValue(cpu))
.setScalar(Scalar.newBuilder().setValue(cpus))
ports.foreach { resourcePorts =>
builder.addResourcesBuilder()
.setName("ports")
.setType(Value.Type.RANGES)
.setRanges(Ranges.newBuilder().addRange(MesosRange.newBuilder()
.setBegin(resourcePorts._1).setEnd(resourcePorts._2).build()))
}
if (gpus > 0) {
builder.addResourcesBuilder()
.setName("gpus")
.setType(Value.Type.SCALAR)
.setScalar(Scalar.newBuilder().setValue(gpus))
}
builder.setId(createOfferId(offerId))
.setFrameworkId(FrameworkID.newBuilder()
.setValue("f1"))
Expand Down Expand Up @@ -82,4 +89,3 @@ object Utils {
TaskID.newBuilder().setValue(taskId).build()
}
}

0 comments on commit 29f186b

Please sign in to comment.