Skip to content

Commit

Permalink
Merge pull request mongodb#75 from alabid/issues/pig/mongo-update-sto…
Browse files Browse the repository at this point in the history
…rage

-- Added functionality + junit tests + pig script tests to enable MongoU...
  • Loading branch information
mpobrien committed Aug 21, 2013
2 parents 55a10a7 + 5619404 commit 2bf4a15
Show file tree
Hide file tree
Showing 19 changed files with 1,113 additions and 161 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,5 @@ testing/*.pyc
*.jar
streaming/language_support/python/dist/
streaming/language_support/python/pymongo_hadoop.egg-info/
*.splits
*.crc
222 changes: 112 additions & 110 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ public class TreasuryYieldReducer
}

final double avg = sum / count;
BasicBSONObject output = new BasicBSONObject();
output.put("avg", avg);
pContext.write( pKey, new BSONWritable( output ) );
}
}
}
```

Expand All @@ -64,30 +64,30 @@ public class TreasuryYieldReducer
We can also easily accomplish the same task with just a few lines of Pig script. We also use some external UDFs provided by the Amazon Piggybank jar: http://aws.amazon.com/code/Elastic-MapReduce/2730

-- UDFs used for date parsing
REGISTER /tmp/piggybank-0.3-amzn.jar
-- MongoDB Java driver
REGISTER /tmp/mongo-2.10.1.jar;
-- Core Mongo-Hadoop Library
REGISTER ../core/target/mongo-hadoop-core_1.0.3-1.1.0-SNAPSHOT.jar
-- mongo-hadoop pig support
REGISTER ../pig/target/mongo-hadoop-pig_1.0.3-1.1.0-SNAPSHOT.jar
raw = LOAD 'mongodb://localhost:27017/demo.yield_historical.in' using com.mongodb.hadoop.pig.MongoLoader;
DEFINE UnixToISO org.apache.pig.piggybank.evaluation.datetime.convert.UnixToISO();
DEFINE EXTRACT org.apache.pig.piggybank.evaluation.string.EXTRACT();
date_tenyear = foreach raw generate UnixToISO($0#'_id'), $0#'bc10Year';
parsed_year = foreach date_tenyear generate
FLATTEN(EXTRACT($0, '(\\d{4})')) AS year, (double)$1 as bc;
by_year = GROUP parsed_year BY (chararray)year;
year_10yearavg = FOREACH by_year GENERATE group, AVG(parsed_year.bc) as tenyear_avg;
-- Args to MongoInsertStorage are: schema for output doc, field to use as '_id'.
STORE year_10yearavg
INTO 'mongodb://localhost:27017/demo.asfkjabfa'
USING
com.mongodb.hadoop.pig.MongoInsertStorage('group:chararray,tenyear_avg:float', 'group');
REGISTER /tmp/piggybank-0.3-amzn.jar
-- MongoDB Java driver
REGISTER /tmp/mongo-2.10.1.jar;
-- Core Mongo-Hadoop Library
REGISTER ../core/target/mongo-hadoop-core_1.0.3-1.1.0-SNAPSHOT.jar
-- mongo-hadoop pig support
REGISTER ../pig/target/mongo-hadoop-pig_1.0.3-1.1.0-SNAPSHOT.jar
raw = LOAD 'mongodb://localhost:27017/demo.yield_historical.in' using com.mongodb.hadoop.pig.MongoLoader;
DEFINE UnixToISO org.apache.pig.piggybank.evaluation.datetime.convert.UnixToISO();
DEFINE EXTRACT org.apache.pig.piggybank.evaluation.string.EXTRACT();
date_tenyear = foreach raw generate UnixToISO($0#'_id'), $0#'bc10Year';
parsed_year = foreach date_tenyear generate
FLATTEN(EXTRACT($0, '(\\d{4})')) AS year, (double)$1 as bc;
by_year = GROUP parsed_year BY (chararray)year;
year_10yearavg = FOREACH by_year GENERATE group, AVG(parsed_year.bc) as tenyear_avg;
-- Args to MongoInsertStorage are: schema for output doc, field to use as '_id'.
STORE year_10yearavg
INTO 'mongodb://localhost:27017/demo.asfkjabfa'
USING
com.mongodb.hadoop.pig.MongoInsertStorage('group:chararray,tenyear_avg:float', 'group');



Expand All @@ -101,26 +101,26 @@ Abbreviated code snippets shown below - to see the full source for this example,

####Map/Reduce with Java

The mapper class will get the `headers` field from each document, parse out the sender from the `From` field and the recipients from the `To` field, and construct a `MailPair` object containing each pair which will act as the key. Then we emit the value `1` for each key. `MailPair` is just a simple "POJO" that contains Strings for the `from` and `to` values, and implements `WritableComparable` so that it can be serialized across Hadoop nodes and sorted.
The mapper class will get the `headers` field from each document, parse out the sender from the `From` field and the recipients from the `To` field, and construct a `MailPair` object containing each pair which will act as the key. Then we emit the value `1` for each key. `MailPair` is just a simple "POJO" that contains Strings for the `from` and `to` values, and implements `WritableComparable` so that it can be serialized across Hadoop nodes and sorted.

```java
@Override
public void map(NullWritable key, BSONObject val, final Context context)
throws IOException, InterruptedException{
if(val.containsKey("headers")){
BSONObject headers = (BSONObject)val.get("headers");
if(headers.containsKey("From") && headers.containsKey("To")){
String from = (String)headers.get("From");
String to = (String)headers.get("To");
String[] recips = to.split(",");
for(int i=0;i<recips.length;i++){
String recip = recips[i].trim();
if(recip.length() > 0){
context.write(new MailPair(from, recip), new IntWritable(1));
}
throws IOException, InterruptedException{
if (val.containsKey("headers")) {
BSONObject headers = (BSONObject)val.get("headers");
if (headers.containsKey("From") && headers.containsKey("To")){
String from = (String)headers.get("From");
String to = (String)headers.get("To");
String[] recips = to.split(",");
for(int i=0;i<recips.length;i++){
String recip = recips[i].trim();
if (recip.length() > 0) {
context.write(new MailPair(from, recip), new IntWritable(1));
}
}
}
}
}
}
}
```

Expand All @@ -146,18 +146,18 @@ The reduce class will take the collected values for each key, sum them together,

To accomplish the same with pig, but with much less work:

REGISTER ../mongo-2.10.1.jar;
REGISTER ../core/target/mongo-hadoop-core_cdh4.3.0-1.1.0.jar
REGISTER ../pig/target/mongo-hadoop-pig_cdh4.3.0-1.1.0.jar
raw = LOAD 'file:///Users/mike/dump/enron_mail/messages.bson' using com.mongodb.hadoop.pig.BSONLoader('','headers:[]') ;
send_recip = FOREACH raw GENERATE $0#'From' as from, $0#'To' as to;
send_recip_filtered = FILTER send_recip BY to IS NOT NULL;
send_recip_split = FOREACH send_recip_filtered GENERATE from as from, FLATTEN(TOKENIZE(to)) as to;
send_recip_split_trimmed = FOREACH send_recip_split GENERATE from as from, TRIM(to) as to;
send_recip_grouped = GROUP send_recip_split_trimmed BY (from, to);
send_recip_counted = FOREACH send_recip_grouped GENERATE group, COUNT($1) as count;
STORE send_recip_counted INTO 'file:///tmp/enron_emailcounts.bson' using com.mongodb.hadoop.pig.BSONStorage;
REGISTER ../mongo-2.10.1.jar;
REGISTER ../core/target/mongo-hadoop-core_cdh4.3.0-1.1.0.jar
REGISTER ../pig/target/mongo-hadoop-pig_cdh4.3.0-1.1.0.jar
raw = LOAD 'file:///Users/mike/dump/enron_mail/messages.bson' using com.mongodb.hadoop.pig.BSONLoader('','headers:[]') ;
send_recip = FOREACH raw GENERATE $0#'From' as from, $0#'To' as to;
send_recip_filtered = FILTER send_recip BY to IS NOT NULL;
send_recip_split = FOREACH send_recip_filtered GENERATE from as from, FLATTEN(TOKENIZE(to)) as to;
send_recip_split_trimmed = FOREACH send_recip_split GENERATE from as from, TRIM(to) as to;
send_recip_grouped = GROUP send_recip_split_trimmed BY (from, to);
send_recip_counted = FOREACH send_recip_grouped GENERATE group, COUNT($1) as count;
STORE send_recip_counted INTO 'file:///tmp/enron_emailcounts.bson' using com.mongodb.hadoop.pig.BSONStorage;

## Example 3 - Sensor Logs

Expand Down Expand Up @@ -201,34 +201,36 @@ We will solve this by doing two passes of Map/Reduce. The first will operate ove

The `Mapper` code in phase one just produces the pair `<owner,_id>` for each device. The `Reducer` then takes the list of all `_id`s for each owner and creates a new document containing them.

public class DeviceMapper extends Mapper<Object, BSONObject, Text, Text>{

@Override
public void map(Object key, BSONObject val, final Context context)
throws IOException, InterruptedException {
String keyOut = (String)val.get("owner") + " " + (String)val.get("type");
context.write(new Text(keyOut), new Text(val.get("_id").toString()));
}

}

public class DeviceReducer extends Reducer<Text, Text, NullWritable, MongoUpdateWritable>{

@Override
public void reduce( final Text pKey, final Iterable<Text> pValues,
final Context pContext )
throws IOException, InterruptedException{
BasicBSONObject query = new BasicBSONObject("_id", pKey.toString());
ArrayList<ObjectId> devices = new ArrayList<ObjectId>();
for(Text val : pValues){
devices.add(new ObjectId(val.toString()));
}
BasicBSONObject devices_list = new BasicBSONObject("devices", devices);
BasicBSONObject output = new BasicBSONObject("_id", pKey);
pContext.write(null, new MongoUpdateWritable(query, update, true, false));
}
}
public class DeviceMapper extends Mapper<Object, BSONObject, Text, Text>{

@Override
public void map(Object key, BSONObject val, final Context context)
throws IOException, InterruptedException {
String keyOut = (String)val.get("owner") + " " + (String)val.get("type");
context.write(new Text(keyOut), new Text(val.get("_id").toString()));
}

}

public class DeviceReducer extends Reducer<Text, Text, NullWritable, MongoUpdateWritable>{

@Override
public void reduce( final Text pKey, final Iterable<Text> pValues,
final Context pContext )
throws IOException, InterruptedException{
BasicBSONObject query = new BasicBSONObject("_id", pKey.toString());
ArrayList<ObjectId> devices = new ArrayList<ObjectId>();
for(Text val : pValues){
devices.add(new ObjectId(val.toString()));
}

BasicBSONObject devices_list = new BasicBSONObject("devices", devices);
BasicBSONObject update = new BasicBSONObject("$pushAll", devices_list);

pContext.write(null, new MongoUpdateWritable(query, update, true, false));
}
}



Expand All @@ -250,35 +252,35 @@ After phase one, the output collection documents each look like this:

In phase two, we map over the large collection `logs` and compute the totals for each device owner/type. The mapper emits the device id from each log item along, and the reducer uses `MongoUpdateWritable` to increment counts of these into the output collection by querying the record that contains the device's `_id` in its `devices` array which we populated in phase one. Between phase one and phase two, we create an index on `devices` to make sure this will be fast (see the script in `examples/sensors/run_job.sh` for details)

public class LogMapper extends Mapper<Object, BSONObject, Text, IntWritable>{
@Override
public void map(Object key, BSONObject val,
final Context context)
throws IOException, InterruptedException{
context.write(new Text(((ObjectId)val.get("d_id")).toString()),
new IntWritable(1));
}
}
public class LogReducer extends Reducer<Text, IntWritable, NullWritable, MongoUpdateWritable> {

@Override
public void reduce( final Text pKey,
final Iterable<IntWritable> pValues,
final Context pContext )
throws IOException, InterruptedException{
int count = 0;
for(IntWritable val : pValues){
count += val.get();
}
BasicBSONObject query = new BasicBSONObject("devices", new ObjectId(pKey.toString()));
BasicBSONObject update = new BasicBSONObject("$inc", new BasicBSONObject("logs_count", count));
pContext.write(null, new MongoUpdateWritable(query, update, true, false));
}
}
public class LogMapper extends Mapper<Object, BSONObject, Text, IntWritable>{
@Override
public void map(Object key, BSONObject val,
final Context context)
throws IOException, InterruptedException{
context.write(new Text(((ObjectId)val.get("d_id")).toString()),
new IntWritable(1));
}
}
public class LogReducer extends Reducer<Text, IntWritable, NullWritable, MongoUpdateWritable> {

@Override
public void reduce( final Text pKey,
final Iterable<IntWritable> pValues,
final Context pContext )
throws IOException, InterruptedException{
int count = 0;
for(IntWritable val : pValues){
count += val.get();
}
BasicBSONObject query = new BasicBSONObject("devices", new ObjectId(pKey.toString()));
BasicBSONObject update = new BasicBSONObject("$inc", new BasicBSONObject("logs_count", count));
pContext.write(null, new MongoUpdateWritable(query, update, true, false));
}
}

After phase two is finished, the result documents look like this (the `logs_count` field is now populated with the result):

Expand Down
88 changes: 88 additions & 0 deletions pig/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,91 @@ To make each output record be used as an insert into a MongoDB collection, use t
The `MongoInsertStorage` class also takes two args: an `idAlias` and a `schema` as described above. If `schema` is left blank, it will attempt to infer the output schema from the data using the strategy described above. If `idAlias` is left blank, an `ObjectId` will be generated for the value of the `_id` field in each output document.


### Updating a MongoDB collection

Just like in the MongoDB javascript shell, you can now update documents in a MongoDB collection within a Pig script via `com.mongodb.hadoop.pig.MongoUpdateStorage`. Use:
```
STORE <aliasname> INTO 'mongodb://localhost:27017/<db>.<collection>'
USING com.mongodb.hadoop.pig.MongoUpdateStorage(
'<query>',
'<update>',
'<schema>', '<fieldtoignore>',
'<updateOptions>');
```
where
* `<aliasname>` is the name of the alias you want to use in updating documents in your collection
* `<db>` is the name of the database to update and `<collection>` is the name of the collection to update
* `<query>` is the (valid) JSON representing the query to use to find document(s) in the collection
* `<update>` is the (valid) JSON representing the kind of updates to perform on document(s) in the collection
* Optional: `<schema>` is the PIG schema of <aliasname>. **Strongly** advised to use this.
* Optional: `<fieldtoignore>` is the fieldname to ignore in `schema` during construction of BSON objects.
Particularly useful for updating/writing an array to a document
* Optional: you can use `<updateOptions>` to provide other update options, just as in the MongoDB JS shell.
For example, `{upsert : true, multi : true}`. Only upsert and multi are supported for now.

Consider the following examples:

Assume we have an alias `data` that is a bag of tuples.
```
data =
{
("Bab", "Alabi", "male", 19, {("a"), ("b"), ("c")}),
("Dad", "Alabi", "male", 21, {("d"), ("e"))}),
("Tins", "Dada", "female", 50, {})
}
```
with schema `f:chararray, l:chararray, g:chararray, age:int, cars:{t:(car:chararray)}`.
**Note**: Every pig data structure in a pig schema has to be named.

To insert the gender, first and last names of each person in `data` into a `test.persons_info` collection,
making sure that we update any existing documents with the same `first` and `last` fields, use
```
STORE data INTO 'mongodb://localhost:27017/test.persons_info'
USING com.mongodb.hadoop.pig.MongoUpdateStorage(
'{first:"\$f", last:"\$l"}',
'{\$set:{gender:"\$g"}}',
'f:chararray, l:chararray, g:chararray, age:int, cars:{t:(car:chararray)}'
);
```
The resulting collection looks like this:
```
{ "_id" : ObjectId("..."), "first":"Bab", "last":"Alabi", "gender":"male"},
{ "_id" : ObjectId("..."), "first":"Dad", "last":"Alabi", "gender":"male"},
{ "_id" : ObjectId("..."), "first":"Tins", "last":"Dada", "gender":"female"}
```
Next, let's say, we want to include the `age` and `cars` for each person into the collection, use:
```
STORE data INTO 'mongodb://localhost:27017/test.persons_info'
USING com.mongodb.hadoop.pig.MongoUpdateStorage(
'{first:"\$f", last:"\$l"}',
'{\$set:{age:"\$age"}, \$pushAll:{cars:"\$cars"}}',
'f:chararray, l:chararray, g:chararray, age:int, cars:{t:(car:chararray)}'
);
```
The resulting collection looks like this:
```
{ "_id" : ObjectId("..."), "gender":"male", "age" : 19, "cars" : [{"car": "a"}, {"car":"b"}, {"car":"c"}], "first" : "Daniel", "last" : "Alabi" }
{ "_id" : ObjectId("..."), "gender":"male", "age" : 21, "cars" : [{"car":"d"}, {"car":"e"}], "first" : "Tolu", "last" : "Alabi" }
{ "_id" : ObjectId("..."), "gender":"female", "age" : 50, "cars" : [], "first" : "Tinuke", "last" : "Dada" }
```
Notice that every element in `cars` is a named map with one key `car`. In most cases, such update is unwanted/unnecessary. To instead make
`cars` an array of strings, we can use:
```
STORE data INTO 'mongodb://localhost:27017/test.persons_info'
USING com.mongodb.hadoop.pig.MongoUpdateStorage(
'{first:"\$f", last:"\$l"}',
'{\$set:{age:"\$age"}, \$pushAll:{cars:"\$cars"}}',
'f:chararray, l:chararray, age:int, cars:{t:(car:chararray)}'
'car'
);
```
specifying what field to ignore in the schema while inserting pig objects. The resulting collection looks like this:
```
{ "_id" : ObjectId("..."), "gender":"male", "age" : 19, "cars" : ["a", "b", "c"], "first" : "Daniel", "last" : "Alabi" }
{ "_id" : ObjectId("..."), "gender":"male", "age" : 21, "cars" : ["d", "e"], "first" : "Tolu", "last" : "Alabi" }
{ "_id" : ObjectId("..."), "gender":"female", "age" : 50, "cars" : [], "first" : "Tinuke", "last" : "Dada" }
```
More like it.



Loading

0 comments on commit 2bf4a15

Please sign in to comment.