diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index 2b13b21..f71f8f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -33,7 +33,6 @@ import java.util.NavigableMap; import java.util.Random; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.ExecutionException; import org.apache.commons.lang.NotImplementedException; import org.apache.commons.logging.Log; @@ -60,7 +59,6 @@ import com.google.common.base.Joiner; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ListenableFuture; /** * The base class for load balancers. It provides the the functions used to by @@ -119,7 +117,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer { HRegionInfo[] regions; Deque[] regionLoads; private RegionLocationFinder regionFinder; - ArrayList> regionLocationFutures; int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality @@ -169,8 +166,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { Map> loads, RegionLocationFinder regionFinder, RackManager rackManager) { - this(null, clusterState, loads, regionFinder, - rackManager); + this(null, clusterState, loads, regionFinder, rackManager); } @SuppressWarnings("unchecked") @@ -241,13 +237,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { regionIndexToTableIndex = new int[numRegions]; regionIndexToPrimaryIndex = new int[numRegions]; regionLoads = new Deque[numRegions]; - regionLocationFutures = new ArrayList>( - numRegions); - if (regionFinder != null) { - for (int i = 0; i < numRegions; i++) { - regionLocationFutures.add(null); - } - } + regionLocations = new int[numRegions][]; serverIndicesSortedByRegionCount = new Integer[numServers]; serverIndicesSortedByLocality = new Integer[numServers]; @@ -307,43 +297,16 @@ public abstract class BaseLoadBalancer implements LoadBalancer { for (HRegionInfo region : entry.getValue()) { registerRegion(region, regionIndex, serverIndex, loads, regionFinder); - regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex; regionIndex++; } } + for (HRegionInfo region : unassignedRegions) { registerRegion(region, regionIndex, -1, loads, regionFinder); regionIndex++; } - if (regionFinder != null) { - for (int index = 0; index < regionLocationFutures.size(); index++) { - ListenableFuture future = regionLocationFutures - .get(index); - HDFSBlocksDistribution blockDistbn = null; - try { - blockDistbn = future.get(); - } catch (InterruptedException ite) { - } catch (ExecutionException ee) { - LOG.debug( - "IOException during HDFSBlocksDistribution computation. for region = " - + regions[index].getEncodedName(), ee); - } finally { - if (blockDistbn == null) { - blockDistbn = new HDFSBlocksDistribution(); - } - } - List loc = regionFinder.getTopBlockLocations(blockDistbn); - regionLocations[index] = new int[loc.size()]; - for (int i = 0; i < loc.size(); i++) { - regionLocations[index][i] = loc.get(i) == null ? -1 - : (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1 - : serversToIndex.get(loc.get(i).getHostAndPort())); - } - } - } - for (int i = 0; i < serversPerHostList.size(); i++) { serversPerHost[i] = new int[serversPerHostList.get(i).size()]; for (int j = 0; j < serversPerHost[i].length; j++) { @@ -464,8 +427,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } /** Helper for Cluster constructor to handle a region */ - private void registerRegion(HRegionInfo region, int regionIndex, int serverIndex, - Map> loads, RegionLocationFinder regionFinder) { + private void registerRegion(HRegionInfo region, int regionIndex, + int serverIndex, Map> loads, + RegionLocationFinder regionFinder) { String tableName = region.getTable().getNameAsString(); if (!tablesToIndex.containsKey(tableName)) { tables.add(tableName); @@ -492,8 +456,13 @@ public abstract class BaseLoadBalancer implements LoadBalancer { if (regionFinder != null) { // region location - regionLocationFutures.set(regionIndex, - regionFinder.asyncGetBlockDistribution(region)); + List loc = regionFinder.getTopBlockLocations(region); + regionLocations[regionIndex] = new int[loc.size()]; + for (int i = 0; i < loc.size(); i++) { + regionLocations[regionIndex][i] = loc.get(i) == null ? -1 + : (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1 + : serversToIndex.get(loc.get(i).getHostAndPort())); + } } } @@ -1277,7 +1246,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { return assignments; } - Cluster cluster = createCluster(servers, regions); + Cluster cluster = createCluster(servers, regions, false); List unassignedRegions = new ArrayList(); roundRobinAssignment(cluster, regions, unassignedRegions, @@ -1324,7 +1293,10 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } protected Cluster createCluster(List servers, - Collection regions) { + Collection regions, boolean forceRefresh) { + if (forceRefresh == true) { + regionFinder.refreshAndWait(regions); + } // Get the snapshot of the current assignments for the regions in question, and then create // a cluster out of it. Note that we might have replicas already assigned to some servers // earlier. So we want to get the snapshot to see those assignments, but this will only contain @@ -1337,7 +1309,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } } return new Cluster(regions, clusterState, null, this.regionFinder, - rackManager); + rackManager); } /** @@ -1365,7 +1337,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } List regions = Lists.newArrayList(regionInfo); - Cluster cluster = createCluster(servers, regions); + Cluster cluster = createCluster(servers, regions, false); return randomAssignment(cluster, regionInfo, servers); } @@ -1440,7 +1412,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { int numRandomAssignments = 0; int numRetainedAssigments = 0; - Cluster cluster = createCluster(servers, regions.keySet()); + Cluster cluster = createCluster(servers, regions.keySet(), true); for (Map.Entry entry : regions.entrySet()) { HRegionInfo region = entry.getKey(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java index fbe57d0..732cbb8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java @@ -17,15 +17,17 @@ */ package org.apache.hadoop.hbase.master.balancer; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -43,17 +45,15 @@ import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * This will find where data for a region is located in HDFS. It ranks @@ -167,9 +167,8 @@ class RegionLocationFinder { return includesUserTables; } - protected List getTopBlockLocations( - HDFSBlocksDistribution blocksDistribution) { - List topHosts = blocksDistribution.getTopHosts(); + protected List getTopBlockLocations(HRegionInfo region) { + List topHosts = getBlockDistribution(region).getTopHosts(); return mapHostNameToServerName(topHosts); } @@ -299,7 +298,7 @@ class RegionLocationFinder { } } - public ListenableFuture asyncGetBlockDistribution( + private ListenableFuture asyncGetBlockDistribution( HRegionInfo hri) { try { return loader.reload(hri, EMPTY_BLOCK_DISTRIBUTION); @@ -307,4 +306,34 @@ class RegionLocationFinder { return Futures.immediateFuture(EMPTY_BLOCK_DISTRIBUTION); } } + + public void refreshAndWait(Collection hris) { + ArrayList> regionLocationFutures = + new ArrayList>(hris.size()); + for (HRegionInfo hregionInfo : hris) { + regionLocationFutures.add(asyncGetBlockDistribution(hregionInfo)); + } + int index = 0; + for (HRegionInfo hregionInfo : hris) { + ListenableFuture future = regionLocationFutures + .get(index); + try { + cache.put(hregionInfo, future.get()); + } catch (InterruptedException ite) { + } catch (ExecutionException ee) { + LOG.debug( + "IOException during HDFSBlocksDistribution computation. for region = " + + hregionInfo.getEncodedName(), ee); + } + index++; + } + if (lastFullRefresh == 0) { + lastFullRefresh = EnvironmentEdgeManager.currentTime(); + } + } + + // For test + LoadingCache getCache() { + return cache; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java index 37165d3..b0b0a2b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java @@ -37,7 +37,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseIOException; -import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -57,8 +56,6 @@ import org.junit.experimental.categories.Category; import org.mockito.Mockito; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; @Category({MasterTests.class, MediumTests.class}) public class TestBaseLoadBalancer extends BalancerTestBase { @@ -451,49 +448,17 @@ public class TestBaseLoadBalancer extends BalancerTestBase { // mock block locality for some regions RegionLocationFinder locationFinder = mock(RegionLocationFinder.class); - HDFSBlocksDistribution emptyBlockDistribution = new HDFSBlocksDistribution(); - ListenableFuture defaultFuture = Futures - .immediateFuture(emptyBlockDistribution); - for (HRegionInfo regionInfo : regions) { - when(locationFinder.asyncGetBlockDistribution(regionInfo)).thenReturn( - defaultFuture); - } // block locality: region:0 => {server:0} // region:1 => {server:0, server:1} // region:42 => {server:4, server:9, server:5} - HDFSBlocksDistribution region0BlockDistribution = new HDFSBlocksDistribution(); - ListenableFuture future0 = Futures - .immediateFuture(region0BlockDistribution); - when(locationFinder.asyncGetBlockDistribution(regions.get(0))).thenReturn( - future0); - when(locationFinder.getTopBlockLocations(region0BlockDistribution)) - .thenReturn(Lists.newArrayList(servers.get(0))); - - HDFSBlocksDistribution region1BlockDistribution = new HDFSBlocksDistribution(); - ListenableFuture future1 = Futures - .immediateFuture(region1BlockDistribution); - when(locationFinder.asyncGetBlockDistribution(regions.get(1))).thenReturn( - future1); - when(locationFinder.getTopBlockLocations(region1BlockDistribution)) - .thenReturn(Lists.newArrayList(servers.get(0), servers.get(1))); - - HDFSBlocksDistribution region42BlockDistribution = new HDFSBlocksDistribution(); - ListenableFuture future42 = Futures - .immediateFuture(region42BlockDistribution); - when(locationFinder.asyncGetBlockDistribution(regions.get(42))).thenReturn( - future42); - when(locationFinder.getTopBlockLocations(region42BlockDistribution)) - .thenReturn( - Lists.newArrayList(servers.get(4), servers.get(9), servers.get(5))); - - HDFSBlocksDistribution region43BlockDistribution = new HDFSBlocksDistribution(); - ListenableFuture future43 = Futures - .immediateFuture(region43BlockDistribution); - when(locationFinder.asyncGetBlockDistribution(regions.get(43))).thenReturn( - future43); - // this server does not exists in clusterStatus - when(locationFinder.getTopBlockLocations(region43BlockDistribution)) - .thenReturn(Lists.newArrayList(ServerName.valueOf("foo", 0, 0))); + when(locationFinder.getTopBlockLocations(regions.get(0))).thenReturn( + Lists.newArrayList(servers.get(0))); + when(locationFinder.getTopBlockLocations(regions.get(1))).thenReturn( + Lists.newArrayList(servers.get(0), servers.get(1))); + when(locationFinder.getTopBlockLocations(regions.get(42))).thenReturn( + Lists.newArrayList(servers.get(4), servers.get(9), servers.get(5))); + when(locationFinder.getTopBlockLocations(regions.get(43))).thenReturn( + Lists.newArrayList(ServerName.valueOf("foo", 0, 0))); // this server does not exists in clusterStatus BaseLoadBalancer.Cluster cluster = new Cluster(clusterState, null, locationFinder, null); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java index 039cac1..f18d722 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.master.balancer; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.util.ArrayList; @@ -25,6 +26,7 @@ import java.util.List; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -121,8 +123,8 @@ public class TestRegionLocationFinder { for (int i = 0; i < ServerNum; i++) { HRegionServer server = cluster.getRegionServer(i); for (Region region : server.getOnlineRegions(tableName)) { - List servers = finder.getTopBlockLocations(finder - .getBlockDistribution(region.getRegionInfo())); + List servers = finder.getTopBlockLocations(region + .getRegionInfo()); // test table may have empty region if (region.getHDFSBlocksDistribution().getUniqueBlocksTotalWeight() == 0) { continue; @@ -139,4 +141,24 @@ public class TestRegionLocationFinder { } } } + + @Test + public void testRefreshAndWait() throws Exception { + finder.getCache().invalidateAll(); + for (int i = 0; i < ServerNum; i++) { + HRegionServer server = cluster.getRegionServer(i); + List regions = server.getOnlineRegions(tableName); + if (regions.size() <= 0) { + continue; + } + List regionInfos = new ArrayList(regions.size()); + for (Region region : regions) { + regionInfos.add(region.getRegionInfo()); + } + finder.refreshAndWait(regionInfos); + for (HRegionInfo regionInfo : regionInfos) { + assertNotNull(finder.getCache().getIfPresent(regionInfo)); + } + } + } }