diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index 7c6858e..4420d23 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action; @@ -144,7 +143,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { new RandomCandidateGenerator(), new LoadCandidateGenerator(), localityCandidateGenerator, - new RegionReplicaCandidateGenerator(), + new RegionReplicaRackCandidateGenerator(), }; regionLoadFunctions = new CostFromRegionLoadFunction[] { @@ -419,6 +418,14 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { return RANDOM.nextInt(cluster.numServers); } + protected int pickRandomRack(Cluster cluster) { + if (cluster.numRacks < 1) { + return -1; + } + + return RANDOM.nextInt(cluster.numRacks); + } + protected int pickOtherRandomServer(Cluster cluster, int serverIndex) { if (cluster.numServers < 2) { return -1; @@ -431,6 +438,18 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } } + protected int pickOtherRandomRack(Cluster cluster, int rackIndex) { + if (cluster.numRacks < 2) { + return -1; + } + while (true) { + int otherRackIndex = pickRandomRack(cluster); + if (otherRackIndex != rackIndex) { + return otherRackIndex; + } + } + } + protected Cluster.Action pickRandomRegions(Cluster cluster, int thisServer, int otherServer) { @@ -484,7 +503,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } } - public static class LoadCandidateGenerator extends CandidateGenerator { + static class LoadCandidateGenerator extends CandidateGenerator { @Override Cluster.Action generate(Cluster cluster) { @@ -580,39 +599,40 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * Generates candidates which moves the replicas out of the region server for * co-hosted region replicas */ - public static class RegionReplicaCandidateGenerator extends CandidateGenerator { + static class RegionReplicaCandidateGenerator extends CandidateGenerator { RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator(); - @Override - Cluster.Action generate(Cluster cluster) { - - int serverIndex = pickRandomServer(cluster); - - if (cluster.numServers <= 1 || serverIndex == -1) { - return Cluster.NullAction; - } - - // randomly select one primaryIndex out of all region replicas in the same server - // we don't know how many region replicas are co-hosted, we will randomly select one - // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html) + /** + * Randomly select one regionIndex out of all region replicas co-hosted in the same group + * (a group is a server, host or rack) + * @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer, + * primariesOfRegionsPerHost or primariesOfRegionsPerRack + * @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack + * @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex + * @return a regionIndex for the selected primary or -1 if there is no co-locating + */ + int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup + , int[] regionIndexToPrimaryIndex) { int currentPrimary = -1; int currentPrimaryIndex = -1; - int primaryIndex = -1; + int selectedPrimaryIndex = -1; double currentLargestRandom = -1; - // regionsByPrimaryPerServer is a sorted array. Since it contains the primary region + // primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region // ids for the regions hosted in server, a consecutive repetition means that replicas // are co-hosted - for (int j = 0; j <= cluster.primariesOfRegionsPerServer[serverIndex].length; j++) { - int primary = j < cluster.primariesOfRegionsPerServer[serverIndex].length - ? cluster.primariesOfRegionsPerServer[serverIndex][j] : -1; + for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) { + int primary = j < primariesOfRegionsPerGroup.length + ? primariesOfRegionsPerGroup[j] : -1; if (primary != currentPrimary) { // check for whether we see a new primary int numReplicas = j - currentPrimaryIndex; if (numReplicas > 1) { // means consecutive primaries, indicating co-location // decide to select this primary region id or not double currentRandom = RANDOM.nextDouble(); + // we don't know how many region replicas are co-hosted, we will randomly select one + // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html) if (currentRandom > currentLargestRandom) { - primaryIndex = currentPrimary; // select this primary + selectedPrimaryIndex = currentPrimary; currentLargestRandom = currentRandom; } } @@ -621,30 +641,73 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } } - // if there are no pairs of region replicas co-hosted, default to random generator - if (primaryIndex == -1) { - // default to randompicker - return randomGenerator.generate(cluster); - } - // we have found the primary id for the region to move. Now find the actual regionIndex // with the given primary, prefer to move the secondary region. - int regionIndex = -1; - for (int k = 0; k < cluster.regionsPerServer[serverIndex].length; k++) { - int region = cluster.regionsPerServer[serverIndex][k]; - if (primaryIndex == cluster.regionIndexToPrimaryIndex[region]) { + for (int j = 0; j < regionsPerGroup.length; j++) { + int regionIndex = regionsPerGroup[j]; + if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) { // always move the secondary, not the primary - if (!RegionReplicaUtil.isDefaultReplica(cluster.regions[region])) { - regionIndex = region; - break; + if (selectedPrimaryIndex != regionIndex) { + return regionIndex; } } } + return -1; + } - int toServerIndex = pickOtherRandomServer(cluster, serverIndex); + @Override + Cluster.Action generate(Cluster cluster) { + int serverIndex = pickRandomServer(cluster); + if (cluster.numServers <= 1 || serverIndex == -1) { + return Cluster.NullAction; + } + int regionIndex = selectCoHostedRegionPerGroup( + cluster.primariesOfRegionsPerServer[serverIndex], + cluster.regionsPerServer[serverIndex], + cluster.regionIndexToPrimaryIndex); + + // if there are no pairs of region replicas co-hosted, default to random generator + if (regionIndex == -1) { + // default to randompicker + return randomGenerator.generate(cluster); + } + + int toServerIndex = pickOtherRandomServer(cluster, serverIndex); int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f); + return getAction (serverIndex, regionIndex, toServerIndex, toRegionIndex); + } + } + + /** + * Generates candidates which moves the replicas out of the rack for + * co-hosted region replicas in the same rack + */ + static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator { + @Override + Cluster.Action generate(Cluster cluster) { + int rackIndex = pickRandomRack(cluster); + if (cluster.numRacks <= 1 || rackIndex == -1) { + return super.generate(cluster); + } + + int regionIndex = selectCoHostedRegionPerGroup( + cluster.primariesOfRegionsPerRack[rackIndex], + cluster.regionsPerRack[rackIndex], + cluster.regionIndexToPrimaryIndex); + // if there are no pairs of region replicas co-hosted, default to random generator + if (regionIndex == -1) { + // default to randompicker + return randomGenerator.generate(cluster); + } + + int serverIndex = cluster.regionIndexToServerIndex[regionIndex]; + int toRackIndex = pickOtherRandomRack(cluster, rackIndex); + + int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length); + int toServerIndex = cluster.serversPerRack[toRackIndex][rand]; + int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f); return getAction (serverIndex, regionIndex, toServerIndex, toRegionIndex); } } @@ -652,7 +715,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { /** * Base class of StochasticLoadBalancer's Cost Functions. */ - public abstract static class CostFunction { + abstract static class CostFunction { private float multiplier = 0; private Configuration conf; @@ -763,7 +826,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * Given the starting state of the regions and a potential ending state * compute cost based upon the number of regions that have moved. */ - public static class MoveCostFunction extends CostFunction { + static class MoveCostFunction extends CostFunction { private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost"; private static final String MAX_MOVES_PERCENT_KEY = "hbase.master.balancer.stochastic.maxMovePercent"; @@ -812,7 +875,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * Compute the cost of a potential cluster state from skew in number of * regions on a cluster. */ - public static class RegionCountSkewCostFunction extends CostFunction { + static class RegionCountSkewCostFunction extends CostFunction { private static final String REGION_COUNT_SKEW_COST_KEY = "hbase.master.balancer.stochastic.regionCountCost"; private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500; @@ -842,7 +905,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * Compute the cost of a potential cluster configuration based upon how evenly * distributed tables are. */ - public static class TableSkewCostFunction extends CostFunction { + static class TableSkewCostFunction extends CostFunction { private static final String TABLE_SKEW_COST_KEY = "hbase.master.balancer.stochastic.tableSkewCost"; @@ -872,7 +935,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * Compute a cost of a potential cluster configuration based upon where * {@link org.apache.hadoop.hbase.regionserver.StoreFile}s are located. */ - public static class LocalityCostFunction extends CostFunction { + static class LocalityCostFunction extends CostFunction { private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost"; private static final float DEFAULT_LOCALITY_COST = 25; @@ -932,7 +995,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * Base class the allows writing costs functions from rolling average of some * number from RegionLoad. */ - public abstract static class CostFromRegionLoadFunction extends CostFunction { + abstract static class CostFromRegionLoadFunction extends CostFunction { private ClusterStatus clusterStatus = null; private Map> loads = null; @@ -1006,7 +1069,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * computed cost will be. This uses a rolling average of regionload. */ - public static class ReadRequestCostFunction extends CostFromRegionLoadFunction { + static class ReadRequestCostFunction extends CostFromRegionLoadFunction { private static final String READ_REQUEST_COST_KEY = "hbase.master.balancer.stochastic.readRequestCost"; @@ -1028,7 +1091,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * Compute the cost of total number of write requests. The more unbalanced the higher the * computed cost will be. This uses a rolling average of regionload. */ - public static class WriteRequestCostFunction extends CostFromRegionLoadFunction { + static class WriteRequestCostFunction extends CostFromRegionLoadFunction { private static final String WRITE_REQUEST_COST_KEY = "hbase.master.balancer.stochastic.writeRequestCost"; @@ -1051,7 +1114,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * though, since if numReplicas > numRegionServers, we still want to keep the * replica open. */ - public static class RegionReplicaHostCostFunction extends CostFunction { + static class RegionReplicaHostCostFunction extends CostFunction { private static final String REGION_REPLICA_HOST_COST_KEY = "hbase.master.balancer.stochastic.regionReplicaHostCostKey"; private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000; @@ -1165,7 +1228,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * cost to hosting replicas of the same region in the same rack. We do not prevent the case * though. */ - public static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction { + static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction { private static final String REGION_REPLICA_RACK_COST_KEY = "hbase.master.balancer.stochastic.regionReplicaRackCostKey"; private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000; @@ -1208,7 +1271,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * Compute the cost of total memstore size. The more unbalanced the higher the * computed cost will be. This uses a rolling average of regionload. */ - public static class MemstoreSizeCostFunction extends CostFromRegionLoadFunction { + static class MemstoreSizeCostFunction extends CostFromRegionLoadFunction { private static final String MEMSTORE_SIZE_COST_KEY = "hbase.master.balancer.stochastic.memstoreSizeCost"; @@ -1228,7 +1291,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * Compute the cost of total open storefiles size. The more unbalanced the higher the * computed cost will be. This uses a rolling average of regionload. */ - public static class StoreFileCostFunction extends CostFromRegionLoadFunction { + static class StoreFileCostFunction extends CostFromRegionLoadFunction { private static final String STOREFILE_SIZE_COST_KEY = "hbase.master.balancer.stochastic.storefileSizeCost"; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java index e9e9a36..c63987c 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java @@ -49,8 +49,6 @@ import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.net.DNSToSwitchMapping; -import org.apache.hadoop.net.NetworkTopology; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -66,8 +64,6 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { public static void beforeAllTests() throws Exception { conf = HBaseConfiguration.create(); conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 0.75f); - conf.setClass("hbase.util.ip.to.rack.determiner", - MyRackResolver.class, DNSToSwitchMapping.class); loadBalancer = new StochasticLoadBalancer(); loadBalancer.setConf(conf); } @@ -491,7 +487,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); } - @Test (timeout = 60000) + @Test (timeout = 800000) public void testRegionReplicasOnSmallCluster() { int numNodes = 10; int numRegions = 1000; @@ -501,9 +497,11 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); } - @Test (timeout = 60000) + @Test (timeout = 800000) public void testRegionReplicasOnMidCluster() { conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); + conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L); + conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec loadBalancer.setConf(conf); int numNodes = 200; int numRegions = 40 * 200; @@ -513,34 +511,38 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); } - @Test (timeout = 60000) + @Test (timeout = 800000) public void testRegionReplicasOnLargeCluster() { conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); + conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L); + conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec loadBalancer.setConf(conf); int numNodes = 1000; - int numRegions = 40 * numNodes; //40 regions per RS - int numRegionsPerServer = 30; //all servers except one + int numRegions = 20 * numNodes; // 20 * replication regions per RS + int numRegionsPerServer = 19; // all servers except one int numTables = 100; int replication = 3; testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); } - @Test (timeout = 60000) + @Test (timeout = 800000) public void testRegionReplicasOnMidClusterHighReplication() { - conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L); + conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 4000000L); + conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); loadBalancer.setConf(conf); - int numNodes = 100; - int numRegions = 6 * 100; - int replication = 100; // 100 replicas per region, one for each server + int numNodes = 80; + int numRegions = 6 * numNodes; + int replication = 80; // 80 replicas per region, one for each server int numRegionsPerServer = 5; int numTables = 10; - testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); + testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, false, true); } - @Test (timeout = 60000) + @Test (timeout = 800000) public void testRegionReplicationOnMidClusterSameHosts() { conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L); + conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); loadBalancer.setConf(conf); int numHosts = 100; @@ -583,34 +585,35 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { } } - @Test (timeout = 120000) + @Test (timeout = 800000) public void testRegionReplicationOnMidClusterWithRacks() { - conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 4000000L); + conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 10000000L); conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); - conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 60 * 1000); // 60 sec + conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec loadBalancer.setConf(conf); - int numNodes = 50; + int numNodes = 30; int numRegions = numNodes * 30; int replication = 3; // 3 replicas per region - int numRegionsPerServer = 25; + int numRegionsPerServer = 28; int numTables = 10; int numRacks = 4; // all replicas should be on a different rack Map> serverMap = createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables); RackManager rm = new ForTestRackManager(numRacks); - testWithCluster(serverMap, rm, true, true); + testWithCluster(serverMap, rm, false, true); } - @Test (timeout = 60000) + @Test (timeout = 800000) public void testRegionReplicationOnMidClusterReplicationGreaterThanNumNodes() { conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L); + conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); loadBalancer.setConf(conf); - int numNodes = 80; - int numRegions = 6 * 100; - int replication = 100; // 100 replicas per region, more than numNodes - int numRegionsPerServer = 5; + int numNodes = 40; + int numRegions = 6 * 50; + int replication = 50; // 50 replicas per region, more than numNodes + int numRegionsPerServer = 6; int numTables = 10; testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, false); } @@ -638,15 +641,19 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { assertNotNull(plans); // Check to see that this actually got to a stable place. - if (assertFullyBalanced) { + if (assertFullyBalanced || assertFullyBalancedForReplicas) { // Apply the plan to the mock cluster. List balancedCluster = reconcile(list, plans, serverMap); // Print out the cluster loads to make debugging easier. LOG.info("Mock Balance : " + printMock(balancedCluster)); - assertClusterAsBalanced(balancedCluster); - List secondPlans = loadBalancer.balanceCluster(serverMap); - assertNull(secondPlans); + + if (assertFullyBalanced) { + assertClusterAsBalanced(balancedCluster); + List secondPlans = loadBalancer.balanceCluster(serverMap); + assertNull(secondPlans); + } + if (assertFullyBalancedForReplicas) { assertRegionReplicaPlacement(serverMap, rackManager); } @@ -681,21 +688,4 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { return clusterState; } - - public static class MyRackResolver implements DNSToSwitchMapping { - - public MyRackResolver(Configuration conf) {} - - @Override - public List resolve(List names) { - List racks = new ArrayList(names.size()); - for (int i = 0; i < names.size(); i++) { - racks.add(i, NetworkTopology.DEFAULT_RACK); - } - return racks; - } - - @Override - public void reloadCachedMappings() {} - } }