diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java index 6560b41..b39b5a0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java @@ -164,7 +164,17 @@ extends InputFormat { if (table == null) { throw new IOException("No table was provided."); } - + //Set one region for one mapper as default + int regionPerMapperInt = 1; + // Get the number of regions per mapper, the default value is 1 + String regionPerMapper = context.getConfiguration().get("hbase.mapreduce.scan" + + ".regionspermapper", "1"); + try { + regionPerMapperInt = Integer.parseInt(regionPerMapper); + LOG.debug("Number of regions per mapper: " + regionPerMapperInt); + } catch (NumberFormatException e) { + LOG.error("hbase.mapreduce.scan.regionspermapper must be an integer "); + } RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(regionLocator, admin); Pair keys = getStartEndKeys(); @@ -182,50 +192,107 @@ extends InputFormat { splits.add(split); return splits; } - List splits = new ArrayList(keys.getFirst().length); - for (int i = 0; i < keys.getFirst().length; i++) { - if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { - continue; - } - HRegionLocation location = regionLocator.getRegionLocation(keys.getFirst()[i], false); - // The below InetSocketAddress creation does a name resolution. - InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort()); - if (isa.isUnresolved()) { - LOG.warn("Failed resolve " + isa); - } - InetAddress regionAddress = isa.getAddress(); - String regionLocation; - try { - regionLocation = reverseDNS(regionAddress); - } catch (NamingException e) { - LOG.warn("Cannot resolve the host name for " + regionAddress + " because of " + e); - regionLocation = location.getHostname(); - } - byte[] startRow = scan.getStartRow(); - byte[] stopRow = scan.getStopRow(); - // determine if the given start an stop key fall into the region - if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || - Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && - (stopRow.length == 0 || - Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { - byte[] splitStart = startRow.length == 0 || - Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? - keys.getFirst()[i] : startRow; - byte[] splitStop = (stopRow.length == 0 || - Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && - keys.getSecond()[i].length > 0 ? - keys.getSecond()[i] : stopRow; + //Default: numbers of splits = region numbers + int splitsNumber = keys.getFirst().length; + if (regionPerMapperInt > 1) { + try { + double splitsNumberDouble = (double) keys.getFirst().length / regionPerMapperInt; + splitsNumber = (int) Math.ceil(splitsNumberDouble); + LOG.debug("Number of mappers: " + keys.getFirst().length + " / " + + regionPerMapperInt + " = " + splitsNumber); + } catch (NumberFormatException e) { + LOG.error("Wrong number of mappers: " + keys.getFirst().length + " / " + + regionPerMapperInt); + } + } + List splits = new ArrayList(splitsNumber); + if (regionPerMapperInt == 1) { + // in this case, one mapper use one region as input + for (int i = 0; i < keys.getFirst().length; i++) { + if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { + continue; + } + HRegionLocation location = regionLocator.getRegionLocation(keys.getFirst()[i], false); + String regionLocation=getRegionLocation(location); + byte[] startRow = scan.getStartRow(); + byte[] stopRow = scan.getStopRow(); + // determine if the given start an stop key fall into the region + if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || + Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && + (stopRow.length == 0 || + Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { + byte[] splitStart = startRow.length == 0 || + Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? + keys.getFirst()[i] : startRow; + byte[] splitStop = (stopRow.length == 0 || + Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && + keys.getSecond()[i].length > 0 ? + keys.getSecond()[i] : stopRow; - byte[] regionName = location.getRegionInfo().getRegionName(); - long regionSize = sizeCalculator.getRegionSize(regionName); - TableSplit split = new TableSplit(table.getName(), - splitStart, splitStop, regionLocation, regionSize); - splits.add(split); - if (LOG.isDebugEnabled()) { - LOG.debug("getSplits: split -> " + i + " -> " + split); + byte[] regionName = location.getRegionInfo().getRegionName(); + long regionSize = sizeCalculator.getRegionSize(regionName); + TableSplit split = new TableSplit(table.getName(), + splitStart, splitStop, regionLocation, regionSize); + splits.add(split); + if (LOG.isDebugEnabled()) { + LOG.debug("getSplits: split -> " + i + " -> " + split); + } + } + } + } else { + // one mapper use multiple regions as input. + // In HBASE-2302, there is a feature to exclude some specific regions from the MR + // job. In this multiple regions case, the feature in HBASE-2302 would not be + // supported. Because the mapper deal with only one TableSplit object, + // the multiple regions in one mapper must be continuous. + for (int i = 0; i * regionPerMapperInt < keys.getFirst().length; i++) { + int startRegion=i*regionPerMapperInt; + int stopRegion = (i * regionPerMapperInt + regionPerMapperInt - 1 < keys.getFirst() + .length) ? (i * regionPerMapperInt + regionPerMapperInt - 1) : (keys.getFirst() + .length - 1); + + ArrayList regionLocationList =new ArrayList(); + ArrayList regionNameList =new ArrayList(); + for (int j=startRegion;j<=stopRegion;j++) { + + HRegionLocation location = regionLocator.getRegionLocation(keys.getFirst()[i], + false); + regionLocationList.add(getRegionLocation(location)); + byte[] regionName = location.getRegionInfo().getRegionName(); + regionNameList.add(regionName); + + } + byte[] startRow = scan.getStartRow(); + byte[] stopRow = scan.getStopRow(); + // determine if the given start an stop key fall into the regions + if ((startRow.length == 0 || keys.getSecond()[stopRegion].length == 0 || + Bytes.compareTo(startRow, keys.getSecond()[stopRegion]) < 0) && + (stopRow.length == 0 || + Bytes.compareTo(stopRow, keys.getFirst()[startRegion]) > 0)) { + byte[] splitStart = startRow.length == 0 || + Bytes.compareTo(keys.getFirst()[startRegion], startRow) >= 0 ? + keys.getFirst()[startRegion] : startRow; + byte[] splitStop = (stopRow.length == 0 || + Bytes.compareTo(keys.getSecond()[stopRegion], stopRow) <= 0) && + keys.getSecond()[stopRegion].length > 0 ? + keys.getSecond()[stopRegion] : stopRow; + + long regionSize = 0; + for (int k=0;k " + i + " -> " + split); + } + } } - } } return splits; } @@ -249,6 +316,23 @@ extends InputFormat { return hostName; } + private String getRegionLocation(HRegionLocation location) throws IOException { + // The below InetSocketAddress creation does a name resolution. + InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort()); + if (isa.isUnresolved()) { + LOG.warn("Failed resolve " + isa); + } + InetAddress regionAddress = isa.getAddress(); + String regionLocation; + try { + regionLocation = reverseDNS(regionAddress); + } catch (NamingException e) { + LOG.warn("Cannot resolve the host name for " + regionAddress + " because of " + e); + regionLocation = location.getHostname(); + } + return regionLocation; + } + /** * * diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java index 47cb834..00fe6b2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java @@ -97,4 +97,33 @@ public class TestTableInputFormatScan1 extends TestTableInputFormatScanBase { testScan(null, "opp", "opo"); } + /** + * Tests a MR scan and check the number of mappers. + * The default test table has 25 regions, so the we set seven test cases, using 1,2,3,5,24,25,99 + * as hbase.mapreduce.scan.regionspermapper. The correct number of splits shoudl be: + * + * hbase.mapreduce.scan.regionspermapper number of splits(mappers) + * 1 25 + * 2 13 + * 3 9 + * 5 5 + * 24 2 + * 25 1 + * 99 1 + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testGetSplits() throws IOException, InterruptedException, ClassNotFoundException { + testNumOfSplits(1); + testNumOfSplits(2); + testNumOfSplits(3); + testNumOfSplits(5); + testNumOfSplits(24); + testNumOfSplits(25); + testNumOfSplits(99); + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java index 750ea39..4ed776d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -37,10 +38,12 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; /** @@ -240,5 +243,36 @@ public abstract class TestTableInputFormatScanBase { LOG.info("After map/reduce completion - job " + jobName); } + /** + * Tests a MR scan using specific number of mappers. + * This test is for the method getSplits() in TableInputFormat + * + * @throws java.io.IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + public void testNumOfSplits(int regionsPerMapper) throws IOException, InterruptedException, + ClassNotFoundException { + String jobName = "TestJobForNumOfSplits"; + LOG.info("Before map/reduce startup - job " + jobName); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + Scan scan = new Scan(); + scan.addFamily(INPUT_FAMILY); + c.set("hbase.mapreduce.scan.regionspermapper", "" + regionsPerMapper); + c.set(KEY_STARTROW, ""); + c.set(KEY_LASTROW, ""); + Job job = new Job(c, jobName); + TableMapReduceUtil.initTableMapperJob(Bytes.toString(TABLE_NAME), scan, ScanMapper.class, + ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); + TableInputFormat tif = new TableInputFormat(); + tif.setConf(job.getConfiguration()); + Assert.assertEquals(new String(TABLE_NAME), new String(table.getTableName())); + List splits = tif.getSplits(job); + int expectedNumSplits = (int) Math.ceil((double) table.getRegionLocations().size() / + regionsPerMapper); + Assert.assertEquals(expectedNumSplits, splits.size()); + } + + }