diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java index 6560b41..fcb196e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java @@ -30,6 +30,7 @@ import javax.naming.NamingException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.HConstants; @@ -227,7 +228,14 @@ extends InputFormat { } } } - return splits; + //The default value of "hbase.mapreduce.split.autobalance" is false, which means not enabled. + String enableAutoBalance=context.getConfiguration().get("hbase.mapreduce.split.autobalance", + "false"); + if (enableAutoBalance.equals("TRUE") || enableAutoBalance.equals("true")) { + return getAutoBalanceSplits(splits, context); + } else{ + return splits; + } } public String reverseDNS(InetAddress ipAddress) throws NamingException, UnknownHostException { @@ -250,6 +258,114 @@ extends InputFormat { } /** + * Calculates the splits that will serve as input for the map tasks. The + * number of splits depends on the target size user set in configuration. + * + * @param list The list of input splits before balance. + * @param context The current job context. + * @return The list of input splits. + * @throws IOException When creating the list of splits fails. + * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( + * org.apache.hadoop.mapreduce.JobContext) + */ + public List getAutoBalanceSplits(List list, + JobContext context) throws IOException { + List resultList = new ArrayList(); + Configuration conf = context.getConfiguration(); + //The default target size is 1 GB. + long splitTargetSize = Long.parseLong(conf.get("hbase.mapreduce.split.targetsize", + "1073741824")); + int count = 0; + while (count < list.size()) { + TableSplit ts = (TableSplit)list.get(count); + String regionLocation = ts.getRegionLocation(); + long regionSize = ts.getLength(); + // if the current split is large than the target size, cut the split into two. + if (regionSize > splitTargetSize) { + byte[] splitKey = getSplitKey(ts.getStartRow(), ts.getEndRow()); + TableSplit t1 = new TableSplit(table.getName(), ts.getStartRow(), splitKey, regionLocation, + regionSize / 2); + TableSplit t2 = new TableSplit(table.getName(), splitKey, ts.getEndRow(), regionLocation, + regionSize - regionSize / 2); + resultList.add(t1); + resultList.add(t2); + count++; + } else { + // if the sum of several small continuous splits less than the target size, + // combine them into one split. + long totalSize = regionSize; + byte[] splitStartKey = ts.getStartRow(); + byte[] splitEndKey = ts.getEndRow(); + count++; + for (; count < list.size(); count++) { + TableSplit nextRegion = (TableSplit)list.get(count); + long nextRegionSize = nextRegion.getLength(); + if (totalSize + nextRegionSize <= splitTargetSize) { + totalSize = totalSize + nextRegionSize; + splitEndKey = nextRegion.getEndRow(); + } else { + break; + } + } + TableSplit t = new TableSplit(table.getName(), splitStartKey, splitEndKey, + regionLocation, totalSize); + resultList.add(t); + } + } + return resultList; + } + + /** + * select a split point in the region. Here are some example of the selection. + * startKey: aaabcdefg endKey: aaafff split point: aaad + * startKey: 111000 endKey: 1125790 split point: 112 + * startKey: 1110 endKey: 1120 split point: 111W + * + * @param startKey Start key of the region + * @param endKey End key of the region + * @return The split point in the region. + * @throws IOException when calculating the split key fails. + */ + private byte[] getSplitKey(byte[] startKey, byte[] endKey) throws IOException { + String startRow = new String(startKey) + " "; + String endRow = new String(endKey) + " "; + int keyLength = (startRow.length() > endRow.length()) ? startRow.length() : endRow.length(); + char[] charArray = new char[keyLength]; + int resultLength = 0; + for (int i = 0; i < keyLength; i++) { + if (!(startRow.charAt(i) == endRow.charAt(i))) { + char tempChar1 = startRow.charAt(i); + char tempChar2 = endRow.charAt(i); + if (tempChar2 - tempChar1 == 1) { + char nextChar1 = startRow.charAt(i + 1); + char nextChar2 = endRow.charAt(i + 1); + if (nextChar2 != ' ') { + charArray[i] = tempChar2; + charArray[i + 1] = ' '; + } else { + charArray[i] = tempChar1; + int def = '~' - nextChar1; + charArray[i + 1] = (char) (nextChar1 + def / 2); + } + resultLength = i + 2; + } else { + charArray[i] = (char) (tempChar1 + (tempChar2 - tempChar1) / 2); + resultLength = i + 1; + } + break; + } else { + charArray[i] = endRow.charAt(i); + } + } + char[] resultChar = new char[resultLength]; + for (int i = 0; i < resultLength; i++) { + resultChar[i] = charArray[i]; + } + String result = new String(resultChar); + return Bytes.toBytes(result); + } + + /** * * * Test if the given region is to be included in the InputSplit while splitting diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java index 47cb834..ce2e3c6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java @@ -97,4 +97,21 @@ public class TestTableInputFormatScan1 extends TestTableInputFormatScanBase { testScan(null, "opp", "opo"); } + /** + * Tests a MR scan using specific number of mappers. The test table has 25 regions as default. + * when we set splitTargetSize as -1, all regions will be cut into two splits, + * the number of splits should be 50; when we set splitTargetSize as 1GB, + * the sum of all region sizes is less then the target size, all regions will be combined into 1 + * split. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testGetSplits() throws IOException, InterruptedException, ClassNotFoundException { + testNumOfSplits("-1",50); + testNumOfSplits("10022122547",1); + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java index 750ea39..b4ca885 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -37,12 +38,15 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; + /** *

* Tests various scan start and stop row scenarios. This is set in a scan and @@ -240,5 +244,34 @@ public abstract class TestTableInputFormatScanBase { LOG.info("After map/reduce completion - job " + jobName); } + + /** + * Tests a MR scan using specific splitTargetSize. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + public void testNumOfSplits(String targetSize, int expectedNumOfSplits) throws IOException, + InterruptedException, + ClassNotFoundException { + String jobName = "TestJobForNumOfSplits"; + LOG.info("Before map/reduce startup - job " + jobName); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + Scan scan = new Scan(); + scan.addFamily(INPUT_FAMILY); + c.set("hbase.mapreduce.split.autobalance", "true"); + c.set("hbase.mapreduce.split.targetsize", targetSize); + c.set(KEY_STARTROW, ""); + c.set(KEY_LASTROW, ""); + Job job = new Job(c, jobName); + TableMapReduceUtil.initTableMapperJob(Bytes.toString(TABLE_NAME), scan, ScanMapper.class, + ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); + TableInputFormat tif = new TableInputFormat(); + tif.setConf(job.getConfiguration()); + Assert.assertEquals(new String(TABLE_NAME), new String(table.getTableName())); + List splits = tif.getSplits(job); + Assert.assertEquals(expectedNumOfSplits, splits.size()); + } }