Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fix the typo in the figure in Overview
  • Loading branch information
JerryLead committed Oct 12, 2014
2 parents f2618ab + bf3e4cc commit 505d139
Showing 1 changed file with 5 additions and 3 deletions.
8 changes: 5 additions & 3 deletions markdown/4-shuffleDetails.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

**从 high-level 的角度来看,两者并没有大的差别。** 都是将 mapper(Spark 里是 ShuffleMapTask)的输出进行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一个 stage 里的 ShuffleMapTask,也可能是 ResultTask)。Reducer 以内存作缓冲区,边 shuffle 边 aggregate 数据,等到数据 aggregate 好以后进行 reduce() (Spark 里可能是后续的一系列操作)。

**从 low-level 的角度来看,两者差别不小。** Hadoop MapReduce 是 sort-based,进入 combine() 和 reduce() 的 records 必须先 sort。这样的好处在于 combine/reduce() 可以处理大规模的数据,因为其输入数据可以通过**外排**得到(mapper 对每段数据先做排序,reducer 的 shuffle 对排好序的每段数据做归并)。目前的 Spark hash-based,通常使用 HashMap 来对 shuffle 来的数据进行 aggregate,不会对数据进行提前排序。如果用户需要经过排序的数据,那么需要自己调用类似 sortByKey() 的操作。
**从 low-level 的角度来看,两者差别不小。** Hadoop MapReduce 是 sort-based,进入 combine() 和 reduce() 的 records 必须先 sort。这样的好处在于 combine/reduce() 可以处理大规模的数据,因为其输入数据可以通过**外排**得到(mapper 对每段数据先做排序,reducer 的 shuffle 对排好序的每段数据做归并)。目前的 Spark 默认选择的是 hash-based,通常使用 HashMap 来对 shuffle 来的数据进行 aggregate,不会对数据进行提前排序。如果用户需要经过排序的数据,那么需要自己调用类似 sortByKey() 的操作;如果你是Spark 1.1的用户,可以将spark.shuffle.manager设置为sort,则会对数据进行排序。在Spark 1.2中,sort将作为默认的Shuffle实现

**从实现角度来看,两者也有不少差别。** Hadoop MapReduce 将处理流程划分出明显的几个阶段:map(), spill, merge, shuffle, sort, reduce() 等。每个阶段各司其职,可以按照过程式的编程思想来逐一实现每个阶段的功能。在 Spark 中,没有这样功能明确的阶段,只有不同的 stage 和一系列的 transformation(),所以 spill, merge, aggregate 等操作需要蕴含在 transformation() 中。

Expand Down Expand Up @@ -59,14 +59,16 @@ ShuffleMapTask 的执行过程很简单:先利用 pipeline 计算得到 finalR
// MapReduce
reduce(K key, Iterable<V> values) {
result = process(key, values)
return result }
return result
}

// Spark
reduce(K key, Iterable<V> values) {
result = null
for (V value : values)
result = func(result, value)
return result }
return result
}
```
MapReduce 可以在 process 函数里面可以定义任何数据结构,也可以将部分或全部的 values 都 cache 后再进行处理,非常灵活。而 Spark 中的 func 的输入参数是固定的,一个是上一个 record 的处理结果,另一个是当前读入的 record,它们经过 func 处理后的结果被下一个 record 处理时使用。因此一些算法比如求平均数,在 process 里面很好实现,直接`sum(values)/values.length`,而在 Spark 中 func 可以实现`sum(values)`,但不好实现`/values.length`。更多的 func 将会在下面的章节细致分析。
- **fetch 来的数据存放到哪里?**刚 fetch 来的 FileSegment 存放在 softBuffer 缓冲区,经过处理后的数据放在内存 + 磁盘上。这里我们主要讨论处理后的数据,可以灵活设置这些数据是“只用内存”还是“内存+磁盘”。如果`spark.shuffle.spill = false`就只用内存。内存使用的是`AppendOnlyMap` ,类似 Java 的`HashMap`,内存+磁盘使用的是`ExternalAppendOnlyMap`,如果内存空间不足时,`ExternalAppendOnlyMap`可以将 \<K, V\> records 进行 sort 后 spill 到磁盘上,等到需要它们的时候再进行归并,后面会详解。**使用“内存+磁盘”的一个主要问题就是如何在两者之间取得平衡?**在 Hadoop MapReduce 中,默认将 reducer 的 70% 的内存空间用于存放 shuffle 来的数据,等到这个空间利用率达到 66% 的时候就开始 merge-combine()-spill。在 Spark 中,也适用同样的策略,一旦 ExternalAppendOnlyMap 达到一个阈值就开始 spill,具体细节下面会讨论。
Expand Down

0 comments on commit 505d139

Please sign in to comment.