diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index f22f9fb..03128dd 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1050,9 +1050,13 @@ possible configurations would overwhelm and obscure the important.
hbase.master.loadbalancer.class
- org.apache.hadoop.hbase.master.balancer.DefaultLoadBalancer
+ org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer
Class used to execute the regions balancing when the period occurs.
+ See the class comment for more on how it works
+ http://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.html
+ It replaces the DefaultLoadBalancer as the default (since renamed
+ as the SimpleLoadBalancer).
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DefaultLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DefaultLoadBalancer.java
deleted file mode 100644
index 9b89927..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DefaultLoadBalancer.java
+++ /dev/null
@@ -1,433 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master.balancer;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Random;
-import java.util.TreeMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.master.AssignmentManager;
-import org.apache.hadoop.hbase.master.RegionPlan;
-
-import com.google.common.collect.MinMaxPriorityQueue;
-
-/**
- * Makes decisions about the placement and movement of Regions across
- * RegionServers.
- *
- *
Cluster-wide load balancing will occur only when there are no regions in
- * transition and according to a fixed period of a time using {@link #balanceCluster(Map)}.
- *
- *
Inline region placement with {@link #immediateAssignment} can be used when
- * the Master needs to handle closed regions that it currently does not have
- * a destination set for. This can happen during master failover.
- *
- *
On cluster startup, bulk assignment can be used to determine
- * locations for all Regions in a cluster.
- *
- *
This classes produces plans for the {@link AssignmentManager} to execute.
- */
-@InterfaceAudience.Private
-public class DefaultLoadBalancer extends BaseLoadBalancer {
- private static final Log LOG = LogFactory.getLog(DefaultLoadBalancer.class);
- private static final Random RANDOM = new Random(System.currentTimeMillis());
-
- private RegionInfoComparator riComparator = new RegionInfoComparator();
- private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator();
-
-
- /**
- * Stores additional per-server information about the regions added/removed
- * during the run of the balancing algorithm.
- *
- * For servers that shed regions, we need to track which regions we have already
- * shed. nextRegionForUnload contains the index in the list of regions on
- * the server that is the next to be shed.
- */
- static class BalanceInfo {
-
- private final int nextRegionForUnload;
- private int numRegionsAdded;
-
- public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) {
- this.nextRegionForUnload = nextRegionForUnload;
- this.numRegionsAdded = numRegionsAdded;
- }
-
- int getNextRegionForUnload() {
- return nextRegionForUnload;
- }
-
- int getNumRegionsAdded() {
- return numRegionsAdded;
- }
-
- void setNumRegionsAdded(int numAdded) {
- this.numRegionsAdded = numAdded;
- }
- }
-
- /**
- * Generate a global load balancing plan according to the specified map of
- * server information to the most loaded regions of each server.
- *
- * The load balancing invariant is that all servers are within 1 region of the
- * average number of regions per server. If the average is an integer number,
- * all servers will be balanced to the average. Otherwise, all servers will
- * have either floor(average) or ceiling(average) regions.
- *
- * HBASE-3609 Modeled regionsToMove using Guava's MinMaxPriorityQueue so that
- * we can fetch from both ends of the queue.
- * At the beginning, we check whether there was empty region server
- * just discovered by Master. If so, we alternately choose new / old
- * regions from head / tail of regionsToMove, respectively. This alternation
- * avoids clustering young regions on the newly discovered region server.
- * Otherwise, we choose new regions from head of regionsToMove.
- *
- * Another improvement from HBASE-3609 is that we assign regions from
- * regionsToMove to underloaded servers in round-robin fashion.
- * Previously one underloaded server would be filled before we move onto
- * the next underloaded server, leading to clustering of young regions.
- *
- * Finally, we randomly shuffle underloaded servers so that they receive
- * offloaded regions relatively evenly across calls to balanceCluster().
- *
- * The algorithm is currently implemented as such:
- *
- *
- * - Determine the two valid numbers of regions each server should have,
- * MIN=floor(average) and MAX=ceiling(average).
- *
- *
- Iterate down the most loaded servers, shedding regions from each so
- * each server hosts exactly MAX regions. Stop once you reach a
- * server that already has <= MAX regions.
- *
- * Order the regions to move from most recent to least.
- *
- *
- Iterate down the least loaded servers, assigning regions so each server
- * has exactly MIN regions. Stop once you reach a server that
- * already has >= MIN regions.
- *
- * Regions being assigned to underloaded servers are those that were shed
- * in the previous step. It is possible that there were not enough
- * regions shed to fill each underloaded server to MIN. If so we
- * end up with a number of regions required to do so, neededRegions.
- *
- * It is also possible that we were able to fill each underloaded but ended
- * up with regions that were unassigned from overloaded servers but that
- * still do not have assignment.
- *
- * If neither of these conditions hold (no regions needed to fill the
- * underloaded servers, no regions leftover from overloaded servers),
- * we are done and return. Otherwise we handle these cases below.
- *
- *
- If neededRegions is non-zero (still have underloaded servers),
- * we iterate the most loaded servers again, shedding a single server from
- * each (this brings them from having MAX regions to having
- * MIN regions).
- *
- *
- We now definitely have more regions that need assignment, either from
- * the previous step or from the original shedding from overloaded servers.
- * Iterate the least loaded servers filling each to MIN.
- *
- *
- If we still have more regions that need assignment, again iterate the
- * least loaded servers, this time giving each one (filling them to
- * MAX) until we run out.
- *
- *
- All servers will now either host MIN or MAX regions.
- *
- * In addition, any server hosting >= MAX regions is guaranteed
- * to end up with MAX regions at the end of the balancing. This
- * ensures the minimal number of regions possible are moved.
- *
- *
- * TODO: We can at-most reassign the number of regions away from a particular
- * server to be how many they report as most loaded.
- * Should we just keep all assignment in memory? Any objections?
- * Does this mean we need HeapSize on HMaster? Or just careful monitor?
- * (current thinking is we will hold all assignments in memory)
- *
- * @param clusterMap Map of regionservers and their load/region information to
- * a list of their most loaded regions
- * @return a list of regions to be moved, including source and destination,
- * or null if cluster is already balanced
- */
- public List balanceCluster(
- Map> clusterMap) {
- boolean emptyRegionServerPresent = false;
- long startTime = System.currentTimeMillis();
-
- ClusterLoadState cs = new ClusterLoadState(clusterMap);
-
- if (!this.needsBalance(cs)) return null;
-
- int numServers = cs.getNumServers();
- NavigableMap> serversByLoad = cs.getServersByLoad();
- int numRegions = cs.getNumRegions();
- int min = numRegions / numServers;
- int max = numRegions % numServers == 0 ? min : min + 1;
-
- // Using to check balance result.
- StringBuilder strBalanceParam = new StringBuilder();
- strBalanceParam.append("Balance parameter: numRegions=").append(numRegions)
- .append(", numServers=").append(numServers).append(", max=").append(max)
- .append(", min=").append(min);
- LOG.debug(strBalanceParam.toString());
-
- // Balance the cluster
- // TODO: Look at data block locality or a more complex load to do this
- MinMaxPriorityQueue regionsToMove =
- MinMaxPriorityQueue.orderedBy(rpComparator).create();
- List regionsToReturn = new ArrayList();
-
- // Walk down most loaded, pruning each to the max
- int serversOverloaded = 0;
- // flag used to fetch regions from head and tail of list, alternately
- boolean fetchFromTail = false;
- Map serverBalanceInfo =
- new TreeMap();
- for (Map.Entry> server:
- serversByLoad.descendingMap().entrySet()) {
- ServerAndLoad sal = server.getKey();
- int regionCount = sal.getLoad();
- if (regionCount <= max) {
- serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0));
- break;
- }
- serversOverloaded++;
- List regions = server.getValue();
- int numToOffload = Math.min(regionCount - max, regions.size());
- // account for the out-of-band regions which were assigned to this server
- // after some other region server crashed
- Collections.sort(regions, riComparator);
- int numTaken = 0;
- for (int i = 0; i <= numToOffload; ) {
- HRegionInfo hri = regions.get(i); // fetch from head
- if (fetchFromTail) {
- hri = regions.get(regions.size() - 1 - i);
- }
- i++;
- // Don't rebalance meta regions.
- if (hri.isMetaRegion()) continue;
- regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null));
- numTaken++;
- if (numTaken >= numToOffload) break;
- // fetch in alternate order if there is new region server
- if (emptyRegionServerPresent) {
- fetchFromTail = !fetchFromTail;
- }
- }
- serverBalanceInfo.put(sal.getServerName(),
- new BalanceInfo(numToOffload, (-1)*numTaken));
- }
- int totalNumMoved = regionsToMove.size();
-
- // Walk down least loaded, filling each to the min
- int neededRegions = 0; // number of regions needed to bring all up to min
- fetchFromTail = false;
-
- Map underloadedServers = new HashMap();
- float average = (float)numRegions / numServers; // for logging
- int maxToTake = numRegions - (int)average;
- for (Map.Entry> server:
- serversByLoad.entrySet()) {
- if (maxToTake == 0) break; // no more to take
- int regionCount = server.getKey().getLoad();
- if (regionCount >= min && regionCount > 0) {
- continue; // look for other servers which haven't reached min
- }
- int regionsToPut = min - regionCount;
- if (regionsToPut == 0)
- {
- regionsToPut = 1;
- }
- maxToTake -= regionsToPut;
- underloadedServers.put(server.getKey().getServerName(), regionsToPut);
- }
- // number of servers that get new regions
- int serversUnderloaded = underloadedServers.size();
- int incr = 1;
- List sns =
- Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded]));
- Collections.shuffle(sns, RANDOM);
- while (regionsToMove.size() > 0) {
- int cnt = 0;
- int i = incr > 0 ? 0 : underloadedServers.size()-1;
- for (; i >= 0 && i < underloadedServers.size(); i += incr) {
- if (regionsToMove.isEmpty()) break;
- ServerName si = sns.get(i);
- int numToTake = underloadedServers.get(si);
- if (numToTake == 0) continue;
-
- addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn);
- if (emptyRegionServerPresent) {
- fetchFromTail = !fetchFromTail;
- }
-
- underloadedServers.put(si, numToTake-1);
- cnt++;
- BalanceInfo bi = serverBalanceInfo.get(si);
- if (bi == null) {
- bi = new BalanceInfo(0, 0);
- serverBalanceInfo.put(si, bi);
- }
- bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1);
- }
- if (cnt == 0) break;
- // iterates underloadedServers in the other direction
- incr = -incr;
- }
- for (Integer i : underloadedServers.values()) {
- // If we still want to take some, increment needed
- neededRegions += i;
- }
-
- // If none needed to fill all to min and none left to drain all to max,
- // we are done
- if (neededRegions == 0 && regionsToMove.isEmpty()) {
- long endTime = System.currentTimeMillis();
- LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " +
- "Moving " + totalNumMoved + " regions off of " +
- serversOverloaded + " overloaded servers onto " +
- serversUnderloaded + " less loaded servers");
- return regionsToReturn;
- }
-
- // Need to do a second pass.
- // Either more regions to assign out or servers that are still underloaded
-
- // If we need more to fill min, grab one from each most loaded until enough
- if (neededRegions != 0) {
- // Walk down most loaded, grabbing one from each until we get enough
- for (Map.Entry> server :
- serversByLoad.descendingMap().entrySet()) {
- BalanceInfo balanceInfo =
- serverBalanceInfo.get(server.getKey().getServerName());
- int idx =
- balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload();
- if (idx >= server.getValue().size()) break;
- HRegionInfo region = server.getValue().get(idx);
- if (region.isMetaRegion()) continue; // Don't move meta regions.
- regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null));
- totalNumMoved++;
- if (--neededRegions == 0) {
- // No more regions needed, done shedding
- break;
- }
- }
- }
-
- // Now we have a set of regions that must be all assigned out
- // Assign each underloaded up to the min, then if leftovers, assign to max
-
- // Walk down least loaded, assigning to each to fill up to min
- for (Map.Entry> server :
- serversByLoad.entrySet()) {
- int regionCount = server.getKey().getLoad();
- if (regionCount >= min) break;
- BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
- if(balanceInfo != null) {
- regionCount += balanceInfo.getNumRegionsAdded();
- }
- if(regionCount >= min) {
- continue;
- }
- int numToTake = min - regionCount;
- int numTaken = 0;
- while(numTaken < numToTake && 0 < regionsToMove.size()) {
- addRegionPlan(regionsToMove, fetchFromTail,
- server.getKey().getServerName(), regionsToReturn);
- numTaken++;
- if (emptyRegionServerPresent) {
- fetchFromTail = !fetchFromTail;
- }
- }
- }
-
- // If we still have regions to dish out, assign underloaded to max
- if (0 < regionsToMove.size()) {
- for (Map.Entry> server :
- serversByLoad.entrySet()) {
- int regionCount = server.getKey().getLoad();
- BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
- if(balanceInfo != null) {
- regionCount += balanceInfo.getNumRegionsAdded();
- }
- if(regionCount >= max) {
- break;
- }
- addRegionPlan(regionsToMove, fetchFromTail,
- server.getKey().getServerName(), regionsToReturn);
- if (emptyRegionServerPresent) {
- fetchFromTail = !fetchFromTail;
- }
- if (regionsToMove.isEmpty()) {
- break;
- }
- }
- }
-
- long endTime = System.currentTimeMillis();
-
- if (!regionsToMove.isEmpty() || neededRegions != 0) {
- // Emit data so can diagnose how balancer went astray.
- LOG.warn("regionsToMove=" + totalNumMoved +
- ", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded +
- ", serversUnderloaded=" + serversUnderloaded);
- StringBuilder sb = new StringBuilder();
- for (Map.Entry> e: clusterMap.entrySet()) {
- if (sb.length() > 0) sb.append(", ");
- sb.append(e.getKey().toString());
- sb.append(" ");
- sb.append(e.getValue().size());
- }
- LOG.warn("Input " + sb.toString());
- }
-
- // All done!
- LOG.info("Done. Calculated a load balance in " + (endTime-startTime) + "ms. " +
- "Moving " + totalNumMoved + " regions off of " +
- serversOverloaded + " overloaded servers onto " +
- serversUnderloaded + " less loaded servers");
-
- return regionsToReturn;
- }
-
- /**
- * Add a region from the head or tail to the List of regions to return.
- */
- private void addRegionPlan(final MinMaxPriorityQueue regionsToMove,
- final boolean fetchFromTail, final ServerName sn, List regionsToReturn) {
- RegionPlan rp = null;
- if (!fetchFromTail) rp = regionsToMove.remove();
- else rp = regionsToMove.removeLast();
- rp.setDestination(sn);
- regionsToReturn.add(rp);
- }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java
new file mode 100644
index 0000000..da6b443
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java
@@ -0,0 +1,433 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.master.RegionPlan;
+
+import com.google.common.collect.MinMaxPriorityQueue;
+
+/**
+ * Makes decisions about the placement and movement of Regions across
+ * RegionServers.
+ *
+ * Cluster-wide load balancing will occur only when there are no regions in
+ * transition and according to a fixed period of a time using {@link #balanceCluster(Map)}.
+ *
+ *
Inline region placement with {@link #immediateAssignment} can be used when
+ * the Master needs to handle closed regions that it currently does not have
+ * a destination set for. This can happen during master failover.
+ *
+ *
On cluster startup, bulk assignment can be used to determine
+ * locations for all Regions in a cluster.
+ *
+ *
This classes produces plans for the {@link AssignmentManager} to execute.
+ */
+@InterfaceAudience.Private
+public class SimpleLoadBalancer extends BaseLoadBalancer {
+ private static final Log LOG = LogFactory.getLog(SimpleLoadBalancer.class);
+ private static final Random RANDOM = new Random(System.currentTimeMillis());
+
+ private RegionInfoComparator riComparator = new RegionInfoComparator();
+ private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator();
+
+
+ /**
+ * Stores additional per-server information about the regions added/removed
+ * during the run of the balancing algorithm.
+ *
+ * For servers that shed regions, we need to track which regions we have already
+ * shed. nextRegionForUnload contains the index in the list of regions on
+ * the server that is the next to be shed.
+ */
+ static class BalanceInfo {
+
+ private final int nextRegionForUnload;
+ private int numRegionsAdded;
+
+ public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) {
+ this.nextRegionForUnload = nextRegionForUnload;
+ this.numRegionsAdded = numRegionsAdded;
+ }
+
+ int getNextRegionForUnload() {
+ return nextRegionForUnload;
+ }
+
+ int getNumRegionsAdded() {
+ return numRegionsAdded;
+ }
+
+ void setNumRegionsAdded(int numAdded) {
+ this.numRegionsAdded = numAdded;
+ }
+ }
+
+ /**
+ * Generate a global load balancing plan according to the specified map of
+ * server information to the most loaded regions of each server.
+ *
+ * The load balancing invariant is that all servers are within 1 region of the
+ * average number of regions per server. If the average is an integer number,
+ * all servers will be balanced to the average. Otherwise, all servers will
+ * have either floor(average) or ceiling(average) regions.
+ *
+ * HBASE-3609 Modeled regionsToMove using Guava's MinMaxPriorityQueue so that
+ * we can fetch from both ends of the queue.
+ * At the beginning, we check whether there was empty region server
+ * just discovered by Master. If so, we alternately choose new / old
+ * regions from head / tail of regionsToMove, respectively. This alternation
+ * avoids clustering young regions on the newly discovered region server.
+ * Otherwise, we choose new regions from head of regionsToMove.
+ *
+ * Another improvement from HBASE-3609 is that we assign regions from
+ * regionsToMove to underloaded servers in round-robin fashion.
+ * Previously one underloaded server would be filled before we move onto
+ * the next underloaded server, leading to clustering of young regions.
+ *
+ * Finally, we randomly shuffle underloaded servers so that they receive
+ * offloaded regions relatively evenly across calls to balanceCluster().
+ *
+ * The algorithm is currently implemented as such:
+ *
+ *
+ * - Determine the two valid numbers of regions each server should have,
+ * MIN=floor(average) and MAX=ceiling(average).
+ *
+ *
- Iterate down the most loaded servers, shedding regions from each so
+ * each server hosts exactly MAX regions. Stop once you reach a
+ * server that already has <= MAX regions.
+ *
+ * Order the regions to move from most recent to least.
+ *
+ *
- Iterate down the least loaded servers, assigning regions so each server
+ * has exactly MIN regions. Stop once you reach a server that
+ * already has >= MIN regions.
+ *
+ * Regions being assigned to underloaded servers are those that were shed
+ * in the previous step. It is possible that there were not enough
+ * regions shed to fill each underloaded server to MIN. If so we
+ * end up with a number of regions required to do so, neededRegions.
+ *
+ * It is also possible that we were able to fill each underloaded but ended
+ * up with regions that were unassigned from overloaded servers but that
+ * still do not have assignment.
+ *
+ * If neither of these conditions hold (no regions needed to fill the
+ * underloaded servers, no regions leftover from overloaded servers),
+ * we are done and return. Otherwise we handle these cases below.
+ *
+ *
- If neededRegions is non-zero (still have underloaded servers),
+ * we iterate the most loaded servers again, shedding a single server from
+ * each (this brings them from having MAX regions to having
+ * MIN regions).
+ *
+ *
- We now definitely have more regions that need assignment, either from
+ * the previous step or from the original shedding from overloaded servers.
+ * Iterate the least loaded servers filling each to MIN.
+ *
+ *
- If we still have more regions that need assignment, again iterate the
+ * least loaded servers, this time giving each one (filling them to
+ * MAX) until we run out.
+ *
+ *
- All servers will now either host MIN or MAX regions.
+ *
+ * In addition, any server hosting >= MAX regions is guaranteed
+ * to end up with MAX regions at the end of the balancing. This
+ * ensures the minimal number of regions possible are moved.
+ *
+ *
+ * TODO: We can at-most reassign the number of regions away from a particular
+ * server to be how many they report as most loaded.
+ * Should we just keep all assignment in memory? Any objections?
+ * Does this mean we need HeapSize on HMaster? Or just careful monitor?
+ * (current thinking is we will hold all assignments in memory)
+ *
+ * @param clusterMap Map of regionservers and their load/region information to
+ * a list of their most loaded regions
+ * @return a list of regions to be moved, including source and destination,
+ * or null if cluster is already balanced
+ */
+ public List balanceCluster(
+ Map> clusterMap) {
+ boolean emptyRegionServerPresent = false;
+ long startTime = System.currentTimeMillis();
+
+ ClusterLoadState cs = new ClusterLoadState(clusterMap);
+
+ if (!this.needsBalance(cs)) return null;
+
+ int numServers = cs.getNumServers();
+ NavigableMap> serversByLoad = cs.getServersByLoad();
+ int numRegions = cs.getNumRegions();
+ int min = numRegions / numServers;
+ int max = numRegions % numServers == 0 ? min : min + 1;
+
+ // Using to check balance result.
+ StringBuilder strBalanceParam = new StringBuilder();
+ strBalanceParam.append("Balance parameter: numRegions=").append(numRegions)
+ .append(", numServers=").append(numServers).append(", max=").append(max)
+ .append(", min=").append(min);
+ LOG.debug(strBalanceParam.toString());
+
+ // Balance the cluster
+ // TODO: Look at data block locality or a more complex load to do this
+ MinMaxPriorityQueue regionsToMove =
+ MinMaxPriorityQueue.orderedBy(rpComparator).create();
+ List regionsToReturn = new ArrayList();
+
+ // Walk down most loaded, pruning each to the max
+ int serversOverloaded = 0;
+ // flag used to fetch regions from head and tail of list, alternately
+ boolean fetchFromTail = false;
+ Map serverBalanceInfo =
+ new TreeMap();
+ for (Map.Entry> server:
+ serversByLoad.descendingMap().entrySet()) {
+ ServerAndLoad sal = server.getKey();
+ int regionCount = sal.getLoad();
+ if (regionCount <= max) {
+ serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0));
+ break;
+ }
+ serversOverloaded++;
+ List regions = server.getValue();
+ int numToOffload = Math.min(regionCount - max, regions.size());
+ // account for the out-of-band regions which were assigned to this server
+ // after some other region server crashed
+ Collections.sort(regions, riComparator);
+ int numTaken = 0;
+ for (int i = 0; i <= numToOffload; ) {
+ HRegionInfo hri = regions.get(i); // fetch from head
+ if (fetchFromTail) {
+ hri = regions.get(regions.size() - 1 - i);
+ }
+ i++;
+ // Don't rebalance meta regions.
+ if (hri.isMetaRegion()) continue;
+ regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null));
+ numTaken++;
+ if (numTaken >= numToOffload) break;
+ // fetch in alternate order if there is new region server
+ if (emptyRegionServerPresent) {
+ fetchFromTail = !fetchFromTail;
+ }
+ }
+ serverBalanceInfo.put(sal.getServerName(),
+ new BalanceInfo(numToOffload, (-1)*numTaken));
+ }
+ int totalNumMoved = regionsToMove.size();
+
+ // Walk down least loaded, filling each to the min
+ int neededRegions = 0; // number of regions needed to bring all up to min
+ fetchFromTail = false;
+
+ Map underloadedServers = new HashMap();
+ float average = (float)numRegions / numServers; // for logging
+ int maxToTake = numRegions - (int)average;
+ for (Map.Entry> server:
+ serversByLoad.entrySet()) {
+ if (maxToTake == 0) break; // no more to take
+ int regionCount = server.getKey().getLoad();
+ if (regionCount >= min && regionCount > 0) {
+ continue; // look for other servers which haven't reached min
+ }
+ int regionsToPut = min - regionCount;
+ if (regionsToPut == 0)
+ {
+ regionsToPut = 1;
+ }
+ maxToTake -= regionsToPut;
+ underloadedServers.put(server.getKey().getServerName(), regionsToPut);
+ }
+ // number of servers that get new regions
+ int serversUnderloaded = underloadedServers.size();
+ int incr = 1;
+ List sns =
+ Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded]));
+ Collections.shuffle(sns, RANDOM);
+ while (regionsToMove.size() > 0) {
+ int cnt = 0;
+ int i = incr > 0 ? 0 : underloadedServers.size()-1;
+ for (; i >= 0 && i < underloadedServers.size(); i += incr) {
+ if (regionsToMove.isEmpty()) break;
+ ServerName si = sns.get(i);
+ int numToTake = underloadedServers.get(si);
+ if (numToTake == 0) continue;
+
+ addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn);
+ if (emptyRegionServerPresent) {
+ fetchFromTail = !fetchFromTail;
+ }
+
+ underloadedServers.put(si, numToTake-1);
+ cnt++;
+ BalanceInfo bi = serverBalanceInfo.get(si);
+ if (bi == null) {
+ bi = new BalanceInfo(0, 0);
+ serverBalanceInfo.put(si, bi);
+ }
+ bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1);
+ }
+ if (cnt == 0) break;
+ // iterates underloadedServers in the other direction
+ incr = -incr;
+ }
+ for (Integer i : underloadedServers.values()) {
+ // If we still want to take some, increment needed
+ neededRegions += i;
+ }
+
+ // If none needed to fill all to min and none left to drain all to max,
+ // we are done
+ if (neededRegions == 0 && regionsToMove.isEmpty()) {
+ long endTime = System.currentTimeMillis();
+ LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " +
+ "Moving " + totalNumMoved + " regions off of " +
+ serversOverloaded + " overloaded servers onto " +
+ serversUnderloaded + " less loaded servers");
+ return regionsToReturn;
+ }
+
+ // Need to do a second pass.
+ // Either more regions to assign out or servers that are still underloaded
+
+ // If we need more to fill min, grab one from each most loaded until enough
+ if (neededRegions != 0) {
+ // Walk down most loaded, grabbing one from each until we get enough
+ for (Map.Entry> server :
+ serversByLoad.descendingMap().entrySet()) {
+ BalanceInfo balanceInfo =
+ serverBalanceInfo.get(server.getKey().getServerName());
+ int idx =
+ balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload();
+ if (idx >= server.getValue().size()) break;
+ HRegionInfo region = server.getValue().get(idx);
+ if (region.isMetaRegion()) continue; // Don't move meta regions.
+ regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null));
+ totalNumMoved++;
+ if (--neededRegions == 0) {
+ // No more regions needed, done shedding
+ break;
+ }
+ }
+ }
+
+ // Now we have a set of regions that must be all assigned out
+ // Assign each underloaded up to the min, then if leftovers, assign to max
+
+ // Walk down least loaded, assigning to each to fill up to min
+ for (Map.Entry> server :
+ serversByLoad.entrySet()) {
+ int regionCount = server.getKey().getLoad();
+ if (regionCount >= min) break;
+ BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
+ if(balanceInfo != null) {
+ regionCount += balanceInfo.getNumRegionsAdded();
+ }
+ if(regionCount >= min) {
+ continue;
+ }
+ int numToTake = min - regionCount;
+ int numTaken = 0;
+ while(numTaken < numToTake && 0 < regionsToMove.size()) {
+ addRegionPlan(regionsToMove, fetchFromTail,
+ server.getKey().getServerName(), regionsToReturn);
+ numTaken++;
+ if (emptyRegionServerPresent) {
+ fetchFromTail = !fetchFromTail;
+ }
+ }
+ }
+
+ // If we still have regions to dish out, assign underloaded to max
+ if (0 < regionsToMove.size()) {
+ for (Map.Entry> server :
+ serversByLoad.entrySet()) {
+ int regionCount = server.getKey().getLoad();
+ BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
+ if(balanceInfo != null) {
+ regionCount += balanceInfo.getNumRegionsAdded();
+ }
+ if(regionCount >= max) {
+ break;
+ }
+ addRegionPlan(regionsToMove, fetchFromTail,
+ server.getKey().getServerName(), regionsToReturn);
+ if (emptyRegionServerPresent) {
+ fetchFromTail = !fetchFromTail;
+ }
+ if (regionsToMove.isEmpty()) {
+ break;
+ }
+ }
+ }
+
+ long endTime = System.currentTimeMillis();
+
+ if (!regionsToMove.isEmpty() || neededRegions != 0) {
+ // Emit data so can diagnose how balancer went astray.
+ LOG.warn("regionsToMove=" + totalNumMoved +
+ ", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded +
+ ", serversUnderloaded=" + serversUnderloaded);
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry> e: clusterMap.entrySet()) {
+ if (sb.length() > 0) sb.append(", ");
+ sb.append(e.getKey().toString());
+ sb.append(" ");
+ sb.append(e.getValue().size());
+ }
+ LOG.warn("Input " + sb.toString());
+ }
+
+ // All done!
+ LOG.info("Done. Calculated a load balance in " + (endTime-startTime) + "ms. " +
+ "Moving " + totalNumMoved + " regions off of " +
+ serversOverloaded + " overloaded servers onto " +
+ serversUnderloaded + " less loaded servers");
+
+ return regionsToReturn;
+ }
+
+ /**
+ * Add a region from the head or tail to the List of regions to return.
+ */
+ private void addRegionPlan(final MinMaxPriorityQueue regionsToMove,
+ final boolean fetchFromTail, final ServerName sn, List regionsToReturn) {
+ RegionPlan rp = null;
+ if (!fetchFromTail) rp = regionsToMove.remove();
+ else rp = regionsToMove.removeLast();
+ rp.setDestination(sn);
+ regionsToReturn.add(rp);
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
index 482a9b3..f2d0bc7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
@@ -45,7 +45,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.LoadBalancer;
-import org.apache.hadoop.hbase.master.balancer.DefaultLoadBalancer;
+import org.apache.hadoop.hbase.master.balancer.SimpleLoadBalancer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
@@ -556,7 +556,7 @@ public class TestZooKeeper {
}
}
- static class MockLoadBalancer extends DefaultLoadBalancer {
+ static class MockLoadBalancer extends SimpleLoadBalancer {
static boolean retainAssignCalled = false;
@Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
index 98fd2f4..b681db6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
@@ -56,7 +56,7 @@ import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager;
-import org.apache.hadoop.hbase.master.balancer.DefaultLoadBalancer;
+import org.apache.hadoop.hbase.master.balancer.SimpleLoadBalancer;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
@@ -797,7 +797,7 @@ public class TestAssignmentManager {
Mocking.waitForRegionPendingOpenInRIT(am, REGIONINFO.getEncodedName());
} finally {
this.server.getConfiguration().setClass(
- HConstants.HBASE_MASTER_LOADBALANCER_CLASS, DefaultLoadBalancer.class,
+ HConstants.HBASE_MASTER_LOADBALANCER_CLASS, SimpleLoadBalancer.class,
LoadBalancer.class);
am.getExecutorService().shutdown();
am.shutdown();
@@ -808,7 +808,7 @@ public class TestAssignmentManager {
* Mocked load balancer class used in the testcase to make sure that the testcase waits until
* random assignment is called and the gate variable is set to true.
*/
- public static class MockedLoadBalancer extends DefaultLoadBalancer {
+ public static class MockedLoadBalancer extends SimpleLoadBalancer {
private AtomicBoolean gate;
public void setGateVariable(AtomicBoolean gate) {
@@ -899,7 +899,7 @@ public class TestAssignmentManager {
am.getZKTable().isDisabledTable(REGIONINFO.getTable()));
} finally {
this.server.getConfiguration().setClass(
- HConstants.HBASE_MASTER_LOADBALANCER_CLASS, DefaultLoadBalancer.class,
+ HConstants.HBASE_MASTER_LOADBALANCER_CLASS, SimpleLoadBalancer.class,
LoadBalancer.class);
am.getZKTable().setEnabledTable(REGIONINFO.getTable());
am.shutdown();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java
index 8c00abd..e93507f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java
@@ -46,7 +46,7 @@ public class TestDefaultLoadBalancer extends BalancerTestBase {
public static void beforeAllTests() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.regions.slop", "0");
- loadBalancer = new DefaultLoadBalancer();
+ loadBalancer = new SimpleLoadBalancer();
loadBalancer.setConf(conf);
}