Skip to content

Commit

Permalink
Support document replacement (HADOOP-263).
Browse files Browse the repository at this point in the history
  • Loading branch information
Luke Lovett committed May 4, 2016
1 parent e99542e commit 6a662c5
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 42 deletions.
27 changes: 20 additions & 7 deletions core/src/main/java/com/mongodb/hadoop/io/MongoUpdateWritable.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,23 @@ public class MongoUpdateWritable implements Writable {
private BasicBSONObject modifiers;
private boolean upsert;
private boolean multiUpdate;
private boolean replace;

public MongoUpdateWritable() {
this(null, null);
}

public MongoUpdateWritable(final BasicBSONObject query, final BasicBSONObject modifiers) {
this(query, modifiers, true, false, false);
}

public MongoUpdateWritable(final BasicBSONObject query, final BasicBSONObject modifiers, final boolean upsert,
final boolean multiUpdate) {
final boolean multiUpdate, final boolean replace) {
this.query = query;
this.modifiers = modifiers;
this.upsert = upsert;
this.multiUpdate = multiUpdate;
}

public MongoUpdateWritable(final BasicBSONObject query, final BasicBSONObject modifiers) {
this(query, modifiers, true, false);
this.replace = replace;
}

public BasicBSONObject getQuery() {
Expand All @@ -73,11 +75,18 @@ public boolean isUpsert() {
return upsert;
}


public boolean isMultiUpdate() {
return multiUpdate;
}

public boolean isReplace() {
return replace;
}

public void setReplace(final boolean replace) {
this.replace = replace;
}

public void setQuery(final BasicBSONObject query) {
this.query = query;
}
Expand Down Expand Up @@ -110,6 +119,7 @@ public void write(final DataOutput out) throws IOException {
buf.pipe(new DataOutputOutputStreamAdapter(out));
out.writeBoolean(upsert);
out.writeBoolean(multiUpdate);
out.writeBoolean(replace);
}

/**
Expand Down Expand Up @@ -139,6 +149,7 @@ public void readFields(final DataInput in) throws IOException {
modifiers = (BasicBSONObject) cb.get();
upsert = in.readBoolean();
multiUpdate = in.readBoolean();
replace = in.readBoolean();
} catch (Exception e) {
/* If we can't read another length it's not an error, just return quietly. */
// TODO - Figure out how to gracefully mark this as an empty
Expand All @@ -155,7 +166,8 @@ public boolean equals(final Object obj) {
return false;
}
final MongoUpdateWritable other = (MongoUpdateWritable) obj;
if (upsert != other.upsert || multiUpdate != other.multiUpdate) {
if (upsert != other.upsert || multiUpdate != other.multiUpdate
|| replace != other.replace) {
return false;
}
if ((query == null && other.query != null)
Expand All @@ -177,6 +189,7 @@ public int hashCode() {
hashCode ^= modifiers.hashCode();
hashCode ^= (upsert ? 1 : 0) << 1;
hashCode ^= (multiUpdate ? 1 : 0) << 2;
hashCode ^= (replace ? 1 : 0) << 3;
return hashCode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package com.mongodb.hadoop.output;

import com.mongodb.BasicDBObject;
import com.mongodb.BulkUpdateRequestBuilder;
import com.mongodb.BulkWriteOperation;
import com.mongodb.BulkWriteRequestBuilder;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.MongoException;
Expand Down Expand Up @@ -152,17 +154,23 @@ public void commitTask(final TaskAttemptContext taskContext)
DBObject query = new BasicDBObject(muw.getQuery().toMap());
DBObject modifiers =
new BasicDBObject(muw.getModifiers().toMap());
if (muw.isMultiUpdate()) {
if (muw.isUpsert()) {
bulkOp.find(query).upsert().update(modifiers);
BulkWriteRequestBuilder writeBuilder = bulkOp.find(query);
if (muw.isReplace()) {
writeBuilder.replaceOne(modifiers);
} else if (muw.isUpsert()) {
BulkUpdateRequestBuilder updateBuilder =
writeBuilder.upsert();
if (muw.isMultiUpdate()) {
updateBuilder.update(modifiers);
} else {
bulkOp.find(query).update(modifiers);
updateBuilder.updateOne(modifiers);
}
} else {
if (muw.isUpsert()) {
bulkOp.find(query).upsert().updateOne(modifiers);
// No-upsert update.
if (muw.isMultiUpdate()) {
writeBuilder.update(modifiers);
} else {
bulkOp.find(query).updateOne(modifiers);
writeBuilder.updateOne(modifiers);
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,15 @@
import java.io.DataOutputStream;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotEquals;

public class MongoUpdateWritableTest {
@Test
public void testSerialization() throws Exception {
BasicDBObject query = new BasicDBObject("_id", new ObjectId());
BasicDBObject modifiers = new BasicDBObject("$set", new BasicDBObject("foo", "bar"));
MongoUpdateWritable writable = new MongoUpdateWritable(query, modifiers, true, false);
MongoUpdateWritable writable =
new MongoUpdateWritable(query, modifiers, true, false, true);


ByteArrayOutputStream baos = new ByteArrayOutputStream();
Expand All @@ -33,14 +32,9 @@ public void testSerialization() throws Exception {
ByteArrayInputStream bais = new ByteArrayInputStream(serializedBytes);
DataInputStream in = new DataInputStream(bais);

writable = new MongoUpdateWritable(null, null, false, true);

writable.readFields(in);

assertEquals(query, writable.getQuery());
assertEquals(modifiers, writable.getModifiers());
assertTrue(writable.isUpsert());
assertFalse(writable.isMultiUpdate());
MongoUpdateWritable blank = new MongoUpdateWritable();
blank.readFields(in);
assertEquals(writable, blank);
}

@Test
Expand All @@ -52,30 +46,40 @@ public void testEquals() {
BasicDBObject modifiers2 = new BasicDBObject("$set", new
BasicDBObject("bar", "baz"));
MongoUpdateWritable writable = new MongoUpdateWritable(query1,
modifiers1, true, false);
modifiers1, true, false, false);

// Not equal because queries differ.
MongoUpdateWritable diffQuery = new MongoUpdateWritable(query2,
modifiers1, true, false);
modifiers1, true, false, false);
assertNotEquals(writable, diffQuery);

// Not equal because modifiers differ.
MongoUpdateWritable diffModifier = new MongoUpdateWritable(query1,
modifiers2, true, false);
modifiers2, true, false, false);
assertNotEquals(writable, diffModifier);

// Not equal because upsert flag differs.
MongoUpdateWritable diffUpsert = new MongoUpdateWritable(query1,
modifiers1, false, false);
modifiers1, false, false, false);
assertNotEquals(writable, diffUpsert);

// Not equal because multi flag differs.
MongoUpdateWritable diffMulti = new MongoUpdateWritable(query1,
modifiers1, true, true);
modifiers1, true, true, false);
assertNotEquals(writable, diffMulti);

// Not equal because replace flag differs.
MongoUpdateWritable diffReplace = new MongoUpdateWritable(query1,
modifiers1, true, false, true);
assertNotEquals(writable, diffReplace);

MongoUpdateWritable same = new MongoUpdateWritable(query1, modifiers1,
true, false);
true, false, false);
assertEquals(writable, same);

// Test defaults for simple constructor.
MongoUpdateWritable simpleConstructor =
new MongoUpdateWritable(query1, modifiers1);
assertEquals(writable, simpleConstructor);
}
}
6 changes: 4 additions & 2 deletions pig/src/main/java/com/mongodb/hadoop/pig/JSONPigReplace.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,13 @@ public BasicBSONObject[] substitute(final Tuple t,
} else if (s instanceof ResourceSchema) {
schema = (ResourceSchema) s;
} else {
throw new IllegalArgumentException("Schema must be represented either by a string or a Schema object");
throw new IllegalArgumentException(
"Schema must be represented either by a string or a Schema "
+ "object, not " + s);
}
fields = schema.getFields();
} catch (Exception e) {
throw new IllegalArgumentException("Invalid Schema Format");
throw new IllegalArgumentException("Invalid Schema Format", e);
}

// Make Tuple t into BSONObject using schema provided and store result in pObj
Expand Down
12 changes: 7 additions & 5 deletions pig/src/main/java/com/mongodb/hadoop/pig/MongoUpdateStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,20 +144,22 @@ public void putNext(final Tuple tuple) throws IOException {
BasicBSONObject q = toUpdate[0];
// 'update' JSON
BasicBSONObject u = toUpdate[1];

// multi and upsert 'options' JSON
// update options
BasicBSONObject mu = toUpdate.length > 2 ? toUpdate[2] : null;
boolean isUpsert = true;
boolean isMulti = false;
BasicBSONObject mu = toUpdate.length > 2 ? toUpdate[2] : null;
boolean isReplace = false;
if (mu != null) {
isUpsert = !mu.containsField("upsert") || mu.getBoolean("upsert");
isMulti = mu.containsField("multi") && mu.getBoolean("multi");
isUpsert = mu.getBoolean("upsert", true);
isMulti = mu.getBoolean("multi", false);
isReplace = mu.getBoolean("replace", false);
}

muw.setQuery(q);
muw.setModifiers(u);
muw.setUpsert(isUpsert);
muw.setMultiUpdate(isMulti);
muw.setReplace(isReplace);
recordWriter.write(null, muw);
} catch (Exception e) {
throw new IOException("Couldn't convert tuple to bson: ", e);
Expand Down
25 changes: 21 additions & 4 deletions pig/src/test/java/com/mongodb/hadoop/pig/PigTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import com.mongodb.BasicDBObject;
import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.ListIndexesIterable;
Expand Down Expand Up @@ -32,16 +34,18 @@ public class PigTest extends BaseHadoopTest {
private static final MongoClientURI URI =
new MongoClientURI("mongodb://localhost:27017/mongo_hadoop.pigtests");
private MongoClient mongoClient;
private DB db;

@Before
public void setup() throws UnknownHostException {
mongoClient = new MongoClient(URI);
mongoClient.getDB("mongo_hadoop").dropDatabase();
db = mongoClient.getDB("mongo_hadoop");
db.dropDatabase();
}

@After
public void tearDown() {
mongoClient.getDB("mongo_hadoop").dropDatabase();
db.dropDatabase();
mongoClient.close();
}

Expand Down Expand Up @@ -111,8 +115,7 @@ public void mongoUpdateStorageMulti() throws IOException, ParseException {
public void testPigUUID() throws IOException, ParseException {
UUID uuid = UUID.randomUUID();
BasicDBObject doc = new BasicDBObject("uuid", uuid);
mongoClient.getDB("mongo_hadoop")
.getCollection("uuid_test").insert(doc);
db.getCollection("uuid_test").insert(doc);

org.apache.pig.pigunit.PigTest test =
new org.apache.pig.pigunit.PigTest(
Expand Down Expand Up @@ -229,4 +232,18 @@ public void testMongoStorageEnsureIndex()
assertFalse("Should not have the index \"last_1\"",
indexExists(coll, "last_1"));
}

@Test
public void testPigUpdateReplace() throws IOException, ParseException {
DBCollection replaceCollection = db.getCollection("replace_test");
for (int i = 0; i < 10; ++i) {
replaceCollection.insert(new BasicDBObject("i", i));
}
runScript("/pig/replace_mus.pig");
DBCursor cursor =
replaceCollection.find().sort(new BasicDBObject("i", 1));
for (int i = 1; i <= 10; ++i) {
assertEquals(i, cursor.next().get("i"));
}
}
}
18 changes: 18 additions & 0 deletions pig/src/test/resources/pig/replace_mus.pig
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
documents =
LOAD 'mongodb://localhost:27017/mongo_hadoop.replace_test'
USING com.mongodb.hadoop.pig.MongoLoader('id:chararray,i:int', 'id');

increment_number =
FOREACH documents
GENERATE com.mongodb.hadoop.pig.udf.ToObjectId(id) AS id,
i + 1 AS i;

STORE increment_number
INTO 'mongodb://localhost:27017/mongo_hadoop.replace_test'
USING com.mongodb.hadoop.pig.MongoUpdateStorage(
'{_id:"\$id"}', -- query
'{i:"\$i"}', -- replacement
'id:bytearray,i:int', -- schema
'', -- toIgnore (none)
'{replace:true}' -- update options
);

0 comments on commit 6a662c5

Please sign in to comment.