Index: src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java (revision 1229408) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java (revision ) @@ -20,6 +20,9 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,8 +32,17 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.util.StringUtils; + + + + /** * Convert HBase tabular data into a format that is consumable by Map/Reduce. */ @@ -61,7 +73,12 @@ public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks"; /** The number of rows for caching that will be passed to scanners. */ public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows"; + /** The number of splits per region that will be created */ + public static final String SPLITS_PER_REGION = "hbase.mapreduce.splitsPerRegion"; + /** The number of bytes to use in determining splits within a region */ + public static final String SPLIT_KEY_BYTE_PRECISION = "hbase.mapreduce.splitKeyBytePrecision"; + /** The configuration. */ private Configuration conf = null; @@ -191,4 +208,120 @@ } } + + /** + * Calculates the splits that will serve as input for the map tasks. The + * number of splits will be n splits per regions where n = hbase.mapreduce.splitsPerRegion. + * If this is not specified, there will be one split per region + * + * @param context The current job context. + * @return The list of input splits. + * @throws java.io.IOException When creating the list of splits fails. + * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( + * org.apache.hadoop.mapreduce.JobContext) + */ + @Override + public List getSplits(JobContext context) throws IOException { + if (getHTable() == null) { + throw new IOException("No table was provided."); -} + } + + Pair keys = getHTable().getStartEndKeys(); + if (keys == null || keys.getFirst() == null || + keys.getFirst().length == 0) { + throw new IOException("Expecting at least one region."); + } + + Configuration conf = context.getConfiguration(); + + int splitsPerRegion = conf.get(SPLITS_PER_REGION) == null ? + 1 : Integer.parseInt(conf.get(SPLITS_PER_REGION)); + + int splitBytePrecision = conf.get(SPLIT_KEY_BYTE_PRECISION) == null ? + 64 : Integer.parseInt(conf.get(SPLIT_KEY_BYTE_PRECISION)); + + List splits = new ArrayList(keys.getFirst().length); + + for (int i = 0; i < keys.getFirst().length; i++) { + if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { + continue; + } + String regionLocation = getHTable().getRegionLocation(keys.getFirst()[i]).getHostname(); + byte[] startRow = getScan().getStartRow(); + byte[] stopRow = getScan().getStopRow(); + + // determine if the given start and stop key fall into the region + if ( + (startRow.length == 0 || keys.getSecond()[i].length == 0 + || Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) + && (stopRow.length == 0 + || Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { + + byte[] regionSplitStart = startRow.length == 0 || Bytes.compareTo(keys.getFirst()[i], startRow) + >= 0 ? keys.getFirst()[i] : startRow; + byte[] regionSplitStop = (stopRow.length == 0 || Bytes.compareTo(keys.getSecond()[i], stopRow) + <= 0) && keys.getSecond()[i].length > 0 ? keys.getSecond()[i] : stopRow; + + // get the first key if startRow is byte[] of 0 length + if (regionSplitStart.length == 0) { + ResultScanner firstKeyScanner = getHTable().getScanner(getScan()); + Result res = firstKeyScanner.next(); + regionSplitStart = res.getRow(); + } + + splits = generateRegionSplits(getHTable().getTableName(), regionSplitStart, regionSplitStop, + regionLocation, splitsPerRegion, splitBytePrecision); + + if (LOG.isDebugEnabled()) + LOG.debug("table "+getHTable().getTableName()+" / region "+regionLocation + +" being split into "+splits.size()+" total splits"); + } + + } + + return splits; + } + + + public List generateRegionSplits(byte[] table, byte[] regionStartKey, byte[] regionEndKey, + String regionLocation, int splitsPerRegion, int bytePrecision) { + byte[] splitStartKey; + byte[] splitEndKey; + List splits = new ArrayList(); + InputSplit split; + byte[] calculatedRegionEndKey; + + if (splitsPerRegion <= 1) { + splits.add(new TableSplit(table, regionStartKey, regionEndKey, regionLocation)); + return splits; + + } + + if (regionEndKey.length == 0) { + // if the end key is a 0 length byte[] we'll assume its the last + calculatedRegionEndKey = getMaxByteArrayValue(bytePrecision); + } else { + calculatedRegionEndKey = regionEndKey; + } + + byte[][] splitBA = Bytes.split(regionStartKey, calculatedRegionEndKey, splitsPerRegion-1); + for (int i=1; i splits = tableInputFormat.generateRegionSplits("table4-mr".getBytes(), new byte[]{}, new byte[]{}, "fakeRegionLocation", 3, 64); + assertTrue(splits.size() == 3); + assertTrue(((TableSplit)splits.get(0)).getStartRow().length == 0); + assertTrue(((TableSplit)splits.get(0)).getEndRow().length > 0); + assertTrue(((TableSplit)splits.get(1)).getStartRow().length > 0); + assertTrue(((TableSplit)splits.get(1)).getEndRow().length > 0); + assertTrue(((TableSplit)splits.get(2)).getStartRow().length > 0); + assertTrue(((TableSplit)splits.get(2)).getEndRow().length == 0); + assertTrue(((TableSplit)splits.get(0)).getEndRow() == ((TableSplit)splits.get(1)).getStartRow()); + assertTrue(((TableSplit)splits.get(1)).getEndRow() == ((TableSplit)splits.get(2)).getStartRow()); + } + + @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();