Skip to content

Commit

Permalink
[SPARK-29150][CORE] Update RDD API for Stage level scheduling to be p…
Browse files Browse the repository at this point in the history
…ublic

### What changes were proposed in this pull request?

This PR is to make the RDD api for stage level scheduling public.  All the other jiras for functionality are in so now we can make it public for people to use and tag it properly.

### Why are the changes needed?

to make it usable.

### Does this PR introduce _any_ user-facing change?

Yes the api is now public

### How was this patch tested?

Unit tests and manually tested.

Closes apache#28697 from tgravescs/SPARK-29150.

Lead-authored-by: Thomas Graves <[email protected]>
Co-authored-by: Thomas Graves <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
  • Loading branch information
2 people authored and HyukjinKwon committed Jun 2, 2020
1 parent 283814a commit ff4a97d
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 25 deletions.
10 changes: 4 additions & 6 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1780,10 +1780,9 @@ abstract class RDD[T: ClassTag](
* It will result in new executors with the resources specified being acquired to
* calculate the RDD.
*/
// PRIVATE for now, added for testing purposes, will be made public with SPARK-29150
@Experimental
@Since("3.0.0")
private[spark] def withResources(rp: ResourceProfile): this.type = {
@Since("3.1.0")
def withResources(rp: ResourceProfile): this.type = {
resourceProfile = Option(rp)
sc.resourceProfileManager.addResourceProfile(resourceProfile.get)
this
Expand All @@ -1794,10 +1793,9 @@ abstract class RDD[T: ClassTag](
* @return the user specified ResourceProfile or null (for Java compatibility) if
* none was specified
*/
// PRIVATE for now, added for testing purposes, will be made public with SPARK-29150
@Experimental
@Since("3.0.0")
private[spark] def getResourceProfile(): ResourceProfile = resourceProfile.getOrElse(null)
@Since("3.1.0")
def getResourceProfile(): ResourceProfile = resourceProfile.getOrElse(null)

// =======================================================================
// Other internal methods and fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.resource

import org.apache.spark.annotation.{Evolving, Since}

/**
* An Executor resource request. This is used in conjunction with the ResourceProfile to
* programmatically specify the resources needed for an RDD that will be applied at the
Expand Down Expand Up @@ -46,11 +48,10 @@ package org.apache.spark.resource
* allocated. The script runs on Executors startup to discover the addresses
* of the resources available.
* @param vendor Optional vendor, required for some cluster managers
*
* This api is currently private until the rest of the pieces are in place and then it
* will become public.
*/
private[spark] class ExecutorResourceRequest(
@Evolving
@Since("3.1.0")
class ExecutorResourceRequest(
val resourceName: String,
val amount: Long,
val discoveryScript: String = "",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._

import org.apache.spark.annotation.{Evolving, Since}
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.resource.ResourceProfile._

/**
* A set of Executor resource requests. This is used in conjunction with the ResourceProfile to
* programmatically specify the resources needed for an RDD that will be applied at the
* stage level.
*
* This api is currently private until the rest of the pieces are in place and then it
* will become public.
*/
private[spark] class ExecutorResourceRequests() extends Serializable {
@Evolving
@Since("3.1.0")
class ExecutorResourceRequests() extends Serializable {

private val _executorResources = new ConcurrentHashMap[String, ExecutorResourceRequest]()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.annotation.Evolving
import org.apache.spark.annotation.{Evolving, Since}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY
Expand All @@ -37,6 +37,7 @@ import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY
* This is meant to be immutable so user can't change it after building.
*/
@Evolving
@Since("3.1.0")
class ResourceProfile(
val executorResources: Map[String, ExecutorResourceRequest],
val taskResources: Map[String, TaskResourceRequest]) extends Serializable with Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,19 @@ import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._

import org.apache.spark.annotation.Evolving
import org.apache.spark.annotation.{Evolving, Since}


/**
* Resource profile builder to build a Resource profile to associate with an RDD.
* A ResourceProfile allows the user to specify executor and task requirements for an RDD
* that will get applied during a stage. This allows the user to change the resource
* requirements between stages.
*
*/
@Evolving
private[spark] class ResourceProfileBuilder() {
@Since("3.1.0")
class ResourceProfileBuilder() {

private val _taskResources = new ConcurrentHashMap[String, TaskResourceRequest]()
private val _executorResources = new ConcurrentHashMap[String, ExecutorResourceRequest]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@

package org.apache.spark.resource

import org.apache.spark.annotation.{Evolving, Since}

/**
* A task resource request. This is used in conjuntion with the ResourceProfile to
* programmatically specify the resources needed for an RDD that will be applied at the
* stage level.
*
* Use TaskResourceRequests class as a convenience API.
*
* This api is currently private until the rest of the pieces are in place and then it
* will become public.
*/
private[spark] class TaskResourceRequest(val resourceName: String, val amount: Double)
@Evolving
@Since("3.1.0")
class TaskResourceRequest(val resourceName: String, val amount: Double)
extends Serializable {

assert(amount <= 0.5 || amount % 1 == 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._

import org.apache.spark.annotation.{Evolving, Since}
import org.apache.spark.resource.ResourceProfile._

/**
* A set of task resource requests. This is used in conjunction with the ResourceProfile to
* programmatically specify the resources needed for an RDD that will be applied at the
* stage level.
*
* This api is currently private until the rest of the pieces are in place and then it
* will become public.
*/
private[spark] class TaskResourceRequests() extends Serializable {
@Evolving
@Since("3.1.0")
class TaskResourceRequests() extends Serializable {

private val _taskResources = new ConcurrentHashMap[String, TaskResourceRequest]()

Expand Down

0 comments on commit ff4a97d

Please sign in to comment.