diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index 2b13b21..1c5001f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -169,8 +169,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { Map> loads, RegionLocationFinder regionFinder, RackManager rackManager) { - this(null, clusterState, loads, regionFinder, - rackManager); + this(null, clusterState, loads, regionFinder, rackManager, false); } @SuppressWarnings("unchecked") @@ -179,7 +178,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer { Map> clusterState, Map> loads, RegionLocationFinder regionFinder, - RackManager rackManager) { + RackManager rackManager, + boolean startup) { if (unassignedRegions == null) { unassignedRegions = EMPTY_REGION_LIST; @@ -241,9 +241,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer { regionIndexToTableIndex = new int[numRegions]; regionIndexToPrimaryIndex = new int[numRegions]; regionLoads = new Deque[numRegions]; - regionLocationFutures = new ArrayList>( - numRegions); - if (regionFinder != null) { + if (startup && regionFinder != null) { + regionLocationFutures = new ArrayList>( + numRegions); for (int i = 0; i < numRegions; i++) { regionLocationFutures.add(null); } @@ -306,18 +306,19 @@ public abstract class BaseLoadBalancer implements LoadBalancer { serverIndexToRackIndex[serverIndex] = rackIndex; for (HRegionInfo region : entry.getValue()) { - registerRegion(region, regionIndex, serverIndex, loads, regionFinder); + registerRegion(region, regionIndex, serverIndex, loads, regionFinder, + startup); regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex; regionIndex++; } } for (HRegionInfo region : unassignedRegions) { - registerRegion(region, regionIndex, -1, loads, regionFinder); + registerRegion(region, regionIndex, -1, loads, regionFinder, startup); regionIndex++; } - if (regionFinder != null) { + if (startup && regionFinder != null) { for (int index = 0; index < regionLocationFutures.size(); index++) { ListenableFuture future = regionLocationFutures .get(index); @@ -464,8 +465,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } /** Helper for Cluster constructor to handle a region */ - private void registerRegion(HRegionInfo region, int regionIndex, int serverIndex, - Map> loads, RegionLocationFinder regionFinder) { + private void registerRegion(HRegionInfo region, int regionIndex, + int serverIndex, Map> loads, + RegionLocationFinder regionFinder, boolean startup) { String tableName = region.getTable().getNameAsString(); if (!tablesToIndex.containsKey(tableName)) { tables.add(tableName); @@ -492,8 +494,18 @@ public abstract class BaseLoadBalancer implements LoadBalancer { if (regionFinder != null) { // region location - regionLocationFutures.set(regionIndex, - regionFinder.asyncGetBlockDistribution(region)); + if (startup) { + regionLocationFutures.set(regionIndex, + regionFinder.asyncGetBlockDistribution(region)); + } else { + List loc = regionFinder.getTopBlockLocations(region); + regionLocations[regionIndex] = new int[loc.size()]; + for (int i = 0; i < loc.size(); i++) { + regionLocations[regionIndex][i] = loc.get(i) == null ? -1 + : (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1 + : serversToIndex.get(loc.get(i).getHostAndPort())); + } + } } } @@ -1277,7 +1289,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { return assignments; } - Cluster cluster = createCluster(servers, regions); + Cluster cluster = createCluster(servers, regions, false); List unassignedRegions = new ArrayList(); roundRobinAssignment(cluster, regions, unassignedRegions, @@ -1324,7 +1336,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } protected Cluster createCluster(List servers, - Collection regions) { + Collection regions, boolean startup) { // Get the snapshot of the current assignments for the regions in question, and then create // a cluster out of it. Note that we might have replicas already assigned to some servers // earlier. So we want to get the snapshot to see those assignments, but this will only contain @@ -1337,7 +1349,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } } return new Cluster(regions, clusterState, null, this.regionFinder, - rackManager); + rackManager, startup); } /** @@ -1365,7 +1377,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } List regions = Lists.newArrayList(regionInfo); - Cluster cluster = createCluster(servers, regions); + Cluster cluster = createCluster(servers, regions, false); return randomAssignment(cluster, regionInfo, servers); } @@ -1440,7 +1452,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { int numRandomAssignments = 0; int numRetainedAssigments = 0; - Cluster cluster = createCluster(servers, regions.keySet()); + Cluster cluster = createCluster(servers, regions.keySet(), true); for (Map.Entry entry : regions.entrySet()) { HRegionInfo region = entry.getKey(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java index fbe57d0..288de09 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java @@ -167,6 +167,10 @@ class RegionLocationFinder { return includesUserTables; } + protected List getTopBlockLocations(HRegionInfo region) { + return getTopBlockLocations(getBlockDistribution(region)); + } + protected List getTopBlockLocations( HDFSBlocksDistribution blocksDistribution) { List topHosts = blocksDistribution.getTopHosts(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java index 37165d3..fa76ef9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java @@ -440,7 +440,7 @@ public class TestBaseLoadBalancer extends BalancerTestBase { } } - @Test (timeout=180000) + @Test(timeout = 180000) public void testClusterRegionLocations() { // tests whether region locations are handled correctly in Cluster List servers = getListOfServerNames(randomServers(10, 10)); @@ -451,6 +451,78 @@ public class TestBaseLoadBalancer extends BalancerTestBase { // mock block locality for some regions RegionLocationFinder locationFinder = mock(RegionLocationFinder.class); + // block locality: region:0 => {server:0} + // region:1 => {server:0, server:1} + // region:42 => {server:4, server:9, server:5} + when(locationFinder.getTopBlockLocations(regions.get(0))).thenReturn( + Lists.newArrayList(servers.get(0))); + when(locationFinder.getTopBlockLocations(regions.get(1))).thenReturn( + Lists.newArrayList(servers.get(0), servers.get(1))); + when(locationFinder.getTopBlockLocations(regions.get(42))).thenReturn( + Lists.newArrayList(servers.get(4), servers.get(9), servers.get(5))); + when(locationFinder.getTopBlockLocations(regions.get(43))).thenReturn( + Lists.newArrayList(ServerName.valueOf("foo", 0, 0))); // this server does not exists in clusterStatus + + HDFSBlocksDistribution region43BlockDistribution = new HDFSBlocksDistribution(); + ListenableFuture future43 = Futures + .immediateFuture(region43BlockDistribution); + when(locationFinder.asyncGetBlockDistribution(regions.get(43))).thenReturn( + future43); + // this server does not exists in clusterStatus + when(locationFinder.getTopBlockLocations(region43BlockDistribution)) + .thenReturn(Lists.newArrayList(ServerName.valueOf("foo", 0, 0))); + + BaseLoadBalancer.Cluster cluster = new Cluster(clusterState, null, + locationFinder, null); + + int r0 = ArrayUtils.indexOf(cluster.regions, regions.get(0)); // this is ok, + // it is just + // a test + int r1 = ArrayUtils.indexOf(cluster.regions, regions.get(1)); + int r10 = ArrayUtils.indexOf(cluster.regions, regions.get(10)); + int r42 = ArrayUtils.indexOf(cluster.regions, regions.get(42)); + int r43 = ArrayUtils.indexOf(cluster.regions, regions.get(43)); + + int s0 = cluster.serversToIndex.get(servers.get(0).getHostAndPort()); + int s1 = cluster.serversToIndex.get(servers.get(1).getHostAndPort()); + int s4 = cluster.serversToIndex.get(servers.get(4).getHostAndPort()); + int s5 = cluster.serversToIndex.get(servers.get(5).getHostAndPort()); + int s9 = cluster.serversToIndex.get(servers.get(9).getHostAndPort()); + + // region 0 locations + assertEquals(1, cluster.regionLocations[r0].length); + assertEquals(s0, cluster.regionLocations[r0][0]); + + // region 1 locations + assertEquals(2, cluster.regionLocations[r1].length); + assertEquals(s0, cluster.regionLocations[r1][0]); + assertEquals(s1, cluster.regionLocations[r1][1]); + + // region 10 locations + assertEquals(0, cluster.regionLocations[r10].length); + + // region 42 locations + assertEquals(3, cluster.regionLocations[r42].length); + assertEquals(s4, cluster.regionLocations[r42][0]); + assertEquals(s9, cluster.regionLocations[r42][1]); + assertEquals(s5, cluster.regionLocations[r42][2]); + + // region 43 locations + assertEquals(1, cluster.regionLocations[r43].length); + assertEquals(-1, cluster.regionLocations[r43][0]); + } + + @Test (timeout=180000) + public void testClusterRegionLocationsStartup() { + // tests whether region locations are handled correctly in Cluster + List servers = getListOfServerNames(randomServers(10, 10)); + List regions = randomRegions(101); + Map> clusterState = new HashMap>(); + + assignRegions(regions, servers, clusterState); + + // mock block locality for some regions + RegionLocationFinder locationFinder = mock(RegionLocationFinder.class); HDFSBlocksDistribution emptyBlockDistribution = new HDFSBlocksDistribution(); ListenableFuture defaultFuture = Futures .immediateFuture(emptyBlockDistribution); @@ -495,7 +567,8 @@ public class TestBaseLoadBalancer extends BalancerTestBase { when(locationFinder.getTopBlockLocations(region43BlockDistribution)) .thenReturn(Lists.newArrayList(ServerName.valueOf("foo", 0, 0))); - BaseLoadBalancer.Cluster cluster = new Cluster(clusterState, null, locationFinder, null); + BaseLoadBalancer.Cluster cluster = new Cluster(null, clusterState, null, + locationFinder, null, true); int r0 = ArrayUtils.indexOf(cluster.regions, regions.get(0)); // this is ok, it is just a test int r1 = ArrayUtils.indexOf(cluster.regions, regions.get(1));