Index: src/java/org/apache/hadoop/hbase/mapred/HRegionPartitioner.java =================================================================== --- src/java/org/apache/hadoop/hbase/mapred/HRegionPartitioner.java (revision 0) +++ src/java/org/apache/hadoop/hbase/mapred/HRegionPartitioner.java (revision 0) @@ -0,0 +1,93 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Partitioner; + + +/** + * This is used to partition the output keys in to groups all keys + * to the same region go to the same reducer + * @param + * @param + */ +public class HRegionPartitioner + implements Partitioner { + + private final Log LOG = LogFactory.getLog(TableInputFormat.class); + + private HTable table; + private byte[][] startKeys; + + public void configure(JobConf job) { + try { + this.table = new HTable(new HBaseConfiguration(job), + job.get(TableOutputFormat.OUTPUT_TABLE)); + } catch (IOException e) { + LOG.error(e); + } + + try { + this.startKeys = this.table.getStartKeys(); + } catch (IOException e) { + LOG.error(e); + } + } + + @Override + public int getPartition(ImmutableBytesWritable key, V2 value, + int numPartitions) { + byte[] region = null; + + //only one region return 0 + if (this.startKeys.length == 1){ + return 0; + } + try { + // not sure if this is cached after a split so we could have problems + // here if a region splits while mapping + region = table.getRegionLocation(key.get()).getRegionInfo().getStartKey(); + } catch (IOException e) { + LOG.error(e); + } + for (int i = 0; i < this.startKeys.length; i++){ + if (Bytes.compareTo(region, this.startKeys[i]) == 0 ){ + if (i >= numPartitions-1){ + // cover if we have less reduces then regions. + return (Integer.toString(i).hashCode() + & Integer.MAX_VALUE) % numPartitions; + } else { + return i; + } + } + } + // if above fails to find start key that match we need to return something + return 0; + } +} \ No newline at end of file Index: src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java =================================================================== --- src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java (revision 712616) +++ src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java (working copy) @@ -1,5 +1,28 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hbase.mapred; +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.Writable; @@ -40,6 +63,7 @@ * @param table * @param reducer * @param job + * @throws IOException */ public static void initTableReduceJob(String table, Class reducer, JobConf job) { @@ -49,4 +73,31 @@ job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(BatchUpdate.class); } + + + /** + * Use this before submitting a TableReduce job. It will + * appropriately set up the JobConf. + * + * @param table + * @param reducer + * @param job + * @param partitioner + * @throws IOException + */ + public static void initTableReduceJob(String table, + Class reducer, JobConf job, Class partitioner) throws IOException { + job.setOutputFormat(TableOutputFormat.class); + job.setReducerClass(reducer); + job.set(TableOutputFormat.OUTPUT_TABLE, table); + job.setPartitionerClass(HRegionPartitioner.class); + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(BatchUpdate.class); + HTable outputTable = new HTable(new HBaseConfiguration(job), table); + int regions = outputTable.getRegionsInfo().size(); + if (job.getNumReduceTasks() > regions){ + job.setNumReduceTasks(outputTable.getRegionsInfo().size()); + } + } + }