diff --git a/core/src/main/java/com/mongodb/hadoop/splitter/MongoCollectionSplitter.java b/core/src/main/java/com/mongodb/hadoop/splitter/MongoCollectionSplitter.java index bb2dcdb6..a31d6136 100644 --- a/core/src/main/java/com/mongodb/hadoop/splitter/MongoCollectionSplitter.java +++ b/core/src/main/java/com/mongodb/hadoop/splitter/MongoCollectionSplitter.java @@ -30,7 +30,7 @@ public abstract class MongoCollectionSplitter extends MongoSplitter{ private static final Log log = LogFactory.getLog( MongoCollectionSplitter.class ); - + public static final MinKey MIN_KEY_TYPE = new MinKey(); public static final MaxKey MAX_KEY_TYPE = new MaxKey(); protected Mongo mongo; @@ -54,55 +54,55 @@ public MongoCollectionSplitter(){ } public MongoCollectionSplitter(Configuration conf){ super(conf); } -// +// // public void setInputURI(MongoURI inputURI){ // this.inputURI = inputURI; // } -// +// // public void setQuery(DBObject query){ // this.query = query; // } -// +// // public BSONObject getQuery(){ // return this.query; // } -// +// // public boolean getUseRangeQuery(){ // return this.useRangeQuery; // } -// +// // public void setUseRangeQuery(boolean useRangeQuery){ // this.useRangeQuery = useRangeQuery; // } -// +// // public void setAuthURI(MongoURI authURI){ // this.authURI = authURI; // } -// +// // public boolean getNoTimeout(){ // return this.noTimeout; // } -// +// // public void setNoTimeout(boolean noTimeout){ // this.noTimeout = noTimeout; // } -// +// // public MongoURI getAuthURI(){ // return this.authURI; // } -// +// // public DBObject getFields(){ // return this.fields; // } -// +// // public void setFields(DBObject fields){ // this.fields = fields; // } -// +// // public DBObject getSort(){ // return this.sort; // } -// +// // public void setSort(DBObject sort){ // this.sort = sort; // } @@ -110,7 +110,7 @@ public MongoCollectionSplitter(Configuration conf){ protected void init(){ MongoURI inputURI = MongoConfigUtil.getInputURI(conf); this.inputCollection = MongoConfigUtil.getCollection(inputURI); - DB db = this.inputCollection.getDB(); + DB db = this.inputCollection.getDB(); this.mongo = db.getMongo(); MongoURI authURI = MongoConfigUtil.getAuthURI(conf); if( authURI != null ){ @@ -156,7 +156,7 @@ protected Map getShardsMap(){ /** * Takes an existing {@link MongoURI} and returns a new modified URI which - * replaces the original's server host + port with a supplied new + * replaces the original's server host + port with a supplied new * server host + port, but maintaining all the same original options. * This is useful for generating distinct URIs for each mongos instance * so that large batch reads can all target them separately, distributing the @@ -196,13 +196,14 @@ protected static MongoURI rewriteURI( MongoURI originalUri, String newServerUri) * clauses in the query construct to create the split, otherwise it will use * min/max index boundaries (default behavior) * - * @param lowerBound the lower bound of the collection + * @param lowerBound the lower bound of the collection * @param newServerUri the new host(s) to target, e.g. server1:port1[,server2:port2,...] * */ public MongoInputSplit createSplitFromBounds(BasicDBObject lowerBound, BasicDBObject upperBound) throws SplitFailedException { + log.info("Created split: min=" + (lowerBound != null ? lowerBound.toString() : "null") + ", max= " + (upperBound != null ? upperBound.toString() : "null")); //Objects to contain upper/lower bounds for each split DBObject splitMin = new BasicDBObject(); DBObject splitMax = new BasicDBObject(); @@ -213,11 +214,16 @@ public MongoInputSplit createSplitFromBounds(BasicDBObject lowerBound, BasicDBOb if(!val.equals(MIN_KEY_TYPE)) splitMin.put(key, val); - if(upperBound != null){ - Object maxVal = upperBound.get(key); - if(!val.equals(MAX_KEY_TYPE)) - splitMax.put(key, maxVal); - } + } + } + + if(upperBound != null){ + for( Map.Entry entry : upperBound.entrySet() ){ + String key = entry.getKey(); + Object val = entry.getValue(); + + if(!val.equals(MAX_KEY_TYPE)) + splitMax.put(key, val); } } @@ -253,7 +259,7 @@ public MongoInputSplit createSplitFromBounds(BasicDBObject lowerBound, BasicDBOb /** * Creates an instance of {@link MongoInputSplit} whose upper and lower - * bounds are restricted by adding $gt/$lte clauses to the query filter. + * bounds are restricted by adding $gt/$lte clauses to the query filter. * This requires that the boundaries are not compound keys, and that * the query does not contain any keys used in the split key. * @@ -297,20 +303,25 @@ public MongoInputSplit createRangeQuerySplit(BasicDBObject chunkLowerBound, Basi (maxKey != null && query.containsKey(maxKey.getKey())) ){ throw new IllegalArgumentException("Range query is enabled but split key conflicts with query filter:\n" + "min: " + chunkLowerBound + "\n" + - "max: " + chunkUpperBound + "\n" + + "max: " + chunkUpperBound + "\n" + "query: " + query); } + String key = null; BasicDBObject rangeObj = new BasicDBObject(); - if( minKey!=null ) + if( minKey!=null ) { + key = minKey.getKey(); rangeObj.put("$gte", minKey.getValue()); + } - if( maxKey!=null ) + if( maxKey!=null ) { + key = maxKey.getKey(); rangeObj.put("$lt", maxKey.getValue()); + } DBObject splitQuery = new BasicDBObject(); splitQuery.putAll(query); - splitQuery.put(minKey.getKey(), rangeObj); + splitQuery.put(key, rangeObj); MongoInputSplit split = new MongoInputSplit(); split.setQuery(splitQuery); return split; diff --git a/core/src/main/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitter.java b/core/src/main/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitter.java index cfb88139..d2c41854 100644 --- a/core/src/main/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitter.java +++ b/core/src/main/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitter.java @@ -100,6 +100,7 @@ public List calculateSplits() throws SplitFailedException{ for (int i = 1; i < splitData.size(); i++ ) { final BasicDBObject _tKey = (BasicDBObject)splitData.get(i); MongoInputSplit split = createSplitFromBounds(lastKey, _tKey); + returnVal.add(split); lastKey = _tKey; } diff --git a/core/src/main/java/com/mongodb/hadoop/util/MongoTool.java b/core/src/main/java/com/mongodb/hadoop/util/MongoTool.java index d9b8c969..add30e0f 100644 --- a/core/src/main/java/com/mongodb/hadoop/util/MongoTool.java +++ b/core/src/main/java/com/mongodb/hadoop/util/MongoTool.java @@ -121,5 +121,5 @@ public int run( String[] args ) throws Exception{ /** * SET ME Defines the name of the job on the cluster. Left non-final to allow tweaking with serial #s, etc */ - String _jobName = ""; + protected String _jobName = ""; }