Index: src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java =================================================================== --- src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java (revision 819224) +++ src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java (working copy) @@ -20,19 +20,18 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; +import java.util.ArrayList; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; @@ -271,29 +270,33 @@ */ @Override public List getSplits(JobContext context) throws IOException { + Pair keys = table.getStartEndKeys(); + if (keys == null || keys.getFirst() == null || + keys.getFirst().length == 0) { + throw new IOException("Expecting at least one region."); + } if (table == null) { throw new IOException("No table was provided."); } - byte [][] startKeys = table.getStartKeys(); - if (startKeys == null || startKeys.length == 0) { - throw new IOException("Expecting at least one region."); + int count = 0; + List splits = new ArrayList(keys.getFirst().length); + for (int i = 0; i < keys.getFirst().length; i++) { + String regionLocation = table.getRegionLocation(keys.getFirst()[i]). + getServerAddress().getHostname(); + byte[] startRow = scan.getStartRow(); + byte[] stopRow = scan.getStopRow(); + if ((startRow.length == 0 || + Bytes.compareTo(startRow, keys.getSecond()[i]) <= 0) && + (stopRow.length == 0 || + Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { + InputSplit split = new TableSplit(table.getTableName(), + keys.getFirst()[i], keys.getSecond()[i], regionLocation); + splits.add(split); + if (LOG.isDebugEnabled()) + LOG.debug("split: " + (count++) + " -> " + split); + } } - int realNumSplits = startKeys.length; - InputSplit[] splits = new InputSplit[realNumSplits]; - int middle = startKeys.length / realNumSplits; - int startPos = 0; - for (int i = 0; i < realNumSplits; i++) { - int lastPos = startPos + middle; - lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos; - String regionLocation = table.getRegionLocation(startKeys[startPos]). - getServerAddress().getHostname(); - splits[i] = new TableSplit(this.table.getTableName(), - startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]: - HConstants.EMPTY_START_ROW, regionLocation); - LOG.info("split: " + i + "->" + splits[i]); - startPos = lastPos; - } - return Arrays.asList(splits); + return splits; } /**