Index: src/docbkx/book.xml
===================================================================
--- src/docbkx/book.xml (revision 1163770)
+++ src/docbkx/book.xml (working copy)
@@ -81,38 +81,199 @@
See HBase and MapReduce up in javadocs.
Start there. Below is some additional help.
- The default HBase MapReduce Splitter
- When TableInputFormat,
- is used to source an HBase table in a MapReduce job,
- its splitter will make a map task for each region of the table.
- Thus, if there are 100 regions in the table, there will be
- 100 map-tasks for the job - regardless of how many column families are selected in the Scan.
+ Map-Task Spitting
+
+ The Default HBase MapReduce Splitter
+ When TableInputFormat
+ is used to source an HBase table in a MapReduce job,
+ its splitter will make a map task for each region of the table.
+ Thus, if there are 100 regions in the table, there will be
+ 100 map-tasks for the job - regardless of how many column families are selected in the Scan.
+
+
+ Custom Splitters
+ For those interested in implementing custom splitters, see the method getSplits in
+ TableInputFormatBase.
+ That is where the logic for map-task assignment resides.
+
+
- HBase Input MapReduce Example
- To use HBase as a MapReduce source,
- the job would be configured via TableMapReduceUtil in the following manner...
- Job job = ...;
+ HBase MapReduce Examples
+
+ HBase MapReduce Read Example
+ The following is an example of using HBase as a MapReduce source in read-only manner. Specifically,
+ there is a Mapper instance but no Reducer, and nothing is being emitted from the Mapper. There job would be defined
+ as follows...
+
+Configuration config = HBaseConfiguration.create();
+Job job = new Job(config, "ExampleRead");
+job.setJarByClass(MyReadJob.class); // class that contains mapper
+
Scan scan = new Scan();
-scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
-scan.setCacheBlocks(false);
-// Now set other scan attrs
+scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
+scan.setCacheBlocks(false); // don't set to true for MR jobs
+// set other scan attrs
...
TableMapReduceUtil.initTableMapperJob(
- tableName, // input HBase table name
- scan, // Scan instance to control CF and attribute selection
- MyMapper.class, // mapper
- Text.class, // reducer key
- LongWritable.class, // reducer value
- job // job instance
- );
+ tableName, // input HBase table name
+ scan, // Scan instance to control CF and attribute selection
+ MyMapper.class, // mapper
+ null, // mapper output key
+ null, // mapper output value
+ job);
+job.setOutputFormatClass(NullOutputFormat.class); // because we aren't emitting anything from mapper
+
+boolean b = job.waitForCompletion(true);
+if ( b == false) {
+ throw new IOException("error with job!");
+}
+
...and the mapper instance would extend TableMapper...
-public class MyMapper extends TableMapper<Text, LongWritable> {
+public static class MyMapper extends TableMapper<Text, Text> {
+
public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException {
- // process data for the row from the Result instance.
-
+ // process data for the row from the Result instance.
+ }
+}
+
+
+
+
+ HBase MapReduce Read/Write Example
+ The following is an example of using HBase both as a source and as a sink with MapReduce.
+ This example will simply copy data from one table to another.
+
+Configuration config = HBaseConfiguration.create();
+Job job = new Job(config,"ExampleReadWrite");
+job.setJarByClass(MyReadWriteJob.class); // class that contains mapper
+
+Scan scan = new Scan();
+scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
+scan.setCacheBlocks(false); // don't set to true for MR jobs
+// set other scan attrs
+
+TableMapReduceUtil.initTableMapperJob(
+ sourceTable, // input table
+ scan, // Scan instance to control CF and attribute selection
+ MyMapper.class, // mapper class
+ null, // mapper output key
+ null, // mapper output value
+ job);
+TableMapReduceUtil.initTableReducerJob(
+ targetTable, // output table
+ null, // reducer class
+ job);
+job.setNumReduceTasks(0);
+
+boolean b = job.waitForCompletion(true);
+if ( b == false) {
+ throw new IOException("error with job!");
+}
+
+ An explanation is required of what TableMapReduceUtil is doing, especially with the reducer.
+ TableOutputFormat is being used
+ as the outputFormat class, and several parameters are being set on the config (e.g., TableOutputFormat.OUTPUT_TABLE), as
+ well as setting the reducer output keys to ImmutableBytesWritable and Writable.
+ These could be set by the programmer on the job and conf, but TableMapReduceUtil tries to make things easier.
+ The following is the example mapper, which will create a Put and matching the input Result
+ and emit it. Note: this is what the CopyTable utility does.
+
+
+public static class MyMapper extends TableMapper<ImmutableBytesWritable, Put> {
+
+ public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
+ // this example is just copying the data from the source table...
+ context.write(row, resultToPut(row,value));
+ }
+
+ private static Put resultToPut(ImmutableBytesWritable key, Result result)
+ throws IOException {
+ Put put = new Put(key.get());
+ for (KeyValue kv : result.raw()) {
+ put.add(kv);
+ }
+ return put;
+ }
+}
+
+ There isn't actually a reducer step, so TableOutputFormat takes care of sending the Put
+ to the target table.
+
+ This is just an example, developers could choose not to use TableOutputFormat and connect to the
+ target table themselves.
+
+
+
+
+ HBase MapReduce Summary Example
+ The following example uses HBase as a MapReduce source and sink with a summarization step. This example will
+ count the number of distinct instances of a value in a table and write those summarized counts in another table.
+
+Configuration config = HBaseConfiguration.create();
+Job job = new Job(config,"ExampleSummary");
+job.setJarByClass(MySummaryJob.class); // class that contains mapper and reducer
+
+Scan scan = new Scan();
+scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
+scan.setCacheBlocks(false); // don't set to true for MR jobs
+// set other scan attrs
+
+TableMapReduceUtil.initTableMapperJob(
+ sourceTable, // input table
+ scan, // Scan instance to control CF and attribute selection
+ MyMapper.class, // mapper class
+ Text.class, // mapper output key
+ IntWritable.class, // mapper output value
+ job);
+TableMapReduceUtil.initTableReducerJob(
+ targetTable, // output table
+ MyReducer.class, // reducer class
+ job);
+job.setNumReduceTasks(1); // at least one, adjust as required
+
+boolean b = job.waitForCompletion(true);
+if ( b == false) {
+ throw new IOException("error with job!");
+}
+
+ In this example mapper a column with a String-value is chosen as the value to summarize upon.
+ This value is used as the key to emit from the mapper, and an IntWritable represents an instance counter.
+
+public static class MyMapper extends TableMapper<Text, IntWritable> {
+
+ private final IntWritable ONE = new IntWritable(1);
+ private Text text = new Text();
+
+ public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
+ String val = new String(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("attr1")));
+ text.set(val); // we can only emit Writables...
+
+ context.write(text, ONE);
+ }
+}
+
+ In the reducer, the "ones" are counted (just like any other MR example that does this), and then emits a Put.
+
+public static class MyReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
+
+ public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
+ int i = 0;
+ for (IntWritable val : values) {
+ i += val.get();
+ }
+ Put put = new Put(Bytes.toBytes(key.toString()));
+ put.add(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(i));
+
+ context.write(null, put);
+ }
+}
+
+
+
+ Accessing Other HBase Tables in a MapReduce Job
@@ -123,10 +284,16 @@
public class MyMapper extends TableMapper<Text, LongWritable> {
private HTable myOtherTable;
- @Override
public void setup(Context context) {
myOtherTable = new HTable("myOtherTable");
- }
+ }
+
+ public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
+ // process Result...
+ // use 'myOtherTable' for lookups
+ }
+
+
@@ -381,7 +548,7 @@
A secondary index could be created in an other table which is periodically updated via a MapReduce job. The job could be executed intra-day, but depending on
load-strategy it could still potentially be out of sync with the main data table.
- See for more information.
+ See for more information.
@@ -396,7 +563,7 @@
Where time-ranges are very wide (e.g., year-long report) and where the data is voluminous, summary tables are a common approach.
These would be generated with MapReduce jobs into another table.
- See for more information.
+ See for more information.