Skip to content

Commit c692662

Browse files
committed
add topk mapreduce
1 parent 9175ced commit c692662

File tree

3 files changed

+192
-0
lines changed

3 files changed

+192
-0
lines changed

zh-hans/SUMMARY.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,8 @@
218218
* [Longest Words](data_structure/longest_words.md)
219219
* [Heapify](data_structure/heapify.md)
220220
* [Kth Smallest Number in Sorted Matrix](data_structure/kth_smallest_number_in_sorted_matrix.md)
221+
* [Big Data](bigdata/README.md)
222+
* [Top K Frequent Words (Map Reduce)](bigdata/top_k_frequent_words_map_reduce.md)
221223
* [Problem Misc](problem_misc/README.md)
222224
* [Nuts and Bolts Problem](problem_misc/nuts_and_bolts_problem.md)
223225
* [String to Integer](problem_misc/string_to_integer.md)

zh-hans/bigdata/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
本小节主要总结 bigdata 相关的题,涉及 MapReduce, Top K 等经典问题。
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
---
2+
difficulty: Medium
3+
tags:
4+
- Big Data
5+
- Map Reduce
6+
- EditorsChoice
7+
title: Top K Frequent Words (Map Reduce)
8+
---
9+
10+
# Top K Frequent Words (Map Reduce)
11+
12+
## Problem
13+
14+
### Metadata
15+
16+
- tags: Big Data, Map Reduce, EditorsChoice
17+
- difficulty: Medium
18+
- source(lintcode): <https://www.lintcode.com/problem/top-k-frequent-words-map-reduce/>
19+
20+
### Description
21+
22+
Find top k frequent words with map reduce framework.
23+
24+
The mapper's key is the document id, value is the content of the document, words in a document are split by spaces.
25+
26+
For reducer, the output should be at most k key-value pairs, which are the top k words and their frequencies in this reducer. The judge will take care about how to merge different reducers' results to get the global top k frequent words, so you don't need to care about that part.
27+
28+
The *k* is given in the constructor of TopK class.
29+
30+
#### Notice
31+
32+
For the words with same frequency, rank them with alphabet.
33+
34+
#### Example
35+
36+
Given document A =
37+
```
38+
lintcode is the best online judge
39+
I love lintcode
40+
```
41+
and document B =
42+
```
43+
lintcode is an online judge for coding interview
44+
you can test your code online at lintcode
45+
```
46+
47+
The top 2 words and their frequencies should be
48+
```
49+
lintcode, 4
50+
online, 3
51+
```
52+
53+
## 题解
54+
55+
使用 Map Reduce 来做 Top K, 相比传统的 Top K 多了 Map 和 Reduce 这两大步骤。Map Reduce 模型实际上是在处理分布式问题时总结出的抽象模型,主要分为 Map 和 Reduce 两个阶段。
56+
57+
- Map 阶段:数据分片,每个分片由一个 Map task 处理,不进行分片则无法分布式处理
58+
- Reduce 阶段:并行对前一阶段的结果进行规约处理并得到最终最终结果
59+
60+
实际的 MapReduce 编程模型可由以下5个分布式步骤组成:
61+
62+
1. 将输入数据解析为 `<key, value>`
63+
2. 将输入的 `<key, value>` map 为另一种 `<key, value>`
64+
3. 根据 key 对 map 阶段的数据分组
65+
4. 对上一阶段的分组数据进行规约(Reduce) 并生成新的 `<key, value>`
66+
5. 进一步处理 Reduce 阶段的数据并进行持久化
67+
68+
根据题意,我们只需要实现 Map, Reduce 这两个步骤即可,输出出现频率最高的 K 个单词并对相同频率的单词按照字典序排列。如果我们使用大根堆维护,那么我们可以在输出结果时依次移除根节点即可。这种方法虽然可行,但不可避免会产生不少空间浪费,理想情况下,我们仅需要维护 K 个大小的堆即可。所以接下来的问题便是我们怎么更好地维护这种 K 大小的堆,并且在新增元素时剔除的是最末尾(最小)的节点。
69+
70+
### Java
71+
72+
```java
73+
/**
74+
* Definition of OutputCollector:
75+
* class OutputCollector<K, V> {
76+
* public void collect(K key, V value);
77+
* // Adds a key/value pair to the output buffer
78+
* }
79+
* Definition of Document:
80+
* class Document {
81+
* public int id;
82+
* public String content;
83+
* }
84+
*/
85+
86+
class KeyFreq implements Comparable<KeyFreq> {
87+
public String key = null;
88+
public int freq = 0;
89+
90+
public KeyFreq(String key, int freq) {
91+
this.key = key;
92+
this.freq = freq;
93+
}
94+
95+
@Override
96+
public int compareTo(KeyFreq kf) {
97+
if (kf.freq != this.freq) {
98+
return this.freq - kf.freq;
99+
}
100+
101+
// keep small alphabet
102+
return kf.key.compareTo(this.key);
103+
}
104+
}
105+
106+
public class TopKFrequentWords {
107+
108+
public static class Map {
109+
public void map(String _, Document value,
110+
OutputCollector<String, Integer> output) {
111+
// Write your code here
112+
// Output the results into output buffer.
113+
// Ps. output.collect(String key, int value);
114+
if (value == null || value.content == null) return;
115+
116+
String[] splits = value.content.split(" ");
117+
for (String split : splits) {
118+
if (split.length() > 0) {
119+
output.collect(split, 1);
120+
}
121+
}
122+
}
123+
}
124+
125+
public static class Reduce {
126+
127+
private int k = 0;
128+
private PriorityQueue<KeyFreq> pq = null;
129+
130+
public void setup(int k) {
131+
// initialize your data structure here
132+
this.k = k;
133+
pq = new PriorityQueue<KeyFreq>(k);
134+
}
135+
136+
public void reduce(String key, Iterator<Integer> values) {
137+
int sum = 0;
138+
while (values.hasNext()) {
139+
int value = values.next();
140+
sum += value;
141+
}
142+
143+
KeyFreq kf = new KeyFreq(key, sum);
144+
145+
if (pq.size() < k) {
146+
pq.offer(kf);
147+
} else {
148+
KeyFreq peekKf = pq.peek();
149+
if (peekKf.compareTo(kf) <= 0) {
150+
pq.poll();
151+
pq.offer(kf);
152+
}
153+
}
154+
}
155+
156+
public void cleanup(OutputCollector<String, Integer> output) {
157+
// Output the top k pairs <word, times> into output buffer.
158+
// Ps. output.collect(String key, Integer value);
159+
160+
List<KeyFreq> kfList = new ArrayList<KeyFreq>(k);
161+
for (int i = 0; i < k && (!pq.isEmpty()); i++) {
162+
kfList.add(pq.poll());
163+
}
164+
165+
// get max k from min-heapqueue
166+
int kfLen = kfList.size();
167+
for (int i = 0; i < kfLen; i++) {
168+
KeyFreq kf = kfList.get(kfLen - i - 1);
169+
output.collect(kf.key, kf.freq);
170+
}
171+
}
172+
}
173+
}
174+
```
175+
176+
### 源码分析
177+
178+
使用 Java 自带的 PriorityQueue 来实现堆,由于需要定制大小比较,所以这里自定义类中实现了 `Comparable``compareTo` 接口,另外需要注意的是这里原生使用了小根堆,所以我们在覆写 `compareTo` 时需要注意字符串的比较,相同频率的按照字典序排序,即优先保留字典序较小的字符串,所以正好和 freq 的比较相反。最后再输出答案时,由于是小根堆,所以还需要再转置一次。此题的 Java 实现中,使用的 PriorityQueue 并非线程安全,实际使用中需要注意是否需要用到线程安全的 PriorityBlockingQueue
179+
180+
对于 Java, 虽然标准库中暂未有定长的 PriorityQueue 实现,但是我们常用的 Google guava 库中其实已有类似实现,见 [MinMaxPriorityQueue](https://google.github.io/guava/releases/snapshot/api/docs/com/google/common/collect/MinMaxPriorityQueue.html) 不必再自己造轮子了。
181+
182+
### 复杂度分析
183+
184+
堆的插入删除操作,定长为 K, n 个元素,故时间复杂度约 $$O(n \log K)$$, 空间复杂度为 $$O(n)$$.
185+
186+
## Reference
187+
188+
- 《大数据技术体系详解》——董西成,MapReduce 编程模型
189+
- [九章算法 - topk-mapreduce](https://www.jiuzhang.com/solution/top-k-frequent-words-map-reduce/)

0 commit comments

Comments
 (0)