Index: TableInputFormatBase.java =================================================================== --- TableInputFormatBase.java (revision 804158) +++ TableInputFormatBase.java (working copy) @@ -20,18 +20,19 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; -import java.util.Arrays; +import java.util.ArrayList; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; @@ -270,8 +271,9 @@ */ @Override public List getSplits(JobContext context) throws IOException { - byte [][] startKeys = table.getStartKeys(); - if (startKeys == null || startKeys.length == 0) { + Pair keys = table.getStartEndKeys(); + if (keys == null || keys.getFirst() == null || + keys.getFirst().length == 0) { throw new IOException("Expecting at least one region."); } if (table == null) { @@ -280,22 +282,27 @@ if (!scan.hasFamilies()) { throw new IOException("Expecting at least one column."); } - int realNumSplits = startKeys.length; - InputSplit[] splits = new InputSplit[realNumSplits]; - int middle = startKeys.length / realNumSplits; - int startPos = 0; - for (int i = 0; i < realNumSplits; i++) { - int lastPos = startPos + middle; - lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos; - String regionLocation = table.getRegionLocation(startKeys[startPos]). - getServerAddress().getHostname(); - splits[i] = new TableSplit(this.table.getTableName(), - startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]: - HConstants.EMPTY_START_ROW, regionLocation); - LOG.info("split: " + i + "->" + splits[i]); - startPos = lastPos; + KeyValue.KeyComparator kvc = KeyValue.getRowComparator( + table.getTableName()); + int count = 0; + List splits = new ArrayList(keys.getFirst().length); + for (int i = 0; i < keys.getFirst().length; i++) { + String regionLocation = table.getRegionLocation(keys.getFirst()[i]). + getServerAddress().getHostname(); + byte[] startRow = scan.getStartRow() != null ? scan.getStartRow() : + HConstants.EMPTY_START_ROW; + byte[] stopRow = scan.getStopRow() != null ? scan.getStopRow() : + HConstants.EMPTY_END_ROW; + if (kvc.compare(startRow, keys.getSecond()[i]) <= 0 && + kvc.compare(stopRow, keys.getFirst()[i]) >= 0) { + InputSplit split = new TableSplit(table.getTableName(), + keys.getFirst()[i], keys.getSecond()[i], regionLocation); + splits.add(split); + if (LOG.isDebugEnabled()) LOG.debug("split: " + (count++) + + " -> " + split); + } } - return Arrays.asList(splits); + return splits; } /**