diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java index 5cdfad2..7f2fc9e 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.constraint.ConstraintException; import org.apache.hadoop.hbase.master.LoadBalancer; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.master.balancer.ServerAndLoad; import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer; import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.util.ReflectionUtils; @@ -384,6 +385,10 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { } @Override + public void setClusterLoad(List clusterLoad) { + } + + @Override public void setClusterLoad(Map>> clusterLoad) { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java index 01540b7..e1b3128 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import edu.umd.cs.findbugs.annotations.Nullable; +import org.apache.hadoop.hbase.master.balancer.ServerAndLoad; /** * Makes decisions about the placement and movement of Regions across @@ -62,6 +63,12 @@ public interface LoadBalancer extends Configurable, Stoppable, ConfigurationObse /** * Pass RegionStates and allow balancer to set the current cluster load. + * @param loads + */ + void setClusterLoad(List loads); + + /** + * Pass RegionStates and allow balancer to set the current cluster load. * @param ClusterLoad */ void setClusterLoad(Map>> ClusterLoad); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index 196e693..55d4ae7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -1154,6 +1154,11 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } @Override + public void setClusterLoad(List clusterLoad){ + + } + + @Override public void setMasterServices(MasterServices masterServices) { masterServerName = masterServices.getServerName(); this.services = masterServices; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/ServerAndLoad.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/ServerAndLoad.java index cceaf87..d371189 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/ServerAndLoad.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/ServerAndLoad.java @@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.ServerName; * Data structure that holds servername and 'load'. */ @InterfaceAudience.Private -class ServerAndLoad implements Comparable, Serializable { +public class ServerAndLoad implements Comparable, Serializable { private static final long serialVersionUID = 2735470854607296965L; private final ServerName sn; private final int load; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java index 7e8d696..222f2aa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java @@ -108,6 +108,15 @@ public class SimpleLoadBalancer extends BaseLoadBalancer { } + public void setClusterLoad(List loads) { + serverLoadList = new ArrayList<>(loads); + float sum = 0; + for(ServerAndLoad load : loads) { + sum += load.getLoad(); + } + avgLoadOverall = sum / serverLoadList.size(); + } + public void setClusterLoad(Map>> clusterLoad){ serverLoadList = new ArrayList<>(); float sum = 0; @@ -403,23 +412,35 @@ public class SimpleLoadBalancer extends BaseLoadBalancer { // Assign each underloaded up to the min, then if leftovers, assign to max // Walk down least loaded, assigning to each to fill up to min - for (Map.Entry> server : - serversByLoad.entrySet()) { - int regionCount = server.getKey().getLoad(); - if (regionCount >= min) break; - BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName()); - if(balanceInfo != null) { - regionCount += balanceInfo.getNumRegionsAdded(); - } - if(regionCount >= min) { - continue; + /**HBASE-17969 + * When reaching this point, all regionserver should have at least + * min regions. If we still use serversByLoad's order(ordered by load + * and then severname, see {@link ServerAndLoad.compareTo()}). + * Then servers with smaller servername will have bigger possibility + * to get the remain regions. So here we need shuffle the regionservers + * to assign the remain regions. + */ + if(0 < regionsToMove.size()) { + List serversWithLessLoad = new ArrayList(); + for (Map.Entry> server : + serversByLoad.entrySet()) { + int regionCount = server.getKey().getLoad(); + BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName()); + if(balanceInfo != null) { + regionCount += balanceInfo.getNumRegionsAdded(); + } + if(regionCount >= max) { + break; + } + serversWithLessLoad.add(server.getKey().getServerName()); } - int numToTake = min - regionCount; - int numTaken = 0; - while(numTaken < numToTake && 0 < regionsToMove.size()) { - addRegionPlan(regionsToMove, fetchFromTail, - server.getKey().getServerName(), regionsToReturn); - numTaken++; + //shuffle the less loaded server + Collections.shuffle(serversWithLessLoad, RANDOM); + for(ServerName serverName : serversWithLessLoad) { + addRegionPlan(regionsToMove, fetchFromTail, serverName, regionsToReturn); + if (regionsToMove.isEmpty()) { + break; + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java index f93449c..cfab150 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java @@ -447,9 +447,11 @@ public class BalancerTestBase { updateLoad(map, source, -1); ServerName destination = plan.getDestination(); updateLoad(map, destination, +1); + if(servers != null) { + servers.get(source).remove(plan.getRegionInfo()); + servers.get(destination).add(plan.getRegionInfo()); + } - servers.get(source).remove(plan.getRegionInfo()); - servers.get(destination).add(plan.getRegionInfo()); } } result.clear(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java index 610ecf7..dc7e814 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.net.DNSToSwitchMapping; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -192,4 +193,76 @@ public class TestDefaultLoadBalancer extends BalancerTestBase { List balancedCluster1 = reconcile(clusterList, clusterplans1, clusterServers); assertTrue(assertClusterOverallAsBalanced(balancedCluster1, result1.keySet().size())); } + + /** + * A Test case for HBASE-17969 Balance by table using SimpleLoadBalancer could end up imbalance + * Say we have three RS named s1, s2, s3. + * A table named table1 with 3 regions distributes on these rs like this: + * s1 1 + * s2 1 + * s3 3 + * In average, each rs will have min=1, max=2 regions. So balancer will run. + * For s1 and s2, they have already have min=1 regions. Balancer won't do any operation on them. + * But for s3, it exceed max=3, so balancer will remove one region from s3, + * and choose one rs from s1, \s2 to move to. + * But s1 and s2 have the same load. So we need a random here. otherwise will be chosen since + * servername s1 < s2(alphabet order, sorted by ServerAndLoad's compareTo method). + * It is OK for table1 itself. But if every table in the cluster have similar situations like table1, + * then the load in the cluster will always be like s1 > s2 > s3. + * @throws Exception + */ + @Test (timeout=600000) + public void testACaseOfImbalanceBalanceByTable() throws Exception { + int tables = 10; + //three servers + ServerName s1 = ServerName.valueOf("s1", 60000, 0); + ServerName s2 = ServerName.valueOf("s2", 60000, 0); + ServerName s3 = ServerName.valueOf("s3", 60000, 0); + Map> RegionDistributeForOneTable = new HashMap<>(); + //one region for s1 + List regionForS1 = new ArrayList<>(); + regionForS1.addAll(randomRegions(1)); + RegionDistributeForOneTable.put(s1, regionForS1); + //one region for s2 + List regionForS2 = new ArrayList<>(); + regionForS2.addAll(randomRegions(1)); + RegionDistributeForOneTable.put(s2, regionForS2); + //three region for s2 + List regionForS3 = new ArrayList<>(); + regionForS3.addAll(randomRegions(3)); + RegionDistributeForOneTable.put(s3, regionForS3); + + List servers = convertToList(RegionDistributeForOneTable); + List currentServerLoadList = new ArrayList<>(); + Map serverLoad = new HashMap<>(); + for(ServerAndLoad serverAndLoad : servers) { + ServerAndLoad load = new ServerAndLoad(serverAndLoad.getServerName(), 0); + currentServerLoadList.add(load); + serverLoad.put(serverAndLoad.getServerName(),load); + } + + //mock balance 10 tables in the cluster + for(int i = 0; i < tables; i++ ) { + + loadBalancer.setClusterLoad(currentServerLoadList); + List plans = loadBalancer.balanceCluster(RegionDistributeForOneTable); + List newLoads = reconcile(servers, plans, null); + currentServerLoadList = new ArrayList<>(); + for(ServerAndLoad load : newLoads) { + ServerName server = load.getServerName(); + ServerAndLoad oldLoad = serverLoad.get(load.getServerName()); + ServerAndLoad newLoad = new ServerAndLoad(server, oldLoad.getLoad() + load.getLoad()); + serverLoad.put(server, newLoad); + currentServerLoadList.add(newLoad); + } + + } + ServerAndLoad loadForS1 = serverLoad.get(s1); + //Like said above, if random is not used, s1 will always have 20 regions of the 10 tables + Assert.assertNotEquals(20, loadForS1.getLoad()); + + + + } + }