Index: src/java/org/apache/hadoop/hbase/mapred/HRegionPartitioner.java =================================================================== --- src/java/org/apache/hadoop/hbase/mapred/HRegionPartitioner.java (revision 0) +++ src/java/org/apache/hadoop/hbase/mapred/HRegionPartitioner.java (revision 0) @@ -0,0 +1,71 @@ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Partitioner; + + +/** + * This is used to partition the output keys in to groups all keys to the same region go to the same reducer + * @param + * @param + */ +public class HRegionPartitioner + implements Partitioner { + + private final Log LOG = LogFactory.getLog(TableInputFormat.class); + + private HTable table; + private byte[][] startKeys; + + public void configure(JobConf job) { + try { + this.table = new HTable(new HBaseConfiguration(job), + job.get(TableOutputFormat.OUTPUT_TABLE)); + } catch (IOException e) { + LOG.error(e); + } + + try { + this.startKeys = this.table.getStartKeys(); + } catch (IOException e) { + LOG.error(e); + } + } + + @Override + public int getPartition(ImmutableBytesWritable key, V2 value, + int numPartitions) { + byte[] region = null; + + //only one region return 0 + if (this.startKeys.length == 1){ + return 0; + } + try { + // not sure if this is cached after a split so we could have problems here if a region splits while mapping + region = table.getRegionLocation(key.get()).getRegionInfo().getStartKey(); + } catch (IOException e) { + LOG.error(e); + } + for (int i = 0; i < this.startKeys.length; i++){ + if (Bytes.compareTo(region, this.startKeys[i]) == 0 ){ + if (this.startKeys.length > numPartitions){ + // cover if we have less reduces then regions. + return (Integer.toString(i).hashCode() & Integer.MAX_VALUE) % numPartitions; + } else { + return i; + } + } + } + // if above fails to find start key that match we need to return something + return 0; + } +} \ No newline at end of file Index: src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java =================================================================== --- src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java (revision 712616) +++ src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java (working copy) @@ -46,7 +46,9 @@ job.setOutputFormat(TableOutputFormat.class); job.setReducerClass(reducer); job.set(TableOutputFormat.OUTPUT_TABLE, table); + job.setPartitionerClass(HRegionPartitioner.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(BatchUpdate.class); + } }