Skip to content

Commit

Permalink
Simplify HBaseTemperatureImporter to use TableOutputFormat.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwhite committed Nov 5, 2014
1 parent 23fd7bf commit 57cfcf0
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 28 deletions.
4 changes: 4 additions & 0 deletions ch20-hbase/src/main/java/HBaseTemperatureBulkImporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
* Uses HBase's bulk load facility ({@link HFileOutputFormat2} and {@link
* LoadIncrementalHFiles}) to efficiently load temperature data into a HBase table.
*/
public class HBaseTemperatureBulkImporter extends Configured implements Tool {

static class HBaseTemperatureMapper extends Mapper<LongWritable, Text,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,38 @@
import java.io.IOException;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class HBaseTemperatureImporterWithOutputFormat extends Configured implements Tool {
/**
* Uses {@link HTable} to load temperature data into a HBase table directly. Less
* efficient than {@link HBaseTemperatureImporter} (since auto-flush is enabled) and
* {@link HBaseTemperatureBulkImporter} in particular.
*/
public class HBaseTemperatureDirectImporter extends Configured implements Tool {

static class HBaseTemperatureMapper<K> extends Mapper<LongWritable, Text, K, Put> {
static class HBaseTemperatureMapper<K, V> extends Mapper<LongWritable, Text, K, V> {
private NcdcRecordParser parser = new NcdcRecordParser();
private HTable table;

@Override
protected void setup(Context context) throws IOException {
// Create the HBase table client once up-front and keep it around
// rather than create on each map invocation.
this.table = new HTable(HBaseConfiguration.create(context.getConfiguration()),
"observations");
}

@Override
public void map(LongWritable key, Text value, Context context) throws
Expand All @@ -29,30 +45,34 @@ public void map(LongWritable key, Text value, Context context) throws
p.add(HBaseTemperatureQuery.DATA_COLUMNFAMILY,
HBaseTemperatureQuery.AIRTEMP_QUALIFIER,
Bytes.toBytes(parser.getAirTemperature()));
context.write(null, p);
table.put(p);
}
}

@Override
protected void cleanup(Context context) throws IOException {
table.close();
}
}

@Override
public int run(String[] args) throws Exception {
if (args.length != 1) {
System.err.println("Usage: HBaseTemperatureImporterWithOutputFormat <input>");
System.err.println("Usage: HBaseTemperatureDirectImporter <input>");
return -1;
}
Job job = new Job(getConf(), getClass().getSimpleName());
job.setJarByClass(getClass());
FileInputFormat.addInputPath(job, new Path(args[0]));
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "observations");
job.setMapperClass(HBaseTemperatureMapper.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(TableOutputFormat.class);
job.setOutputFormatClass(NullOutputFormat.class);
return job.waitForCompletion(true) ? 0 : 1;
}

public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(HBaseConfiguration.create(),
new HBaseTemperatureImporterWithOutputFormat(), args);
new HBaseTemperatureDirectImporter(), args);
System.exit(exitCode);
}
}
28 changes: 8 additions & 20 deletions ch20-hbase/src/main/java/HBaseTemperatureImporter.java
Original file line number Diff line number Diff line change
@@ -1,33 +1,25 @@
import java.io.IOException;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
* Uses HBase's {@link TableOutputFormat} to load temperature data into a HBase table.
*/
public class HBaseTemperatureImporter extends Configured implements Tool {

static class HBaseTemperatureMapper<K, V> extends Mapper<LongWritable, Text, K, V> {
static class HBaseTemperatureMapper<K> extends Mapper<LongWritable, Text, K, Put> {
private NcdcRecordParser parser = new NcdcRecordParser();
private HTable table;

@Override
protected void setup(Context context) throws IOException {
// Create the HBase table client once up-front and keep it around
// rather than create on each map invocation.
this.table = new HTable(HBaseConfiguration.create(context.getConfiguration()),
"observations");
}

@Override
public void map(LongWritable key, Text value, Context context) throws
Expand All @@ -40,14 +32,9 @@ public void map(LongWritable key, Text value, Context context) throws
p.add(HBaseTemperatureQuery.DATA_COLUMNFAMILY,
HBaseTemperatureQuery.AIRTEMP_QUALIFIER,
Bytes.toBytes(parser.getAirTemperature()));
table.put(p);
context.write(null, p);
}
}

@Override
protected void cleanup(Context context) throws IOException {
table.close();
}
}

@Override
Expand All @@ -59,9 +46,10 @@ public int run(String[] args) throws Exception {
Job job = new Job(getConf(), getClass().getSimpleName());
job.setJarByClass(getClass());
FileInputFormat.addInputPath(job, new Path(args[0]));
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "observations");
job.setMapperClass(HBaseTemperatureMapper.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(NullOutputFormat.class);
job.setOutputFormatClass(TableOutputFormat.class);
return job.waitForCompletion(true) ? 0 : 1;
}

Expand Down

0 comments on commit 57cfcf0

Please sign in to comment.