diff --git src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 49d1e7c..29858b2 100644 --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -59,7 +59,7 @@ import org.apache.hadoop.hbase.executor.RegionTransitionData; import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.regionserver.RegionOpeningState; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; -import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan; +import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler; import org.apache.hadoop.hbase.master.handler.DisableTableHandler; import org.apache.hadoop.hbase.master.handler.EnableTableHandler; @@ -101,6 +101,8 @@ public class AssignmentManager extends ZooKeeperListener { private TimeoutMonitor timeoutMonitor; + private LoadBalancer balancer; + /* * Maximum times we recurse an assignment. See below in {@link #assign()}. */ @@ -172,6 +174,7 @@ public class AssignmentManager extends ZooKeeperListener { this.zkTable = new ZKTable(this.master.getZooKeeper()); this.maximumAssignmentAttempts = this.master.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10); + this.balancer = LoadBalancerFactory.getLoadBalancer(conf); } /** @@ -1364,7 +1367,7 @@ public class AssignmentManager extends ZooKeeperListener { if (serverToExclude != null) servers.remove(serverToExclude); if (servers.isEmpty()) return null; RegionPlan randomPlan = new RegionPlan(state.getRegion(), null, - LoadBalancer.randomAssignment(servers)); + balancer.randomAssignment(servers)); boolean newPlan = false; RegionPlan existingPlan = null; synchronized (this.regionPlans) { @@ -1564,7 +1567,7 @@ public class AssignmentManager extends ZooKeeperListener { return; Map> bulkPlan = null; // Generate a round-robin bulk assignment plan - bulkPlan = LoadBalancer.roundRobinAssignment(regions, servers); + bulkPlan = balancer.roundRobinAssignment(regions, servers); LOG.info("Bulk assigning " + regions.size() + " region(s) round-robin across " + servers.size() + " server(s)"); // Use fixed count thread pool assigning. @@ -1598,7 +1601,7 @@ public class AssignmentManager extends ZooKeeperListener { Map> bulkPlan = null; if (retainAssignment) { // Reuse existing assignment info - bulkPlan = LoadBalancer.retainAssignment(allRegions, servers); + bulkPlan = balancer.retainAssignment(allRegions, servers); } else { // assign regions in round-robin fashion assignUserRegions(new ArrayList(allRegions.keySet()), servers); diff --git src/main/java/org/apache/hadoop/hbase/master/DefaultLoadBalancer.java src/main/java/org/apache/hadoop/hbase/master/DefaultLoadBalancer.java new file mode 100644 index 0000000..d608327 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/master/DefaultLoadBalancer.java @@ -0,0 +1,723 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Random; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; + +import com.google.common.collect.MinMaxPriorityQueue; + +/** + * Makes decisions about the placement and movement of Regions across + * RegionServers. + * + *

Cluster-wide load balancing will occur only when there are no regions in + * transition and according to a fixed period of a time using {@link #balanceCluster(Map)}. + * + *

Inline region placement with {@link #immediateAssignment} can be used when + * the Master needs to handle closed regions that it currently does not have + * a destination set for. This can happen during master failover. + * + *

On cluster startup, bulk assignment can be used to determine + * locations for all Regions in a cluster. + * + *

This classes produces plans for the {@link AssignmentManager} to execute. + */ +public class DefaultLoadBalancer implements LoadBalancer { + private static final Log LOG = LogFactory.getLog(LoadBalancer.class); + private static final Random RANDOM = new Random(System.currentTimeMillis()); + // slop for regions + private float slop; + private Configuration config; + private ClusterStatus status; + private MasterServices services; + + public void setClusterStatus(ClusterStatus st) { + this.status = st; + } + + public void setMasterServices(MasterServices masterServices) { + this.services = masterServices; + } + + @Override + public void setConf(Configuration conf) { + this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2); + if (slop < 0) slop = 0; + else if (slop > 1) slop = 1; + this.config = conf; + } + + @Override + public Configuration getConf() { + return this.config; + } + + /* + * The following comparator assumes that RegionId from HRegionInfo can + * represent the age of the region - larger RegionId means the region + * is younger. + * This comparator is used in balanceCluster() to account for the out-of-band + * regions which were assigned to the server after some other region server + * crashed. + */ + private class RegionInfoComparator implements Comparator { + @Override + public int compare(HRegionInfo l, HRegionInfo r) { + long diff = r.getRegionId() - l.getRegionId(); + if (diff < 0) return -1; + if (diff > 0) return 1; + return 0; + } + } + + + RegionInfoComparator riComparator = new RegionInfoComparator(); + + private class RegionPlanComparator implements Comparator { + @Override + public int compare(RegionPlan l, RegionPlan r) { + long diff = r.getRegionInfo().getRegionId() - l.getRegionInfo().getRegionId(); + if (diff < 0) return -1; + if (diff > 0) return 1; + return 0; + } + } + + RegionPlanComparator rpComparator = new RegionPlanComparator(); + + /** + * Generate a global load balancing plan according to the specified map of + * server information to the most loaded regions of each server. + * + * The load balancing invariant is that all servers are within 1 region of the + * average number of regions per server. If the average is an integer number, + * all servers will be balanced to the average. Otherwise, all servers will + * have either floor(average) or ceiling(average) regions. + * + * HBASE-3609 Modeled regionsToMove using Guava's MinMaxPriorityQueue so that + * we can fetch from both ends of the queue. + * At the beginning, we check whether there was empty region server + * just discovered by Master. If so, we alternately choose new / old + * regions from head / tail of regionsToMove, respectively. This alternation + * avoids clustering young regions on the newly discovered region server. + * Otherwise, we choose new regions from head of regionsToMove. + * + * Another improvement from HBASE-3609 is that we assign regions from + * regionsToMove to underloaded servers in round-robin fashion. + * Previously one underloaded server would be filled before we move onto + * the next underloaded server, leading to clustering of young regions. + * + * Finally, we randomly shuffle underloaded servers so that they receive + * offloaded regions relatively evenly across calls to balanceCluster(). + * + * The algorithm is currently implemented as such: + * + *

    + *
  1. Determine the two valid numbers of regions each server should have, + * MIN=floor(average) and MAX=ceiling(average). + * + *
  2. Iterate down the most loaded servers, shedding regions from each so + * each server hosts exactly MAX regions. Stop once you reach a + * server that already has <= MAX regions. + *

    + * Order the regions to move from most recent to least. + * + *

  3. Iterate down the least loaded servers, assigning regions so each server + * has exactly MIN regions. Stop once you reach a server that + * already has >= MIN regions. + * + * Regions being assigned to underloaded servers are those that were shed + * in the previous step. It is possible that there were not enough + * regions shed to fill each underloaded server to MIN. If so we + * end up with a number of regions required to do so, neededRegions. + * + * It is also possible that we were able to fill each underloaded but ended + * up with regions that were unassigned from overloaded servers but that + * still do not have assignment. + * + * If neither of these conditions hold (no regions needed to fill the + * underloaded servers, no regions leftover from overloaded servers), + * we are done and return. Otherwise we handle these cases below. + * + *
  4. If neededRegions is non-zero (still have underloaded servers), + * we iterate the most loaded servers again, shedding a single server from + * each (this brings them from having MAX regions to having + * MIN regions). + * + *
  5. We now definitely have more regions that need assignment, either from + * the previous step or from the original shedding from overloaded servers. + * Iterate the least loaded servers filling each to MIN. + * + *
  6. If we still have more regions that need assignment, again iterate the + * least loaded servers, this time giving each one (filling them to + * MAX) until we run out. + * + *
  7. All servers will now either host MIN or MAX regions. + * + * In addition, any server hosting >= MAX regions is guaranteed + * to end up with MAX regions at the end of the balancing. This + * ensures the minimal number of regions possible are moved. + *
+ * + * TODO: We can at-most reassign the number of regions away from a particular + * server to be how many they report as most loaded. + * Should we just keep all assignment in memory? Any objections? + * Does this mean we need HeapSize on HMaster? Or just careful monitor? + * (current thinking is we will hold all assignments in memory) + * + * @param clusterState Map of regionservers and their load/region information to + * a list of their most loaded regions + * @return a list of regions to be moved, including source and destination, + * or null if cluster is already balanced + */ + public List balanceCluster( + Map> clusterState) { + boolean emptyRegionServerPresent = false; + long startTime = System.currentTimeMillis(); + + int numServers = clusterState.size(); + if (numServers == 0) { + LOG.debug("numServers=0 so skipping load balancing"); + return null; + } + NavigableMap> serversByLoad = + new TreeMap>(); + int numRegions = 0; + StringBuilder strBalanceParam = new StringBuilder("Server information: "); + // Iterate so we can count regions as we build the map + for (Map.Entry> server: clusterState.entrySet()) { + List regions = server.getValue(); + int sz = regions.size(); + if (sz == 0) emptyRegionServerPresent = true; + numRegions += sz; + serversByLoad.put(new ServerAndLoad(server.getKey(), sz), regions); + strBalanceParam.append(server.getKey().getServerName()).append("="). + append(server.getValue().size()).append(", "); + } + strBalanceParam.delete(strBalanceParam.length() - 2, + strBalanceParam.length()); + LOG.debug(strBalanceParam.toString()); + + // Check if we even need to do any load balancing + float average = (float)numRegions / numServers; // for logging + // HBASE-3681 check sloppiness first + int floor = (int) Math.floor(average * (1 - slop)); + int ceiling = (int) Math.ceil(average * (1 + slop)); + if (serversByLoad.lastKey().getLoad() <= ceiling && + serversByLoad.firstKey().getLoad() >= floor) { + // Skipped because no server outside (min,max) range + LOG.info("Skipping load balancing because balanced cluster; " + + "servers=" + numServers + " " + + "regions=" + numRegions + " average=" + average + " " + + "mostloaded=" + serversByLoad.lastKey().getLoad() + + " leastloaded=" + serversByLoad.firstKey().getLoad()); + return null; + } + int min = numRegions / numServers; + int max = numRegions % numServers == 0 ? min : min + 1; + + // Using to check banance result. + strBalanceParam.delete(0, strBalanceParam.length()); + strBalanceParam.append("Balance parameter: numRegions=").append(numRegions) + .append(", numServers=").append(numServers).append(", max=").append(max) + .append(", min=").append(min); + LOG.debug(strBalanceParam.toString()); + + // Balance the cluster + // TODO: Look at data block locality or a more complex load to do this + MinMaxPriorityQueue regionsToMove = + MinMaxPriorityQueue.orderedBy(rpComparator).create(); + List regionsToReturn = new ArrayList(); + + // Walk down most loaded, pruning each to the max + int serversOverloaded = 0; + // flag used to fetch regions from head and tail of list, alternately + boolean fetchFromTail = false; + Map serverBalanceInfo = + new TreeMap(); + for (Map.Entry> server: + serversByLoad.descendingMap().entrySet()) { + ServerAndLoad sal = server.getKey(); + int regionCount = sal.getLoad(); + if (regionCount <= max) { + serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0)); + break; + } + serversOverloaded++; + List regions = server.getValue(); + int numToOffload = Math.min(regionCount - max, regions.size()); + // account for the out-of-band regions which were assigned to this server + // after some other region server crashed + Collections.sort(regions, riComparator); + int numTaken = 0; + for (int i = 0; i <= numToOffload; ) { + HRegionInfo hri = regions.get(i); // fetch from head + if (fetchFromTail) { + hri = regions.get(regions.size() - 1 - i); + } + i++; + // Don't rebalance meta regions. + if (hri.isMetaRegion()) continue; + regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null)); + numTaken++; + if (numTaken >= numToOffload) break; + // fetch in alternate order if there is new region server + if (emptyRegionServerPresent) { + fetchFromTail = !fetchFromTail; + } + } + serverBalanceInfo.put(sal.getServerName(), + new BalanceInfo(numToOffload, (-1)*numTaken)); + } + int totalNumMoved = regionsToMove.size(); + + // Walk down least loaded, filling each to the min + int neededRegions = 0; // number of regions needed to bring all up to min + fetchFromTail = false; + + Map underloadedServers = new HashMap(); + for (Map.Entry> server: + serversByLoad.entrySet()) { + int regionCount = server.getKey().getLoad(); + if (regionCount >= min) { + break; + } + underloadedServers.put(server.getKey().getServerName(), min - regionCount); + } + // number of servers that get new regions + int serversUnderloaded = underloadedServers.size(); + int incr = 1; + List sns = + Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded])); + Collections.shuffle(sns, RANDOM); + while (regionsToMove.size() > 0) { + int cnt = 0; + int i = incr > 0 ? 0 : underloadedServers.size()-1; + for (; i >= 0 && i < underloadedServers.size(); i += incr) { + if (regionsToMove.isEmpty()) break; + ServerName si = sns.get(i); + int numToTake = underloadedServers.get(si); + if (numToTake == 0) continue; + + addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn); + if (emptyRegionServerPresent) { + fetchFromTail = !fetchFromTail; + } + + underloadedServers.put(si, numToTake-1); + cnt++; + BalanceInfo bi = serverBalanceInfo.get(si); + if (bi == null) { + bi = new BalanceInfo(0, 0); + serverBalanceInfo.put(si, bi); + } + bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1); + } + if (cnt == 0) break; + // iterates underloadedServers in the other direction + incr = -incr; + } + for (Integer i : underloadedServers.values()) { + // If we still want to take some, increment needed + neededRegions += i; + } + + // If none needed to fill all to min and none left to drain all to max, + // we are done + if (neededRegions == 0 && regionsToMove.isEmpty()) { + long endTime = System.currentTimeMillis(); + LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " + + "Moving " + totalNumMoved + " regions off of " + + serversOverloaded + " overloaded servers onto " + + serversUnderloaded + " less loaded servers"); + return regionsToReturn; + } + + // Need to do a second pass. + // Either more regions to assign out or servers that are still underloaded + + // If we need more to fill min, grab one from each most loaded until enough + if (neededRegions != 0) { + // Walk down most loaded, grabbing one from each until we get enough + for (Map.Entry> server : + serversByLoad.descendingMap().entrySet()) { + BalanceInfo balanceInfo = + serverBalanceInfo.get(server.getKey().getServerName()); + int idx = + balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload(); + if (idx >= server.getValue().size()) break; + HRegionInfo region = server.getValue().get(idx); + if (region.isMetaRegion()) continue; // Don't move meta regions. + regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null)); + totalNumMoved++; + if (--neededRegions == 0) { + // No more regions needed, done shedding + break; + } + } + } + + // Now we have a set of regions that must be all assigned out + // Assign each underloaded up to the min, then if leftovers, assign to max + + // Walk down least loaded, assigning to each to fill up to min + for (Map.Entry> server : + serversByLoad.entrySet()) { + int regionCount = server.getKey().getLoad(); + if (regionCount >= min) break; + BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName()); + if(balanceInfo != null) { + regionCount += balanceInfo.getNumRegionsAdded(); + } + if(regionCount >= min) { + continue; + } + int numToTake = min - regionCount; + int numTaken = 0; + while(numTaken < numToTake && 0 < regionsToMove.size()) { + addRegionPlan(regionsToMove, fetchFromTail, + server.getKey().getServerName(), regionsToReturn); + numTaken++; + if (emptyRegionServerPresent) { + fetchFromTail = !fetchFromTail; + } + } + } + + // If we still have regions to dish out, assign underloaded to max + if (0 < regionsToMove.size()) { + for (Map.Entry> server : + serversByLoad.entrySet()) { + int regionCount = server.getKey().getLoad(); + if(regionCount >= max) { + break; + } + addRegionPlan(regionsToMove, fetchFromTail, + server.getKey().getServerName(), regionsToReturn); + if (emptyRegionServerPresent) { + fetchFromTail = !fetchFromTail; + } + if (regionsToMove.isEmpty()) { + break; + } + } + } + + long endTime = System.currentTimeMillis(); + + if (!regionsToMove.isEmpty() || neededRegions != 0) { + // Emit data so can diagnose how balancer went astray. + LOG.warn("regionsToMove=" + totalNumMoved + + ", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded + + ", serversUnderloaded=" + serversUnderloaded); + StringBuilder sb = new StringBuilder(); + for (Map.Entry> e: clusterState.entrySet()) { + if (sb.length() > 0) sb.append(", "); + sb.append(e.getKey().toString()); + sb.append(" "); + sb.append(e.getValue().size()); + } + LOG.warn("Input " + sb.toString()); + } + + // All done! + LOG.info("Done. Calculated a load balance in " + (endTime-startTime) + "ms. " + + "Moving " + totalNumMoved + " regions off of " + + serversOverloaded + " overloaded servers onto " + + serversUnderloaded + " less loaded servers"); + + return regionsToReturn; + } + + /** + * Add a region from the head or tail to the List of regions to return. + */ + void addRegionPlan(final MinMaxPriorityQueue regionsToMove, + final boolean fetchFromTail, final ServerName sn, List regionsToReturn) { + RegionPlan rp = null; + if (!fetchFromTail) rp = regionsToMove.remove(); + else rp = regionsToMove.removeLast(); + rp.setDestination(sn); + regionsToReturn.add(rp); + } + + /** + * Stores additional per-server information about the regions added/removed + * during the run of the balancing algorithm. + * + * For servers that shed regions, we need to track which regions we have + * already shed. nextRegionForUnload contains the index in the list + * of regions on the server that is the next to be shed. + */ + private static class BalanceInfo { + + private final int nextRegionForUnload; + private int numRegionsAdded; + + public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) { + this.nextRegionForUnload = nextRegionForUnload; + this.numRegionsAdded = numRegionsAdded; + } + + public int getNextRegionForUnload() { + return nextRegionForUnload; + } + + public int getNumRegionsAdded() { + return numRegionsAdded; + } + + public void setNumRegionsAdded(int numAdded) { + this.numRegionsAdded = numAdded; + } + } + + /** + * Generates a bulk assignment plan to be used on cluster startup using a + * simple round-robin assignment. + *

+ * Takes a list of all the regions and all the servers in the cluster and + * returns a map of each server to the regions that it should be assigned. + *

+ * Currently implemented as a round-robin assignment. Same invariant as + * load balancing, all servers holding floor(avg) or ceiling(avg). + * + * TODO: Use block locations from HDFS to place regions with their blocks + * + * @param regions all regions + * @param servers all servers + * @return map of server to the regions it should take, or null if no + * assignment is possible (ie. no regions or no servers) + */ + public Map> roundRobinAssignment( + List regions, List servers) { + if (regions.isEmpty() || servers.isEmpty()) { + return null; + } + Map> assignments = + new TreeMap>(); + int numRegions = regions.size(); + int numServers = servers.size(); + int max = (int)Math.ceil((float)numRegions/numServers); + int serverIdx = 0; + if (numServers > 1) { + serverIdx = RANDOM.nextInt(numServers); + } + int regionIdx = 0; + for (int j = 0; j < numServers; j++) { + ServerName server = servers.get((j + serverIdx) % numServers); + List serverRegions = new ArrayList(max); + for (int i=regionIdx; i + * Takes a map of all regions to their existing assignment from META. Also + * takes a list of online servers for regions to be assigned to. Attempts to + * retain all assignment, so in some instances initial assignment will not be + * completely balanced. + *

+ * Any leftover regions without an existing server to be assigned to will be + * assigned randomly to available servers. + * @param regions regions and existing assignment from meta + * @param servers available servers + * @return map of servers and regions to be assigned to them + */ + public Map> retainAssignment( + Map regions, List servers) { + Map> assignments = + new TreeMap>(); + for (ServerName server : servers) { + assignments.put(server, new ArrayList()); + } + for (Map.Entry region : regions.entrySet()) { + ServerName sn = region.getValue(); + if (sn != null && servers.contains(sn)) { + assignments.get(sn).add(region.getKey()); + } else { + int size = assignments.size(); + assignments.get(servers.get(RANDOM.nextInt(size))).add(region.getKey()); + } + } + return assignments; + } + + /** + * Returns an ordered list of hosts that are hosting the blocks for this + * region. The weight of each host is the sum of the block lengths of all + * files on that host, so the first host in the list is the server which + * holds the most bytes of the given region's HFiles. + * + * @param fs the filesystem + * @param region region + * @return ordered list of hosts holding blocks of the specified region + */ + @SuppressWarnings("unused") + private List getTopBlockLocations(FileSystem fs, + HRegionInfo region) { + List topServerNames = null; + try { + HTableDescriptor tableDescriptor = getTableDescriptor( + region.getTableName()); + if (tableDescriptor != null) { + HDFSBlocksDistribution blocksDistribution = + HRegion.computeHDFSBlocksDistribution(config, tableDescriptor, + region.getEncodedName()); + List topHosts = blocksDistribution.getTopHosts(); + topServerNames = mapHostNameToServerName(topHosts); + } + } catch (IOException ioe) { + LOG.debug("IOException during HDFSBlocksDistribution computation. for " + + "region = " + region.getEncodedName() , ioe); + } + + return topServerNames; + } + + /** + * return HTableDescriptor for a given tableName + * @param tableName the table name + * @return HTableDescriptor + * @throws IOException + */ + private HTableDescriptor getTableDescriptor(byte[] tableName) + throws IOException { + HTableDescriptor tableDescriptor = null; + try { + if ( this.services != null) + { + tableDescriptor = this.services.getTableDescriptors(). + get(Bytes.toString(tableName)); + } + } catch (TableExistsException tee) { + LOG.debug("TableExistsException during getTableDescriptors." + + " Current table name = " + tableName , tee); + } catch (FileNotFoundException fnfe) { + LOG.debug("FileNotFoundException during getTableDescriptors." + + " Current table name = " + tableName , fnfe); + } + + return tableDescriptor; + } + + /** + * Map hostname to ServerName, The output ServerName list will have the same + * order as input hosts. + * @param hosts the list of hosts + * @return ServerName list + */ + private List mapHostNameToServerName(List hosts) { + if ( hosts == null || status == null) { + return null; + } + + List topServerNames = new ArrayList(); + Collection regionServers = status.getServers(); + + // create a mapping from hostname to ServerName for fast lookup + HashMap hostToServerName = + new HashMap(); + for (ServerName sn : regionServers) { + hostToServerName.put(sn.getHostname(), sn); + } + + for (String host : hosts ) { + ServerName sn = hostToServerName.get(host); + // it is possible that HDFS is up ( thus host is valid ), + // but RS is down ( thus sn is null ) + if (sn != null) { + topServerNames.add(sn); + } + } + return topServerNames; + } + + + /** + * Generates an immediate assignment plan to be used by a new master for + * regions in transition that do not have an already known destination. + * + * Takes a list of regions that need immediate assignment and a list of + * all available servers. Returns a map of regions to the server they + * should be assigned to. + * + * This method will return quickly and does not do any intelligent + * balancing. The goal is to make a fast decision not the best decision + * possible. + * + * Currently this is random. + * + * @param regions + * @param servers + * @return map of regions to the server it should be assigned to + */ + public Map immediateAssignment( + List regions, List servers) { + Map assignments = + new TreeMap(); + for(HRegionInfo region : regions) { + assignments.put(region, servers.get(RANDOM.nextInt(servers.size()))); + } + return assignments; + } + + public ServerName randomAssignment(List servers) { + if (servers == null || servers.isEmpty()) { + LOG.warn("Wanted to do random assignment but no servers to assign to"); + return null; + } + return servers.get(RANDOM.nextInt(servers.size())); + } + +} diff --git src/main/java/org/apache/hadoop/hbase/master/HMaster.java src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 9784195..92d5dbb 100644 --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -63,7 +63,7 @@ import org.apache.hadoop.hbase.ipc.HBaseServer; import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.ipc.HMasterRegionInterface; import org.apache.hadoop.hbase.ipc.RpcServer; -import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan; +import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.handler.DeleteTableHandler; import org.apache.hadoop.hbase.master.handler.DisableTableHandler; import org.apache.hadoop.hbase.master.handler.EnableTableHandler; @@ -354,7 +354,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { this.assignmentManager = new AssignmentManager(this, serverManager, this.catalogTracker, this.executorService); - this.balancer = new LoadBalancer(conf); + this.balancer = LoadBalancerFactory.getLoadBalancer(conf); zooKeeper.registerListenerFirst(assignmentManager); this.regionServerTracker = new RegionServerTracker(zooKeeper, this, diff --git src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java index 05460b6..ba0d422 100644 --- src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java +++ src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java @@ -1,5 +1,5 @@ /** - * Copyright 2010 The Apache Software Foundation + * Copyright 2011 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -19,34 +19,13 @@ */ package org.apache.hadoop.hbase.master; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Random; -import java.util.TreeMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.hbase.ClusterStatus; -import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableExistsException; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.util.Bytes; -import com.google.common.collect.MinMaxPriorityQueue; +import java.util.List; +import java.util.Map; /** * Makes decisions about the placement and movement of Regions across @@ -64,764 +43,56 @@ import com.google.common.collect.MinMaxPriorityQueue; * *

This classes produces plans for the {@link AssignmentManager} to execute. */ -public class LoadBalancer { - private static final Log LOG = LogFactory.getLog(LoadBalancer.class); - private static final Random RANDOM = new Random(System.currentTimeMillis()); - // slop for regions - private float slop; - private Configuration config; - private ClusterStatus status; - private MasterServices services; - - LoadBalancer(Configuration conf) { - this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2); - if (slop < 0) slop = 0; - else if (slop > 1) slop = 1; - this.config = conf; - } - - public void setClusterStatus(ClusterStatus st) { - this.status = st; - } - - public void setMasterServices(MasterServices masterServices) { - this.services = masterServices; - } - - /* - * The following comparator assumes that RegionId from HRegionInfo can - * represent the age of the region - larger RegionId means the region - * is younger. - * This comparator is used in balanceCluster() to account for the out-of-band - * regions which were assigned to the server after some other region server - * crashed. - */ - static class RegionInfoComparator implements Comparator { - @Override - public int compare(HRegionInfo l, HRegionInfo r) { - long diff = r.getRegionId() - l.getRegionId(); - if (diff < 0) return -1; - if (diff > 0) return 1; - return 0; - } - } - static RegionInfoComparator riComparator = new RegionInfoComparator(); - - static class RegionPlanComparator implements Comparator { - @Override - public int compare(RegionPlan l, RegionPlan r) { - long diff = r.getRegionInfo().getRegionId() - l.getRegionInfo().getRegionId(); - if (diff < 0) return -1; - if (diff > 0) return 1; - return 0; - } - } - static RegionPlanComparator rpComparator = new RegionPlanComparator(); +public interface LoadBalancer extends Configurable { /** - * Data structure that holds servername and 'load'. + * Set the current cluster status. This allows a LoadBalancer to map host name to a server + * @param st */ - static class ServerAndLoad implements Comparable { - private final ServerName sn; - private final int load; - ServerAndLoad(final ServerName sn, final int load) { - this.sn = sn; - this.load = load; - } + public void setClusterStatus(ClusterStatus st); - ServerName getServerName() {return this.sn;} - int getLoad() {return this.load;} - - @Override - public int compareTo(ServerAndLoad other) { - int diff = this.load - other.load; - return diff != 0? diff: this.sn.compareTo(other.getServerName()); - } - } /** - * Generate a global load balancing plan according to the specified map of - * server information to the most loaded regions of each server. - * - * The load balancing invariant is that all servers are within 1 region of the - * average number of regions per server. If the average is an integer number, - * all servers will be balanced to the average. Otherwise, all servers will - * have either floor(average) or ceiling(average) regions. - * - * HBASE-3609 Modeled regionsToMove using Guava's MinMaxPriorityQueue so that - * we can fetch from both ends of the queue. - * At the beginning, we check whether there was empty region server - * just discovered by Master. If so, we alternately choose new / old - * regions from head / tail of regionsToMove, respectively. This alternation - * avoids clustering young regions on the newly discovered region server. - * Otherwise, we choose new regions from head of regionsToMove. - * - * Another improvement from HBASE-3609 is that we assign regions from - * regionsToMove to underloaded servers in round-robin fashion. - * Previously one underloaded server would be filled before we move onto - * the next underloaded server, leading to clustering of young regions. - * - * Finally, we randomly shuffle underloaded servers so that they receive - * offloaded regions relatively evenly across calls to balanceCluster(). - * - * The algorithm is currently implemented as such: - * - *

    - *
  1. Determine the two valid numbers of regions each server should have, - * MIN=floor(average) and MAX=ceiling(average). - * - *
  2. Iterate down the most loaded servers, shedding regions from each so - * each server hosts exactly MAX regions. Stop once you reach a - * server that already has <= MAX regions. - *

    - * Order the regions to move from most recent to least. - * - *

  3. Iterate down the least loaded servers, assigning regions so each server - * has exactly MIN regions. Stop once you reach a server that - * already has >= MIN regions. - * - * Regions being assigned to underloaded servers are those that were shed - * in the previous step. It is possible that there were not enough - * regions shed to fill each underloaded server to MIN. If so we - * end up with a number of regions required to do so, neededRegions. - * - * It is also possible that we were able to fill each underloaded but ended - * up with regions that were unassigned from overloaded servers but that - * still do not have assignment. - * - * If neither of these conditions hold (no regions needed to fill the - * underloaded servers, no regions leftover from overloaded servers), - * we are done and return. Otherwise we handle these cases below. - * - *
  4. If neededRegions is non-zero (still have underloaded servers), - * we iterate the most loaded servers again, shedding a single server from - * each (this brings them from having MAX regions to having - * MIN regions). - * - *
  5. We now definitely have more regions that need assignment, either from - * the previous step or from the original shedding from overloaded servers. - * Iterate the least loaded servers filling each to MIN. - * - *
  6. If we still have more regions that need assignment, again iterate the - * least loaded servers, this time giving each one (filling them to - * MAX) until we run out. - * - *
  7. All servers will now either host MIN or MAX regions. - * - * In addition, any server hosting >= MAX regions is guaranteed - * to end up with MAX regions at the end of the balancing. This - * ensures the minimal number of regions possible are moved. - *
- * - * TODO: We can at-most reassign the number of regions away from a particular - * server to be how many they report as most loaded. - * Should we just keep all assignment in memory? Any objections? - * Does this mean we need HeapSize on HMaster? Or just careful monitor? - * (current thinking is we will hold all assignments in memory) - * - * @param clusterState Map of regionservers and their load/region information to - * a list of their most loaded regions - * @return a list of regions to be moved, including source and destination, - * or null if cluster is already balanced + * Set the master service. + * @param masterServices */ - public List balanceCluster( - Map> clusterState) { - boolean emptyRegionServerPresent = false; - long startTime = System.currentTimeMillis(); - - int numServers = clusterState.size(); - if (numServers == 0) { - LOG.debug("numServers=0 so skipping load balancing"); - return null; - } - NavigableMap> serversByLoad = - new TreeMap>(); - int numRegions = 0; - StringBuilder strBalanceParam = new StringBuilder("Server information: "); - // Iterate so we can count regions as we build the map - for (Map.Entry> server: clusterState.entrySet()) { - List regions = server.getValue(); - int sz = regions.size(); - if (sz == 0) emptyRegionServerPresent = true; - numRegions += sz; - serversByLoad.put(new ServerAndLoad(server.getKey(), sz), regions); - strBalanceParam.append(server.getKey().getServerName()).append("="). - append(server.getValue().size()).append(", "); - } - strBalanceParam.delete(strBalanceParam.length() - 2, - strBalanceParam.length()); - LOG.debug(strBalanceParam.toString()); - - // Check if we even need to do any load balancing - float average = (float)numRegions / numServers; // for logging - // HBASE-3681 check sloppiness first - int floor = (int) Math.floor(average * (1 - slop)); - int ceiling = (int) Math.ceil(average * (1 + slop)); - if (serversByLoad.lastKey().getLoad() <= ceiling && - serversByLoad.firstKey().getLoad() >= floor) { - // Skipped because no server outside (min,max) range - LOG.info("Skipping load balancing because balanced cluster; " + - "servers=" + numServers + " " + - "regions=" + numRegions + " average=" + average + " " + - "mostloaded=" + serversByLoad.lastKey().getLoad() + - " leastloaded=" + serversByLoad.firstKey().getLoad()); - return null; - } - int min = numRegions / numServers; - int max = numRegions % numServers == 0 ? min : min + 1; - - // Using to check banance result. - strBalanceParam.delete(0, strBalanceParam.length()); - strBalanceParam.append("Balance parameter: numRegions=").append(numRegions) - .append(", numServers=").append(numServers).append(", max=").append(max) - .append(", min=").append(min); - LOG.debug(strBalanceParam.toString()); - - // Balance the cluster - // TODO: Look at data block locality or a more complex load to do this - MinMaxPriorityQueue regionsToMove = - MinMaxPriorityQueue.orderedBy(rpComparator).create(); - List regionsToReturn = new ArrayList(); - - // Walk down most loaded, pruning each to the max - int serversOverloaded = 0; - // flag used to fetch regions from head and tail of list, alternately - boolean fetchFromTail = false; - Map serverBalanceInfo = - new TreeMap(); - for (Map.Entry> server: - serversByLoad.descendingMap().entrySet()) { - ServerAndLoad sal = server.getKey(); - int regionCount = sal.getLoad(); - if (regionCount <= max) { - serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0)); - break; - } - serversOverloaded++; - List regions = server.getValue(); - int numToOffload = Math.min(regionCount - max, regions.size()); - // account for the out-of-band regions which were assigned to this server - // after some other region server crashed - Collections.sort(regions, riComparator); - int numTaken = 0; - for (int i = 0; i <= numToOffload; ) { - HRegionInfo hri = regions.get(i); // fetch from head - if (fetchFromTail) { - hri = regions.get(regions.size() - 1 - i); - } - i++; - // Don't rebalance meta regions. - if (hri.isMetaRegion()) continue; - regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null)); - numTaken++; - if (numTaken >= numToOffload) break; - // fetch in alternate order if there is new region server - if (emptyRegionServerPresent) { - fetchFromTail = !fetchFromTail; - } - } - serverBalanceInfo.put(sal.getServerName(), - new BalanceInfo(numToOffload, (-1)*numTaken)); - } - int totalNumMoved = regionsToMove.size(); - - // Walk down least loaded, filling each to the min - int neededRegions = 0; // number of regions needed to bring all up to min - fetchFromTail = false; - - Map underloadedServers = new HashMap(); - for (Map.Entry> server: - serversByLoad.entrySet()) { - int regionCount = server.getKey().getLoad(); - if (regionCount >= min) { - break; - } - underloadedServers.put(server.getKey().getServerName(), min - regionCount); - } - // number of servers that get new regions - int serversUnderloaded = underloadedServers.size(); - int incr = 1; - List sns = - Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded])); - Collections.shuffle(sns, RANDOM); - while (regionsToMove.size() > 0) { - int cnt = 0; - int i = incr > 0 ? 0 : underloadedServers.size()-1; - for (; i >= 0 && i < underloadedServers.size(); i += incr) { - if (regionsToMove.isEmpty()) break; - ServerName si = sns.get(i); - int numToTake = underloadedServers.get(si); - if (numToTake == 0) continue; - - addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn); - if (emptyRegionServerPresent) { - fetchFromTail = !fetchFromTail; - } - - underloadedServers.put(si, numToTake-1); - cnt++; - BalanceInfo bi = serverBalanceInfo.get(si); - if (bi == null) { - bi = new BalanceInfo(0, 0); - serverBalanceInfo.put(si, bi); - } - bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1); - } - if (cnt == 0) break; - // iterates underloadedServers in the other direction - incr = -incr; - } - for (Integer i : underloadedServers.values()) { - // If we still want to take some, increment needed - neededRegions += i; - } - - // If none needed to fill all to min and none left to drain all to max, - // we are done - if (neededRegions == 0 && regionsToMove.isEmpty()) { - long endTime = System.currentTimeMillis(); - LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " + - "Moving " + totalNumMoved + " regions off of " + - serversOverloaded + " overloaded servers onto " + - serversUnderloaded + " less loaded servers"); - return regionsToReturn; - } - - // Need to do a second pass. - // Either more regions to assign out or servers that are still underloaded - - // If we need more to fill min, grab one from each most loaded until enough - if (neededRegions != 0) { - // Walk down most loaded, grabbing one from each until we get enough - for (Map.Entry> server : - serversByLoad.descendingMap().entrySet()) { - BalanceInfo balanceInfo = - serverBalanceInfo.get(server.getKey().getServerName()); - int idx = - balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload(); - if (idx >= server.getValue().size()) break; - HRegionInfo region = server.getValue().get(idx); - if (region.isMetaRegion()) continue; // Don't move meta regions. - regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null)); - totalNumMoved++; - if (--neededRegions == 0) { - // No more regions needed, done shedding - break; - } - } - } - - // Now we have a set of regions that must be all assigned out - // Assign each underloaded up to the min, then if leftovers, assign to max - - // Walk down least loaded, assigning to each to fill up to min - for (Map.Entry> server : - serversByLoad.entrySet()) { - int regionCount = server.getKey().getLoad(); - if (regionCount >= min) break; - BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName()); - if(balanceInfo != null) { - regionCount += balanceInfo.getNumRegionsAdded(); - } - if(regionCount >= min) { - continue; - } - int numToTake = min - regionCount; - int numTaken = 0; - while(numTaken < numToTake && 0 < regionsToMove.size()) { - addRegionPlan(regionsToMove, fetchFromTail, - server.getKey().getServerName(), regionsToReturn); - numTaken++; - if (emptyRegionServerPresent) { - fetchFromTail = !fetchFromTail; - } - } - } - - // If we still have regions to dish out, assign underloaded to max - if (0 < regionsToMove.size()) { - for (Map.Entry> server : - serversByLoad.entrySet()) { - int regionCount = server.getKey().getLoad(); - if(regionCount >= max) { - break; - } - addRegionPlan(regionsToMove, fetchFromTail, - server.getKey().getServerName(), regionsToReturn); - if (emptyRegionServerPresent) { - fetchFromTail = !fetchFromTail; - } - if (regionsToMove.isEmpty()) { - break; - } - } - } - - long endTime = System.currentTimeMillis(); - - if (!regionsToMove.isEmpty() || neededRegions != 0) { - // Emit data so can diagnose how balancer went astray. - LOG.warn("regionsToMove=" + totalNumMoved + - ", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded + - ", serversUnderloaded=" + serversUnderloaded); - StringBuilder sb = new StringBuilder(); - for (Map.Entry> e: clusterState.entrySet()) { - if (sb.length() > 0) sb.append(", "); - sb.append(e.getKey().toString()); - sb.append(" "); - sb.append(e.getValue().size()); - } - LOG.warn("Input " + sb.toString()); - } - - // All done! - LOG.info("Done. Calculated a load balance in " + (endTime-startTime) + "ms. " + - "Moving " + totalNumMoved + " regions off of " + - serversOverloaded + " overloaded servers onto " + - serversUnderloaded + " less loaded servers"); - - return regionsToReturn; - } + public void setMasterServices(MasterServices masterServices); /** - * Add a region from the head or tail to the List of regions to return. + * Perform the major balance operation + * @param clusterState + * @return */ - void addRegionPlan(final MinMaxPriorityQueue regionsToMove, - final boolean fetchFromTail, final ServerName sn, List regionsToReturn) { - RegionPlan rp = null; - if (!fetchFromTail) rp = regionsToMove.remove(); - else rp = regionsToMove.removeLast(); - rp.setDestination(sn); - regionsToReturn.add(rp); - } + public List balanceCluster(Map> clusterState); /** + * Perform a Round Robin assignment of regions. * @param regions - * @return Randomization of passed regions - */ - static List randomize(final List regions) { - Collections.shuffle(regions, RANDOM); - return regions; - } - - /** - * Stores additional per-server information about the regions added/removed - * during the run of the balancing algorithm. - * - * For servers that shed regions, we need to track which regions we have - * already shed. nextRegionForUnload contains the index in the list - * of regions on the server that is the next to be shed. - */ - private static class BalanceInfo { - - private final int nextRegionForUnload; - private int numRegionsAdded; - - public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) { - this.nextRegionForUnload = nextRegionForUnload; - this.numRegionsAdded = numRegionsAdded; - } - - public int getNextRegionForUnload() { - return nextRegionForUnload; - } - - public int getNumRegionsAdded() { - return numRegionsAdded; - } - - public void setNumRegionsAdded(int numAdded) { - this.numRegionsAdded = numAdded; - } - } - - /** - * Generates a bulk assignment plan to be used on cluster startup using a - * simple round-robin assignment. - *

- * Takes a list of all the regions and all the servers in the cluster and - * returns a map of each server to the regions that it should be assigned. - *

- * Currently implemented as a round-robin assignment. Same invariant as - * load balancing, all servers holding floor(avg) or ceiling(avg). - * - * TODO: Use block locations from HDFS to place regions with their blocks - * - * @param regions all regions - * @param servers all servers - * @return map of server to the regions it should take, or null if no - * assignment is possible (ie. no regions or no servers) - */ - public static Map> roundRobinAssignment( - List regions, List servers) { - if (regions.isEmpty() || servers.isEmpty()) { - return null; - } - Map> assignments = - new TreeMap>(); - int numRegions = regions.size(); - int numServers = servers.size(); - int max = (int)Math.ceil((float)numRegions/numServers); - int serverIdx = 0; - if (numServers > 1) { - serverIdx = RANDOM.nextInt(numServers); - } - int regionIdx = 0; - for (int j = 0; j < numServers; j++) { - ServerName server = servers.get((j + serverIdx) % numServers); - List serverRegions = new ArrayList(max); - for (int i=regionIdx; i - * Takes a map of all regions to their existing assignment from META. Also - * takes a list of online servers for regions to be assigned to. Attempts to - * retain all assignment, so in some instances initial assignment will not be - * completely balanced. - *

- * Any leftover regions without an existing server to be assigned to will be - * assigned randomly to available servers. - * @param regions regions and existing assignment from meta - * @param servers available servers - * @return map of servers and regions to be assigned to them - */ - public static Map> retainAssignment( - Map regions, List servers) { - Map> assignments = - new TreeMap>(); - for (ServerName server : servers) { - assignments.put(server, new ArrayList()); - } - for (Map.Entry region : regions.entrySet()) { - ServerName sn = region.getValue(); - if (sn != null && servers.contains(sn)) { - assignments.get(sn).add(region.getKey()); - } else { - int size = assignments.size(); - assignments.get(servers.get(RANDOM.nextInt(size))).add(region.getKey()); - } - } - return assignments; - } - - /** - * Returns an ordered list of hosts that are hosting the blocks for this - * region. The weight of each host is the sum of the block lengths of all - * files on that host, so the first host in the list is the server which - * holds the most bytes of the given region's HFiles. - * - * @param fs the filesystem - * @param region region - * @return ordered list of hosts holding blocks of the specified region + * @param servers + * @return */ - @SuppressWarnings("unused") - private List getTopBlockLocations(FileSystem fs, - HRegionInfo region) { - List topServerNames = null; - try { - HTableDescriptor tableDescriptor = getTableDescriptor( - region.getTableName()); - if (tableDescriptor != null) { - HDFSBlocksDistribution blocksDistribution = - HRegion.computeHDFSBlocksDistribution(config, tableDescriptor, - region.getEncodedName()); - List topHosts = blocksDistribution.getTopHosts(); - topServerNames = mapHostNameToServerName(topHosts); - } - } catch (IOException ioe) { - LOG.debug("IOException during HDFSBlocksDistribution computation. for " + - "region = " + region.getEncodedName() , ioe); - } - - return topServerNames; - } + public Map> roundRobinAssignment(List regions, List servers); /** - * return HTableDescriptor for a given tableName - * @param tableName the table name - * @return HTableDescriptor - * @throws IOException + * Assign regions to the previously hosting region server + * @param regions + * @param servers + * @return */ - private HTableDescriptor getTableDescriptor(byte[] tableName) - throws IOException { - HTableDescriptor tableDescriptor = null; - try { - if ( this.services != null) - { - tableDescriptor = this.services.getTableDescriptors(). - get(Bytes.toString(tableName)); - } - } catch (TableExistsException tee) { - LOG.debug("TableExistsException during getTableDescriptors." + - " Current table name = " + tableName , tee); - } catch (FileNotFoundException fnfe) { - LOG.debug("FileNotFoundException during getTableDescriptors." + - " Current table name = " + tableName , fnfe); - } - - return tableDescriptor; - } + public Map> retainAssignment(Map regions, List servers); /** - * Map hostname to ServerName, The output ServerName list will have the same - * order as input hosts. - * @param hosts the list of hosts - * @return ServerName list - */ - private List mapHostNameToServerName(List hosts) { - if ( hosts == null || status == null) { - return null; - } - - List topServerNames = new ArrayList(); - Collection regionServers = status.getServers(); - - // create a mapping from hostname to ServerName for fast lookup - HashMap hostToServerName = - new HashMap(); - for (ServerName sn : regionServers) { - hostToServerName.put(sn.getHostname(), sn); - } - - for (String host : hosts ) { - ServerName sn = hostToServerName.get(host); - // it is possible that HDFS is up ( thus host is valid ), - // but RS is down ( thus sn is null ) - if (sn != null) { - topServerNames.add(sn); - } - } - return topServerNames; - } - - - /** - * Generates an immediate assignment plan to be used by a new master for - * regions in transition that do not have an already known destination. - * - * Takes a list of regions that need immediate assignment and a list of - * all available servers. Returns a map of regions to the server they - * should be assigned to. - * - * This method will return quickly and does not do any intelligent - * balancing. The goal is to make a fast decision not the best decision - * possible. - * - * Currently this is random. - * + * Sync assign a region * @param regions * @param servers - * @return map of regions to the server it should be assigned to + * @return */ - public static Map immediateAssignment( - List regions, List servers) { - Map assignments = - new TreeMap(); - for(HRegionInfo region : regions) { - assignments.put(region, servers.get(RANDOM.nextInt(servers.size()))); - } - return assignments; - } - - public static ServerName randomAssignment(List servers) { - if (servers == null || servers.isEmpty()) { - LOG.warn("Wanted to do random assignment but no servers to assign to"); - return null; - } - return servers.get(RANDOM.nextInt(servers.size())); - } + public Map immediateAssignment(List regions, List servers); /** - * Stores the plan for the move of an individual region. - * - * Contains info for the region being moved, info for the server the region - * should be moved from, and info for the server the region should be moved - * to. - * - * The comparable implementation of this class compares only the region - * information and not the source/dest server info. + * Get a random region server from the list + * @param servers + * @return */ - public static class RegionPlan implements Comparable { - private final HRegionInfo hri; - private final ServerName source; - private ServerName dest; - - /** - * Instantiate a plan for a region move, moving the specified region from - * the specified source server to the specified destination server. - * - * Destination server can be instantiated as null and later set - * with {@link #setDestination(ServerName)}. - * - * @param hri region to be moved - * @param source regionserver region should be moved from - * @param dest regionserver region should be moved to - */ - public RegionPlan(final HRegionInfo hri, ServerName source, ServerName dest) { - this.hri = hri; - this.source = source; - this.dest = dest; - } - - /** - * Set the destination server for the plan for this region. - */ - public void setDestination(ServerName dest) { - this.dest = dest; - } - - /** - * Get the source server for the plan for this region. - * @return server info for source - */ - public ServerName getSource() { - return source; - } - - /** - * Get the destination server for the plan for this region. - * @return server info for destination - */ - public ServerName getDestination() { - return dest; - } - - /** - * Get the encoded region name for the region this plan is for. - * @return Encoded region name - */ - public String getRegionName() { - return this.hri.getEncodedName(); - } - - public HRegionInfo getRegionInfo() { - return this.hri; - } - - /** - * Compare the region info. - * @param o region plan you are comparing against - */ - @Override - public int compareTo(RegionPlan o) { - return getRegionName().compareTo(o.getRegionName()); - } - - @Override - public String toString() { - return "hri=" + this.hri.getRegionNameAsString() + ", src=" + - (this.source == null? "": this.source.toString()) + - ", dest=" + (this.dest == null? "": this.dest.toString()); - } - } + public ServerName randomAssignment(List servers); } diff --git src/main/java/org/apache/hadoop/hbase/master/LoadBalancerFactory.java src/main/java/org/apache/hadoop/hbase/master/LoadBalancerFactory.java new file mode 100644 index 0000000..69f76b3 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/master/LoadBalancerFactory.java @@ -0,0 +1,43 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * The class that creates a load balancer from a conf. + */ +public class LoadBalancerFactory { + + /** + * Create a loadblanacer from the given conf. + * @param conf + * @return + */ + public static LoadBalancer getLoadBalancer(Configuration conf) { + + // Create the balancer + Class balancerKlass = conf.getClass("hbase.maser.loadBalancer.class",DefaultLoadBalancer.class, LoadBalancer.class); + return ReflectionUtils.newInstance(balancerKlass, conf); + + } +} diff --git src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java new file mode 100644 index 0000000..1561e00 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java @@ -0,0 +1,88 @@ +package org.apache.hadoop.hbase.master; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; + +/** + * Stores the plan for the move of an individual region. + * + * Contains info for the region being moved, info for the server the region + * should be moved from, and info for the server the region should be moved + * to. + * + * The comparable implementation of this class compares only the region + * information and not the source/dest server info. + */ +public class RegionPlan implements Comparable { + private final HRegionInfo hri; + private final ServerName source; + private ServerName dest; + + /** + * Instantiate a plan for a region move, moving the specified region from + * the specified source server to the specified destination server. + * + * Destination server can be instantiated as null and later set + * with {@link #setDestination(ServerName)}. + * + * @param hri region to be moved + * @param source regionserver region should be moved from + * @param dest regionserver region should be moved to + */ + public RegionPlan(final HRegionInfo hri, ServerName source, ServerName dest) { + this.hri = hri; + this.source = source; + this.dest = dest; + } + + /** + * Set the destination server for the plan for this region. + */ + public void setDestination(ServerName dest) { + this.dest = dest; + } + + /** + * Get the source server for the plan for this region. + * @return server info for source + */ + public ServerName getSource() { + return source; + } + + /** + * Get the destination server for the plan for this region. + * @return server info for destination + */ + public ServerName getDestination() { + return dest; + } + + /** + * Get the encoded region name for the region this plan is for. + * @return Encoded region name + */ + public String getRegionName() { + return this.hri.getEncodedName(); + } + + public HRegionInfo getRegionInfo() { + return this.hri; + } + + /** + * Compare the region info. + * @param o region plan you are comparing against + */ + @Override + public int compareTo(RegionPlan o) { + return getRegionName().compareTo(o.getRegionName()); + } + + @Override + public String toString() { + return "hri=" + this.hri.getRegionNameAsString() + ", src=" + + (this.source == null? "": this.source.toString()) + + ", dest=" + (this.dest == null? "": this.dest.toString()); + } +} diff --git src/main/java/org/apache/hadoop/hbase/master/ServerAndLoad.java src/main/java/org/apache/hadoop/hbase/master/ServerAndLoad.java new file mode 100644 index 0000000..ac836d9 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/master/ServerAndLoad.java @@ -0,0 +1,31 @@ +package org.apache.hadoop.hbase.master; + + +import org.apache.hadoop.hbase.ServerName; + +/** + * Data structure that holds servername and 'load'. + */ +class ServerAndLoad implements Comparable { + private final ServerName sn; + private final int load; + + ServerAndLoad(final ServerName sn, final int load) { + this.sn = sn; + this.load = load; + } + + ServerName getServerName() { + return this.sn; + } + + int getLoad() { + return this.load; + } + + @Override + public int compareTo(ServerAndLoad other) { + int diff = this.load - other.load; + return diff != 0 ? diff : this.sn.compareTo(other.getServerName()); + } +} \ No newline at end of file diff --git src/test/java/org/apache/hadoop/hbase/master/TestDefaultLoadBalancer.java src/test/java/org/apache/hadoop/hbase/master/TestDefaultLoadBalancer.java new file mode 100644 index 0000000..b45d3db --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/master/TestDefaultLoadBalancer.java @@ -0,0 +1,534 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.*; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; + + +/** + * Test the load balancer that is created by default. + */ +public class TestDefaultLoadBalancer { + private static final Log LOG = LogFactory.getLog(TestDefaultLoadBalancer.class); + private static final Random RANDOM = new Random(System.currentTimeMillis()); + + private static LoadBalancer loadBalancer; + + private static Random rand; + + @BeforeClass + public static void beforeAllTests() throws Exception { + Configuration conf = HBaseConfiguration.create(); + conf.set("hbase.regions.slop", "0"); + loadBalancer = new DefaultLoadBalancer(); + loadBalancer.setConf(conf); + rand = new Random(); + } + + // int[testnum][servernumber] -> numregions + int [][] clusterStateMocks = new int [][] { + // 1 node + new int [] { 0 }, + new int [] { 1 }, + new int [] { 10 }, + // 2 node + new int [] { 0, 0 }, + new int [] { 2, 0 }, + new int [] { 2, 1 }, + new int [] { 2, 2 }, + new int [] { 2, 3 }, + new int [] { 2, 4 }, + new int [] { 1, 1 }, + new int [] { 0, 1 }, + new int [] { 10, 1 }, + new int [] { 14, 1432 }, + new int [] { 47, 53 }, + // 3 node + new int [] { 0, 1, 2 }, + new int [] { 1, 2, 3 }, + new int [] { 0, 2, 2 }, + new int [] { 0, 3, 0 }, + new int [] { 0, 4, 0 }, + new int [] { 20, 20, 0 }, + // 4 node + new int [] { 0, 1, 2, 3 }, + new int [] { 4, 0, 0, 0 }, + new int [] { 5, 0, 0, 0 }, + new int [] { 6, 6, 0, 0 }, + new int [] { 6, 2, 0, 0 }, + new int [] { 6, 1, 0, 0 }, + new int [] { 6, 0, 0, 0 }, + new int [] { 4, 4, 4, 7 }, + new int [] { 4, 4, 4, 8 }, + new int [] { 0, 0, 0, 7 }, + // 5 node + new int [] { 1, 1, 1, 1, 4 }, + // more nodes + new int [] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 }, + new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 10 }, + new int [] { 6, 6, 5, 6, 6, 6, 6, 6, 6, 1 }, + new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 54 }, + new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 55 }, + new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 56 }, + new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 16 }, + new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 8 }, + new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 9 }, + new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 10 }, + new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 123 }, + new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 155 }, + new int [] { 0, 0, 144, 1, 1, 1, 1, 1123, 133, 138, 12, 1444 }, + new int [] { 0, 0, 144, 1, 0, 4, 1, 1123, 133, 138, 12, 1444 }, + new int [] { 1538, 1392, 1561, 1557, 1535, 1553, 1385, 1542, 1619 } + }; + + int [][] regionsAndServersMocks = new int [][] { + // { num regions, num servers } + new int [] { 0, 0 }, + new int [] { 0, 1 }, + new int [] { 1, 1 }, + new int [] { 2, 1 }, + new int [] { 10, 1 }, + new int [] { 1, 2 }, + new int [] { 2, 2 }, + new int [] { 3, 2 }, + new int [] { 1, 3 }, + new int [] { 2, 3 }, + new int [] { 3, 3 }, + new int [] { 25, 3 }, + new int [] { 2, 10 }, + new int [] { 2, 100 }, + new int [] { 12, 10 }, + new int [] { 12, 100 }, + }; + + @Test + public void testRandomizer() { + for(int [] mockCluster : clusterStateMocks) { + if (mockCluster.length < 5) continue; + Map> servers = + mockClusterServers(mockCluster); + for (Map.Entry> e: servers.entrySet()) { + List original = e.getValue(); + if (original.size() < 5) continue; + // Try ten times in case random chances upon original order more than + // one or two times in a row. + boolean same = true; + for (int i = 0; i < 10 && same; i++) { + List copy = new ArrayList(original); + System.out.println("Randomizing before " + copy.size()); + for (HRegionInfo hri: copy) { + System.out.println(hri.getEncodedName()); + } + List randomized = randomize(copy); + System.out.println("Randomizing after " + randomized.size()); + for (HRegionInfo hri: randomized) { + System.out.println(hri.getEncodedName()); + } + if (original.equals(randomized)) continue; + same = false; + break; + } + assertFalse(same); + } + } + } + + + /** + * @param regions + * @return Randomization of passed regions + */ + public List randomize(final List regions) { + Collections.shuffle(regions, RANDOM); + return regions; + } + + /** + * Test the load balancing algorithm. + * + * Invariant is that all servers should be hosting either + * floor(average) or ceiling(average) + * + * @throws Exception + */ + @Test + public void testBalanceCluster() throws Exception { + + for(int [] mockCluster : clusterStateMocks) { + Map> servers = mockClusterServers(mockCluster); + List list = convertToList(servers); + LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list)); + List plans = loadBalancer.balanceCluster(servers); + List balancedCluster = reconcile(list, plans); + LOG.info("Mock Balance : " + printMock(balancedCluster)); + assertClusterAsBalanced(balancedCluster); + for(Map.Entry> entry : servers.entrySet()) { + returnRegions(entry.getValue()); + returnServer(entry.getKey()); + } + } + + } + + /** + * Invariant is that all servers have between floor(avg) and ceiling(avg) + * number of regions. + */ + public void assertClusterAsBalanced(List servers) { + int numServers = servers.size(); + int numRegions = 0; + int maxRegions = 0; + int minRegions = Integer.MAX_VALUE; + for(ServerAndLoad server : servers) { + int nr = server.getLoad(); + if(nr > maxRegions) { + maxRegions = nr; + } + if(nr < minRegions) { + minRegions = nr; + } + numRegions += nr; + } + if(maxRegions - minRegions < 2) { + // less than 2 between max and min, can't balance + return; + } + int min = numRegions / numServers; + int max = numRegions % numServers == 0 ? min : min + 1; + + for(ServerAndLoad server : servers) { + assertTrue(server.getLoad() <= max); + assertTrue(server.getLoad() >= min); + } + } + + /** + * Tests immediate assignment. + * + * Invariant is that all regions have an assignment. + * + * @throws Exception + */ + @Test + public void testImmediateAssignment() throws Exception { + for(int [] mock : regionsAndServersMocks) { + LOG.debug("testImmediateAssignment with " + mock[0] + " regions and " + mock[1] + " servers"); + List regions = randomRegions(mock[0]); + List servers = randomServers(mock[1], 0); + List list = getListOfServerNames(servers); + Map assignments = + loadBalancer.immediateAssignment(regions, list); + assertImmediateAssignment(regions, list, assignments); + returnRegions(regions); + returnServers(list); + } + } + + /** + * All regions have an assignment. + * @param regions + * @param servers + * @param assignments + */ + private void assertImmediateAssignment(List regions, + List servers, Map assignments) { + for(HRegionInfo region : regions) { + assertTrue(assignments.containsKey(region)); + } + } + + /** + * Tests the bulk assignment used during cluster startup. + * + * Round-robin. Should yield a balanced cluster so same invariant as the load + * balancer holds, all servers holding either floor(avg) or ceiling(avg). + * + * @throws Exception + */ + @Test + public void testBulkAssignment() throws Exception { + for(int [] mock : regionsAndServersMocks) { + LOG.debug("testBulkAssignment with " + mock[0] + " regions and " + mock[1] + " servers"); + List regions = randomRegions(mock[0]); + List servers = randomServers(mock[1], 0); + List list = getListOfServerNames(servers); + Map> assignments = + loadBalancer.roundRobinAssignment(regions, list); + float average = (float)regions.size()/servers.size(); + int min = (int)Math.floor(average); + int max = (int)Math.ceil(average); + if(assignments != null && !assignments.isEmpty()) { + for(List regionList : assignments.values()) { + assertTrue(regionList.size() == min || regionList.size() == max); + } + } + returnRegions(regions); + returnServers(list); + } + } + + /** + * Test the cluster startup bulk assignment which attempts to retain + * assignment info. + * @throws Exception + */ + @Test + public void testRetainAssignment() throws Exception { + // Test simple case where all same servers are there + List servers = randomServers(10, 10); + List regions = randomRegions(100); + Map existing = + new TreeMap(); + for (int i = 0; i < regions.size(); i++) { + existing.put(regions.get(i), servers.get(i % servers.size()).getServerName()); + } + List listOfServerNames = getListOfServerNames(servers); + Map> assignment = + loadBalancer.retainAssignment(existing, listOfServerNames); + assertRetainedAssignment(existing, listOfServerNames, assignment); + + // Include two new servers that were not there before + List servers2 = + new ArrayList(servers); + servers2.add(randomServer(10)); + servers2.add(randomServer(10)); + listOfServerNames = getListOfServerNames(servers2); + assignment = loadBalancer.retainAssignment(existing, listOfServerNames); + assertRetainedAssignment(existing, listOfServerNames, assignment); + + // Remove two of the servers that were previously there + List servers3 = + new ArrayList(servers); + servers3.remove(servers3.size()-1); + servers3.remove(servers3.size()-2); + listOfServerNames = getListOfServerNames(servers2); + assignment = loadBalancer.retainAssignment(existing, listOfServerNames); + assertRetainedAssignment(existing, listOfServerNames, assignment); + } + + private List getListOfServerNames(final List sals) { + List list = new ArrayList(); + for (ServerAndLoad e: sals) { + list.add(e.getServerName()); + } + return list; + } + + /** + * Asserts a valid retained assignment plan. + *

+ * Must meet the following conditions: + *

    + *
  • Every input region has an assignment, and to an online server + *
  • If a region had an existing assignment to a server with the same + * address a a currently online server, it will be assigned to it + *
+ * @param existing + * @param servers + * @param assignment + */ + private void assertRetainedAssignment( + Map existing, List servers, + Map> assignment) { + // Verify condition 1, every region assigned, and to online server + Set onlineServerSet = new TreeSet(servers); + Set assignedRegions = new TreeSet(); + for (Map.Entry> a : assignment.entrySet()) { + assertTrue("Region assigned to server that was not listed as online", + onlineServerSet.contains(a.getKey())); + for (HRegionInfo r : a.getValue()) assignedRegions.add(r); + } + assertEquals(existing.size(), assignedRegions.size()); + + // Verify condition 2, if server had existing assignment, must have same + Set onlineAddresses = new TreeSet(); + for (ServerName s : servers) onlineAddresses.add(s); + for (Map.Entry> a : assignment.entrySet()) { + for (HRegionInfo r : a.getValue()) { + ServerName address = existing.get(r); + if (address != null && onlineAddresses.contains(address)) { + assertTrue(a.getKey().equals(address)); + } + } + } + } + + private String printStats(List servers) { + int numServers = servers.size(); + int totalRegions = 0; + for(ServerAndLoad server : servers) { + totalRegions += server.getLoad(); + } + float average = (float)totalRegions / numServers; + int max = (int)Math.ceil(average); + int min = (int)Math.floor(average); + return "[srvr=" + numServers + " rgns=" + totalRegions + " avg=" + average + " max=" + max + " min=" + min + "]"; + } + + private List convertToList(final Map> servers) { + List list = + new ArrayList(servers.size()); + for (Map.Entry> e: servers.entrySet()) { + list.add(new ServerAndLoad(e.getKey(), e.getValue().size())); + } + return list; + } + + private String printMock(Map> servers) { + return printMock(convertToList(servers)); + } + + private String printMock(List balancedCluster) { + SortedSet sorted = + new TreeSet(balancedCluster); + ServerAndLoad [] arr = + sorted.toArray(new ServerAndLoad[sorted.size()]); + StringBuilder sb = new StringBuilder(sorted.size() * 4 + 4); + sb.append("{ "); + for(int i = 0; i < arr.length; i++) { + if (i != 0) { + sb.append(" , "); + } + sb.append(arr[i].getLoad()); + } + sb.append(" }"); + return sb.toString(); + } + + /** + * This assumes the RegionPlan HSI instances are the same ones in the map, so + * actually no need to even pass in the map, but I think it's clearer. + * @param list + * @param plans + * @return + */ + private List reconcile(List list, + List plans) { + List result = + new ArrayList(list.size()); + if (plans == null) return result; + Map map = + new HashMap(list.size()); + for (RegionPlan plan : plans) { + ServerName source = plan.getSource(); + updateLoad(map, source, -1); + ServerName destination = plan.getDestination(); + updateLoad(map, destination, +1); + } + result.clear(); + result.addAll(map.values()); + return result; + } + + private void updateLoad(Map map, + final ServerName sn, final int diff) { + ServerAndLoad sal = map.get(sn); + if (sal == null) return; + sal = new ServerAndLoad(sn, sal.getLoad() + diff); + map.put(sn, sal); + } + + private Map> mockClusterServers( + int [] mockCluster) { + int numServers = mockCluster.length; + Map> servers = + new TreeMap>(); + for(int i = 0; i < numServers; i++) { + int numRegions = mockCluster[i]; + ServerAndLoad sal = randomServer(0); + List regions = randomRegions(numRegions); + servers.put(sal.getServerName(), regions); + } + return servers; + } + + private Queue regionQueue = new LinkedList(); + static int regionId = 0; + + private List randomRegions(int numRegions) { + List regions = new ArrayList(numRegions); + byte [] start = new byte[16]; + byte [] end = new byte[16]; + rand.nextBytes(start); + rand.nextBytes(end); + for(int i=0;i regions) { + regionQueue.addAll(regions); + } + + private Queue serverQueue = new LinkedList(); + + private ServerAndLoad randomServer(final int numRegionsPerServer) { + if (!this.serverQueue.isEmpty()) { + ServerName sn = this.serverQueue.poll(); + return new ServerAndLoad(sn, numRegionsPerServer); + } + String host = "127.0.0.1"; + int port = rand.nextInt(60000); + long startCode = rand.nextLong(); + ServerName sn = new ServerName(host, port, startCode); + return new ServerAndLoad(sn, numRegionsPerServer); + } + + private List randomServers(int numServers, int numRegionsPerServer) { + List servers = + new ArrayList(numServers); + for (int i = 0; i < numServers; i++) { + servers.add(randomServer(numRegionsPerServer)); + } + return servers; + } + + private void returnServer(ServerName server) { + serverQueue.add(server); + } + + private void returnServers(List servers) { + this.serverQueue.addAll(servers); + } +} diff --git src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java deleted file mode 100644 index 78da7fe..0000000 --- src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java +++ /dev/null @@ -1,530 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Random; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeMap; -import java.util.TreeSet; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.BeforeClass; -import org.junit.Test; - -public class TestLoadBalancer { - private static final Log LOG = LogFactory.getLog(TestLoadBalancer.class); - - private static LoadBalancer loadBalancer; - - private static Random rand; - - @BeforeClass - public static void beforeAllTests() throws Exception { - Configuration conf = HBaseConfiguration.create(); - conf.set("hbase.regions.slop", "0"); - loadBalancer = new LoadBalancer(conf); - rand = new Random(); - } - - // int[testnum][servernumber] -> numregions - int [][] clusterStateMocks = new int [][] { - // 1 node - new int [] { 0 }, - new int [] { 1 }, - new int [] { 10 }, - // 2 node - new int [] { 0, 0 }, - new int [] { 2, 0 }, - new int [] { 2, 1 }, - new int [] { 2, 2 }, - new int [] { 2, 3 }, - new int [] { 2, 4 }, - new int [] { 1, 1 }, - new int [] { 0, 1 }, - new int [] { 10, 1 }, - new int [] { 14, 1432 }, - new int [] { 47, 53 }, - // 3 node - new int [] { 0, 1, 2 }, - new int [] { 1, 2, 3 }, - new int [] { 0, 2, 2 }, - new int [] { 0, 3, 0 }, - new int [] { 0, 4, 0 }, - new int [] { 20, 20, 0 }, - // 4 node - new int [] { 0, 1, 2, 3 }, - new int [] { 4, 0, 0, 0 }, - new int [] { 5, 0, 0, 0 }, - new int [] { 6, 6, 0, 0 }, - new int [] { 6, 2, 0, 0 }, - new int [] { 6, 1, 0, 0 }, - new int [] { 6, 0, 0, 0 }, - new int [] { 4, 4, 4, 7 }, - new int [] { 4, 4, 4, 8 }, - new int [] { 0, 0, 0, 7 }, - // 5 node - new int [] { 1, 1, 1, 1, 4 }, - // more nodes - new int [] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 }, - new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 10 }, - new int [] { 6, 6, 5, 6, 6, 6, 6, 6, 6, 1 }, - new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 54 }, - new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 55 }, - new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 56 }, - new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 16 }, - new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 8 }, - new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 9 }, - new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 10 }, - new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 123 }, - new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 155 }, - new int [] { 0, 0, 144, 1, 1, 1, 1, 1123, 133, 138, 12, 1444 }, - new int [] { 0, 0, 144, 1, 0, 4, 1, 1123, 133, 138, 12, 1444 }, - new int [] { 1538, 1392, 1561, 1557, 1535, 1553, 1385, 1542, 1619 } - }; - - int [][] regionsAndServersMocks = new int [][] { - // { num regions, num servers } - new int [] { 0, 0 }, - new int [] { 0, 1 }, - new int [] { 1, 1 }, - new int [] { 2, 1 }, - new int [] { 10, 1 }, - new int [] { 1, 2 }, - new int [] { 2, 2 }, - new int [] { 3, 2 }, - new int [] { 1, 3 }, - new int [] { 2, 3 }, - new int [] { 3, 3 }, - new int [] { 25, 3 }, - new int [] { 2, 10 }, - new int [] { 2, 100 }, - new int [] { 12, 10 }, - new int [] { 12, 100 }, - }; - - @Test - public void testRandomizer() { - for(int [] mockCluster : clusterStateMocks) { - if (mockCluster.length < 5) continue; - Map> servers = - mockClusterServers(mockCluster); - for (Map.Entry> e: servers.entrySet()) { - List original = e.getValue(); - if (original.size() < 5) continue; - // Try ten times in case random chances upon original order more than - // one or two times in a row. - boolean same = true; - for (int i = 0; i < 10 && same; i++) { - List copy = new ArrayList(original); - System.out.println("Randomizing before " + copy.size()); - for (HRegionInfo hri: copy) { - System.out.println(hri.getEncodedName()); - } - List randomized = LoadBalancer.randomize(copy); - System.out.println("Randomizing after " + randomized.size()); - for (HRegionInfo hri: randomized) { - System.out.println(hri.getEncodedName()); - } - if (original.equals(randomized)) continue; - same = false; - break; - } - assertFalse(same); - } - } - } - - /** - * Test the load balancing algorithm. - * - * Invariant is that all servers should be hosting either - * floor(average) or ceiling(average) - * - * @throws Exception - */ - @Test - public void testBalanceCluster() throws Exception { - - for(int [] mockCluster : clusterStateMocks) { - Map> servers = mockClusterServers(mockCluster); - List list = convertToList(servers); - LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list)); - List plans = loadBalancer.balanceCluster(servers); - List balancedCluster = reconcile(list, plans); - LOG.info("Mock Balance : " + printMock(balancedCluster)); - assertClusterAsBalanced(balancedCluster); - for(Map.Entry> entry : servers.entrySet()) { - returnRegions(entry.getValue()); - returnServer(entry.getKey()); - } - } - - } - - /** - * Invariant is that all servers have between floor(avg) and ceiling(avg) - * number of regions. - */ - public void assertClusterAsBalanced(List servers) { - int numServers = servers.size(); - int numRegions = 0; - int maxRegions = 0; - int minRegions = Integer.MAX_VALUE; - for(LoadBalancer.ServerAndLoad server : servers) { - int nr = server.getLoad(); - if(nr > maxRegions) { - maxRegions = nr; - } - if(nr < minRegions) { - minRegions = nr; - } - numRegions += nr; - } - if(maxRegions - minRegions < 2) { - // less than 2 between max and min, can't balance - return; - } - int min = numRegions / numServers; - int max = numRegions % numServers == 0 ? min : min + 1; - - for(LoadBalancer.ServerAndLoad server : servers) { - assertTrue(server.getLoad() <= max); - assertTrue(server.getLoad() >= min); - } - } - - /** - * Tests immediate assignment. - * - * Invariant is that all regions have an assignment. - * - * @throws Exception - */ - @Test - public void testImmediateAssignment() throws Exception { - for(int [] mock : regionsAndServersMocks) { - LOG.debug("testImmediateAssignment with " + mock[0] + " regions and " + mock[1] + " servers"); - List regions = randomRegions(mock[0]); - List servers = randomServers(mock[1], 0); - List list = getListOfServerNames(servers); - Map assignments = - LoadBalancer.immediateAssignment(regions, list); - assertImmediateAssignment(regions, list, assignments); - returnRegions(regions); - returnServers(list); - } - } - - /** - * All regions have an assignment. - * @param regions - * @param servers - * @param assignments - */ - private void assertImmediateAssignment(List regions, - List servers, Map assignments) { - for(HRegionInfo region : regions) { - assertTrue(assignments.containsKey(region)); - } - } - - /** - * Tests the bulk assignment used during cluster startup. - * - * Round-robin. Should yield a balanced cluster so same invariant as the load - * balancer holds, all servers holding either floor(avg) or ceiling(avg). - * - * @throws Exception - */ - @Test - public void testBulkAssignment() throws Exception { - for(int [] mock : regionsAndServersMocks) { - LOG.debug("testBulkAssignment with " + mock[0] + " regions and " + mock[1] + " servers"); - List regions = randomRegions(mock[0]); - List servers = randomServers(mock[1], 0); - List list = getListOfServerNames(servers); - Map> assignments = - LoadBalancer.roundRobinAssignment(regions, list); - float average = (float)regions.size()/servers.size(); - int min = (int)Math.floor(average); - int max = (int)Math.ceil(average); - if(assignments != null && !assignments.isEmpty()) { - for(List regionList : assignments.values()) { - assertTrue(regionList.size() == min || regionList.size() == max); - } - } - returnRegions(regions); - returnServers(list); - } - } - - /** - * Test the cluster startup bulk assignment which attempts to retain - * assignment info. - * @throws Exception - */ - @Test - public void testRetainAssignment() throws Exception { - // Test simple case where all same servers are there - List servers = randomServers(10, 10); - List regions = randomRegions(100); - Map existing = - new TreeMap(); - for (int i = 0; i < regions.size(); i++) { - existing.put(regions.get(i), servers.get(i % servers.size()).getServerName()); - } - List listOfServerNames = getListOfServerNames(servers); - Map> assignment = - LoadBalancer.retainAssignment(existing, listOfServerNames); - assertRetainedAssignment(existing, listOfServerNames, assignment); - - // Include two new servers that were not there before - List servers2 = - new ArrayList(servers); - servers2.add(randomServer(10)); - servers2.add(randomServer(10)); - listOfServerNames = getListOfServerNames(servers2); - assignment = LoadBalancer.retainAssignment(existing, listOfServerNames); - assertRetainedAssignment(existing, listOfServerNames, assignment); - - // Remove two of the servers that were previously there - List servers3 = - new ArrayList(servers); - servers3.remove(servers3.size()-1); - servers3.remove(servers3.size()-2); - listOfServerNames = getListOfServerNames(servers2); - assignment = LoadBalancer.retainAssignment(existing, listOfServerNames); - assertRetainedAssignment(existing, listOfServerNames, assignment); - } - - private List getListOfServerNames(final List sals) { - List list = new ArrayList(); - for (LoadBalancer.ServerAndLoad e: sals) { - list.add(e.getServerName()); - } - return list; - } - - /** - * Asserts a valid retained assignment plan. - *

- * Must meet the following conditions: - *

    - *
  • Every input region has an assignment, and to an online server - *
  • If a region had an existing assignment to a server with the same - * address a a currently online server, it will be assigned to it - *
- * @param existing - * @param servers - * @param assignment - */ - private void assertRetainedAssignment( - Map existing, List servers, - Map> assignment) { - // Verify condition 1, every region assigned, and to online server - Set onlineServerSet = new TreeSet(servers); - Set assignedRegions = new TreeSet(); - for (Map.Entry> a : assignment.entrySet()) { - assertTrue("Region assigned to server that was not listed as online", - onlineServerSet.contains(a.getKey())); - for (HRegionInfo r : a.getValue()) assignedRegions.add(r); - } - assertEquals(existing.size(), assignedRegions.size()); - - // Verify condition 2, if server had existing assignment, must have same - Set onlineAddresses = new TreeSet(); - for (ServerName s : servers) onlineAddresses.add(s); - for (Map.Entry> a : assignment.entrySet()) { - for (HRegionInfo r : a.getValue()) { - ServerName address = existing.get(r); - if (address != null && onlineAddresses.contains(address)) { - assertTrue(a.getKey().equals(address)); - } - } - } - } - - private String printStats(List servers) { - int numServers = servers.size(); - int totalRegions = 0; - for(LoadBalancer.ServerAndLoad server : servers) { - totalRegions += server.getLoad(); - } - float average = (float)totalRegions / numServers; - int max = (int)Math.ceil(average); - int min = (int)Math.floor(average); - return "[srvr=" + numServers + " rgns=" + totalRegions + " avg=" + average + " max=" + max + " min=" + min + "]"; - } - - private List convertToList(final Map> servers) { - List list = - new ArrayList(servers.size()); - for (Map.Entry> e: servers.entrySet()) { - list.add(new LoadBalancer.ServerAndLoad(e.getKey(), e.getValue().size())); - } - return list; - } - - private String printMock(Map> servers) { - return printMock(convertToList(servers)); - } - - private String printMock(List balancedCluster) { - SortedSet sorted = - new TreeSet(balancedCluster); - LoadBalancer.ServerAndLoad [] arr = - sorted.toArray(new LoadBalancer.ServerAndLoad[sorted.size()]); - StringBuilder sb = new StringBuilder(sorted.size() * 4 + 4); - sb.append("{ "); - for(int i = 0; i < arr.length; i++) { - if (i != 0) { - sb.append(" , "); - } - sb.append(arr[i].getLoad()); - } - sb.append(" }"); - return sb.toString(); - } - - /** - * This assumes the RegionPlan HSI instances are the same ones in the map, so - * actually no need to even pass in the map, but I think it's clearer. - * @param servers - * @param plans - * @return - */ - private List reconcile(List list, - List plans) { - List result = - new ArrayList(list.size()); - if (plans == null) return result; - Map map = - new HashMap(list.size()); - for (RegionPlan plan : plans) { - ServerName source = plan.getSource(); - updateLoad(map, source, -1); - ServerName destination = plan.getDestination(); - updateLoad(map, destination, +1); - } - result.clear(); - result.addAll(map.values()); - return result; - } - - private void updateLoad(Map map, - final ServerName sn, final int diff) { - LoadBalancer.ServerAndLoad sal = map.get(sn); - if (sal == null) return; - sal = new LoadBalancer.ServerAndLoad(sn, sal.getLoad() + diff); - map.put(sn, sal); - } - - private Map> mockClusterServers( - int [] mockCluster) { - int numServers = mockCluster.length; - Map> servers = - new TreeMap>(); - for(int i = 0; i < numServers; i++) { - int numRegions = mockCluster[i]; - LoadBalancer.ServerAndLoad sal = randomServer(0); - List regions = randomRegions(numRegions); - servers.put(sal.getServerName(), regions); - } - return servers; - } - - private Queue regionQueue = new LinkedList(); - static int regionId = 0; - - private List randomRegions(int numRegions) { - List regions = new ArrayList(numRegions); - byte [] start = new byte[16]; - byte [] end = new byte[16]; - rand.nextBytes(start); - rand.nextBytes(end); - for(int i=0;i regions) { - regionQueue.addAll(regions); - } - - private Queue serverQueue = new LinkedList(); - - private LoadBalancer.ServerAndLoad randomServer(final int numRegionsPerServer) { - if (!this.serverQueue.isEmpty()) { - ServerName sn = this.serverQueue.poll(); - return new LoadBalancer.ServerAndLoad(sn, numRegionsPerServer); - } - String host = "127.0.0.1"; - int port = rand.nextInt(60000); - long startCode = rand.nextLong(); - ServerName sn = new ServerName(host, port, startCode); - return new LoadBalancer.ServerAndLoad(sn, numRegionsPerServer); - } - - private List randomServers(int numServers, int numRegionsPerServer) { - List servers = - new ArrayList(numServers); - for (int i = 0; i < numServers; i++) { - servers.add(randomServer(numRegionsPerServer)); - } - return servers; - } - - private void returnServer(ServerName server) { - serverQueue.add(server); - } - - private void returnServers(List servers) { - this.serverQueue.addAll(servers); - } -} diff --git src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java index a2ff322..978092f 100644 --- src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java +++ src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java @@ -37,7 +37,7 @@ import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.executor.RegionTransitionData; import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.master.AssignmentManager.RegionState; -import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan; +import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes;