diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index f22f9fb..03128dd 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -1050,9 +1050,13 @@ possible configurations would overwhelm and obscure the important. hbase.master.loadbalancer.class - org.apache.hadoop.hbase.master.balancer.DefaultLoadBalancer + org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer Class used to execute the regions balancing when the period occurs. + See the class comment for more on how it works + http://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.html + It replaces the DefaultLoadBalancer as the default (since renamed + as the SimpleLoadBalancer). diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DefaultLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DefaultLoadBalancer.java deleted file mode 100644 index 9b89927..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DefaultLoadBalancer.java +++ /dev/null @@ -1,433 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master.balancer; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Random; -import java.util.TreeMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.master.AssignmentManager; -import org.apache.hadoop.hbase.master.RegionPlan; - -import com.google.common.collect.MinMaxPriorityQueue; - -/** - * Makes decisions about the placement and movement of Regions across - * RegionServers. - * - *

Cluster-wide load balancing will occur only when there are no regions in - * transition and according to a fixed period of a time using {@link #balanceCluster(Map)}. - * - *

Inline region placement with {@link #immediateAssignment} can be used when - * the Master needs to handle closed regions that it currently does not have - * a destination set for. This can happen during master failover. - * - *

On cluster startup, bulk assignment can be used to determine - * locations for all Regions in a cluster. - * - *

This classes produces plans for the {@link AssignmentManager} to execute. - */ -@InterfaceAudience.Private -public class DefaultLoadBalancer extends BaseLoadBalancer { - private static final Log LOG = LogFactory.getLog(DefaultLoadBalancer.class); - private static final Random RANDOM = new Random(System.currentTimeMillis()); - - private RegionInfoComparator riComparator = new RegionInfoComparator(); - private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator(); - - - /** - * Stores additional per-server information about the regions added/removed - * during the run of the balancing algorithm. - * - * For servers that shed regions, we need to track which regions we have already - * shed. nextRegionForUnload contains the index in the list of regions on - * the server that is the next to be shed. - */ - static class BalanceInfo { - - private final int nextRegionForUnload; - private int numRegionsAdded; - - public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) { - this.nextRegionForUnload = nextRegionForUnload; - this.numRegionsAdded = numRegionsAdded; - } - - int getNextRegionForUnload() { - return nextRegionForUnload; - } - - int getNumRegionsAdded() { - return numRegionsAdded; - } - - void setNumRegionsAdded(int numAdded) { - this.numRegionsAdded = numAdded; - } - } - - /** - * Generate a global load balancing plan according to the specified map of - * server information to the most loaded regions of each server. - * - * The load balancing invariant is that all servers are within 1 region of the - * average number of regions per server. If the average is an integer number, - * all servers will be balanced to the average. Otherwise, all servers will - * have either floor(average) or ceiling(average) regions. - * - * HBASE-3609 Modeled regionsToMove using Guava's MinMaxPriorityQueue so that - * we can fetch from both ends of the queue. - * At the beginning, we check whether there was empty region server - * just discovered by Master. If so, we alternately choose new / old - * regions from head / tail of regionsToMove, respectively. This alternation - * avoids clustering young regions on the newly discovered region server. - * Otherwise, we choose new regions from head of regionsToMove. - * - * Another improvement from HBASE-3609 is that we assign regions from - * regionsToMove to underloaded servers in round-robin fashion. - * Previously one underloaded server would be filled before we move onto - * the next underloaded server, leading to clustering of young regions. - * - * Finally, we randomly shuffle underloaded servers so that they receive - * offloaded regions relatively evenly across calls to balanceCluster(). - * - * The algorithm is currently implemented as such: - * - *

    - *
  1. Determine the two valid numbers of regions each server should have, - * MIN=floor(average) and MAX=ceiling(average). - * - *
  2. Iterate down the most loaded servers, shedding regions from each so - * each server hosts exactly MAX regions. Stop once you reach a - * server that already has <= MAX regions. - *

    - * Order the regions to move from most recent to least. - * - *

  3. Iterate down the least loaded servers, assigning regions so each server - * has exactly MIN regions. Stop once you reach a server that - * already has >= MIN regions. - * - * Regions being assigned to underloaded servers are those that were shed - * in the previous step. It is possible that there were not enough - * regions shed to fill each underloaded server to MIN. If so we - * end up with a number of regions required to do so, neededRegions. - * - * It is also possible that we were able to fill each underloaded but ended - * up with regions that were unassigned from overloaded servers but that - * still do not have assignment. - * - * If neither of these conditions hold (no regions needed to fill the - * underloaded servers, no regions leftover from overloaded servers), - * we are done and return. Otherwise we handle these cases below. - * - *
  4. If neededRegions is non-zero (still have underloaded servers), - * we iterate the most loaded servers again, shedding a single server from - * each (this brings them from having MAX regions to having - * MIN regions). - * - *
  5. We now definitely have more regions that need assignment, either from - * the previous step or from the original shedding from overloaded servers. - * Iterate the least loaded servers filling each to MIN. - * - *
  6. If we still have more regions that need assignment, again iterate the - * least loaded servers, this time giving each one (filling them to - * MAX) until we run out. - * - *
  7. All servers will now either host MIN or MAX regions. - * - * In addition, any server hosting >= MAX regions is guaranteed - * to end up with MAX regions at the end of the balancing. This - * ensures the minimal number of regions possible are moved. - *
- * - * TODO: We can at-most reassign the number of regions away from a particular - * server to be how many they report as most loaded. - * Should we just keep all assignment in memory? Any objections? - * Does this mean we need HeapSize on HMaster? Or just careful monitor? - * (current thinking is we will hold all assignments in memory) - * - * @param clusterMap Map of regionservers and their load/region information to - * a list of their most loaded regions - * @return a list of regions to be moved, including source and destination, - * or null if cluster is already balanced - */ - public List balanceCluster( - Map> clusterMap) { - boolean emptyRegionServerPresent = false; - long startTime = System.currentTimeMillis(); - - ClusterLoadState cs = new ClusterLoadState(clusterMap); - - if (!this.needsBalance(cs)) return null; - - int numServers = cs.getNumServers(); - NavigableMap> serversByLoad = cs.getServersByLoad(); - int numRegions = cs.getNumRegions(); - int min = numRegions / numServers; - int max = numRegions % numServers == 0 ? min : min + 1; - - // Using to check balance result. - StringBuilder strBalanceParam = new StringBuilder(); - strBalanceParam.append("Balance parameter: numRegions=").append(numRegions) - .append(", numServers=").append(numServers).append(", max=").append(max) - .append(", min=").append(min); - LOG.debug(strBalanceParam.toString()); - - // Balance the cluster - // TODO: Look at data block locality or a more complex load to do this - MinMaxPriorityQueue regionsToMove = - MinMaxPriorityQueue.orderedBy(rpComparator).create(); - List regionsToReturn = new ArrayList(); - - // Walk down most loaded, pruning each to the max - int serversOverloaded = 0; - // flag used to fetch regions from head and tail of list, alternately - boolean fetchFromTail = false; - Map serverBalanceInfo = - new TreeMap(); - for (Map.Entry> server: - serversByLoad.descendingMap().entrySet()) { - ServerAndLoad sal = server.getKey(); - int regionCount = sal.getLoad(); - if (regionCount <= max) { - serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0)); - break; - } - serversOverloaded++; - List regions = server.getValue(); - int numToOffload = Math.min(regionCount - max, regions.size()); - // account for the out-of-band regions which were assigned to this server - // after some other region server crashed - Collections.sort(regions, riComparator); - int numTaken = 0; - for (int i = 0; i <= numToOffload; ) { - HRegionInfo hri = regions.get(i); // fetch from head - if (fetchFromTail) { - hri = regions.get(regions.size() - 1 - i); - } - i++; - // Don't rebalance meta regions. - if (hri.isMetaRegion()) continue; - regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null)); - numTaken++; - if (numTaken >= numToOffload) break; - // fetch in alternate order if there is new region server - if (emptyRegionServerPresent) { - fetchFromTail = !fetchFromTail; - } - } - serverBalanceInfo.put(sal.getServerName(), - new BalanceInfo(numToOffload, (-1)*numTaken)); - } - int totalNumMoved = regionsToMove.size(); - - // Walk down least loaded, filling each to the min - int neededRegions = 0; // number of regions needed to bring all up to min - fetchFromTail = false; - - Map underloadedServers = new HashMap(); - float average = (float)numRegions / numServers; // for logging - int maxToTake = numRegions - (int)average; - for (Map.Entry> server: - serversByLoad.entrySet()) { - if (maxToTake == 0) break; // no more to take - int regionCount = server.getKey().getLoad(); - if (regionCount >= min && regionCount > 0) { - continue; // look for other servers which haven't reached min - } - int regionsToPut = min - regionCount; - if (regionsToPut == 0) - { - regionsToPut = 1; - } - maxToTake -= regionsToPut; - underloadedServers.put(server.getKey().getServerName(), regionsToPut); - } - // number of servers that get new regions - int serversUnderloaded = underloadedServers.size(); - int incr = 1; - List sns = - Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded])); - Collections.shuffle(sns, RANDOM); - while (regionsToMove.size() > 0) { - int cnt = 0; - int i = incr > 0 ? 0 : underloadedServers.size()-1; - for (; i >= 0 && i < underloadedServers.size(); i += incr) { - if (regionsToMove.isEmpty()) break; - ServerName si = sns.get(i); - int numToTake = underloadedServers.get(si); - if (numToTake == 0) continue; - - addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn); - if (emptyRegionServerPresent) { - fetchFromTail = !fetchFromTail; - } - - underloadedServers.put(si, numToTake-1); - cnt++; - BalanceInfo bi = serverBalanceInfo.get(si); - if (bi == null) { - bi = new BalanceInfo(0, 0); - serverBalanceInfo.put(si, bi); - } - bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1); - } - if (cnt == 0) break; - // iterates underloadedServers in the other direction - incr = -incr; - } - for (Integer i : underloadedServers.values()) { - // If we still want to take some, increment needed - neededRegions += i; - } - - // If none needed to fill all to min and none left to drain all to max, - // we are done - if (neededRegions == 0 && regionsToMove.isEmpty()) { - long endTime = System.currentTimeMillis(); - LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " + - "Moving " + totalNumMoved + " regions off of " + - serversOverloaded + " overloaded servers onto " + - serversUnderloaded + " less loaded servers"); - return regionsToReturn; - } - - // Need to do a second pass. - // Either more regions to assign out or servers that are still underloaded - - // If we need more to fill min, grab one from each most loaded until enough - if (neededRegions != 0) { - // Walk down most loaded, grabbing one from each until we get enough - for (Map.Entry> server : - serversByLoad.descendingMap().entrySet()) { - BalanceInfo balanceInfo = - serverBalanceInfo.get(server.getKey().getServerName()); - int idx = - balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload(); - if (idx >= server.getValue().size()) break; - HRegionInfo region = server.getValue().get(idx); - if (region.isMetaRegion()) continue; // Don't move meta regions. - regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null)); - totalNumMoved++; - if (--neededRegions == 0) { - // No more regions needed, done shedding - break; - } - } - } - - // Now we have a set of regions that must be all assigned out - // Assign each underloaded up to the min, then if leftovers, assign to max - - // Walk down least loaded, assigning to each to fill up to min - for (Map.Entry> server : - serversByLoad.entrySet()) { - int regionCount = server.getKey().getLoad(); - if (regionCount >= min) break; - BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName()); - if(balanceInfo != null) { - regionCount += balanceInfo.getNumRegionsAdded(); - } - if(regionCount >= min) { - continue; - } - int numToTake = min - regionCount; - int numTaken = 0; - while(numTaken < numToTake && 0 < regionsToMove.size()) { - addRegionPlan(regionsToMove, fetchFromTail, - server.getKey().getServerName(), regionsToReturn); - numTaken++; - if (emptyRegionServerPresent) { - fetchFromTail = !fetchFromTail; - } - } - } - - // If we still have regions to dish out, assign underloaded to max - if (0 < regionsToMove.size()) { - for (Map.Entry> server : - serversByLoad.entrySet()) { - int regionCount = server.getKey().getLoad(); - BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName()); - if(balanceInfo != null) { - regionCount += balanceInfo.getNumRegionsAdded(); - } - if(regionCount >= max) { - break; - } - addRegionPlan(regionsToMove, fetchFromTail, - server.getKey().getServerName(), regionsToReturn); - if (emptyRegionServerPresent) { - fetchFromTail = !fetchFromTail; - } - if (regionsToMove.isEmpty()) { - break; - } - } - } - - long endTime = System.currentTimeMillis(); - - if (!regionsToMove.isEmpty() || neededRegions != 0) { - // Emit data so can diagnose how balancer went astray. - LOG.warn("regionsToMove=" + totalNumMoved + - ", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded + - ", serversUnderloaded=" + serversUnderloaded); - StringBuilder sb = new StringBuilder(); - for (Map.Entry> e: clusterMap.entrySet()) { - if (sb.length() > 0) sb.append(", "); - sb.append(e.getKey().toString()); - sb.append(" "); - sb.append(e.getValue().size()); - } - LOG.warn("Input " + sb.toString()); - } - - // All done! - LOG.info("Done. Calculated a load balance in " + (endTime-startTime) + "ms. " + - "Moving " + totalNumMoved + " regions off of " + - serversOverloaded + " overloaded servers onto " + - serversUnderloaded + " less loaded servers"); - - return regionsToReturn; - } - - /** - * Add a region from the head or tail to the List of regions to return. - */ - private void addRegionPlan(final MinMaxPriorityQueue regionsToMove, - final boolean fetchFromTail, final ServerName sn, List regionsToReturn) { - RegionPlan rp = null; - if (!fetchFromTail) rp = regionsToMove.remove(); - else rp = regionsToMove.removeLast(); - rp.setDestination(sn); - regionsToReturn.add(rp); - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java new file mode 100644 index 0000000..da6b443 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java @@ -0,0 +1,433 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Random; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.AssignmentManager; +import org.apache.hadoop.hbase.master.RegionPlan; + +import com.google.common.collect.MinMaxPriorityQueue; + +/** + * Makes decisions about the placement and movement of Regions across + * RegionServers. + * + *

Cluster-wide load balancing will occur only when there are no regions in + * transition and according to a fixed period of a time using {@link #balanceCluster(Map)}. + * + *

Inline region placement with {@link #immediateAssignment} can be used when + * the Master needs to handle closed regions that it currently does not have + * a destination set for. This can happen during master failover. + * + *

On cluster startup, bulk assignment can be used to determine + * locations for all Regions in a cluster. + * + *

This classes produces plans for the {@link AssignmentManager} to execute. + */ +@InterfaceAudience.Private +public class SimpleLoadBalancer extends BaseLoadBalancer { + private static final Log LOG = LogFactory.getLog(SimpleLoadBalancer.class); + private static final Random RANDOM = new Random(System.currentTimeMillis()); + + private RegionInfoComparator riComparator = new RegionInfoComparator(); + private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator(); + + + /** + * Stores additional per-server information about the regions added/removed + * during the run of the balancing algorithm. + * + * For servers that shed regions, we need to track which regions we have already + * shed. nextRegionForUnload contains the index in the list of regions on + * the server that is the next to be shed. + */ + static class BalanceInfo { + + private final int nextRegionForUnload; + private int numRegionsAdded; + + public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) { + this.nextRegionForUnload = nextRegionForUnload; + this.numRegionsAdded = numRegionsAdded; + } + + int getNextRegionForUnload() { + return nextRegionForUnload; + } + + int getNumRegionsAdded() { + return numRegionsAdded; + } + + void setNumRegionsAdded(int numAdded) { + this.numRegionsAdded = numAdded; + } + } + + /** + * Generate a global load balancing plan according to the specified map of + * server information to the most loaded regions of each server. + * + * The load balancing invariant is that all servers are within 1 region of the + * average number of regions per server. If the average is an integer number, + * all servers will be balanced to the average. Otherwise, all servers will + * have either floor(average) or ceiling(average) regions. + * + * HBASE-3609 Modeled regionsToMove using Guava's MinMaxPriorityQueue so that + * we can fetch from both ends of the queue. + * At the beginning, we check whether there was empty region server + * just discovered by Master. If so, we alternately choose new / old + * regions from head / tail of regionsToMove, respectively. This alternation + * avoids clustering young regions on the newly discovered region server. + * Otherwise, we choose new regions from head of regionsToMove. + * + * Another improvement from HBASE-3609 is that we assign regions from + * regionsToMove to underloaded servers in round-robin fashion. + * Previously one underloaded server would be filled before we move onto + * the next underloaded server, leading to clustering of young regions. + * + * Finally, we randomly shuffle underloaded servers so that they receive + * offloaded regions relatively evenly across calls to balanceCluster(). + * + * The algorithm is currently implemented as such: + * + *

    + *
  1. Determine the two valid numbers of regions each server should have, + * MIN=floor(average) and MAX=ceiling(average). + * + *
  2. Iterate down the most loaded servers, shedding regions from each so + * each server hosts exactly MAX regions. Stop once you reach a + * server that already has <= MAX regions. + *

    + * Order the regions to move from most recent to least. + * + *

  3. Iterate down the least loaded servers, assigning regions so each server + * has exactly MIN regions. Stop once you reach a server that + * already has >= MIN regions. + * + * Regions being assigned to underloaded servers are those that were shed + * in the previous step. It is possible that there were not enough + * regions shed to fill each underloaded server to MIN. If so we + * end up with a number of regions required to do so, neededRegions. + * + * It is also possible that we were able to fill each underloaded but ended + * up with regions that were unassigned from overloaded servers but that + * still do not have assignment. + * + * If neither of these conditions hold (no regions needed to fill the + * underloaded servers, no regions leftover from overloaded servers), + * we are done and return. Otherwise we handle these cases below. + * + *
  4. If neededRegions is non-zero (still have underloaded servers), + * we iterate the most loaded servers again, shedding a single server from + * each (this brings them from having MAX regions to having + * MIN regions). + * + *
  5. We now definitely have more regions that need assignment, either from + * the previous step or from the original shedding from overloaded servers. + * Iterate the least loaded servers filling each to MIN. + * + *
  6. If we still have more regions that need assignment, again iterate the + * least loaded servers, this time giving each one (filling them to + * MAX) until we run out. + * + *
  7. All servers will now either host MIN or MAX regions. + * + * In addition, any server hosting >= MAX regions is guaranteed + * to end up with MAX regions at the end of the balancing. This + * ensures the minimal number of regions possible are moved. + *
+ * + * TODO: We can at-most reassign the number of regions away from a particular + * server to be how many they report as most loaded. + * Should we just keep all assignment in memory? Any objections? + * Does this mean we need HeapSize on HMaster? Or just careful monitor? + * (current thinking is we will hold all assignments in memory) + * + * @param clusterMap Map of regionservers and their load/region information to + * a list of their most loaded regions + * @return a list of regions to be moved, including source and destination, + * or null if cluster is already balanced + */ + public List balanceCluster( + Map> clusterMap) { + boolean emptyRegionServerPresent = false; + long startTime = System.currentTimeMillis(); + + ClusterLoadState cs = new ClusterLoadState(clusterMap); + + if (!this.needsBalance(cs)) return null; + + int numServers = cs.getNumServers(); + NavigableMap> serversByLoad = cs.getServersByLoad(); + int numRegions = cs.getNumRegions(); + int min = numRegions / numServers; + int max = numRegions % numServers == 0 ? min : min + 1; + + // Using to check balance result. + StringBuilder strBalanceParam = new StringBuilder(); + strBalanceParam.append("Balance parameter: numRegions=").append(numRegions) + .append(", numServers=").append(numServers).append(", max=").append(max) + .append(", min=").append(min); + LOG.debug(strBalanceParam.toString()); + + // Balance the cluster + // TODO: Look at data block locality or a more complex load to do this + MinMaxPriorityQueue regionsToMove = + MinMaxPriorityQueue.orderedBy(rpComparator).create(); + List regionsToReturn = new ArrayList(); + + // Walk down most loaded, pruning each to the max + int serversOverloaded = 0; + // flag used to fetch regions from head and tail of list, alternately + boolean fetchFromTail = false; + Map serverBalanceInfo = + new TreeMap(); + for (Map.Entry> server: + serversByLoad.descendingMap().entrySet()) { + ServerAndLoad sal = server.getKey(); + int regionCount = sal.getLoad(); + if (regionCount <= max) { + serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0)); + break; + } + serversOverloaded++; + List regions = server.getValue(); + int numToOffload = Math.min(regionCount - max, regions.size()); + // account for the out-of-band regions which were assigned to this server + // after some other region server crashed + Collections.sort(regions, riComparator); + int numTaken = 0; + for (int i = 0; i <= numToOffload; ) { + HRegionInfo hri = regions.get(i); // fetch from head + if (fetchFromTail) { + hri = regions.get(regions.size() - 1 - i); + } + i++; + // Don't rebalance meta regions. + if (hri.isMetaRegion()) continue; + regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null)); + numTaken++; + if (numTaken >= numToOffload) break; + // fetch in alternate order if there is new region server + if (emptyRegionServerPresent) { + fetchFromTail = !fetchFromTail; + } + } + serverBalanceInfo.put(sal.getServerName(), + new BalanceInfo(numToOffload, (-1)*numTaken)); + } + int totalNumMoved = regionsToMove.size(); + + // Walk down least loaded, filling each to the min + int neededRegions = 0; // number of regions needed to bring all up to min + fetchFromTail = false; + + Map underloadedServers = new HashMap(); + float average = (float)numRegions / numServers; // for logging + int maxToTake = numRegions - (int)average; + for (Map.Entry> server: + serversByLoad.entrySet()) { + if (maxToTake == 0) break; // no more to take + int regionCount = server.getKey().getLoad(); + if (regionCount >= min && regionCount > 0) { + continue; // look for other servers which haven't reached min + } + int regionsToPut = min - regionCount; + if (regionsToPut == 0) + { + regionsToPut = 1; + } + maxToTake -= regionsToPut; + underloadedServers.put(server.getKey().getServerName(), regionsToPut); + } + // number of servers that get new regions + int serversUnderloaded = underloadedServers.size(); + int incr = 1; + List sns = + Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded])); + Collections.shuffle(sns, RANDOM); + while (regionsToMove.size() > 0) { + int cnt = 0; + int i = incr > 0 ? 0 : underloadedServers.size()-1; + for (; i >= 0 && i < underloadedServers.size(); i += incr) { + if (regionsToMove.isEmpty()) break; + ServerName si = sns.get(i); + int numToTake = underloadedServers.get(si); + if (numToTake == 0) continue; + + addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn); + if (emptyRegionServerPresent) { + fetchFromTail = !fetchFromTail; + } + + underloadedServers.put(si, numToTake-1); + cnt++; + BalanceInfo bi = serverBalanceInfo.get(si); + if (bi == null) { + bi = new BalanceInfo(0, 0); + serverBalanceInfo.put(si, bi); + } + bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1); + } + if (cnt == 0) break; + // iterates underloadedServers in the other direction + incr = -incr; + } + for (Integer i : underloadedServers.values()) { + // If we still want to take some, increment needed + neededRegions += i; + } + + // If none needed to fill all to min and none left to drain all to max, + // we are done + if (neededRegions == 0 && regionsToMove.isEmpty()) { + long endTime = System.currentTimeMillis(); + LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " + + "Moving " + totalNumMoved + " regions off of " + + serversOverloaded + " overloaded servers onto " + + serversUnderloaded + " less loaded servers"); + return regionsToReturn; + } + + // Need to do a second pass. + // Either more regions to assign out or servers that are still underloaded + + // If we need more to fill min, grab one from each most loaded until enough + if (neededRegions != 0) { + // Walk down most loaded, grabbing one from each until we get enough + for (Map.Entry> server : + serversByLoad.descendingMap().entrySet()) { + BalanceInfo balanceInfo = + serverBalanceInfo.get(server.getKey().getServerName()); + int idx = + balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload(); + if (idx >= server.getValue().size()) break; + HRegionInfo region = server.getValue().get(idx); + if (region.isMetaRegion()) continue; // Don't move meta regions. + regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null)); + totalNumMoved++; + if (--neededRegions == 0) { + // No more regions needed, done shedding + break; + } + } + } + + // Now we have a set of regions that must be all assigned out + // Assign each underloaded up to the min, then if leftovers, assign to max + + // Walk down least loaded, assigning to each to fill up to min + for (Map.Entry> server : + serversByLoad.entrySet()) { + int regionCount = server.getKey().getLoad(); + if (regionCount >= min) break; + BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName()); + if(balanceInfo != null) { + regionCount += balanceInfo.getNumRegionsAdded(); + } + if(regionCount >= min) { + continue; + } + int numToTake = min - regionCount; + int numTaken = 0; + while(numTaken < numToTake && 0 < regionsToMove.size()) { + addRegionPlan(regionsToMove, fetchFromTail, + server.getKey().getServerName(), regionsToReturn); + numTaken++; + if (emptyRegionServerPresent) { + fetchFromTail = !fetchFromTail; + } + } + } + + // If we still have regions to dish out, assign underloaded to max + if (0 < regionsToMove.size()) { + for (Map.Entry> server : + serversByLoad.entrySet()) { + int regionCount = server.getKey().getLoad(); + BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName()); + if(balanceInfo != null) { + regionCount += balanceInfo.getNumRegionsAdded(); + } + if(regionCount >= max) { + break; + } + addRegionPlan(regionsToMove, fetchFromTail, + server.getKey().getServerName(), regionsToReturn); + if (emptyRegionServerPresent) { + fetchFromTail = !fetchFromTail; + } + if (regionsToMove.isEmpty()) { + break; + } + } + } + + long endTime = System.currentTimeMillis(); + + if (!regionsToMove.isEmpty() || neededRegions != 0) { + // Emit data so can diagnose how balancer went astray. + LOG.warn("regionsToMove=" + totalNumMoved + + ", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded + + ", serversUnderloaded=" + serversUnderloaded); + StringBuilder sb = new StringBuilder(); + for (Map.Entry> e: clusterMap.entrySet()) { + if (sb.length() > 0) sb.append(", "); + sb.append(e.getKey().toString()); + sb.append(" "); + sb.append(e.getValue().size()); + } + LOG.warn("Input " + sb.toString()); + } + + // All done! + LOG.info("Done. Calculated a load balance in " + (endTime-startTime) + "ms. " + + "Moving " + totalNumMoved + " regions off of " + + serversOverloaded + " overloaded servers onto " + + serversUnderloaded + " less loaded servers"); + + return regionsToReturn; + } + + /** + * Add a region from the head or tail to the List of regions to return. + */ + private void addRegionPlan(final MinMaxPriorityQueue regionsToMove, + final boolean fetchFromTail, final ServerName sn, List regionsToReturn) { + RegionPlan rp = null; + if (!fetchFromTail) rp = regionsToMove.remove(); + else rp = regionsToMove.removeLast(); + rp.setDestination(sn); + regionsToReturn.add(rp); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java index 482a9b3..f2d0bc7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java @@ -45,7 +45,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.LoadBalancer; -import org.apache.hadoop.hbase.master.balancer.DefaultLoadBalancer; +import org.apache.hadoop.hbase.master.balancer.SimpleLoadBalancer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.EmptyWatcher; @@ -556,7 +556,7 @@ public class TestZooKeeper { } } - static class MockLoadBalancer extends DefaultLoadBalancer { + static class MockLoadBalancer extends SimpleLoadBalancer { static boolean retainAssignCalled = false; @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java index 98fd2f4..b681db6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java @@ -56,7 +56,7 @@ import org.apache.hadoop.hbase.executor.ExecutorType; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager; -import org.apache.hadoop.hbase.master.balancer.DefaultLoadBalancer; +import org.apache.hadoop.hbase.master.balancer.SimpleLoadBalancer; import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; import org.apache.hadoop.hbase.master.handler.EnableTableHandler; import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler; @@ -797,7 +797,7 @@ public class TestAssignmentManager { Mocking.waitForRegionPendingOpenInRIT(am, REGIONINFO.getEncodedName()); } finally { this.server.getConfiguration().setClass( - HConstants.HBASE_MASTER_LOADBALANCER_CLASS, DefaultLoadBalancer.class, + HConstants.HBASE_MASTER_LOADBALANCER_CLASS, SimpleLoadBalancer.class, LoadBalancer.class); am.getExecutorService().shutdown(); am.shutdown(); @@ -808,7 +808,7 @@ public class TestAssignmentManager { * Mocked load balancer class used in the testcase to make sure that the testcase waits until * random assignment is called and the gate variable is set to true. */ - public static class MockedLoadBalancer extends DefaultLoadBalancer { + public static class MockedLoadBalancer extends SimpleLoadBalancer { private AtomicBoolean gate; public void setGateVariable(AtomicBoolean gate) { @@ -899,7 +899,7 @@ public class TestAssignmentManager { am.getZKTable().isDisabledTable(REGIONINFO.getTable())); } finally { this.server.getConfiguration().setClass( - HConstants.HBASE_MASTER_LOADBALANCER_CLASS, DefaultLoadBalancer.class, + HConstants.HBASE_MASTER_LOADBALANCER_CLASS, SimpleLoadBalancer.class, LoadBalancer.class); am.getZKTable().setEnabledTable(REGIONINFO.getTable()); am.shutdown(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java index 8c00abd..e93507f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java @@ -46,7 +46,7 @@ public class TestDefaultLoadBalancer extends BalancerTestBase { public static void beforeAllTests() throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.regions.slop", "0"); - loadBalancer = new DefaultLoadBalancer(); + loadBalancer = new SimpleLoadBalancer(); loadBalancer.setConf(conf); }