diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index dc5bace..6e8b292 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -33,6 +33,7 @@ import java.util.NavigableMap; import java.util.Random; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ExecutionException; import org.apache.commons.lang.NotImplementedException; import org.apache.commons.logging.Log; @@ -59,6 +60,7 @@ import com.google.common.base.Joiner; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListenableFuture; /** * The base class for load balancers. It provides the the functions used to by @@ -117,6 +119,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { HRegionInfo[] regions; Deque[] regionLoads; private RegionLocationFinder regionFinder; + ArrayList> regionLocationFutures; int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality @@ -238,6 +241,13 @@ public abstract class BaseLoadBalancer implements LoadBalancer { regionIndexToTableIndex = new int[numRegions]; regionIndexToPrimaryIndex = new int[numRegions]; regionLoads = new Deque[numRegions]; + regionLocationFutures = new ArrayList>( + numRegions); + if (regionFinder != null) { + for (int i = 0; i < numRegions; i++) { + regionLocationFutures.add(null); + } + } regionLocations = new int[numRegions][]; serverIndicesSortedByRegionCount = new Integer[numServers]; serverIndicesSortedByLocality = new Integer[numServers]; @@ -307,6 +317,30 @@ public abstract class BaseLoadBalancer implements LoadBalancer { regionIndex++; } + if (regionFinder != null) { + for (int index = 0; index < regionLocationFutures.size(); index++) { + ListenableFuture future = regionLocationFutures + .get(index); + HDFSBlocksDistribution blockDistbn = null; + try { + if (future != null) { + blockDistbn = future.get(); + } else { + blockDistbn = new HDFSBlocksDistribution(); + } + } catch (InterruptedException | ExecutionException e) { + blockDistbn = new HDFSBlocksDistribution(); + } + List loc = regionFinder.getTopBlockLocations(blockDistbn); + regionLocations[index] = new int[loc.size()]; + for (int i = 0; i < loc.size(); i++) { + regionLocations[index][i] = loc.get(i) == null ? -1 + : (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1 + : serversToIndex.get(loc.get(i).getHostAndPort())); + } + } + } + for (int i = 0; i < serversPerHostList.size(); i++) { serversPerHost[i] = new int[serversPerHostList.get(i).size()]; for (int j = 0; j < serversPerHost[i].length; j++) { @@ -454,15 +488,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } if (regionFinder != null) { - //region location - List loc = regionFinder.getTopBlockLocations(region); - regionLocations[regionIndex] = new int[loc.size()]; - for (int i=0; i < loc.size(); i++) { - regionLocations[regionIndex][i] = - loc.get(i) == null ? -1 : - (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1 - : serversToIndex.get(loc.get(i).getHostAndPort())); - } + // region location + regionLocationFutures.set(regionIndex, + regionFinder.asyncGetBlockDistribution(region)); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java index a6724ee..cb93cd9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java @@ -21,10 +21,12 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -63,6 +65,7 @@ import java.util.concurrent.TimeUnit; class RegionLocationFinder { private static final Log LOG = LogFactory.getLog(RegionLocationFinder.class); private static final long CACHE_TIME = 240 * 60 * 1000; + private static final HDFSBlocksDistribution EMPTY_BLOCK_DISTRIBUTION = new HDFSBlocksDistribution(); private Configuration conf; private volatile ClusterStatus status; private MasterServices services; @@ -165,7 +168,11 @@ class RegionLocationFinder { } protected List getTopBlockLocations(HRegionInfo region) { - HDFSBlocksDistribution blocksDistribution = getBlockDistribution(region); + return getTopBlockLocations(getBlockDistribution(region)); + } + + protected List getTopBlockLocations( + HDFSBlocksDistribution blocksDistribution) { List topHosts = blocksDistribution.getTopHosts(); return mapHostNameToServerName(topHosts); } @@ -295,4 +302,13 @@ class RegionLocationFinder { return blockDistbn; } } + + public ListenableFuture asyncGetBlockDistribution( + HRegionInfo hri) { + try { + return loader.reload(hri, EMPTY_BLOCK_DISTRIBUTION); + } catch (Exception e) { + return Futures.immediateFuture(new HDFSBlocksDistribution()); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java index d8c0a3d..606a481 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java @@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -56,6 +57,8 @@ import org.junit.experimental.categories.Category; import org.mockito.Mockito; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; @Category({MasterTests.class, MediumTests.class}) public class TestBaseLoadBalancer extends BalancerTestBase { @@ -451,14 +454,39 @@ public class TestBaseLoadBalancer extends BalancerTestBase { // block locality: region:0 => {server:0} // region:1 => {server:0, server:1} // region:42 => {server:4, server:9, server:5} - when(locationFinder.getTopBlockLocations(regions.get(0))).thenReturn( - Lists.newArrayList(servers.get(0))); - when(locationFinder.getTopBlockLocations(regions.get(1))).thenReturn( - Lists.newArrayList(servers.get(0), servers.get(1))); - when(locationFinder.getTopBlockLocations(regions.get(42))).thenReturn( - Lists.newArrayList(servers.get(4), servers.get(9), servers.get(5))); - when(locationFinder.getTopBlockLocations(regions.get(43))).thenReturn( - Lists.newArrayList(ServerName.valueOf("foo", 0, 0))); // this server does not exists in clusterStatus + HDFSBlocksDistribution region0BlockDistribution = new HDFSBlocksDistribution(); + ListenableFuture future0 = Futures + .immediateFuture(region0BlockDistribution); + when(locationFinder.asyncGetBlockDistribution(regions.get(0))).thenReturn( + future0); + when(locationFinder.getTopBlockLocations(region0BlockDistribution)) + .thenReturn(Lists.newArrayList(servers.get(0))); + + HDFSBlocksDistribution region1BlockDistribution = new HDFSBlocksDistribution(); + ListenableFuture future1 = Futures + .immediateFuture(region1BlockDistribution); + when(locationFinder.asyncGetBlockDistribution(regions.get(1))).thenReturn( + future1); + when(locationFinder.getTopBlockLocations(region1BlockDistribution)) + .thenReturn(Lists.newArrayList(servers.get(0), servers.get(1))); + + HDFSBlocksDistribution region42BlockDistribution = new HDFSBlocksDistribution(); + ListenableFuture future42 = Futures + .immediateFuture(region42BlockDistribution); + when(locationFinder.asyncGetBlockDistribution(regions.get(42))).thenReturn( + future42); + when(locationFinder.getTopBlockLocations(region42BlockDistribution)) + .thenReturn( + Lists.newArrayList(servers.get(4), servers.get(9), servers.get(5))); + + HDFSBlocksDistribution region43BlockDistribution = new HDFSBlocksDistribution(); + ListenableFuture future43 = Futures + .immediateFuture(region43BlockDistribution); + when(locationFinder.asyncGetBlockDistribution(regions.get(43))).thenReturn( + future43); + // this server does not exists in clusterStatus + when(locationFinder.getTopBlockLocations(region43BlockDistribution)) + .thenReturn(Lists.newArrayList(ServerName.valueOf("foo", 0, 0))); BaseLoadBalancer.Cluster cluster = new Cluster(clusterState, null, locationFinder, null);