Skip to content

Commit

Permalink
SPARK-554. Add aggregateByKey.
Browse files Browse the repository at this point in the history
Author: Sandy Ryza <[email protected]>

Closes apache#705 from sryza/sandy-spark-554 and squashes the following commits:

2302b8f [Sandy Ryza] Add MIMA exclude
f52e0ad [Sandy Ryza] Fix Python tests for real
2f3afa3 [Sandy Ryza] Fix Python test
0b735e9 [Sandy Ryza] Fix line lengths
ae56746 [Sandy Ryza] Fix doc (replace T with V)
c2be415 [Sandy Ryza] Java and Python aggregateByKey
23bf400 [Sandy Ryza] SPARK-554.  Add aggregateByKey.
  • Loading branch information
sryza authored and pwendell committed Jun 12, 2014
1 parent 43d53d5 commit ce92a9c
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 2 deletions.
44 changes: 44 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,50 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
: PartialResult[java.util.Map[K, BoundedDouble]] =
rdd.countByKeyApprox(timeout, confidence).map(mapAsJavaMap)

/**
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
* This function can return a different result type, U, than the type of the values in this RDD,
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
* as in scala.TraversableOnce. The former operation is used for merging values within a
* partition, and the latter is used for merging values between partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return their first argument
* instead of creating a new U.
*/
def aggregateByKey[U](zeroValue: U, partitioner: Partitioner, seqFunc: JFunction2[U, V, U],
combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = {
implicit val ctag: ClassTag[U] = fakeClassTag
fromRDD(rdd.aggregateByKey(zeroValue, partitioner)(seqFunc, combFunc))
}

/**
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
* This function can return a different result type, U, than the type of the values in this RDD,
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
* as in scala.TraversableOnce. The former operation is used for merging values within a
* partition, and the latter is used for merging values between partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return their first argument
* instead of creating a new U.
*/
def aggregateByKey[U](zeroValue: U, numPartitions: Int, seqFunc: JFunction2[U, V, U],
combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = {
implicit val ctag: ClassTag[U] = fakeClassTag
fromRDD(rdd.aggregateByKey(zeroValue, numPartitions)(seqFunc, combFunc))
}

/**
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
* This function can return a different result type, U, than the type of the values in this RDD,
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's.
* The former operation is used for merging values within a partition, and the latter is used for
* merging values between partitions. To avoid memory allocation, both of these functions are
* allowed to modify and return their first argument instead of creating a new U.
*/
def aggregateByKey[U](zeroValue: U, seqFunc: JFunction2[U, V, U], combFunc: JFunction2[U, U, U]):
JavaPairRDD[K, U] = {
implicit val ctag: ClassTag[U] = fakeClassTag
fromRDD(rdd.aggregateByKey(zeroValue)(seqFunc, combFunc))
}

/**
* Merge the values for each key using an associative function and a neutral "zero value" which
* may be added to the result an arbitrary number of times, and must not change the result
Expand Down
50 changes: 50 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,56 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
}

/**
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
* This function can return a different result type, U, than the type of the values in this RDD,
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
* as in scala.TraversableOnce. The former operation is used for merging values within a
* partition, and the latter is used for merging values between partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return their first argument
* instead of creating a new U.
*/
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] = {
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
val zeroBuffer = SparkEnv.get.closureSerializer.newInstance().serialize(zeroValue)
val zeroArray = new Array[Byte](zeroBuffer.limit)
zeroBuffer.get(zeroArray)

lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance()
def createZero() = cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))

combineByKey[U]((v: V) => seqOp(createZero(), v), seqOp, combOp, partitioner)
}

/**
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
* This function can return a different result type, U, than the type of the values in this RDD,
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
* as in scala.TraversableOnce. The former operation is used for merging values within a
* partition, and the latter is used for merging values between partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return their first argument
* instead of creating a new U.
*/
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] = {
aggregateByKey(zeroValue, new HashPartitioner(numPartitions))(seqOp, combOp)
}

/**
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
* This function can return a different result type, U, than the type of the values in this RDD,
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
* as in scala.TraversableOnce. The former operation is used for merging values within a
* partition, and the latter is used for merging values between partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return their first argument
* instead of creating a new U.
*/
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] = {
aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
}

