diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index c2529a8..2df4fbe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -1231,7 +1231,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { return assignments; } - Cluster cluster = createCluster(servers, regions); + Cluster cluster = createCluster(servers, regions, false); List unassignedRegions = new ArrayList(); roundRobinAssignment(cluster, regions, unassignedRegions, @@ -1278,7 +1278,10 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } protected Cluster createCluster(List servers, - Collection regions) { + Collection regions, boolean forceRefresh) { + if (forceRefresh) { + regionFinder.refreshAndWait(regions); + } // Get the snapshot of the current assignments for the regions in question, and then create // a cluster out of it. Note that we might have replicas already assigned to some servers // earlier. So we want to get the snapshot to see those assignments, but this will only contain @@ -1352,7 +1355,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } List regions = Lists.newArrayList(regionInfo); - Cluster cluster = createCluster(servers, regions); + Cluster cluster = createCluster(servers, regions, false); return randomAssignment(cluster, regionInfo, servers); } @@ -1427,7 +1430,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { int numRandomAssignments = 0; int numRetainedAssigments = 0; - Cluster cluster = createCluster(servers, regions.keySet()); + Cluster cluster = createCluster(servers, regions.keySet(), true); for (Map.Entry entry : regions.entrySet()) { HRegionInfo region = entry.getKey(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java index a6724ee..6c5cb19 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java @@ -21,6 +21,7 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -63,11 +64,13 @@ import java.util.concurrent.TimeUnit; class RegionLocationFinder { private static final Log LOG = LogFactory.getLog(RegionLocationFinder.class); private static final long CACHE_TIME = 240 * 60 * 1000; + private static final HDFSBlocksDistribution EMPTY_BLOCK_DISTRIBUTION = new HDFSBlocksDistribution(); private Configuration conf; private volatile ClusterStatus status; private MasterServices services; private final ListeningExecutorService executor; - private long lastFullRefresh = 0; + // Do not scheduleFullRefresh at master startup + private long lastFullRefresh = EnvironmentEdgeManager.currentTime(); private CacheLoader loader = new CacheLoader() { @@ -165,8 +168,7 @@ class RegionLocationFinder { } protected List getTopBlockLocations(HRegionInfo region) { - HDFSBlocksDistribution blocksDistribution = getBlockDistribution(region); - List topHosts = blocksDistribution.getTopHosts(); + List topHosts = getBlockDistribution(region).getTopHosts(); return mapHostNameToServerName(topHosts); } @@ -208,7 +210,7 @@ class RegionLocationFinder { + region.getEncodedName(), ioe); } - return new HDFSBlocksDistribution(); + return EMPTY_BLOCK_DISTRIBUTION; } /** @@ -295,4 +297,41 @@ class RegionLocationFinder { return blockDistbn; } } + + private ListenableFuture asyncGetBlockDistribution( + HRegionInfo hri) { + try { + return loader.reload(hri, EMPTY_BLOCK_DISTRIBUTION); + } catch (Exception e) { + return Futures.immediateFuture(EMPTY_BLOCK_DISTRIBUTION); + } + } + + public void refreshAndWait(Collection hris) { + ArrayList> regionLocationFutures = + new ArrayList>(hris.size()); + for (HRegionInfo hregionInfo : hris) { + regionLocationFutures.add(asyncGetBlockDistribution(hregionInfo)); + } + int index = 0; + for (HRegionInfo hregionInfo : hris) { + ListenableFuture future = regionLocationFutures + .get(index); + try { + cache.put(hregionInfo, future.get()); + } catch (InterruptedException ite) { + Thread.currentThread().interrupt(); + } catch (ExecutionException ee) { + LOG.debug( + "ExecutionException during HDFSBlocksDistribution computation. for region = " + + hregionInfo.getEncodedName(), ee); + } + index++; + } + } + + // For test + LoadingCache getCache() { + return cache; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java index bdbdc9f..2585a87 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.master.balancer; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.util.ArrayList; @@ -141,4 +142,24 @@ public class TestRegionLocationFinder { } } } + + @Test + public void testRefreshAndWait() throws Exception { + finder.getCache().invalidateAll(); + for (int i = 0; i < ServerNum; i++) { + HRegionServer server = cluster.getRegionServer(i); + List regions = server.getOnlineRegions(tableName); + if (regions.size() <= 0) { + continue; + } + List regionInfos = new ArrayList(regions.size()); + for (Region region : regions) { + regionInfos.add(region.getRegionInfo()); + } + finder.refreshAndWait(regionInfos); + for (HRegionInfo regionInfo : regionInfos) { + assertNotNull(finder.getCache().getIfPresent(regionInfo)); + } + } + } }