Skip to content

Commit

Permalink
configure the treasury job in the tool class using MongoConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
Justin Lee committed Mar 20, 2014
1 parent 38ae668 commit b197899
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 127 deletions.
220 changes: 97 additions & 123 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'download-task'

ext.configDir = new File(rootDir, 'config')
Expand All @@ -7,7 +8,7 @@ ext.versionMap = [
'0.23': '0.23.10',
'1.0' : '1.0.4',
'1.1' : '1.1.2',
'cdh4': "2.0.0-cdh4.6.0",
'cdh4': '2.0.0-cdh4.6.0',
'2.2' : '2.2.0',
'2.3' : '2.3.0'
].withDefault {
Expand Down Expand Up @@ -65,7 +66,7 @@ configure(subprojects) {
options.encoding = 'ISO-8859-1'
options.fork = true
options.debug = true
options.compilerArgs = [/*'-Xlint:all', */'-Xlint:-options']
options.compilerArgs = [/*'-Xlint:all', */ '-Xlint:-options']
}

project.ext.buildingWith = { n ->
Expand Down Expand Up @@ -119,7 +120,7 @@ configure(subprojects) {
options.links 'http://hadoop.apache.org/docs/r2.3.0/api'
options.links 'http://api.mongodb.org/java/2.11.4/'
}

task listDependencies << {
configurations.compile.each { File file -> println file }
}
Expand Down Expand Up @@ -225,8 +226,8 @@ project(":streaming") {
from zipTree(it)
}
}
uploadArchives.onlyIf { hadoop_version != "1.0" }

uploadArchives.onlyIf { hadoop_version != "1.0" }
}

project(":flume") {
Expand Down Expand Up @@ -265,147 +266,120 @@ project(":examples/sensors") {
}
}

