diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java index 73c5478a72..94a48d19cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java @@ -55,6 +55,13 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Callable; /** * Hadoop MR API-agnostic implementation for mapreduce over table snapshots. @@ -356,10 +363,21 @@ public class TableSnapshotInputFormatImpl { Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName()); + Map> blockDistributions = + computeHDFSBlockDistributionForAllRegions(conf, htd, tableDir, regionManifests); + HDFSBlocksDistribution blockDistribution; + List splits = new ArrayList(); for (HRegionInfo hri : regionManifests) { // load region descriptor + try { + blockDistribution = blockDistributions.get(hri.getEncodedName()).get(); + } catch (InterruptedException|ExecutionException e) { + blockDistribution = new HDFSBlocksDistribution(); + } + blockDistribution = blockDistribution == null ? new HDFSBlocksDistribution() : blockDistribution; + if (numSplits > 1) { byte[][] sp = sa.split(hri.getStartKey(), hri.getEndKey(), numSplits, true); for (int i = 0; i < sp.length - 1; i++) { @@ -367,8 +385,7 @@ public class TableSnapshotInputFormatImpl { sp[i + 1])) { // compute HDFS locations from snapshot files (which will get the locations for // referred hfiles) - List hosts = getBestLocations(conf, - HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir)); + List hosts = getBestLocations(conf, blockDistribution); int len = Math.min(3, hosts.size()); hosts = hosts.subList(0, len); @@ -394,8 +411,7 @@ public class TableSnapshotInputFormatImpl { hri.getEndKey())) { // compute HDFS locations from snapshot files (which will get the locations for // referred hfiles) - List hosts = getBestLocations(conf, - HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir)); + List hosts = getBestLocations(conf, blockDistribution); int len = Math.min(3, hosts.size()); hosts = hosts.subList(0, len); @@ -408,6 +424,23 @@ public class TableSnapshotInputFormatImpl { } + private static Map> computeHDFSBlockDistributionForAllRegions( + final Configuration conf, final HTableDescriptor htd, final Path tableDir, List regionManifests) { + + Map> distribution = new HashMap<>(); + int nrThreads = conf.getInt("hbase.snapshot.thread.pool.max", 8); + ExecutorService executor = Executors.newFixedThreadPool(nrThreads); + for (final HRegionInfo hri : regionManifests) { + distribution.put(hri.getEncodedName(), executor.submit(new Callable() { + @Override + public HDFSBlocksDistribution call() throws Exception { + return HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir); + } + })); + } + return distribution; + } + /** * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take * weights into account, thus will treat every location passed from the input split as equal. We