Index: src/java/org/apache/hadoop/hbase/mapred/HRegionPartitioner.java =================================================================== --- src/java/org/apache/hadoop/hbase/mapred/HRegionPartitioner.java (revision 0) +++ src/java/org/apache/hadoop/hbase/mapred/HRegionPartitioner.java (revision 0) @@ -0,0 +1,71 @@ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Partitioner; + + +/** + * This is used to partition the output keys in to groups all keys to the same region go to the same reducer + * @param + * @param + */ +public class HRegionPartitioner + implements Partitioner { + + private final Log LOG = LogFactory.getLog(TableInputFormat.class); + + private HTable table; + private byte[][] startKeys; + + public void configure(JobConf job) { + try { + this.table = new HTable(new HBaseConfiguration(job), + job.get(TableOutputFormat.OUTPUT_TABLE)); + } catch (IOException e) { + LOG.error(e); + } + + try { + this.startKeys = this.table.getStartKeys(); + } catch (IOException e) { + LOG.error(e); + } + } + + @Override + public int getPartition(ImmutableBytesWritable key, V2 value, + int numPartitions) { + byte[] region = null; + + //only one region return 0 + if (this.startKeys.length == 1){ + return 0; + } + try { + // not sure if this is cached after a split so we could have problems here if a region splits while mapping + region = table.getRegionLocation(key.get()).getRegionInfo().getStartKey(); + } catch (IOException e) { + LOG.error(e); + } + for (int i = 0; i < this.startKeys.length; i++){ + if (Bytes.compareTo(region, this.startKeys[i]) == 0 ){ + if (i >= numPartitions-1){ + // cover if we have less reduces then regions. + return (Integer.toString(i).hashCode() & Integer.MAX_VALUE) % numPartitions; + } else { + return i; + } + } + } + // if above fails to find start key that match we need to return something + return 0; + } +} \ No newline at end of file Index: src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java =================================================================== --- src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java (revision 712616) +++ src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java (working copy) @@ -1,5 +1,9 @@ package org.apache.hadoop.hbase.mapred; +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.Writable; @@ -40,13 +44,24 @@ * @param table * @param reducer * @param job + * @throws IOException */ public static void initTableReduceJob(String table, - Class reducer, JobConf job) { + Class reducer, JobConf job) throws IOException { job.setOutputFormat(TableOutputFormat.class); job.setReducerClass(reducer); job.set(TableOutputFormat.OUTPUT_TABLE, table); + job.setPartitionerClass(HRegionPartitioner.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(BatchUpdate.class); + HTable outputTable = new HTable(new HBaseConfiguration(job), table); + int regions = outputTable.getRegionsInfo().size(); + if (job.getNumReduceTasks() > regions){ + job.setNumReduceTasks(outputTable.getRegionsInfo().size()); + } + + + + } }