Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java (revision 1576574) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java (working copy) @@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action; @@ -144,7 +143,7 @@ new RandomCandidateGenerator(), new LoadCandidateGenerator(), localityCandidateGenerator, - new RegionReplicaCandidateGenerator(), + new RegionReplicaRackCandidateGenerator(), }; regionLoadFunctions = new CostFromRegionLoadFunction[] { @@ -419,6 +418,14 @@ return RANDOM.nextInt(cluster.numServers); } + protected int pickRandomRack(Cluster cluster) { + if (cluster.numRacks < 1) { + return -1; + } + + return RANDOM.nextInt(cluster.numRacks); + } + protected int pickOtherRandomServer(Cluster cluster, int serverIndex) { if (cluster.numServers < 2) { return -1; @@ -431,6 +438,18 @@ } } + protected int pickOtherRandomRack(Cluster cluster, int rackIndex) { + if (cluster.numRacks < 2) { + return -1; + } + while (true) { + int otherRackIndex = pickRandomRack(cluster); + if (otherRackIndex != rackIndex) { + return otherRackIndex; + } + } + } + protected Cluster.Action pickRandomRegions(Cluster cluster, int thisServer, int otherServer) { @@ -484,7 +503,7 @@ } } - public static class LoadCandidateGenerator extends CandidateGenerator { + static class LoadCandidateGenerator extends CandidateGenerator { @Override Cluster.Action generate(Cluster cluster) { @@ -581,39 +600,40 @@ * Generates candidates which moves the replicas out of the region server for * co-hosted region replicas */ - public static class RegionReplicaCandidateGenerator extends CandidateGenerator { + static class RegionReplicaCandidateGenerator extends CandidateGenerator { RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator(); - @Override - Cluster.Action generate(Cluster cluster) { - - int serverIndex = pickRandomServer(cluster); - - if (cluster.numServers <= 1 || serverIndex == -1) { - return Cluster.NullAction; - } - - // randomly select one primaryIndex out of all region replicas in the same server - // we don't know how many region replicas are co-hosted, we will randomly select one - // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html) + /** + * Randomly select one regionIndex out of all region replicas co-hosted in the same group + * (a group is a server, host or rack) + * @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer, + * primariesOfRegionsPerHost or primariesOfRegionsPerRack + * @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack + * @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex + * @return a regionIndex for the selected primary or -1 if there is no co-locating + */ + int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup + , int[] regionIndexToPrimaryIndex) { int currentPrimary = -1; int currentPrimaryIndex = -1; - int primaryIndex = -1; + int selectedPrimaryIndex = -1; double currentLargestRandom = -1; - // regionsByPrimaryPerServer is a sorted array. Since it contains the primary region + // primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region // ids for the regions hosted in server, a consecutive repetition means that replicas // are co-hosted - for (int j = 0; j <= cluster.primariesOfRegionsPerServer[serverIndex].length; j++) { - int primary = j < cluster.primariesOfRegionsPerServer[serverIndex].length - ? cluster.primariesOfRegionsPerServer[serverIndex][j] : -1; + for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) { + int primary = j < primariesOfRegionsPerGroup.length + ? primariesOfRegionsPerGroup[j] : -1; if (primary != currentPrimary) { // check for whether we see a new primary int numReplicas = j - currentPrimaryIndex; if (numReplicas > 1) { // means consecutive primaries, indicating co-location // decide to select this primary region id or not double currentRandom = RANDOM.nextDouble(); + // we don't know how many region replicas are co-hosted, we will randomly select one + // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html) if (currentRandom > currentLargestRandom) { - primaryIndex = currentPrimary; // select this primary + selectedPrimaryIndex = currentPrimary; currentLargestRandom = currentRandom; } } @@ -622,30 +642,73 @@ } } - // if there are no pairs of region replicas co-hosted, default to random generator - if (primaryIndex == -1) { - // default to randompicker - return randomGenerator.generate(cluster); - } - // we have found the primary id for the region to move. Now find the actual regionIndex // with the given primary, prefer to move the secondary region. - int regionIndex = -1; - for (int k = 0; k < cluster.regionsPerServer[serverIndex].length; k++) { - int region = cluster.regionsPerServer[serverIndex][k]; - if (primaryIndex == cluster.regionIndexToPrimaryIndex[region]) { + for (int j = 0; j < regionsPerGroup.length; j++) { + int regionIndex = regionsPerGroup[j]; + if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) { // always move the secondary, not the primary - if (!RegionReplicaUtil.isDefaultReplica(cluster.regions[region])) { - regionIndex = region; - break; + if (selectedPrimaryIndex != regionIndex) { + return regionIndex; } } } + return -1; + } + @Override + Cluster.Action generate(Cluster cluster) { + int serverIndex = pickRandomServer(cluster); + if (cluster.numServers <= 1 || serverIndex == -1) { + return Cluster.NullAction; + } + + int regionIndex = selectCoHostedRegionPerGroup( + cluster.primariesOfRegionsPerServer[serverIndex], + cluster.regionsPerServer[serverIndex], + cluster.regionIndexToPrimaryIndex); + + // if there are no pairs of region replicas co-hosted, default to random generator + if (regionIndex == -1) { + // default to randompicker + return randomGenerator.generate(cluster); + } + int toServerIndex = pickOtherRandomServer(cluster, serverIndex); + int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f); + return getAction (serverIndex, regionIndex, toServerIndex, toRegionIndex); + } + } + /** + * Generates candidates which moves the replicas out of the rack for + * co-hosted region replicas in the same rack + */ + static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator { + @Override + Cluster.Action generate(Cluster cluster) { + int rackIndex = pickRandomRack(cluster); + if (cluster.numRacks <= 1 || rackIndex == -1) { + return super.generate(cluster); + } + + int regionIndex = selectCoHostedRegionPerGroup( + cluster.primariesOfRegionsPerRack[rackIndex], + cluster.regionsPerRack[rackIndex], + cluster.regionIndexToPrimaryIndex); + + // if there are no pairs of region replicas co-hosted, default to random generator + if (regionIndex == -1) { + // default to randompicker + return randomGenerator.generate(cluster); + } + + int serverIndex = cluster.regionIndexToServerIndex[regionIndex]; + int toRackIndex = pickOtherRandomRack(cluster, rackIndex); + + int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length); + int toServerIndex = cluster.serversPerRack[toRackIndex][rand]; int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f); - return getAction (serverIndex, regionIndex, toServerIndex, toRegionIndex); } } @@ -653,7 +716,7 @@ /** * Base class of StochasticLoadBalancer's Cost Functions. */ - public abstract static class CostFunction { + abstract static class CostFunction { private float multiplier = 0; private Configuration conf; @@ -764,7 +827,7 @@ * Given the starting state of the regions and a potential ending state * compute cost based upon the number of regions that have moved. */ - public static class MoveCostFunction extends CostFunction { + static class MoveCostFunction extends CostFunction { private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost"; private static final String MAX_MOVES_PERCENT_KEY = "hbase.master.balancer.stochastic.maxMovePercent"; @@ -813,7 +876,7 @@ * Compute the cost of a potential cluster state from skew in number of * regions on a cluster. */ - public static class RegionCountSkewCostFunction extends CostFunction { + static class RegionCountSkewCostFunction extends CostFunction { private static final String REGION_COUNT_SKEW_COST_KEY = "hbase.master.balancer.stochastic.regionCountCost"; private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500; @@ -843,7 +906,7 @@ * Compute the cost of a potential cluster configuration based upon how evenly * distributed tables are. */ - public static class TableSkewCostFunction extends CostFunction { + static class TableSkewCostFunction extends CostFunction { private static final String TABLE_SKEW_COST_KEY = "hbase.master.balancer.stochastic.tableSkewCost"; @@ -873,7 +936,7 @@ * Compute a cost of a potential cluster configuration based upon where * {@link org.apache.hadoop.hbase.regionserver.StoreFile}s are located. */ - public static class LocalityCostFunction extends CostFunction { + static class LocalityCostFunction extends CostFunction { private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost"; private static final float DEFAULT_LOCALITY_COST = 25; @@ -933,7 +996,7 @@ * Base class the allows writing costs functions from rolling average of some * number from RegionLoad. */ - public abstract static class CostFromRegionLoadFunction extends CostFunction { + abstract static class CostFromRegionLoadFunction extends CostFunction { private ClusterStatus clusterStatus = null; private Map> loads = null; @@ -1007,7 +1070,7 @@ * computed cost will be. This uses a rolling average of regionload. */ - public static class ReadRequestCostFunction extends CostFromRegionLoadFunction { + static class ReadRequestCostFunction extends CostFromRegionLoadFunction { private static final String READ_REQUEST_COST_KEY = "hbase.master.balancer.stochastic.readRequestCost"; @@ -1029,7 +1092,7 @@ * Compute the cost of total number of write requests. The more unbalanced the higher the * computed cost will be. This uses a rolling average of regionload. */ - public static class WriteRequestCostFunction extends CostFromRegionLoadFunction { + static class WriteRequestCostFunction extends CostFromRegionLoadFunction { private static final String WRITE_REQUEST_COST_KEY = "hbase.master.balancer.stochastic.writeRequestCost"; @@ -1052,7 +1115,7 @@ * though, since if numReplicas > numRegionServers, we still want to keep the * replica open. */ - public static class RegionReplicaHostCostFunction extends CostFunction { + static class RegionReplicaHostCostFunction extends CostFunction { private static final String REGION_REPLICA_HOST_COST_KEY = "hbase.master.balancer.stochastic.regionReplicaHostCostKey"; private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000; @@ -1166,7 +1229,7 @@ * cost to hosting replicas of the same region in the same rack. We do not prevent the case * though. */ - public static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction { + static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction { private static final String REGION_REPLICA_RACK_COST_KEY = "hbase.master.balancer.stochastic.regionReplicaRackCostKey"; private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000; @@ -1209,7 +1272,7 @@ * Compute the cost of total memstore size. The more unbalanced the higher the * computed cost will be. This uses a rolling average of regionload. */ - public static class MemstoreSizeCostFunction extends CostFromRegionLoadFunction { + static class MemstoreSizeCostFunction extends CostFromRegionLoadFunction { private static final String MEMSTORE_SIZE_COST_KEY = "hbase.master.balancer.stochastic.memstoreSizeCost"; @@ -1229,7 +1292,7 @@ * Compute the cost of total open storefiles size. The more unbalanced the higher the * computed cost will be. This uses a rolling average of regionload. */ - public static class StoreFileCostFunction extends CostFromRegionLoadFunction { + static class StoreFileCostFunction extends CostFromRegionLoadFunction { private static final String STOREFILE_SIZE_COST_KEY = "hbase.master.balancer.stochastic.storefileSizeCost"; Index: hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java (revision 1576574) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java (working copy) @@ -49,8 +49,6 @@ import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.net.DNSToSwitchMapping; -import org.apache.hadoop.net.NetworkTopology; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -66,8 +64,6 @@ public static void beforeAllTests() throws Exception { conf = HBaseConfiguration.create(); conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 0.75f); - conf.setClass("hbase.util.ip.to.rack.determiner", - MyRackResolver.class, DNSToSwitchMapping.class); loadBalancer = new StochasticLoadBalancer(); loadBalancer.setConf(conf); } @@ -491,7 +487,7 @@ testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); } - @Test (timeout = 60000) + @Test (timeout = 800000) public void testRegionReplicasOnSmallCluster() { int numNodes = 10; int numRegions = 1000; @@ -501,7 +497,7 @@ testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); } - @Test (timeout = 60000) + @Test (timeout = 800000) public void testRegionReplicasOnMidCluster() { conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); loadBalancer.setConf(conf); @@ -513,7 +509,7 @@ testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); } - @Test (timeout = 60000) + @Test (timeout = 800000) public void testRegionReplicasOnLargeCluster() { conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); loadBalancer.setConf(conf); @@ -525,20 +521,21 @@ testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); } - @Test (timeout = 60000) + @Test (timeout = 800000) public void testRegionReplicasOnMidClusterHighReplication() { - conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L); + conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 4000000L); + conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); loadBalancer.setConf(conf); int numNodes = 100; - int numRegions = 6 * 100; + int numRegions = 6 * numNodes; int replication = 100; // 100 replicas per region, one for each server int numRegionsPerServer = 5; int numTables = 10; testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); } - @Test (timeout = 60000) + @Test (timeout = 800000) public void testRegionReplicationOnMidClusterSameHosts() { conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L); conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); @@ -583,16 +580,16 @@ } } - @Test (timeout = 120000) + @Test (timeout = 800000) public void testRegionReplicationOnMidClusterWithRacks() { - conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 4000000L); + conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 10000000L); conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); - conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 60 * 1000); // 60 sec + conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec loadBalancer.setConf(conf); - int numNodes = 50; + int numNodes = 30; int numRegions = numNodes * 30; int replication = 3; // 3 replicas per region - int numRegionsPerServer = 25; + int numRegionsPerServer = 28; int numTables = 10; int numRacks = 4; // all replicas should be on a different rack Map> serverMap = @@ -602,7 +599,7 @@ testWithCluster(serverMap, rm, true, true); } - @Test (timeout = 60000) + @Test (timeout = 800000) public void testRegionReplicationOnMidClusterReplicationGreaterThanNumNodes() { conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L); conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); @@ -681,21 +678,4 @@ return clusterState; } - - public static class MyRackResolver implements DNSToSwitchMapping { - - public MyRackResolver(Configuration conf) {} - - @Override - public List resolve(List names) { - List racks = new ArrayList(names.size()); - for (int i = 0; i < names.size(); i++) { - racks.add(i, NetworkTopology.DEFAULT_RACK); - } - return racks; - } - - @Override - public void reloadCachedMappings() {} - } }