task installHadoop() {
doLast {
new File('../hadoop-binaries/').mkdirs()

if (!new File("${hadoopHome}").exists()) {
def url
switch (hadoop_version) {
case ("cdh4"):
url = "http://archive.cloudera.com/cdh4/cdh/4/hadoop-${hadoopVersion}.tar.gz"
break
default:
url = "http://archive.apache.org/dist/hadoop/common/hadoop-${hadoopVersion}/hadoop-${hadoopVersion}.tar.gz"
break
}
download {
src url
dest "../hadoop-binaries/"
onlyIfNewer true
}

println "Extracting hadoop ${hadoopVersion} download"
copy {
from(tarTree("../hadoop-binaries/hadoop-${hadoopVersion}.tar.gz"))
into '../hadoop-binaries'
}
}
}
}
task installHadoop() << {
new File('../hadoop-binaries/').mkdirs()

task historicalYield(dependsOn: 'installHadoop') {
doLast {
exec() {
commandLine "mongoimport", "-d", "mongo_hadoop", "-c", "yield_historical.in", "--drop",
"examples/treasury_yield/src/main/resources/yield_historical_in.json"
if (!new File("${hadoopHome}").exists()) {
def url
switch (hadoop_version) {
case ("cdh4"):
url = "http://archive.cloudera.com/cdh4/cdh/4/hadoop-${hadoopVersion}.tar.gz"
break
default:
url = "http://archive.apache.org/dist/hadoop/common/hadoop-${hadoopVersion}/hadoop-${hadoopVersion}.tar.gz"
break
}
download {
src url
dest "../hadoop-binaries/"
onlyIfNewer true
}

hadoop("examples/treasury_yield/build/libs/treasury_yield-${project(':core').version}-hadoop_${hadoop_version}.jar",
"com.mongodb.hadoop.examples.treasury.TreasuryYieldXMLConfig",

["mongo.job.input.format=com.mongodb.hadoop.MongoInputFormat",
"mongo.input.uri=mongodb://localhost:27017/mongo_hadoop.yield_historical.in",
// "mongo.job.input.format=com.mongodb.hadoop.BSONFileInputFormat",
// "mapred.input.dir=file:///Users/mike/dump/mongo_hadoop/yield_historical.in.bson",

"mongo.job.mapper=com.mongodb.hadoop.examples.treasury.TreasuryYieldMapper",
"mongo.job.reducer=com.mongodb.hadoop.examples.treasury.TreasuryYieldReducer",

"mongo.job.mapper.output.key=org.apache.hadoop.io.IntWritable",
"mongo.job.mapper.output.value=org.apache.hadoop.io.DoubleWritable",
"mongo.job.output.key=org.apache.hadoop.io.IntWritable",
"mongo.job.output.value=com.mongodb.hadoop.io.BSONWritable",
println "Extracting hadoop ${hadoopVersion} download"
copy {
from(tarTree("../hadoop-binaries/hadoop-${hadoopVersion}.tar.gz"))
into '../hadoop-binaries'
}
}
}

"mongo.output.uri=mongodb://localhost:27017/mongo_hadoop.yield_historical.out",
"mongo.job.output.format=com.mongodb.hadoop.MongoOutputFormat",
// "mapred.output.dir=file:///tmp/yield_historical_out.bson",
// "mongo.job.output.format=com.mongodb.hadoop.BSONFileOutputFormat"
])
task historicalYield(dependsOn: 'installHadoop') << {
exec() {
commandLine "mongoimport", "-d", "mongo_hadoop", "-c", "yield_historical.in", "--drop",
"examples/treasury_yield/src/main/resources/yield_historical_in.json"
}

hadoop("examples/treasury_yield/build/libs/treasury_yield-${project(':core').version}-hadoop_${hadoop_version}.jar",
"com.mongodb.hadoop.examples.treasury.TreasuryYieldXMLConfig", [])
}

task sensorData(dependsOn: 'installHadoop') {
doLast {
hadoop("examples/sensors/build/libs/sensors-${project(':core').version}-hadoop_${hadoop_version}.jar",
"com.mongodb.hadoop.examples.sensors.Sensors",
task sensorData(dependsOn: [installHadoop]) << {
hadoop("examples/sensors/build/libs/sensors-${project(':core').version}-hadoop_${hadoop_version}.jar",
"com.mongodb.hadoop.examples.sensors.Sensors",

["mongo.job.input.format=com.mongodb.hadoop.MongoInputFormat",
"mongo.input.uri=mongodb://localhost:27017/mongo_hadoop.devices",
["mongo.job.input.format=com.mongodb.hadoop.MongoInputFormat",
"mongo.input.uri=mongodb://localhost:27017/mongo_hadoop.devices",

"mongo.job.mapper=com.mongodb.hadoop.examples.sensors.DeviceMapper",
"mongo.job.reducer=com.mongodb.hadoop.examples.sensors.DeviceReducer",
"mongo.job.mapper=com.mongodb.hadoop.examples.sensors.DeviceMapper",
"mongo.job.reducer=com.mongodb.hadoop.examples.sensors.DeviceReducer",

"mongo.job.mapper.output.key=org.apache.hadoop.io.Text",
"mongo.job.mapper.output.value=org.apache.hadoop.io.Text",
"mongo.job.output.key=org.apache.hadoop.io.IntWritable",
"mongo.job.output.value=com.mongodb.hadoop.io.BSONWritable",
"mongo.job.mapper.output.key=org.apache.hadoop.io.Text",
"mongo.job.mapper.output.value=org.apache.hadoop.io.Text",
"mongo.job.output.key=org.apache.hadoop.io.IntWritable",
"mongo.job.output.value=com.mongodb.hadoop.io.BSONWritable",

"mongo.output.uri=mongodb://localhost:27017/mongo_hadoop.logs_aggregate",
"mongo.job.output.format=com.mongodb.hadoop.MongoOutputFormat"
])
"mongo.output.uri=mongodb://localhost:27017/mongo_hadoop.logs_aggregate",
"mongo.job.output.format=com.mongodb.hadoop.MongoOutputFormat"
])

hadoop("examples/sensors/build/libs/sensors-${project(':core').version}-hadoop_${hadoop_version}.jar",
"com.mongodb.hadoop.examples.sensors.Sensors",
hadoop("examples/sensors/build/libs/sensors-${project(':core').version}-hadoop_${hadoop_version}.jar",
"com.mongodb.hadoop.examples.sensors.Sensors",

["mongo.input.uri=mongodb://localhost:27017/mongo_hadoop.logs",
"mongo.job.input.format=com.mongodb.hadoop.MongoInputFormat",
"mongo.job.mapper=com.mongodb.hadoop.examples.sensors.LogMapper",
"mongo.job.reducer=com.mongodb.hadoop.examples.sensors.LogReducer",
"mapreduce.combiner.class=com.mongodb.hadoop.examples.sensors.LogCombiner",
"mongo.job.combiner=com.mongodb.hadoop.examples.sensors.LogCombiner",
"io.sort.mb=100",
["mongo.input.uri=mongodb://localhost:27017/mongo_hadoop.logs",
"mongo.job.input.format=com.mongodb.hadoop.MongoInputFormat",
"mongo.job.mapper=com.mongodb.hadoop.examples.sensors.LogMapper",
"mongo.job.reducer=com.mongodb.hadoop.examples.sensors.LogReducer",
"mapreduce.combiner.class=com.mongodb.hadoop.examples.sensors.LogCombiner",
"mongo.job.combiner=com.mongodb.hadoop.examples.sensors.LogCombiner",
"io.sort.mb=100",

"mongo.job.output.key=org.apache.hadoop.io.Text",
"mongo.job.output.value=org.apache.hadoop.io.IntWritable",
"mongo.job.output.key=org.apache.hadoop.io.Text",
"mongo.job.output.value=org.apache.hadoop.io.IntWritable",

"mongo.output.uri=mongodb://localhost:27017/mongo_hadoop.logs_aggregate",
"mongo.job.output.format=com.mongodb.hadoop.MongoOutputFormat"
])
}
"mongo.output.uri=mongodb://localhost:27017/mongo_hadoop.logs_aggregate",
"mongo.job.output.format=com.mongodb.hadoop.MongoOutputFormat"
])
}

task enronEmails(dependsOn: 'installHadoop') {
doLast {
download {
src 'https://s3.amazonaws.com/mongodb-enron-email/enron_mongo.tar.bz2'
dest 'examples/data/'
onlyIfNewer true
}
if (!new File('examples/data/dump').exists()) {
println "extracting email data"
copy {
from(tarTree('examples/data/enron_mongo.tar.bz2'))
into 'examples/data'
}
task enronEmails(dependsOn: 'installHadoop') << {
download {
src 'https://s3.amazonaws.com/mongodb-enron-email/enron_mongo.tar.bz2'
dest 'examples/data/'
onlyIfNewer true
}
if (!new File('examples/data/dump').exists()) {
println "extracting email data"
copy {
from(tarTree('examples/data/enron_mongo.tar.bz2'))
into 'examples/data'
}
}

exec() {
commandLine "mongorestore", "-v", "-d", "mongo_hadoop", "--drop", "examples/data/dump/enron_mail"
}
exec() {
commandLine "mongorestore", "-v", "-d", "mongo_hadoop", "--drop", "examples/data/dump/enron_mail"
}

hadoop("examples/enron/build/libs/enron-${project(':core').version}-hadoop_${hadoop_version}.jar",
"com.mongodb.hadoop.examples.enron.EnronMail",
hadoop("examples/enron/build/libs/enron-${project(':core').version}-hadoop_${hadoop_version}.jar",
"com.mongodb.hadoop.examples.enron.EnronMail",

["mongo.job.input.format=com.mongodb.hadoop.MongoInputFormat",
"mongo.input.uri=mongodb://localhost:27017/mongo_hadoop.messages",
["mongo.job.input.format=com.mongodb.hadoop.MongoInputFormat",
"mongo.input.uri=mongodb://localhost:27017/mongo_hadoop.messages",

"mongo.input.split_size=64",
"mongo.input.split_size=64",

"mongo.job.mapper=com.mongodb.hadoop.examples.enron.EnronMailMapper",
"mongo.job.reducer=com.mongodb.hadoop.examples.enron.EnronMailReducer",
//"mongo.job.combiner=com.mongodb.hadoop.examples.enron.EnronMailReducer",
"mongo.job.mapper=com.mongodb.hadoop.examples.enron.EnronMailMapper",
"mongo.job.reducer=com.mongodb.hadoop.examples.enron.EnronMailReducer",
//"mongo.job.combiner=com.mongodb.hadoop.examples.enron.EnronMailReducer",

"mongo.job.output.key=com.mongodb.hadoop.examples.enron.MailPair",
"mongo.job.output.value=org.apache.hadoop.io.IntWritable",
"mongo.job.output.key=com.mongodb.hadoop.examples.enron.MailPair",
"mongo.job.output.value=org.apache.hadoop.io.IntWritable",

"mongo.job.mapper.output.key=com.mongodb.hadoop.examples.enron.MailPair",
"mongo.job.mapper.output.value=org.apache.hadoop.io.IntWritable",
"mongo.job.mapper.output.key=com.mongodb.hadoop.examples.enron.MailPair",
"mongo.job.mapper.output.value=org.apache.hadoop.io.IntWritable",

"mongo.output.uri=mongodb://localhost:27017/mongo_hadoop.message_pairs",
"mongo.job.output.format=com.mongodb.hadoop.MongoOutputFormat"
])
}
"mongo.output.uri=mongodb://localhost:27017/mongo_hadoop.message_pairs",
"mongo.job.output.format=com.mongodb.hadoop.MongoOutputFormat"
])
}

def hadoop(jar, className, args) {
Expand All @@ -415,7 +389,7 @@ def hadoop(jar, className, args) {
} else {
hadoopLib = "../hadoop-binaries/hadoop-${hadoopVersion}/share/hadoop/common"
}

copy {
from "core/build/libs/mongo-hadoop-core-${project(':core').version}-hadoop_${hadoop_version}.jar"
into hadoopLib
Expand All @@ -432,7 +406,6 @@ def hadoop(jar, className, args) {
//Split settings
"-Dmongo.input.split_size=8",
"-Dmongo.job.verbose=true",
// "-Dmapreduce.framework.name=yarn",
]
args.each {
line << "-D${it}"
Expand All @@ -444,7 +417,7 @@ def hadoop(jar, className, args) {
}
exec() {
environment << hadoopEnv

commandLine line
}
}
Expand All @@ -458,3 +431,4 @@ def mapReduceDeps(it, version) {
}

apply from: 'gradle/maven-deployment.gradle'
//apply from: 'testing/treasury-tests.gradle'
6 changes: 4 additions & 2 deletions core/src/main/java/com/mongodb/hadoop/util/MongoTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ public int run(final String[] args) throws Exception {

LOG.info(String.format("Created a conf: '%s' on {%s} as job named '%s'", conf, this.getClass(), getJobName()));

for (final Entry<String, String> entry : conf) {
LOG.trace(String.format("%s=%s\n", entry.getKey(), entry.getValue()));
if (LOG.isTraceEnabled()) {
for (final Entry<String, String> entry : conf) {
LOG.trace(String.format("%s=%s\n", entry.getKey(), entry.getValue()));
}
}

final Job job = new Job(conf, getJobName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ public class Sensors extends MongoTool {

private static final Log LOG = LogFactory.getLog(Sensors.class);

private static final int NUM_DEVICES = 1000;
private static final int NUM_LOGS = NUM_DEVICES * 1000;
private static final int NUM_DEVICES = 100;
private static final int NUM_LOGS = NUM_DEVICES * 100;
private static final List<String> TYPES = Arrays.asList("temp", "humidity", "pressure", "sound", "light");

double getRandomInRange(final int from, final int to, final int fixed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@
*/
package com.mongodb.hadoop.examples.treasury;

import com.mongodb.hadoop.MongoConfig;
import com.mongodb.hadoop.MongoOutputFormat;
import com.mongodb.hadoop.io.BSONWritable;
import com.mongodb.hadoop.util.MongoTool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.util.ToolRunner;

/**
Expand All @@ -29,6 +34,25 @@ public class TreasuryYieldXMLConfig extends MongoTool {
Configuration.addDefaultResource("src/examples/mongo-defaults.xml");
}

public TreasuryYieldXMLConfig() {
Configuration conf = new Configuration();
MongoConfig config = new MongoConfig(conf);
config.setInputFormat(com.mongodb.hadoop.MongoInputFormat.class);
config.setInputURI("mongodb://localhost:27017/mongo_hadoop.yield_historical.in");

config.setMapper(TreasuryYieldMapper.class);
config.setMapperOutputKey(IntWritable.class);
config.setMapperOutputValue(DoubleWritable.class);

config.setReducer(TreasuryYieldReducer.class);
config.setOutputKey(IntWritable.class);
config.setOutputValue(BSONWritable.class);
config.setOutputURI("mongodb://localhost:27017/mongo_hadoop.yield_historical.out");
config.setOutputFormat(MongoOutputFormat.class);

setConf(conf);
}

public static void main(final String[] pArgs) throws Exception {
System.exit(ToolRunner.run(new TreasuryYieldXMLConfig(), pArgs));
}
Expand Down

0 comments on commit b197899

Please sign in to comment.