diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index dc5bace..5b5868d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -33,6 +33,7 @@ import java.util.NavigableMap; import java.util.Random; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ExecutionException; import org.apache.commons.lang.NotImplementedException; import org.apache.commons.logging.Log; @@ -59,6 +60,7 @@ import com.google.common.base.Joiner; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListenableFuture; /** * The base class for load balancers. It provides the the functions used to by @@ -117,6 +119,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { HRegionInfo[] regions; Deque[] regionLoads; private RegionLocationFinder regionFinder; + ArrayList> regionLocationFutures; int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality @@ -238,6 +241,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer { regionIndexToTableIndex = new int[numRegions]; regionIndexToPrimaryIndex = new int[numRegions]; regionLoads = new Deque[numRegions]; + regionLocationFutures = new ArrayList>( + numRegions); regionLocations = new int[numRegions][]; serverIndicesSortedByRegionCount = new Integer[numServers]; serverIndicesSortedByLocality = new Integer[numServers]; @@ -307,6 +312,24 @@ public abstract class BaseLoadBalancer implements LoadBalancer { regionIndex++; } + for (int index = 0; index < regionLocationFutures.size(); index++) { + ListenableFuture future = regionLocationFutures + .get(index); + HDFSBlocksDistribution blockDistbn = null; + try { + blockDistbn = future.get(); + } catch (InterruptedException | ExecutionException e) { + blockDistbn = new HDFSBlocksDistribution(); + } + List loc = regionFinder.getTopBlockLocations(blockDistbn); + regionLocations[index] = new int[loc.size()]; + for (int i = 0; i < loc.size(); i++) { + regionLocations[index][i] = loc.get(i) == null ? -1 : (serversToIndex + .get(loc.get(i).getHostAndPort()) == null ? -1 : serversToIndex + .get(loc.get(i).getHostAndPort())); + } + } + for (int i = 0; i < serversPerHostList.size(); i++) { serversPerHost[i] = new int[serversPerHostList.get(i).size()]; for (int j = 0; j < serversPerHost[i].length; j++) { @@ -454,15 +477,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } if (regionFinder != null) { - //region location - List loc = regionFinder.getTopBlockLocations(region); - regionLocations[regionIndex] = new int[loc.size()]; - for (int i=0; i < loc.size(); i++) { - regionLocations[regionIndex][i] = - loc.get(i) == null ? -1 : - (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1 - : serversToIndex.get(loc.get(i).getHostAndPort())); - } + // region location + regionLocationFutures.add(regionIndex, + regionFinder.asyncGetBlockDistribution(region)); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java index a6724ee..fc9d160 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java @@ -21,10 +21,12 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -165,7 +167,11 @@ class RegionLocationFinder { } protected List getTopBlockLocations(HRegionInfo region) { - HDFSBlocksDistribution blocksDistribution = getBlockDistribution(region); + return getTopBlockLocations(getBlockDistribution(region)); + } + + protected List getTopBlockLocations( + HDFSBlocksDistribution blocksDistribution) { List topHosts = blocksDistribution.getTopHosts(); return mapHostNameToServerName(topHosts); } @@ -295,4 +301,13 @@ class RegionLocationFinder { return blockDistbn; } } + + public ListenableFuture asyncGetBlockDistribution( + HRegionInfo hri) { + try { + return loader.reload(hri, null); + } catch (Exception e) { + return Futures.immediateFuture(new HDFSBlocksDistribution()); + } + } }