From fcf12858693e67904cc27eeae8e5c62bc8d62f01 Mon Sep 17 00:00:00 2001 From: pengyuanbo <932333121@qq.com> Date: Thu, 18 Dec 2014 18:31:06 +0800 Subject: [PATCH] MultiTableInputFormatBase.getSplits is too slow --- .../hbase/mapreduce/MultiTableInputFormatBase.java | 97 +++++++++++++--------- 1 file changed, 59 insertions(+), 38 deletions(-) diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java index 76a1632..7160eac 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java @@ -37,6 +37,9 @@ import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import java.util.Map; +import java.util.HashMap; +import java.util.Iterator; /** * A base for {@link MultiTableInputFormat}s. Receives a list of * {@link Scan} instances that define the input tables and @@ -105,59 +108,77 @@ public abstract class MultiTableInputFormatBase extends * @throws IOException When creating the list of splits fails. * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext) */ + + @Override public List getSplits(JobContext context) throws IOException { if (scans.isEmpty()) { throw new IOException("No scans were provided."); } - List splits = new ArrayList(); + Map> tableMaps = new HashMap>(); for (Scan scan : scans) { byte[] tableName = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME); - if (tableName == null) + if (tableName == null) throw new IOException("A scan object did not have a table name"); - HTable table = new HTable(context.getConfiguration(), tableName); - Pair keys = table.getStartEndKeys(); - if (keys == null || keys.getFirst() == null || - keys.getFirst().length == 0) { - throw new IOException("Expecting at least one region for table : " - + Bytes.toString(tableName)); + String tableNameStr = Bytes.toString(tableName); + + List scanList = tableMaps.get(tableNameStr); + + if (scanList == null) { + scanList = new ArrayList(); + tableMaps.put(tableNameStr, scanList); } - int count = 0; - - byte[] startRow = scan.getStartRow(); - byte[] stopRow = scan.getStopRow(); - - for (int i = 0; i < keys.getFirst().length; i++) { - if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { - continue; + scanList.add(scan); + } + + List splits = new ArrayList(); + Iterator iter = tableMaps.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry> entry = (Map.Entry>) iter.next(); + String tableNameStr = entry.getKey(); + List scanList = entry.getValue(); + HTable table = new HTable(context.getConfiguration(), tableNameStr); + Pair keys = table.getStartEndKeys(); + for (Scan scan : scanList) { + if (keys == null || keys.getFirst() == null || + keys.getFirst().length == 0) { + throw new IOException("Expecting at least one region for table : " + + tableNameStr); } - String regionLocation = - table.getRegionLocation(keys.getFirst()[i], false).getHostname(); - - // determine if the given start and stop keys fall into the range - if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || - Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && - (stopRow.length == 0 || - Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { - byte[] splitStart = - startRow.length == 0 || - Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? keys - .getFirst()[i] : startRow; - byte[] splitStop = - (stopRow.length == 0 || Bytes.compareTo(keys.getSecond()[i], - stopRow) <= 0) && keys.getSecond()[i].length > 0 ? keys - .getSecond()[i] : stopRow; - InputSplit split = - new TableSplit(tableName, scan, splitStart, - splitStop, regionLocation); - splits.add(split); - if (LOG.isDebugEnabled()) - LOG.debug("getSplits: split -> " + (count++) + " -> " + split); + int count = 0; + byte[] startRow = scan.getStartRow(); + byte[] stopRow = scan.getStopRow(); + for (int i = 0; i < keys.getFirst().length; i++) { + if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { + continue; + } + + // determine if the given start and stop keys fall into the range + if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || + Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && + (stopRow.length == 0 || Bytes.compareTo(stopRow, + keys.getFirst()[i]) > 0)) { + byte[] splitStart = startRow.length == 0 || + Bytes.compareTo(keys.getFirst()[i], + startRow) >= 0 ? keys.getFirst()[i] : startRow; + byte[] splitStop = (stopRow.length == 0 || + Bytes.compareTo(keys.getSecond()[i], + stopRow) <= 0) && keys.getSecond()[i].length > 0 ? + keys.getSecond()[i] : stopRow; + String regionLocation = table.getRegionLocation( + keys.getFirst()[i], false).getHostname(); + InputSplit split = new TableSplit(Bytes.toBytes(tableNameStr), scan, + splitStart, splitStop, regionLocation); + splits.add(split); + if (LOG.isDebugEnabled()) + LOG.debug("getSplits: split -> " + (count++) + " -> " + split); + } } } table.close(); } + return splits; } -- 1.8.5.2 (Apple Git-48)