Skip to content

Commit

Permalink
SPARK-1162 Added top in python.
Browse files Browse the repository at this point in the history
Author: Prashant Sharma <[email protected]>

Closes apache#93 from ScrapCodes/SPARK-1162/pyspark-top-takeOrdered and squashes the following commits:

ece1fa4 [Prashant Sharma] Added top in python.
  • Loading branch information
ScrapCodes authored and mateiz committed Mar 12, 2014
1 parent 5d1ec64 commit b8afe30
Showing 1 changed file with 25 additions and 0 deletions.
25 changes: 25 additions & 0 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from tempfile import NamedTemporaryFile
from threading import Thread
import warnings
from heapq import heappush, heappop, heappushpop

from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long
Expand Down Expand Up @@ -660,6 +661,30 @@ def mergeMaps(m1, m2):
m1[k] += v
return m1
return self.mapPartitions(countPartition).reduce(mergeMaps)

def top(self, num):
"""
Get the top N elements from a RDD.
Note: It returns the list sorted in ascending order.
>>> sc.parallelize([10, 4, 2, 12, 3]).top(1)
[12]
>>> sc.parallelize([2, 3, 4, 5, 6]).cache().top(2)
[5, 6]
"""
def topIterator(iterator):
q = []
for k in iterator:
if len(q) < num:
heappush(q, k)
else:
heappushpop(q, k)
yield q

def merge(a, b):
return next(topIterator(a + b))

return sorted(self.mapPartitions(topIterator).reduce(merge))

def take(self, num):
"""
Expand Down

0 comments on commit b8afe30

Please sign in to comment.