Skip to content

Commit

Permalink
Merge branch 'master' into scala-2.10
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
	docs/_config.yml
	project/SparkBuild.scala
	repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
  • Loading branch information
ScrapCodes committed Oct 1, 2013
2 parents 9865fd6 + 714fdab commit 5829692
Show file tree
Hide file tree
Showing 57 changed files with 252 additions and 136 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,4 @@ project's open source license. Whether or not you state this explicitly, by
submitting any copyrighted material via pull request, email, or other means
you agree to license the material under the project's open source license and
warrant that you have the legal authority to do so.

2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>0.8.0-SNAPSHOT</version>
<version>0.9.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>0.8.0-SNAPSHOT</version>
<version>0.9.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>0.8.0-SNAPSHOT</version>
<version>0.9.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
ClusterScheduler, Schedulable, SchedulingMode}
ClusterScheduler}
import org.apache.spark.scheduler.local.LocalScheduler
import org.apache.spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.storage.{StorageUtils, BlockManagerSource}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ClosureCleaner, Utils, MetadataCleaner, TimeStampedHashMap}
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def map[R](f: JFunction[T, R]): JavaRDD[R] =
new JavaRDD(rdd.map(f)(f.returnType()))(f.returnType())

/**
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
*/
def mapPartitionsWithIndex[R: ClassManifest](
f: JFunction2[Int, java.util.Iterator[T], java.util.Iterator[R]],
preservesPartitioning: Boolean = false): JavaRDD[R] =
new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))),
preservesPartitioning))

/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
Expand Down
38 changes: 28 additions & 10 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -756,24 +756,42 @@ abstract class RDD[T: ClassTag](
}

/**
* Take the first num elements of the RDD. This currently scans the partitions *one by one*, so
* it will be slow if a lot of partitions are required. In that case, use collect() to get the
* whole RDD instead.
* Take the first num elements of the RDD. It works by first scanning one partition, and use the
* results from that partition to estimate the number of additional partitions needed to satisfy
* the limit.
*/
def take(num: Int): Array[T] = {
if (num == 0) {
return new Array[T](0)
}

val buf = new ArrayBuffer[T]
var p = 0
while (buf.size < num && p < partitions.size) {
val totalParts = this.partitions.length
var partsScanned = 0
while (buf.size < num && partsScanned < totalParts) {
// The number of partitions to try in this iteration. It is ok for this number to be
// greater than totalParts because we actually cap it at totalParts in runJob.
var numPartsToTry = 1
if (partsScanned > 0) {
// If we didn't find any rows after the first iteration, just try all partitions next.
// Otherwise, interpolate the number of partitions we need to try, but overestimate it
// by 50%.
if (buf.size == 0) {
numPartsToTry = totalParts - 1
} else {
numPartsToTry = (1.5 * num * partsScanned / buf.size).toInt
}
}
numPartsToTry = math.max(0, numPartsToTry) // guard against negative num of partitions

val left = num - buf.size
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, Array(p), true)
buf ++= res(0)
if (buf.size == num)
return buf.toArray
p += 1
val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p, allowLocal = true)

res.foreach(buf ++= _.take(num - buf.size))
partsScanned += numPartsToTry
}

return buf.toArray
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.scheduler.cluster.TaskInfo
import org.apache.spark.storage.{BlockManager, BlockManagerMaster}
import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.scheduler

import java.util.Properties

import org.apache.spark.scheduler.cluster.TaskInfo
import scala.collection.mutable.Map

import org.apache.spark._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import scala.io.Source
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.cluster.TaskInfo

// Used to record runtime information for each job, including RDD graph
// tasks' start/stop shuffle information and information from outside
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster
package org.apache.spark.scheduler

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap

import org.apache.spark.Logging
import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode

/**
* An Schedulable entity that represent collection of Pools or TaskSetManagers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster
package org.apache.spark.scheduler

import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode

import scala.collection.mutable.ArrayBuffer
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster
package org.apache.spark.scheduler

import java.io.{FileInputStream, InputStream}
import java.util.{NoSuchElementException, Properties}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster
package org.apache.spark.scheduler

/**
* An interface for sort algorithm
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster
package org.apache.spark.scheduler

/**
* "FAIR" and "FIFO" determines which policy is used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.scheduler

import java.util.Properties
import org.apache.spark.scheduler.cluster.TaskInfo
import org.apache.spark.util.{Utils, Distribution}
import org.apache.spark.{Logging, SparkContext, TaskEndReason}
import org.apache.spark.executor.TaskMetrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.spark.scheduler

import org.apache.spark.scheduler.cluster.TaskInfo
import scala.collection._

import org.apache.spark.executor.TaskMetrics

case class StageInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster
package org.apache.spark.scheduler

import java.nio.ByteBuffer
import org.apache.spark.util.SerializableBuffer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster
package org.apache.spark.scheduler

import org.apache.spark.util.Utils

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster
package org.apache.spark.scheduler


private[spark] object TaskLocality
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.spark.scheduler

import org.apache.spark.scheduler.cluster.Pool
import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode

/**
* Low-level task scheduler interface, implemented by both ClusterScheduler and LocalScheduler.
* Each TaskScheduler schedulers task for a single SparkContext.
* These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage,
* and are responsible for sending the tasks to the cluster, running them, retrying if there
* are failures, and mitigating stragglers. They return events to the DAGScheduler through
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.scheduler

import org.apache.spark.scheduler.cluster.TaskInfo
import scala.collection.mutable.Map

import org.apache.spark.TaskEndReason
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster
package org.apache.spark.scheduler

import java.nio.ByteBuffer

import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.TaskSet

/**
* Tracks and schedules the tasks within a single TaskSet. This class keeps track of the status of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.mutable.HashSet
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicLong
import java.util.{TimerTask, Timer}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,12 @@ import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import scala.math.max
import scala.math.min
import scala.Some

import org.apache.spark.{FetchFailed, Logging, Resubmitted, SparkEnv, Success, TaskEndReason, TaskState}
import org.apache.spark.{ExceptionFailure, SparkException, TaskResultTooBigFailure}
import org.apache.spark.{ExceptionFailure, FetchFailed, Logging, Resubmitted, SparkEnv,
SparkException, Success, TaskEndReason, TaskResultTooBigFailure, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler._
import scala.Some
import org.apache.spark.FetchFailed
import org.apache.spark.ExceptionFailure
import org.apache.spark.TaskResultTooBigFailure
import org.apache.spark.util.{SystemClock, Clock}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.scheduler.cluster
import java.nio.ByteBuffer

import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.util.{Utils, SerializableBuffer}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import akka.pattern.ask
import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}

import org.apache.spark.{SparkException, Logging, TaskState}
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._
import org.apache.spark.util.Utils

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,22 @@
* limitations under the License.
*/

package org.apache.spark.scheduler.mesos
package org.apache.spark.scheduler.cluster.mesos

import com.google.protobuf.ByteString
import java.io.File
import java.util.{ArrayList => JArrayList, List => JList}
import java.util.Collections

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.JavaConversions._

import com.google.protobuf.ByteString
import org.apache.mesos.{Scheduler => MScheduler}
import org.apache.mesos._
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}

import org.apache.spark.{SparkException, Logging, SparkContext}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.JavaConversions._
import java.io.File
import org.apache.spark.scheduler.cluster._
import java.util.{ArrayList => JArrayList, List => JList}
import java.util.Collections
import org.apache.spark.TaskState
import org.apache.spark.{SparkException, Logging, SparkContext, TaskState}
import org.apache.spark.scheduler.cluster.{ClusterScheduler, StandaloneSchedulerBackend}

/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,24 @@
* limitations under the License.
*/

package org.apache.spark.scheduler.mesos
package org.apache.spark.scheduler.cluster.mesos

import com.google.protobuf.ByteString
import java.io.File
import java.util.{ArrayList => JArrayList, List => JList}
import java.util.Collections

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.JavaConversions._

import com.google.protobuf.ByteString
import org.apache.mesos.{Scheduler => MScheduler}
import org.apache.mesos._
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}

import org.apache.spark.{SparkException, Logging, SparkContext}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.JavaConversions._
import java.io.File
import org.apache.spark.scheduler.cluster._
import java.util.{ArrayList => JArrayList, List => JList}
import java.util.Collections
import org.apache.spark.TaskState
import org.apache.spark.{Logging, SparkException, SparkContext, TaskState}
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.{ClusterScheduler, ExecutorExited, ExecutorLossReason}
import org.apache.spark.scheduler.cluster.{SchedulerBackend, SlaveLost, WorkerOffer}
import org.apache.spark.util.Utils

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.ExecutorURLClassLoader
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster._
import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import akka.actor._
import org.apache.spark.util.Utils

Expand Down
Loading

0 comments on commit 5829692

Please sign in to comment.