Skip to content

Commit

Permalink
Restore support for Hadoop 1.2.X (HADOOP-246).
Browse files Browse the repository at this point in the history
  • Loading branch information
Luke Lovett committed Jun 10, 2016
1 parent 72626ab commit eabb01d
Show file tree
Hide file tree
Showing 30 changed files with 448 additions and 326 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ New releases are announced on the [releases](https://github.com/mongodb/mongo-ha
These are the minimum versions tested with the Hadoop connector. Earlier
versions may work, but haven't been tested.

- *Hadoop*: 2.4
- *Hadoop 1.X*: 1.2
- *Hadoop 2.X*: 2.4
- *Hive*: 1.1
- *Pig*: 0.11
- *Spark*: 1.4
Expand Down
58 changes: 36 additions & 22 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@ apply plugin: 'com.github.ben-manes.versions'
ext.configDir = new File(rootDir, 'config')
ext.hadoopBinaries = "${rootDir}/hadoop-binaries".toString()
ext.javaDriverVersion = '3.2.1'
ext.hiveVersion = '1.2.1'
ext.pigVersion = '0.15.0'
ext.hadoopVersion = '2.7.2'
ext.sparkVersion = '1.6.0'
ext.scalaVersion = '2.11'

ext.hadoopHome = "${hadoopBinaries}/hadoop-${hadoopVersion}".toString()
ext.hiveHome = "${hadoopBinaries}/apache-hive-${hiveVersion}-bin".toString()
ext.pigHome = "${hadoopBinaries}/pig-${pigVersion}".toString()
ext.hiveVersion = System.getenv("HIVE_VERSION") ?: '1.2.1'
ext.pigVersion = System.getenv("PIG_VERSION") ?: '0.15.0'
ext.hadoopVersion = System.getenv("HADOOP_VERSION") ?: '2.7.2'
ext.sparkVersion = System.getenv("SPARK_VERSION") ?: '1.6.0'
ext.scalaVersion = System.getenv("SCALA_VERSION") ?: '2.11'
ext.isHadoopV1 = "$hadoopVersion".startsWith("1.")

ext.hadoopHome = System.getenv("HADOOP_HOME") ?: "${hadoopBinaries}/hadoop-${hadoopVersion}".toString()
ext.hiveHome = System.getenv("HIVE_HOME") ?: "${hadoopBinaries}/apache-hive-${hiveVersion}-bin".toString()
ext.pigHome = System.getenv("PIG_HOME") ?: "${hadoopBinaries}/pig-${pigVersion}".toString()
ext.dataHome = "${hadoopBinaries}/examples/data".toString()
ext.docsHome = "${rootDir}/docs"
ext.mongoimport = new File('/mnt/jenkins/mongodb/32/32-release/bin/mongoimport').exists() ?
Expand Down Expand Up @@ -106,11 +107,15 @@ configure(subprojects) {
testCompile 'com.jayway.awaitility:awaitility:1.6.0'
testCompile 'commons-daemon:commons-daemon:1.0.15'

testCompile "org.apache.hadoop:hadoop-hdfs:${hadoopVersion}"
testCompile "org.apache.hadoop:hadoop-hdfs:${hadoopVersion}:tests"
testCompile "org.apache.hadoop:hadoop-common:${hadoopVersion}:tests"
testCompile "org.apache.hadoop:hadoop-yarn-server-tests:${hadoopVersion}:tests"
testCompile "org.apache.hadoop:hadoop-mapreduce-client-jobclient:${hadoopVersion}:tests"
if (isHadoopV1) {
testCompile "org.apache.hadoop:hadoop-test:${hadoopVersion}"
} else {
testCompile "org.apache.hadoop:hadoop-hdfs:${hadoopVersion}"
testCompile "org.apache.hadoop:hadoop-hdfs:${hadoopVersion}:tests"
testCompile "org.apache.hadoop:hadoop-common:${hadoopVersion}:tests"
testCompile "org.apache.hadoop:hadoop-yarn-server-tests:${hadoopVersion}:tests"
testCompile "org.apache.hadoop:hadoop-mapreduce-client-jobclient:${hadoopVersion}:tests"
}
}

/* Compiling */
Expand Down Expand Up @@ -294,13 +299,18 @@ project(":core") {
archivesBaseName = "mongo-hadoop-core"

dependencies {
compile "org.apache.hadoop:hadoop-common:${hadoopVersion}"
compile "org.apache.hadoop:hadoop-mapreduce-client-core:${hadoopVersion}"
compile "org.apache.hadoop:hadoop-mapreduce-client-common:${hadoopVersion}"
compile "org.apache.hadoop:hadoop-mapreduce-client-shuffle:${hadoopVersion}"
compile "org.apache.hadoop:hadoop-mapreduce-client-app:${hadoopVersion}"
compile "org.apache.hadoop:hadoop-mapreduce-client-jobclient:${hadoopVersion}"

if (isHadoopV1) {
compile "org.apache.hadoop:hadoop-core:${hadoopVersion}"
compile "org.apache.hadoop:hadoop-client:${hadoopVersion}"
compile "org.apache.hadoop:hadoop-tools:${hadoopVersion}"
} else {
compile "org.apache.hadoop:hadoop-common:${hadoopVersion}"
compile "org.apache.hadoop:hadoop-mapreduce-client-core:${hadoopVersion}"
compile "org.apache.hadoop:hadoop-mapreduce-client-common:${hadoopVersion}"
compile "org.apache.hadoop:hadoop-mapreduce-client-shuffle:${hadoopVersion}"
compile "org.apache.hadoop:hadoop-mapreduce-client-app:${hadoopVersion}"
compile "org.apache.hadoop:hadoop-mapreduce-client-jobclient:${hadoopVersion}"
}
testCompile "org.mockito:mockito-core:1.10.19"
}
}
Expand Down Expand Up @@ -370,7 +380,11 @@ project(":pig") {

dependencies {
compile project(":core")
compile "org.apache.pig:pig:${pigVersion}:h2"
if (isHadoopV1) {
compile "org.apache.pig:pig:${pigVersion}"
} else {
compile "org.apache.pig:pig:${pigVersion}:h2"
}
compile 'joda-time:joda-time:2.7'

testCompile files(project(':core').sourceSets.main.output)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public OutputCommitter getOutputCommitter(final TaskAttemptContext context) {
*/
public RecordWriter<K, V> getRecordWriter(final TaskAttemptContext context) {
return new MongoRecordWriter<K, V>(
MongoConfigUtil.getOutputCollections(context.getConfiguration()),
MongoConfigUtil.getOutputCollection(context.getConfiguration()),
context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ public BSONFileRecordReader(final long startingPosition) {
this.startingPosition = startingPosition;
}

@Override
public void initialize(final InputSplit inputSplit, final TaskAttemptContext context) throws IOException, InterruptedException {
public void init(final InputSplit inputSplit, final Configuration configuration)
throws IOException, InterruptedException {
this.configuration = configuration;
fileSplit = (FileSplit) inputSplit;
configuration = context.getConfiguration();
if (LOG.isDebugEnabled()) {
LOG.debug("reading split " + fileSplit);
}
Expand Down Expand Up @@ -124,6 +124,11 @@ public void initialize(final InputSplit inputSplit, final TaskAttemptContext con

}

@Override
public void initialize(final InputSplit inputSplit, final TaskAttemptContext context) throws IOException, InterruptedException {
init(inputSplit, context.getConfiguration());
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public BSONFileSplit(final Path file, final long start, final long length,
super(file, start, length, hosts);
}

public BSONFileSplit() { super(); }
public BSONFileSplit() { this(null, 0, 0, null); }

public String getKeyField() {
return keyField;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,12 @@

package com.mongodb.hadoop.mapred;

import com.mongodb.hadoop.mapred.output.MongoOutputCommitter;
import com.mongodb.hadoop.mapred.output.MongoRecordWriter;
import com.mongodb.hadoop.util.MongoConfigUtil;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.util.Progressable;

import java.io.IOException;
Expand All @@ -41,16 +38,6 @@ public void checkOutputSpecs(final FileSystem ignored, final JobConf job) throws
}
}

/**
* @deprecated This method is unused.
* @param context the current task's context.
* @return an instance of {@link MongoOutputCommitter}
*/
@Deprecated
public OutputCommitter getOutputCommitter(final TaskAttemptContext context) {
return new MongoOutputCommitter();
}

@Override
public RecordWriter<K, V> getRecordWriter(
final FileSystem ignored, final JobConf job, final String name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;

import java.io.IOException;

Expand Down Expand Up @@ -57,11 +55,10 @@ public void initialize(final InputSplit inputSplit, final Configuration conf)
throws IOException {
FileSplit fileSplit = (FileSplit) inputSplit;
try {
delegate.initialize(
delegate.init(
new BSONFileSplit(
fileSplit.getPath(), fileSplit.getStart(),
fileSplit.getLength(), fileSplit.getLocations()),
new TaskAttemptContextImpl(conf, new TaskAttemptID()));
fileSplit.getLength(), fileSplit.getLocations()), conf);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public BSONFileSplit(final Path file, final long start, final long
super(file, start, length, hosts);
}

public BSONFileSplit() { super(); }
public BSONFileSplit() { this(null, 0, 0, null); }

public String getKeyField() { return keyField; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@

package com.mongodb.hadoop.mapred.output;

import com.mongodb.DBCollection;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.mapred.TaskAttemptContext;

import java.io.IOException;
import java.util.List;

public class MongoOutputCommitter extends OutputCommitter {
private final com.mongodb.hadoop.output.MongoOutputCommitter delegate;
Expand All @@ -32,15 +30,6 @@ public MongoOutputCommitter() {
delegate = new com.mongodb.hadoop.output.MongoOutputCommitter();
}

/**
* @deprecated Use the zero-args constructor instead.
* @param collections the MongoDB output collections.
*/
@Deprecated
public MongoOutputCommitter(final List<DBCollection> collections) {
this();
}

@Override
public void abortTask(final TaskAttemptContext taskContext)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,51 +17,29 @@

package com.mongodb.hadoop.mapred.output;

import com.mongodb.DBCollection;
import com.mongodb.hadoop.util.CompatUtils;
import com.mongodb.hadoop.util.MongoConfigUtil;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TaskAttemptContextImpl;
import org.apache.hadoop.mapred.TaskAttemptID;

import java.util.Collections;
import java.util.List;

public class MongoRecordWriter<K, V>
extends com.mongodb.hadoop.output.MongoRecordWriter<K, V>
implements RecordWriter<K, V> {
private final JobConf configuration;

/**
* Create a new MongoRecordWriter.
* @param conf the job configuration
*/
public MongoRecordWriter(final JobConf conf) {
super(
Collections.<DBCollection>emptyList(),
new TaskAttemptContextImpl(
conf, TaskAttemptID.forName(conf.get("mapred.task.id"))));
configuration = conf;
}

/**
* @deprecated MongoRecordWriter doesn't use DBCollections directly.
* Please use {@link #MongoRecordWriter(JobConf)} instead.
* @param c the DBCollection
* @param conf the job configuration
*/
@Deprecated
public MongoRecordWriter(final List<DBCollection> c, final JobConf conf) {
this(conf);
MongoConfigUtil.getOutputCollection(conf),
CompatUtils.getTaskAttemptContext(conf, conf.get("mapred.task.id")));
}

@Override
public void close(final Reporter reporter) {
super.close(getContext());
}

public JobConf getConf() {
return configuration;
super.close(null);
}

}
Loading

0 comments on commit eabb01d

Please sign in to comment.