/**
* Merge the values for each key using an associative function and a neutral "zero value" which
* may be added to the result an arbitrary number of times, and must not change the result
Expand Down
31 changes: 31 additions & 0 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,37 @@ public Integer call(Integer a, Integer b) {
Assert.assertEquals(33, sum);
}

@Test
public void aggregateByKey() {
JavaPairRDD<Integer, Integer> pairs = sc.parallelizePairs(
Arrays.asList(
new Tuple2<Integer, Integer>(1, 1),
new Tuple2<Integer, Integer>(1, 1),
new Tuple2<Integer, Integer>(3, 2),
new Tuple2<Integer, Integer>(5, 1),
new Tuple2<Integer, Integer>(5, 3)), 2);

Map<Integer, Set<Integer>> sets = pairs.aggregateByKey(new HashSet<Integer>(),
new Function2<Set<Integer>, Integer, Set<Integer>>() {
@Override
public Set<Integer> call(Set<Integer> a, Integer b) {
a.add(b);
return a;
}
},
new Function2<Set<Integer>, Set<Integer>, Set<Integer>>() {
@Override
public Set<Integer> call(Set<Integer> a, Set<Integer> b) {
a.addAll(b);
return a;
}
}).collectAsMap();
Assert.assertEquals(3, sets.size());
Assert.assertEquals(new HashSet<Integer>(Arrays.asList(1)), sets.get(1));
Assert.assertEquals(new HashSet<Integer>(Arrays.asList(2)), sets.get(3));
Assert.assertEquals(new HashSet<Integer>(Arrays.asList(1, 3)), sets.get(5));
}

@SuppressWarnings("unchecked")
@Test
public void foldByKey() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,19 @@ import org.apache.spark.SparkContext._
import org.apache.spark.{Partitioner, SharedSparkContext}

class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
test("aggregateByKey") {
val pairs = sc.parallelize(Array((1, 1), (1, 1), (3, 2), (5, 1), (5, 3)), 2)

val sets = pairs.aggregateByKey(new HashSet[Int]())(_ += _, _ ++= _).collect()
assert(sets.size === 3)
val valuesFor1 = sets.find(_._1 == 1).get._2
assert(valuesFor1.toList.sorted === List(1))
val valuesFor3 = sets.find(_._1 == 3).get._2
assert(valuesFor3.toList.sorted === List(2))
val valuesFor5 = sets.find(_._1 == 5).get._2
assert(valuesFor5.toList.sorted === List(1, 3))
}

test("groupByKey") {
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
val groups = pairs.groupByKey().collect()
Expand Down
4 changes: 4 additions & 0 deletions docs/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,10 @@ for details.
<td> <b>reduceByKey</b>(<i>func</i>, [<i>numTasks</i>]) </td>
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td>
</tr>
<tr>
<td> <b>aggregateByKey</b>(<i>zeroValue</i>)(<i>seqOp</i>, <i>combOp</i>, [<i>numTasks</i>]) </td>
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td>
</tr>
<tr>
<td> <b>sortByKey</b>([<i>ascending</i>], [<i>numTasks</i>]) </td>
<td> When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean <code>ascending</code> argument.</td>
Expand Down
5 changes: 4 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ object MimaExcludes {
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.api.java.JavaRDDLike.countApproxDistinct$default$1"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1")
"org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$"
+ "createZero$1")
) ++
Seq( // Ignore some private methods in ALS.
ProblemFilters.exclude[MissingMethodProblem](
Expand Down
19 changes: 18 additions & 1 deletion python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,20 @@ def _mergeCombiners(iterator):
combiners[k] = mergeCombiners(combiners[k], v)
return combiners.iteritems()
return shuffled.mapPartitions(_mergeCombiners)

def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None):
"""
Aggregate the values of each key, using given combine functions and a neutral "zero value".
This function can return a different result type, U, than the type of the values in this RDD,
V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
The former operation is used for merging values within a partition, and the latter is used
for merging values between partitions. To avoid memory allocation, both of these functions are
allowed to modify and return their first argument instead of creating a new U.
"""
def createZero():
return copy.deepcopy(zeroValue)

return self.combineByKey(lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions)

def foldByKey(self, zeroValue, func, numPartitions=None):
"""
Expand All @@ -1190,7 +1204,10 @@ def foldByKey(self, zeroValue, func, numPartitions=None):
>>> rdd.foldByKey(0, add).collect()
[('a', 2), ('b', 1)]
"""
return self.combineByKey(lambda v: func(zeroValue, v), func, func, numPartitions)
def createZero():
return copy.deepcopy(zeroValue)

return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions)


# TODO: support variant with custom partitioner
Expand Down
15 changes: 15 additions & 0 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,21 @@ def test_deleting_input_files(self):
os.unlink(tempFile.name)
self.assertRaises(Exception, lambda: filtered_data.count())

def testAggregateByKey(self):
data = self.sc.parallelize([(1, 1), (1, 1), (3, 2), (5, 1), (5, 3)], 2)
def seqOp(x, y):
x.add(y)
return x

def combOp(x, y):
x |= y
return x

sets = dict(data.aggregateByKey(set(), seqOp, combOp).collect())
self.assertEqual(3, len(sets))
self.assertEqual(set([1]), sets[1])
self.assertEqual(set([2]), sets[3])
self.assertEqual(set([1, 3]), sets[5])

class TestIO(PySparkTestCase):

Expand Down

0 comments on commit ce92a9c

Please sign in to comment.