Skip to content

Commit

Permalink
Merge pull request amplab-extras#246 from hlin09/fixCombineByKey
Browse files Browse the repository at this point in the history
Fixes combineByKey
  • Loading branch information
shivaram committed Apr 9, 2015
2 parents 2167eec + 3ced898 commit adbe25d
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
4 changes: 2 additions & 2 deletions pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ setMethod("combineByKey",
pred <- function(item) exists(item$hash, keys)
lapply(part,
function(item) {
item$hash <- as.character(item[[1]])
item$hash <- as.character(hashCode(item[[1]]))
updateOrCreatePair(item, keys, combiners, pred, mergeValue, createCombiner)
})
convertEnvsToList(keys, combiners)
Expand All @@ -425,7 +425,7 @@ setMethod("combineByKey",
pred <- function(item) exists(item$hash, keys)
lapply(part,
function(item) {
item$hash <- as.character(item[[1]])
item$hash <- as.character(hashCode(item[[1]]))
updateOrCreatePair(item, keys, combiners, pred, mergeCombiners, identity)
})
convertEnvsToList(keys, combiners)
Expand Down
12 changes: 12 additions & 0 deletions pkg/inst/tests/test_shuffle.R
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ test_that("combineByKey for doubles", {
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
})

test_that("combineByKey for characters", {
stringKeyRDD <- parallelize(sc,
list(list("max", 1L), list("min", 2L),
list("other", 3L), list("max", 4L)), 2L)
reduced <- combineByKey(stringKeyRDD,
function(x) { x }, "+", "+", 2L)
actual <- collect(reduced)

expected <- list(list("max", 5L), list("min", 2L), list("other", 3L))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
})

test_that("aggregateByKey", {
# test aggregateByKey for int keys
rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
Expand Down

0 comments on commit adbe25d

Please sign in to comment.