-
Notifications
You must be signed in to change notification settings - Fork 30
/
Copy pathIndex.java
165 lines (143 loc) · 6.71 KB
/
Index.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import java.io.IOException;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
/**
* Index Generator for Hadoop Map Reduce.
*
* Adapted from the {@link http://wiki.apache.org/hadoop/WordCount Hadoop wiki}.
*/
public class Index {
/** Mapper for Index.
*
* The base class Mapper is parameterized by
* <in key type, in value type, out key type, out value type>.
*
* Thus, this mapper takes (Text key, Text value) pairs and outputs
* (Text key, LongWritable value) pairs. The input keys are assumed
* to be identifiers for documents, which are ignored, and the values
* to be the content of documents. The output keys are words found
* within each document, and the output values are the number of times
* a word appeared within a document.
*
* To support efficient serialization (conversion of data to and from
* formats suitable for transport), Hadoop typically does not use the
* built-in Java classes like "String" and "Long" as key or value types. The
* wrappers Text and LongWritable implement Hadoop's serialization
* interface (called Writable) and, unlike Java's String and Long, are
* mutable.
*/
public static class IndexMap extends Mapper<Text, Text, Text, LongWritable> {
/** Regex pattern to find words (alphanumeric + _). */
final static Pattern WORD_PATTERN = Pattern.compile("\\w+");
/** Constant 1 as a LongWritable value. */
private final static LongWritable ONE = new LongWritable(1L);
/** Text object to store a word to write to output. */
private Text word = new Text();
/** Actual map function. Takes one document's text and emits key-value
* pairs for each word found in the document.
*
* @param key Document identifier (ignored).
* @param value Text of the current document.
* @param context MapperContext object for accessing output,
* configuration information, etc.
*/
@Override
public void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
// ORIGINAL WORDCOUNT CODE, REPLACE
Matcher matcher = WORD_PATTERN.matcher(value.toString());
while (matcher.find()) {
word.set(matcher.group());
context.write(word, ONE);
}
}
}
/** Reducer for word count.
*
* Like the Mapper base class, the base class Reducer is parameterized by
* <in key type, in value type, out key type, out value type>.
*
* For each Text key, which represents a word, this reducer gets a list of
* LongWritable values, computes the sum of those values, and the key-value
* pair (word, sum).
*/
public static class IndexReduce extends Reducer<Text, LongWritable, Text, LongWritable> {
/** Actual reduce function.
*
* @param key Word.
* @param values Values for this word (partial counts).
* @param context ReducerContext object for accessing output,
* configuration information, etc.
*/
@Override
public void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
/* ---YOUR CODE HERE--- */
}
}
/** Entry-point for our program. Constructs a Job object representing a single
* Map-Reduce job and asks Hadoop to run it. When running on a cluster, the
* final "waitForCompletion" call will distribute the code for this job across
* the cluster.
*
* @param rawArgs command-line arguments
*/
public static void main(String[] rawArgs) throws Exception {
/* Use Hadoop's GenericOptionsParser, so our MapReduce program can accept
* common Hadoop options.
*/
GenericOptionsParser parser = new GenericOptionsParser(rawArgs);
Configuration conf = parser.getConfiguration();
String[] args = parser.getRemainingArgs();
/* Create an object to represent a Job. */
Job job = new Job(conf, "wordcount");
/* Tell Hadoop where to locate the code that must be shipped if this
* job is to be run across a cluster. Unless the location of code
* is specified in some other way (e.g. the -libjars command line
* option), all non-Hadoop code required to run this job must be
* contained in the JAR containing the specified class (IndexMap
* in this case).
*/
job.setJarByClass(IndexMap.class);
/* Set the datatypes of the keys and values outputted by the maps and reduces.
* These must agree with the types used by the Mapper and Reducer. Mismatches
* will not be caught until runtime.
*/
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
/* Set the mapper, combiner, reducer to use. These reference the classes defined above. */
job.setMapperClass(IndexMap.class);
job.setReducerClass(IndexReduce.class);
/* Set the format to expect input in and write output in. The input files we have
* provided are in Hadoop's "sequence file" format, which allows for keys and
* values of arbitrary Hadoop-supported types and supports compression.
*
* The output format TextOutputFormat outputs each key-value pair as a line
* "key<tab>value".
*/
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
/* Specify the input and output locations to use for this job. */
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
/* Submit the job and wait for it to finish. The argument specifies whether
* to print progress information to output. (true means to do so.)
*/
job.waitForCompletion(true);
}
}