Index: src/test/org/apache/hadoop/hbase/regionserver/TestLoadBalancer.java =================================================================== --- src/test/org/apache/hadoop/hbase/regionserver/TestLoadBalancer.java (revision 0) +++ src/test/org/apache/hadoop/hbase/regionserver/TestLoadBalancer.java (revision 0) @@ -0,0 +1,60 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.util.ArrayList; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.HServerInfo; +import org.apache.hadoop.hbase.HServerLoad; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.util.Bytes; + +import com.sun.tools.javac.util.List; + +import junit.framework.TestCase; + +public class TestLoadBalancer extends TestCase { + private LoadBalancer balancer; + + @Override + protected void setUp() throws Exception { + super.setUp(); + this.balancer = new LoadBalancer(new HBaseConfiguration()); + } + + public void testBalancer() throws Exception { + ArrayList hsis = new ArrayList(); + SortedMap> loadToServers = + new TreeMap>(); + Map regions = + new TreeMap(Bytes.BYTES_COMPARATOR); + final HTableDescriptor htd = new HTableDescriptor("testtable"); + for (int i = 0; i < 10; i++) { + String hostname = "host" + i; + HServerAddress hsa = new HServerAddress(hostname + ":12345"); + HServerInfo hsi = new HServerInfo(hsa, -1l, 80, hostname); + HServerLoad hsl = new HServerLoad(); + for (int j = 0; j < 10; j++) { + int startkey = (i << 4) + j; + int endkey = (i << 4) + j + 1; + HRegionInfo hri = new HRegionInfo(htd, Bytes.toBytes("" + startkey), + Bytes.toBytes("" + endkey)); + byte [] name = hri.getRegionName(); + regions.put(name, hri); + HServerLoad.RegionLoad l = + new HServerLoad.RegionLoad(name, 0, 0, 0, 0, 0); + hsl.addRegionInfo(l); + } + hsi.setLoad(hsl); + hsis.add(hsi); + loadToServers.put(jsl, ) + } + HServerInfo first = hsis.get(0); + int numToClose = this.balancer.loadBalancing(first, avg, loadToServers); + } +} \ No newline at end of file Index: src/java/org/apache/hadoop/hbase/regionserver/LoadBalancer.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/LoadBalancer.java (revision 0) +++ src/java/org/apache/hadoop/hbase/regionserver/LoadBalancer.java (revision 0) @@ -0,0 +1,141 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.ArrayList; +import java.util.Set; +import java.util.SortedMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HMsg; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerInfo; +import org.apache.hadoop.hbase.HServerLoad; + +/** + * Class to balance region servers load. + * It keeps Region Servers load in slop range by unassigning Regions + * from most loaded servers. + * + * Equilibrium is reached when load of all serves are in slop range + * [avgLoadMinusSlop, avgLoadPlusSlop], where + * avgLoadPlusSlop = Math.ceil(avgLoad * (1 + this.slop)), and + * avgLoadMinusSlop = Math.floor(avgLoad * (1 - this.slop)) - 1. + */ +public class LoadBalancer { + private final Log LOG = LogFactory.getLog(this.getClass()); + private float slop; // hbase.regions.slop + private final int maxRegToClose; // hbase.regions.close.max + + public LoadBalancer(HBaseConfiguration conf) { + this.slop = conf.getFloat("hbase.regions.slop", (float)0.3); + if (this.slop <= 0) this.slop = 1; + //maxRegToClose to constrain balance closing per one iteration + // -1 to turn off + // TODO: change default in HBASE-862, need a suggestion + this.maxRegToClose = conf.getInt("hbase.regions.close.max", -1); + } + + /** + * Balance server load by unassigning some regions. + * + * @param info - server info + * @param avg Average regions per server in this cluster. + * @param loadToServers + */ + public int loadBalancing(HServerInfo info, final double avg, + final SortedMap> loadToServers) { + HServerLoad servLoad = info.getLoad(); + + // nothing to balance if server load not more then average load + if (servLoad.getLoad() <= Math.ceil(avg) || avg <= 2.0) { + return 0; + } + + // check if current server is overloaded + int numRegionsToClose = balanceFromOverloaded(servLoad, avg); + + // check if we can unload server by low loaded servers + if (numRegionsToClose <= 0) { + numRegionsToClose = balanceToLowloaded(info.getServerName(), servLoad, + avg, loadToServers); + } + + if (maxRegToClose > 0) { + numRegionsToClose = Math.min(numRegionsToClose, maxRegToClose); + } + return numRegionsToClose; + } + + /* + * Check if server load is not overloaded (with load > avgLoadPlusSlop). + * @return number of regions to unassign. + */ + private int balanceFromOverloaded(HServerLoad srvLoad, double avgLoad) { + int avgLoadPlusSlop = (int)Math.ceil(avgLoad * (1 + this.slop)); + int numSrvRegs = srvLoad.getNumberOfRegions(); + if (numSrvRegs > avgLoadPlusSlop) { + if (LOG.isDebugEnabled()) { + LOG.debug("Server is overloaded: load=" + numSrvRegs + + ", avg=" + avgLoad + ", slop=" + this.slop); + } + return numSrvRegs - (int)Math.ceil(avgLoad); + } + return 0; + } + + /* + * Check if server is most loaded and can be unloaded to + * low loaded servers (with load < avgLoadMinusSlop). + * @return number of regions to unassign. + */ + private int balanceToLowloaded(String srvName, HServerLoad srvLoad, + double avgLoad, final SortedMap> loadToServers) { + // check if server most loaded + if (!loadToServers.get(loadToServers.lastKey()).contains(srvName)) + return 0; + + // this server is most loaded, we will try to unload it by lowest + // loaded servers + int avgLoadMinusSlop = (int)Math.floor(avgLoad * (1 - this.slop)) - 1; + int lowestLoad = loadToServers.firstKey().getNumberOfRegions(); + + if(lowestLoad >= avgLoadMinusSlop) + return 0; // there is no low loaded servers + + int lowSrvCount = loadToServers.get(loadToServers.firstKey()).size(); + int numRegionsToClose = 0; + + int numSrvRegs = srvLoad.getNumberOfRegions(); + // What is this number? It seems wacky. + int numMoveToLowLoaded = (avgLoadMinusSlop - lowestLoad) * lowSrvCount; + numRegionsToClose = numSrvRegs - (int)Math.ceil(avgLoad); + numRegionsToClose = Math.min(numRegionsToClose, numMoveToLowLoaded); + if (LOG.isDebugEnabled()) { + LOG.debug("Server " + srvName + " will be unloaded for " + + "balance. Server load: " + numSrvRegs + " avg: " + + avgLoad + ", regions can be moved: " + numMoveToLowLoaded + + ". Regions to close: " + numRegionsToClose); + } + return numRegionsToClose; + } +} \ No newline at end of file Index: src/java/org/apache/hadoop/hbase/master/ServerManager.java =================================================================== --- src/java/org/apache/hadoop/hbase/master/ServerManager.java (revision 948520) +++ src/java/org/apache/hadoop/hbase/master/ServerManager.java (working copy) @@ -168,20 +168,7 @@ LOG.info("Received start message from: " + serverName); // Go on to process the regionserver registration. HServerLoad load = serversToLoad.remove(serverName); - if (load != null) { - // The startup message was from a known server. - // Remove stale information about the server's load. - synchronized (loadToServers) { - Set servers = loadToServers.get(load); - if (servers != null) { - servers.remove(serverName); - if (servers.size() > 0) - loadToServers.put(load, servers); - else - loadToServers.remove(load); - } - } - } + removeServerFromLoadToServers(load, serverName, this.loadToServers); HServerInfo storedInfo = serversToServerInfo.remove(serverName); if (storedInfo != null && !master.closed.get()) { // The startup message was from a known server with the same name. @@ -195,8 +182,48 @@ } recordNewServer(info); } - - + + /* + * @param load + * @param serverName + * @param loadToServers + */ + static void addServerToLoadToServers(final HServerLoad load, + final String serverName, + final SortedMap> loadToServers) { + synchronized (loadToServers) { + Set servers = loadToServers.get(load); + if (servers == null) { + servers = new HashSet(); + } + servers.add(serverName); + loadToServers.put(load, servers); + } + } + + /* + * @param load + * @param serverName + * @param loadToServers + */ + static void removeServerFromLoadToServers(final HServerLoad load, + final String serverName, + final SortedMap> loadToServers) { + if (load == null) return; + // The startup message was from a known server. + // Remove stale information about the server's load. + synchronized (loadToServers) { + Set servers = loadToServers.get(load); + if (servers != null) { + servers.remove(serverName); + if (servers.size() > 0) + loadToServers.put(load, servers); + else + loadToServers.remove(load); + } + } + } + /** * Adds the HSI to the RS list and creates an empty load * @param info The region server informations @@ -222,14 +249,7 @@ serversToServerInfo.put(serverName, info); serverAddressToServerInfo.put(info.getServerAddress(), info); serversToLoad.put(serverName, load); - synchronized (loadToServers) { - Set servers = loadToServers.get(load); - if (servers == null) { - servers = new HashSet(); - } - servers.add(serverName); - loadToServers.put(load, servers); - } + addServerToLoadToServers(load, serverName, loadToServers); } /** @@ -404,18 +424,8 @@ if (load != null) { this.master.getMetrics().incrementRequests(load.getNumberOfRequests()); if (!load.equals(serverInfo.getLoad())) { - // We have previous information about the load on this server - // and the load on this server has changed - synchronized (loadToServers) { - Set servers = loadToServers.get(load); - // Note that servers should never be null because loadToServers - // and serversToLoad are manipulated in pairs - servers.remove(serverInfo.getServerName()); - if (servers.size() > 0) - loadToServers.put(load, servers); - else - loadToServers.remove(load); - } + removeServerFromLoadToServers(load, serverInfo.getServerName(), + this.loadToServers); } } @@ -721,18 +731,7 @@ // update load information HServerLoad load = serversToLoad.remove(serverName); - if (load != null) { - synchronized (loadToServers) { - Set servers = loadToServers.get(load); - if (servers != null) { - servers.remove(serverName); - if(servers.size() > 0) - loadToServers.put(load, servers); - else - loadToServers.remove(load); - } - } - } + removeServerFromLoadToServers(load, serverName, loadToServers); } return infoUpdated; } @@ -857,18 +856,7 @@ if (info != null) { String serverName = info.getServerName(); HServerLoad load = serversToLoad.remove(serverName); - if (load != null) { - synchronized (loadToServers) { - Set servers = loadToServers.get(load); - if (servers != null) { - servers.remove(serverName); - if(servers.size() > 0) - loadToServers.put(load, servers); - else - loadToServers.remove(load); - } - } - } + removeServerFromLoadToServers(load, serverName, loadToServers); deadServers.add(server); try { master.toDoQueue.put(new ProcessServerShutdown(master, info)); Index: src/java/org/apache/hadoop/hbase/master/RegionManager.java =================================================================== --- src/java/org/apache/hadoop/hbase/master/RegionManager.java (revision 948520) +++ src/java/org/apache/hadoop/hbase/master/RegionManager.java (working copy) @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.regionserver.HLog; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.LoadBalancer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; @@ -197,7 +198,12 @@ if (!inSafeMode()) { // We only do load balancing once all regions are assigned. // This prevents churn while the cluster is starting up. - loadBalancer.loadBalancing(info, mostLoadedRegions, returnMsgs); + int numRegionsToClose = + this.loadBalancer.loadBalancing(info, this.master.getAverageLoad(), + this.master.serverManager.getLoadToServers()); + if (numRegionsToClose > 0) { + unassignSomeRegions(info, numRegionsToClose, mostLoadedRegions, returnMsgs); + } } } else { // if there's only one server, just give it all the regions @@ -1394,121 +1400,6 @@ } /** - * Class to balance region servers load. - * It keeps Region Servers load in slop range by unassigning Regions - * from most loaded servers. - * - * Equilibrium is reached when load of all serves are in slop range - * [avgLoadMinusSlop, avgLoadPlusSlop], where - * avgLoadPlusSlop = Math.ceil(avgLoad * (1 + this.slop)), and - * avgLoadMinusSlop = Math.floor(avgLoad * (1 - this.slop)) - 1. - */ - private class LoadBalancer { - private float slop; // hbase.regions.slop - private final int maxRegToClose; // hbase.regions.close.max - - LoadBalancer(HBaseConfiguration conf) { - this.slop = conf.getFloat("hbase.regions.slop", (float)0.3); - if (this.slop <= 0) this.slop = 1; - //maxRegToClose to constrain balance closing per one iteration - // -1 to turn off - // TODO: change default in HBASE-862, need a suggestion - this.maxRegToClose = conf.getInt("hbase.regions.close.max", -1); - } - - /** - * Balance server load by unassigning some regions. - * - * @param info - server info - * @param mostLoadedRegions - array of most loaded regions - * @param returnMsgs - array of return massages - */ - void loadBalancing(HServerInfo info, HRegionInfo[] mostLoadedRegions, - ArrayList returnMsgs) { - HServerLoad servLoad = info.getLoad(); - double avg = master.serverManager.getAverageLoad(); - - // nothing to balance if server load not more then average load - if(servLoad.getLoad() <= Math.ceil(avg) || avg <= 2.0) { - return; - } - - // check if current server is overloaded - int numRegionsToClose = balanceFromOverloaded(servLoad, avg); - - // check if we can unload server by low loaded servers - if(numRegionsToClose <= 0) { - numRegionsToClose = balanceToLowloaded(info.getServerName(), servLoad, - avg); - } - - if(maxRegToClose > 0) { - numRegionsToClose = Math.min(numRegionsToClose, maxRegToClose); - } - - if(numRegionsToClose > 0) { - unassignSomeRegions(info, numRegionsToClose, mostLoadedRegions, - returnMsgs); - } - } - - /* - * Check if server load is not overloaded (with load > avgLoadPlusSlop). - * @return number of regions to unassign. - */ - private int balanceFromOverloaded(HServerLoad srvLoad, double avgLoad) { - int avgLoadPlusSlop = (int)Math.ceil(avgLoad * (1 + this.slop)); - int numSrvRegs = srvLoad.getNumberOfRegions(); - if (numSrvRegs > avgLoadPlusSlop) { - if (LOG.isDebugEnabled()) { - LOG.debug("Server is overloaded: load=" + numSrvRegs + - ", avg=" + avgLoad + ", slop=" + this.slop); - } - return numSrvRegs - (int)Math.ceil(avgLoad); - } - return 0; - } - - /* - * Check if server is most loaded and can be unloaded to - * low loaded servers (with load < avgLoadMinusSlop). - * @return number of regions to unassign. - */ - private int balanceToLowloaded(String srvName, HServerLoad srvLoad, - double avgLoad) { - - SortedMap> loadToServers = - master.serverManager.getLoadToServers(); - // check if server most loaded - if (!loadToServers.get(loadToServers.lastKey()).contains(srvName)) - return 0; - - // this server is most loaded, we will try to unload it by lowest - // loaded servers - int avgLoadMinusSlop = (int)Math.floor(avgLoad * (1 - this.slop)) - 1; - int lowestLoad = loadToServers.firstKey().getNumberOfRegions(); - - if(lowestLoad >= avgLoadMinusSlop) - return 0; // there is no low loaded servers - - int lowSrvCount = loadToServers.get(loadToServers.firstKey()).size(); - int numRegionsToClose = 0; - - int numSrvRegs = srvLoad.getNumberOfRegions(); - int numMoveToLowLoaded = (avgLoadMinusSlop - lowestLoad) * lowSrvCount; - numRegionsToClose = numSrvRegs - (int)Math.ceil(avgLoad); - numRegionsToClose = Math.min(numRegionsToClose, numMoveToLowLoaded); - if (LOG.isDebugEnabled()) { - LOG.debug("Server " + srvName + " will be unloaded for " + - "balance. Server load: " + numSrvRegs + " avg: " + - avgLoad + ", regions can be moved: " + numMoveToLowLoaded + - ". Regions to close: " + numRegionsToClose); - } - return numRegionsToClose; - } - } - - /** * @param regionname Name to clear from regions in transistion. * @return True if we removed an element for the passed regionname. */