Skip to content

Commit

Permalink
HADOOP-104 Merge pull request mongodb#89 from relateiq/master
Browse files Browse the repository at this point in the history
  • Loading branch information
visualzhou committed Dec 13, 2013
2 parents 9c9f1e3 + 0692ef4 commit bdde66e
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
public abstract class MongoCollectionSplitter extends MongoSplitter{

private static final Log log = LogFactory.getLog( MongoCollectionSplitter.class );

public static final MinKey MIN_KEY_TYPE = new MinKey();
public static final MaxKey MAX_KEY_TYPE = new MaxKey();
protected Mongo mongo;
Expand All @@ -54,63 +54,63 @@ public MongoCollectionSplitter(){ }
public MongoCollectionSplitter(Configuration conf){
super(conf);
}
//
//
// public void setInputURI(MongoURI inputURI){
// this.inputURI = inputURI;
// }
//
//
// public void setQuery(DBObject query){
// this.query = query;
// }
//
//
// public BSONObject getQuery(){
// return this.query;
// }
//
//
// public boolean getUseRangeQuery(){
// return this.useRangeQuery;
// }
//
//
// public void setUseRangeQuery(boolean useRangeQuery){
// this.useRangeQuery = useRangeQuery;
// }
//
//
// public void setAuthURI(MongoURI authURI){
// this.authURI = authURI;
// }
//
//
// public boolean getNoTimeout(){
// return this.noTimeout;
// }
//
//
// public void setNoTimeout(boolean noTimeout){
// this.noTimeout = noTimeout;
// }
//
//
// public MongoURI getAuthURI(){
// return this.authURI;
// }
//
//
// public DBObject getFields(){
// return this.fields;
// }
//
//
// public void setFields(DBObject fields){
// this.fields = fields;
// }
//
//
// public DBObject getSort(){
// return this.sort;
// }
//
//
// public void setSort(DBObject sort){
// this.sort = sort;
// }

protected void init(){
MongoURI inputURI = MongoConfigUtil.getInputURI(conf);
this.inputCollection = MongoConfigUtil.getCollection(inputURI);
DB db = this.inputCollection.getDB();
DB db = this.inputCollection.getDB();
this.mongo = db.getMongo();
MongoURI authURI = MongoConfigUtil.getAuthURI(conf);
if( authURI != null ){
Expand Down Expand Up @@ -156,7 +156,7 @@ protected Map<String, String> getShardsMap(){

/**
* Takes an existing {@link MongoURI} and returns a new modified URI which
* replaces the original's server host + port with a supplied new
* replaces the original's server host + port with a supplied new
* server host + port, but maintaining all the same original options.
* This is useful for generating distinct URIs for each mongos instance
* so that large batch reads can all target them separately, distributing the
Expand Down Expand Up @@ -196,13 +196,14 @@ protected static MongoURI rewriteURI( MongoURI originalUri, String newServerUri)
* clauses in the query construct to create the split, otherwise it will use
* min/max index boundaries (default behavior)
*
* @param lowerBound the lower bound of the collection
* @param lowerBound the lower bound of the collection
* @param newServerUri the new host(s) to target, e.g. server1:port1[,server2:port2,...]
*
*/
public MongoInputSplit createSplitFromBounds(BasicDBObject lowerBound, BasicDBObject upperBound)
throws SplitFailedException
{
log.info("Created split: min=" + (lowerBound != null ? lowerBound.toString() : "null") + ", max= " + (upperBound != null ? upperBound.toString() : "null"));
//Objects to contain upper/lower bounds for each split
DBObject splitMin = new BasicDBObject();
DBObject splitMax = new BasicDBObject();
Expand All @@ -213,11 +214,16 @@ public MongoInputSplit createSplitFromBounds(BasicDBObject lowerBound, BasicDBOb

if(!val.equals(MIN_KEY_TYPE))
splitMin.put(key, val);
if(upperBound != null){
Object maxVal = upperBound.get(key);
if(!val.equals(MAX_KEY_TYPE))
splitMax.put(key, maxVal);
}
}
}

if(upperBound != null){
for( Map.Entry<String,Object> entry : upperBound.entrySet() ){
String key = entry.getKey();
Object val = entry.getValue();

if(!val.equals(MAX_KEY_TYPE))
splitMax.put(key, val);
}
}

Expand Down Expand Up @@ -253,7 +259,7 @@ public MongoInputSplit createSplitFromBounds(BasicDBObject lowerBound, BasicDBOb

/**
* Creates an instance of {@link MongoInputSplit} whose upper and lower
* bounds are restricted by adding $gt/$lte clauses to the query filter.
* bounds are restricted by adding $gt/$lte clauses to the query filter.
* This requires that the boundaries are not compound keys, and that
* the query does not contain any keys used in the split key.
*
Expand Down Expand Up @@ -297,20 +303,25 @@ public MongoInputSplit createRangeQuerySplit(BasicDBObject chunkLowerBound, Basi
(maxKey != null && query.containsKey(maxKey.getKey())) ){
throw new IllegalArgumentException("Range query is enabled but split key conflicts with query filter:\n" +
"min: " + chunkLowerBound + "\n" +
"max: " + chunkUpperBound + "\n" +
"max: " + chunkUpperBound + "\n" +
"query: " + query);
}

String key = null;
BasicDBObject rangeObj = new BasicDBObject();
if( minKey!=null )
if( minKey!=null ) {
key = minKey.getKey();
rangeObj.put("$gte", minKey.getValue());
}

if( maxKey!=null )
if( maxKey!=null ) {
key = maxKey.getKey();
rangeObj.put("$lt", maxKey.getValue());
}

DBObject splitQuery = new BasicDBObject();
splitQuery.putAll(query);
splitQuery.put(minKey.getKey(), rangeObj);
splitQuery.put(key, rangeObj);
MongoInputSplit split = new MongoInputSplit();
split.setQuery(splitQuery);
return split;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public List<InputSplit> calculateSplits() throws SplitFailedException{
for (int i = 1; i < splitData.size(); i++ ) {
final BasicDBObject _tKey = (BasicDBObject)splitData.get(i);
MongoInputSplit split = createSplitFromBounds(lastKey, _tKey);
returnVal.add(split);
lastKey = _tKey;
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/com/mongodb/hadoop/util/MongoTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,5 +121,5 @@ public int run( String[] args ) throws Exception{
/**
* SET ME Defines the name of the job on the cluster. Left non-final to allow tweaking with serial #s, etc
*/
String _jobName = "<unnamed MongoTool job>";
protected String _jobName = "<unnamed MongoTool job>";
}

0 comments on commit bdde66e

Please sign in to comment.