diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index 3da6266..fa024aa 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action; @@ -142,7 +141,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { new RandomCandidateGenerator(), new LoadCandidateGenerator(), localityCandidateGenerator, - new RegionReplicaCandidateGenerator(), + new RegionReplicaRackCandidateGenerator(), }; regionLoadFunctions = new CostFromRegionLoadFunction[] { @@ -405,6 +404,14 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { return RANDOM.nextInt(cluster.numServers); } + protected int pickRandomRack(Cluster cluster) { + if (cluster.numRacks < 1) { + return -1; + } + + return RANDOM.nextInt(cluster.numRacks); + } + protected int pickOtherRandomServer(Cluster cluster, int serverIndex) { if (cluster.numServers < 2) { return -1; @@ -417,6 +424,18 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } } + protected int pickOtherRandomRack(Cluster cluster, int rackIndex) { + if (cluster.numRacks < 2) { + return -1; + } + while (true) { + int otherRackIndex = pickRandomRack(cluster); + if (otherRackIndex != rackIndex) { + return otherRackIndex; + } + } + } + protected Cluster.Action pickRandomRegions(Cluster cluster, int thisServer, int otherServer) { @@ -470,7 +489,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } } - public static class LoadCandidateGenerator extends CandidateGenerator { + static class LoadCandidateGenerator extends CandidateGenerator { @Override Cluster.Action generate(Cluster cluster) { @@ -567,39 +586,40 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * Generates candidates which moves the replicas out of the region server for * co-hosted region replicas */ - public static class RegionReplicaCandidateGenerator extends CandidateGenerator { + static class RegionReplicaCandidateGenerator extends CandidateGenerator { RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator(); - @Override - Cluster.Action generate(Cluster cluster) { - - int serverIndex = pickRandomServer(cluster); - - if (cluster.numServers <= 1 || serverIndex == -1) { - return Cluster.NullAction; - } - - // randomly select one primaryIndex out of all region replicas in the same server - // we don't know how many region replicas are co-hosted, we will randomly select one - // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html) + /** + * Randomly select one regionIndex out of all region replicas co-hosted in the same group + * (a group is a server, host or rack) + * @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer, + * primariesOfRegionsPerHost or primariesOfRegionsPerRack + * @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack + * @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex + * @return a regionIndex for the selected primary or -1 if there is no co-locating + */ + int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup + , int[] regionIndexToPrimaryIndex) { int currentPrimary = -1; int currentPrimaryIndex = -1; - int primaryIndex = -1; + int selectedPrimaryIndex = -1; double currentLargestRandom = -1; - // regionsByPrimaryPerServer is a sorted array. Since it contains the primary region + // primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region // ids for the regions hosted in server, a consecutive repetition means that replicas // are co-hosted - for (int j = 0; j <= cluster.primariesOfRegionsPerServer[serverIndex].length; j++) { - int primary = j < cluster.primariesOfRegionsPerServer[serverIndex].length - ? cluster.primariesOfRegionsPerServer[serverIndex][j] : -1; + for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) { + int primary = j < primariesOfRegionsPerGroup.length + ? primariesOfRegionsPerGroup[j] : -1; if (primary != currentPrimary) { // check for whether we see a new primary int numReplicas = j - currentPrimaryIndex; if (numReplicas > 1) { // means consecutive primaries, indicating co-location // decide to select this primary region id or not double currentRandom = RANDOM.nextDouble(); + // we don't know how many region replicas are co-hosted, we will randomly select one + // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html) if (currentRandom > currentLargestRandom) { - primaryIndex = currentPrimary; // select this primary + selectedPrimaryIndex = currentPrimary; currentLargestRandom = currentRandom; } } @@ -608,30 +628,73 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } } - // if there are no pairs of region replicas co-hosted, default to random generator - if (primaryIndex == -1) { - // default to randompicker - return randomGenerator.generate(cluster); - } - // we have found the primary id for the region to move. Now find the actual regionIndex // with the given primary, prefer to move the secondary region. - int regionIndex = -1; - for (int k = 0; k < cluster.regionsPerServer[serverIndex].length; k++) { - int region = cluster.regionsPerServer[serverIndex][k]; - if (primaryIndex == cluster.regionIndexToPrimaryIndex[region]) { + for (int j = 0; j < regionsPerGroup.length; j++) { + int regionIndex = regionsPerGroup[j]; + if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) { // always move the secondary, not the primary - if (!RegionReplicaUtil.isDefaultReplica(cluster.regions[region])) { - regionIndex = region; - break; + if (selectedPrimaryIndex != regionIndex) { + return regionIndex; } } } + return -1; + } - int toServerIndex = pickOtherRandomServer(cluster, serverIndex); + @Override + Cluster.Action generate(Cluster cluster) { + int serverIndex = pickRandomServer(cluster); + if (cluster.numServers <= 1 || serverIndex == -1) { + return Cluster.NullAction; + } + int regionIndex = selectCoHostedRegionPerGroup( + cluster.primariesOfRegionsPerServer[serverIndex], + cluster.regionsPerServer[serverIndex], + cluster.regionIndexToPrimaryIndex); + + // if there are no pairs of region replicas co-hosted, default to random generator + if (regionIndex == -1) { + // default to randompicker + return randomGenerator.generate(cluster); + } + + int toServerIndex = pickOtherRandomServer(cluster, serverIndex); int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f); + return getAction (serverIndex, regionIndex, toServerIndex, toRegionIndex); + } + } + + /** + * Generates candidates which moves the replicas out of the rack for + * co-hosted region replicas in the same rack + */ + static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator { + @Override + Cluster.Action generate(Cluster cluster) { + int rackIndex = pickRandomRack(cluster); + if (cluster.numRacks <= 1 || rackIndex == -1) { + return super.generate(cluster); + } + + int regionIndex = selectCoHostedRegionPerGroup( + cluster.primariesOfRegionsPerRack[rackIndex], + cluster.regionsPerRack[rackIndex], + cluster.regionIndexToPrimaryIndex); + // if there are no pairs of region replicas co-hosted, default to random generator + if (regionIndex == -1) { + // default to randompicker + return randomGenerator.generate(cluster); + } + + int serverIndex = cluster.regionIndexToServerIndex[regionIndex]; + int toRackIndex = pickOtherRandomRack(cluster, rackIndex); + + int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length); + int toServerIndex = cluster.serversPerRack[toRackIndex][rand]; + int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f); return getAction (serverIndex, regionIndex, toServerIndex, toRegionIndex); } } @@ -639,7 +702,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { /** * Base class of StochasticLoadBalancer's Cost Functions. */ - public abstract static class CostFunction { + abstract static class CostFunction { private float multiplier = 0; private Configuration conf; @@ -750,7 +813,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * Given the starting state of the regions and a potential ending state * compute cost based upon the number of regions that have moved. */ - public static class MoveCostFunction extends CostFunction { + static class MoveCostFunction extends CostFunction { private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost"; private static final String MAX_MOVES_PERCENT_KEY = "hbase.master.balancer.stochastic.maxMovePercent"; @@ -799,7 +862,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * Compute the cost of a potential cluster state from skew in number of * regions on a cluster. */ - public static class RegionCountSkewCostFunction extends CostFunction { + static class RegionCountSkewCostFunction extends CostFunction { private static final String REGION_COUNT_SKEW_COST_KEY = "hbase.master.balancer.stochastic.regionCountCost"; private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500; @@ -829,7 +892,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * Compute the cost of a potential cluster configuration based upon how evenly * distributed tables are. */ - public static class TableSkewCostFunction extends CostFunction { + static class TableSkewCostFunction extends CostFunction { private static final String TABLE_SKEW_COST_KEY = "hbase.master.balancer.stochastic.tableSkewCost"; @@ -859,7 +922,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * Compute a cost of a potential cluster configuration based upon where * {@link org.apache.hadoop.hbase.regionserver.StoreFile}s are located. */ - public static class LocalityCostFunction extends CostFunction { + static class LocalityCostFunction extends CostFunction { private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost"; private static final float DEFAULT_LOCALITY_COST = 25; @@ -919,7 +982,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * Base class the allows writing costs functions from rolling average of some * number from RegionLoad. */ - public abstract static class CostFromRegionLoadFunction extends CostFunction { + abstract static class CostFromRegionLoadFunction extends CostFunction { private ClusterStatus clusterStatus = null; private Map> loads = null; @@ -993,7 +1056,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * computed cost will be. This uses a rolling average of regionload. */ - public static class ReadRequestCostFunction extends CostFromRegionLoadFunction { + static class ReadRequestCostFunction extends CostFromRegionLoadFunction { private static final String READ_REQUEST_COST_KEY = "hbase.master.balancer.stochastic.readRequestCost"; @@ -1015,7 +1078,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * Compute the cost of total number of write requests. The more unbalanced the higher the * computed cost will be. This uses a rolling average of regionload. */ - public static class WriteRequestCostFunction extends CostFromRegionLoadFunction { + static class WriteRequestCostFunction extends CostFromRegionLoadFunction { private static final String WRITE_REQUEST_COST_KEY = "hbase.master.balancer.stochastic.writeRequestCost"; @@ -1038,7 +1101,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * though, since if numReplicas > numRegionServers, we still want to keep the * replica open. */ - public static class RegionReplicaHostCostFunction extends CostFunction { + static class RegionReplicaHostCostFunction extends CostFunction { private static final String REGION_REPLICA_HOST_COST_KEY = "hbase.master.balancer.stochastic.regionReplicaHostCostKey"; private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000; @@ -1152,7 +1215,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * cost to hosting replicas of the same region in the same rack. We do not prevent the case * though. */ - public static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction { + static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction { private static final String REGION_REPLICA_RACK_COST_KEY = "hbase.master.balancer.stochastic.regionReplicaRackCostKey"; private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000; @@ -1195,7 +1258,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * Compute the cost of total memstore size. The more unbalanced the higher the * computed cost will be. This uses a rolling average of regionload. */ - public static class MemstoreSizeCostFunction extends CostFromRegionLoadFunction { + static class MemstoreSizeCostFunction extends CostFromRegionLoadFunction { private static final String MEMSTORE_SIZE_COST_KEY = "hbase.master.balancer.stochastic.memstoreSizeCost"; @@ -1215,7 +1278,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * Compute the cost of total open storefiles size. The more unbalanced the higher the * computed cost will be. This uses a rolling average of regionload. */ - public static class StoreFileCostFunction extends CostFromRegionLoadFunction { + static class StoreFileCostFunction extends CostFromRegionLoadFunction { private static final String STOREFILE_SIZE_COST_KEY = "hbase.master.balancer.stochastic.storefileSizeCost"; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java index 7ffb7d5..631572e 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java @@ -495,13 +495,14 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); } - @Test (timeout = 60000) + @Test (timeout = 180000) public void testRegionReplicasOnMidClusterHighReplication() { - conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L); + conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 4000000L); + conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); loadBalancer.setConf(conf); int numNodes = 100; - int numRegions = 6 * 100; + int numRegions = 6 * numNodes; int replication = 100; // 100 replicas per region, one for each server int numRegionsPerServer = 5; int numTables = 10; @@ -546,16 +547,16 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { } } - @Test (timeout = 120000) + @Test (timeout = 180000) public void testRegionReplicationOnMidClusterWithRacks() { - conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 4000000L); + conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 10000000L); conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); - conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 60 * 1000); // 60 sec + conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec loadBalancer.setConf(conf); - int numNodes = 50; + int numNodes = 30; int numRegions = numNodes * 30; int replication = 3; // 3 replicas per region - int numRegionsPerServer = 25; + int numRegionsPerServer = 28; int numTables = 10; int numRacks = 4; // all replicas should be on a different rack Map> serverMap =