Skip to content

Commit d6b58da

Browse files
author
Pedro Bernardo
committed
Added pairRdd/groupbykey/*.py
1 parent 244fe9b commit d6b58da

File tree

3 files changed

+59
-0
lines changed

3 files changed

+59
-0
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from pyspark import SparkContext
2+
3+
if __name__ == "__main__":
4+
5+
'''
6+
Create a Spark program to read the airport data from in/airports.text,
7+
output the the list of the names of the airports located in each country.
8+
9+
Each row of the input file contains the following columns:
10+
Airport ID, Name of airport, Main city served by airport, Country where airport is located, IATA/FAA code,
11+
ICAO Code, Latitude, Longitude, Altitude, Timezone, DST, Timezone in Olson format
12+
13+
Sample output:
14+
15+
"Canada", ["Bagotville", "Montreal", "Coronation", ...]
16+
"Norway" : ["Vigra", "Andenes", "Alta", "Bomoen", "Bronnoy",..]
17+
"Papua New Guinea", ["Goroka", "Madang", ...]
18+
...
19+
20+
'''
21+
22+
23+
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from pyspark import SparkContext
2+
from commons.Utils import Utils
3+
4+
if __name__ == "__main__":
5+
6+
sc = SparkContext("local", "airports")
7+
sc.setLogLevel("ERROR")
8+
9+
lines = sc.textFile("in/airports.text")
10+
11+
countryAndAirportNameAndPair = lines.map(lambda airport:\
12+
(Utils.COMMA_DELIMITER.split(airport)[3],
13+
Utils.COMMA_DELIMITER.split(airport)[1]))
14+
15+
airportsByCountry = countryAndAirportNameAndPair.groupByKey()
16+
17+
for country, airportName in airportsByCountry.collectAsMap().items():
18+
print("{}: {}".format(country,list(airportName)))
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from pyspark import SparkContext
2+
3+
if __name__ == "__main__":
4+
5+
sc = SparkContext("local", "GroupByKeyVsReduceByKey")
6+
sc.setLogLevel("ERROR")
7+
8+
words = ["one", "two", "two", "three", "three", "three"]
9+
wordsPairRdd = sc.parallelize(words).map(lambda word: (word, 1))
10+
11+
wordCountsWithReduceByKey = wordsPairRdd.reduceByKey(lambda x, y: x + y).collect()
12+
print("wordCountsWithReduceByKey: {}".format(list(wordCountsWithReduceByKey)))
13+
14+
wordCountsWithGroupByKey = wordsPairRdd \
15+
.groupByKey() \
16+
.mapValues(lambda intIterable: len(intIterable)) \
17+
.collect()
18+
print("wordCountsWithGroupByKey: {}".format(list(wordCountsWithGroupByKey)))

0 commit comments

Comments
 (0)