diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java index 6560b41..0efd6f9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java @@ -164,7 +164,17 @@ extends InputFormat { if (table == null) { throw new IOException("No table was provided."); } - + //Set one region for one mapper as default + int regionPerMapperInt = 1; + // Get the number of regions per mapper, the default value is 1 + String regionPerMapper = context.getConfiguration().get("hbase.mapreduce.scan" + + ".regionspermapper", "1"); + try { + regionPerMapperInt = Integer.parseInt(regionPerMapper); + LOG.debug("Number of regions per mapper: " + regionPerMapperInt); + } catch (NumberFormatException e) { + LOG.error("hbase.mapreduce.scan.regionspermapper must be an integer "); + } RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(regionLocator, admin); Pair keys = getStartEndKeys(); @@ -182,50 +192,133 @@ extends InputFormat { splits.add(split); return splits; } - List splits = new ArrayList(keys.getFirst().length); - for (int i = 0; i < keys.getFirst().length; i++) { - if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { - continue; - } - HRegionLocation location = regionLocator.getRegionLocation(keys.getFirst()[i], false); - // The below InetSocketAddress creation does a name resolution. - InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort()); - if (isa.isUnresolved()) { - LOG.warn("Failed resolve " + isa); - } - InetAddress regionAddress = isa.getAddress(); - String regionLocation; - try { - regionLocation = reverseDNS(regionAddress); - } catch (NamingException e) { - LOG.warn("Cannot resolve the host name for " + regionAddress + " because of " + e); - regionLocation = location.getHostname(); - } - byte[] startRow = scan.getStartRow(); - byte[] stopRow = scan.getStopRow(); - // determine if the given start an stop key fall into the region - if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || - Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && - (stopRow.length == 0 || - Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { - byte[] splitStart = startRow.length == 0 || - Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? - keys.getFirst()[i] : startRow; - byte[] splitStop = (stopRow.length == 0 || - Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && - keys.getSecond()[i].length > 0 ? - keys.getSecond()[i] : stopRow; + //Default: numbers of splits = region numbers + int splitsNumber = keys.getFirst().length; + if (regionPerMapperInt > 1) { + try { + double splitsNumberDouble = (double) keys.getFirst().length / regionPerMapperInt; + splitsNumber = (int) Math.ceil(splitsNumberDouble); + LOG.debug("Number of mappers: " + keys.getFirst().length + " / " + + regionPerMapperInt + " = " + splitsNumber); + } catch (NumberFormatException e) { + LOG.error("Wrong number of mappers: " + keys.getFirst().length + " / " + + regionPerMapperInt); + } + } + List splits = new ArrayList(splitsNumber); + if (regionPerMapperInt == 1) { + // in this case, one mapper use one region as input + for (int i = 0; i < keys.getFirst().length; i++) { + if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { + continue; + } + HRegionLocation location = regionLocator.getRegionLocation(keys.getFirst()[i], false); + // The below InetSocketAddress creation does a name resolution. + InetSocketAddress isa = new InetSocketAddress(location.getHostname(), + location.getPort()); + if (isa.isUnresolved()) { + LOG.warn("Failed resolve " + isa); + } + InetAddress regionAddress = isa.getAddress(); + String regionLocation; + try { + regionLocation = reverseDNS(regionAddress); + } catch (NamingException e) { + LOG.warn("Cannot resolve the host name for " + regionAddress + " because of " + e); + regionLocation = location.getHostname(); + } + + byte[] startRow = scan.getStartRow(); + byte[] stopRow = scan.getStopRow(); + // determine if the given start an stop key fall into the region + if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || + Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && + (stopRow.length == 0 || + Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { + byte[] splitStart = startRow.length == 0 || + Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? + keys.getFirst()[i] : startRow; + byte[] splitStop = (stopRow.length == 0 || + Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && + keys.getSecond()[i].length > 0 ? + keys.getSecond()[i] : stopRow; - byte[] regionName = location.getRegionInfo().getRegionName(); - long regionSize = sizeCalculator.getRegionSize(regionName); - TableSplit split = new TableSplit(table.getName(), - splitStart, splitStop, regionLocation, regionSize); - splits.add(split); - if (LOG.isDebugEnabled()) { - LOG.debug("getSplits: split -> " + i + " -> " + split); + byte[] regionName = location.getRegionInfo().getRegionName(); + long regionSize = sizeCalculator.getRegionSize(regionName); + TableSplit split = new TableSplit(table.getName(), + splitStart, splitStop, regionLocation, regionSize); + splits.add(split); + if (LOG.isDebugEnabled()) { + LOG.debug("getSplits: split -> " + i + " -> " + split); + } + } + } + } else { + // one mapper use multiple regions as input + // in this case, the feature in HBASE-2302 would not be supported + for (int i = 0; i * regionPerMapperInt < keys.getFirst().length; i++) { + int startRegion=i*regionPerMapperInt; + int stopRegion = (i * regionPerMapperInt + regionPerMapperInt - 1 < keys.getFirst() + .length) ? (i * regionPerMapperInt + regionPerMapperInt - 1) : (keys.getFirst() + .length - 1); + + ArrayList regionLocationList =new ArrayList(); + ArrayList regionNameList =new ArrayList(); + for (int j=startRegion;j<=stopRegion;j++) { + + HRegionLocation location = regionLocator.getRegionLocation(keys.getFirst()[i], + false); + // The below InetSocketAddress creation does a name resolution. + InetSocketAddress isa = new InetSocketAddress(location.getHostname(), + location.getPort()); + if (isa.isUnresolved()) { + LOG.warn("Failed resolve " + isa); + } + InetAddress regionAddress = isa.getAddress(); + String regionLocation; + try { + regionLocation = reverseDNS(regionAddress); + } catch (NamingException e) { + LOG.warn("Cannot resolve the host name for " + regionAddress + " because of " + + e); + regionLocation = location.getHostname(); + } + regionLocationList.add(regionLocation); + byte[] regionName = location.getRegionInfo().getRegionName(); + regionNameList.add(regionName); + + } + byte[] startRow = scan.getStartRow(); + byte[] stopRow = scan.getStopRow(); + // determine if the given start an stop key fall into the regions + if ((startRow.length == 0 || keys.getSecond()[stopRegion].length == 0 || + Bytes.compareTo(startRow, keys.getSecond()[stopRegion]) < 0) && + (stopRow.length == 0 || + Bytes.compareTo(stopRow, keys.getFirst()[startRegion]) > 0)) { + byte[] splitStart = startRow.length == 0 || + Bytes.compareTo(keys.getFirst()[startRegion], startRow) >= 0 ? + keys.getFirst()[startRegion] : startRow; + byte[] splitStop = (stopRow.length == 0 || + Bytes.compareTo(keys.getSecond()[stopRegion], stopRow) <= 0) && + keys.getSecond()[stopRegion].length > 0 ? + keys.getSecond()[stopRegion] : stopRow; + + long regionSize = 0; + for (int k=0;k " + i + " -> " + split); + } + } } - } } return splits; }