Skip to content

Commit

Permalink
Merge pull request gorillalabs#33 from retnuh/wrap_some_zip_fns
Browse files Browse the repository at this point in the history
wrap RDD fns zip-with-index and zip-with-unique-id
  • Loading branch information
chrisbetz committed Jan 29, 2016
2 parents da1f291 + f771927 commit fbfdd6d
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 1 deletion.
2 changes: 2 additions & 0 deletions articles/getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ Sparkling supports the following RDD transformations:
* `group-by`: returns an RDD of items grouped by the return value of a function.
* `group-by-key`: groups the values for each key in an RDD into a single sequence.
* `flat-map-to-pair`: returns a new `JavaPairRDD` by first applying a function to all elements of the RDD, and then flattening the results.
* `zip-with-index`: when called on and RDD of type T, returns a new `JavaPairRDD` of (T, index) pairs where the index is a `long` element index, starting from 0.
* `zip-with-unique-id`: when called on and RDD of type T, returns a new `JavaPairRDD` of (T, id) pairs where the id is a unique `long`. Items in the kth partition will get ids k, n+k, 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method won't trigger a spark job.



Expand Down
6 changes: 6 additions & 0 deletions src/clojure/sparkling/api.clj
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,12 @@
([rdd n shuffle?]
(sc/coalesce n shuffle? rdd)))

(def zip-with-index
sc/zip-with-index)

(def zip-with-unique-id
sc/zip-with-index)

(defn repartition
"Returns a new `rdd` with exactly `n` partitions."
[rdd n]
Expand Down
12 changes: 12 additions & 0 deletions src/clojure/sparkling/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,18 @@
(.coalesce rdd (min n (count-partitions rdd)) shuffle?) n shuffle?)))


(defn zip-with-index
"Zips this RDD with its element indices, creating an RDD of tuples of (item, index)"
[rdd]
(u/set-auto-name (.zipWithIndex rdd)))


(defn zip-with-unique-id
"Zips this RDD with generated unique Long ids, creating an RDD of tuples of (item, uniqueId)"
[rdd]
(u/set-auto-name (.zipWithUniqueId rdd)))


;; functions on multiple rdds

(defn cogroup
Expand Down
21 changes: 20 additions & 1 deletion test/sparkling/api_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
(let [conf (-> (conf/spark-conf)
(conf/set-sparkling-registrator)
(conf/set "spark.kryo.registrationRequired" "true")
(conf/master "local[*]")
(conf/master "local[4]")
(conf/app-name "api-test"))]
(s/with-context c conf
(testing
Expand Down Expand Up @@ -433,6 +433,25 @@
vec)
[#sparkling/tuple[2 5]]))))

(testing
"zip-with-index returns an RDD of (T, index) pairs"
(is (equals-ignore-order? (-> (s/parallelize c ["a" "b" "c" "d"])
s/zip-with-index
(s/map (sd/key-value-fn identity-vec))
s/collect
vec) [["a" 0] ["b" 1] ["c" 2] ["d" 3]])))

(testing
"zip-with-unique-id returns an RDD of (T, unique-id) pairs"
(is (equals-ignore-order? (-> (s/parallelize c ["a" "b" "c" "d" "e" "f" "g"])
(s/map-partition-with-index
(fn [ind it]
(.iterator (map identity-vec (iterator-seq it) (repeat ind)))))
s/zip-with-unique-id
(s/map (sd/key-value-fn identity-vec))
s/collect
vec) [[["a" 0] 0] [["b" 1] 1] [["c" 1] 2] [["d" 2] 3] [["e" 2] 4]
[["f" 3] 5] [["g" 3] 6]])))

;TODO: (future-fact "repartition returns a new RDD with exactly n partitions")

Expand Down

0 comments on commit fbfdd6d

Please sign in to comment.