Skip to content

Commit

Permalink
spark.shuffle.file.buffer.kb=32KB
Browse files Browse the repository at this point in the history
  • Loading branch information
JerryLead committed Sep 15, 2014
1 parent a2da610 commit ffdf01a
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 3 deletions.
Binary file modified .DS_Store
Binary file not shown.
Binary file modified markdown/.DS_Store
Binary file not shown.
1 change: 1 addition & 0 deletions markdown/0-Introduction.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ Doc Version: 1.0.2.0
| [@zzl0](https://github.com/zzl0) | 前四章 | 很多 typos,比如 “groupByKey 产生了后面三个 RDD”,应该是两个。详见 [pull request](https://github.com/JerryLead/SparkInternals/pull/3/files)| 已经全部修改 |
| [@左手牵右手TEL](http://weibo.com/w397090770) | Cache 和 Broadcast 两章 | 很多 typos | 已经全部修改 |
| [@cloud-fan](https://github.com/cloud-fan) | JobLogicalPlan | Cogroup() 图中的某些剪头应该是红色的 | 已经全部修改 |
| [@CrazyJvm](http://weibo.com/476691290) | Shuffle details | 从 Spark1.1开始spark.shuffle.file.buffer.kb的默认值为32k,而不是100k | 已经全部修改 |

特别感谢 [@明风Andy](http://weibo.com/mingfengandy) 同学给予的大力支持。

Expand Down
6 changes: 3 additions & 3 deletions markdown/4-shuffleDetails.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ shuffle write 的任务很简单,那么实现也很简单:将 shuffle write

![shuffle-write-no-consolidation](PNGfigures/shuffle-write-no-consolidation.png)

上图有 4 个 ShuffleMapTask 要在同一个 worker node 上运行,CPU core 数为 2,可以同时运行两个 task。每个 task 的执行结果(该 stage 的 finalRDD 中某个 partition 包含的 records)被逐一写到本地磁盘上。每个 task 包含 R 个缓冲区,R = reducer 个数(也就是下一个 stage 中 task 的个数),缓冲区被称为 bucket,其大小为`spark.shuffle.file.buffer.kb` ,默认是 100KB。
上图有 4 个 ShuffleMapTask 要在同一个 worker node 上运行,CPU core 数为 2,可以同时运行两个 task。每个 task 的执行结果(该 stage 的 finalRDD 中某个 partition 包含的 records)被逐一写到本地磁盘上。每个 task 包含 R 个缓冲区,R = reducer 个数(也就是下一个 stage 中 task 的个数),缓冲区被称为 bucket,其大小为`spark.shuffle.file.buffer.kb` ,默认是 32KB(Spark 1.1 版本以前是 100KB

> 其实 bucket 是一个广义的概念,代表 ShuffleMapTask 输出结果经过 partition 后要存放的地方,这里为了细化数据存放位置和数据名称,仅仅用 bucket 表示缓冲区。
Expand All @@ -30,7 +30,7 @@ ShuffleMapTask 的执行过程很简单:先利用 pipeline 计算得到 finalR
这样的实现很简单,但有几个问题:

1. **产生的 FileSegment 过多。**每个 ShuffleMapTask 产生 R(reducer 个数)个 FileSegment,M 个 ShuffleMapTask 就会产生 M * R 个文件。一般 Spark job 的 M 和 R 都很大,因此磁盘上会存在大量的数据文件。
2. **缓冲区占用内存空间大。**每个 ShuffleMapTask 需要开 R 个 bucket,M 个 ShuffleMapTask 就会产生 M * R 个 bucket。虽然一个 ShuffleMapTask 结束后,对应的缓冲区可以被回收,但一个 worker node 上同时存在的 bucket 个数可以达到 cores * R 个(一般 worker 同时可以运行 cores 个 ShuffleMapTask),占用的内存空间也就达到了`cores * R * 100KB`。对于 8 核 1000 个 reducer 来说,占用内存就是 800MB
2. **缓冲区占用内存空间大。**每个 ShuffleMapTask 需要开 R 个 bucket,M 个 ShuffleMapTask 就会产生 M * R 个 bucket。虽然一个 ShuffleMapTask 结束后,对应的缓冲区可以被回收,但一个 worker node 上同时存在的 bucket 个数可以达到 cores * R 个(一般 worker 同时可以运行 cores 个 ShuffleMapTask),占用的内存空间也就达到了`cores * R * 32 KB`。对于 8 核 1000 个 reducer 来说,占用内存就是 256MB

目前来看,第二个问题还没有好的方法解决,因为写磁盘终究是要开缓冲区的,缓冲区太小会影响 IO 速度。但第一个问题有一些方法去解决,下面介绍已经在 Spark 里面实现的 FileConsolidation 方法。先上图:

Expand Down Expand Up @@ -177,7 +177,7 @@ ExternalAppendOnlyMap 持有一个 AppendOnlyMap,shuffle 来的一个个 (K, V

- Spill 过程

与 shuffle write 一样,在 spill records 到磁盘上的时候,会建立一个 buffer 缓冲区,大小仍为 `spark.shuffle.file.buffer.kb` ,默认是 100KB。另外,由于 serializer 也会分配缓冲区用于序列化和反序列化,所以如果一次 serialize 的 records 过多的话缓冲区会变得很大。Spark 限制每次 serialize 的 records 个数为 `spark.shuffle.spill.batchSize`,默认是 10000。
与 shuffle write 一样,在 spill records 到磁盘上的时候,会建立一个 buffer 缓冲区,大小仍为 `spark.shuffle.file.buffer.kb` ,默认是 32KB。另外,由于 serializer 也会分配缓冲区用于序列化和反序列化,所以如果一次 serialize 的 records 过多的话缓冲区会变得很大。Spark 限制每次 serialize 的 records 个数为 `spark.shuffle.spill.batchSize`,默认是 10000。

## Discussion
通过本章的介绍可以发现,相比 MapReduce 固定的 shuffle-combine-merge-reduce 策略,Spark 更加灵活,会根据不同的 transformation() 的语义去设计不同的 shuffle-aggregate 策略,再加上不同的内存数据结构来混搭出合理的执行流程。
Expand Down
1 change: 1 addition & 0 deletions markdown/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ Doc Version: 1.0.2.0
| [@zzl0](https://github.com/zzl0) | 前四章 | 很多 typos,比如 “groupByKey 产生了后面三个 RDD”,应该是两个。详见 [pull request](https://github.com/JerryLead/SparkInternals/pull/3/files)| 已经全部修改 |
| [@左手牵右手TEL](http://weibo.com/w397090770) | Cache 和 Broadcast 两章 | 很多 typos | 已经全部修改 |
| [@cloud-fan](https://github.com/cloud-fan) | JobLogicalPlan | Cogroup() 图中的某些剪头应该是红色的 | 已经全部修改 |
| [@CrazyJvm](http://weibo.com/476691290) | Shuffle details | 从 Spark1.1开始spark.shuffle.file.buffer.kb的默认值为32k,而不是100k | 已经全部修改 |

特别感谢 [@明风Andy](http://weibo.com/mingfengandy) 同学给予的大力支持。

Expand Down

0 comments on commit ffdf01a

Please sign in to comment.