diff --git hbase-server/pom.xml hbase-server/pom.xml
index bd6aa91..3cd96bb 100644
--- hbase-server/pom.xml
+++ hbase-server/pom.xml
@@ -334,6 +334,10 @@
commons-logging
+ org.apache.commons
+ commons-math
+
+ log4jlog4j
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/DefaultLoadBalancer.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/DefaultLoadBalancer.java
deleted file mode 100644
index 7071b4d..0000000
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/DefaultLoadBalancer.java
+++ /dev/null
@@ -1,771 +0,0 @@
-/**
- * Copyright 2011 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hbase.ClusterStatus;
-import org.apache.hadoop.hbase.HDFSBlocksDistribution;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableExistsException;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.MinMaxPriorityQueue;
-import com.google.common.collect.Sets;
-
-/**
- * Makes decisions about the placement and movement of Regions across
- * RegionServers.
- *
- *
Cluster-wide load balancing will occur only when there are no regions in
- * transition and according to a fixed period of a time using {@link #balanceCluster(Map)}.
- *
- *
Inline region placement with {@link #immediateAssignment} can be used when
- * the Master needs to handle closed regions that it currently does not have
- * a destination set for. This can happen during master failover.
- *
- *
On cluster startup, bulk assignment can be used to determine
- * locations for all Regions in a cluster.
- *
- *
This classes produces plans for the {@link AssignmentManager} to execute.
- */
-@InterfaceAudience.Private
-public class DefaultLoadBalancer implements LoadBalancer {
- private static final Log LOG = LogFactory.getLog(LoadBalancer.class);
- private static final Random RANDOM = new Random(System.currentTimeMillis());
- // slop for regions
- private float slop;
- private Configuration config;
- private ClusterStatus status;
- private MasterServices services;
-
- public void setClusterStatus(ClusterStatus st) {
- this.status = st;
- }
-
- public void setMasterServices(MasterServices masterServices) {
- this.services = masterServices;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2);
- if (slop < 0) slop = 0;
- else if (slop > 1) slop = 1;
- this.config = conf;
- }
-
- @Override
- public Configuration getConf() {
- return this.config;
- }
-
- /*
- * The following comparator assumes that RegionId from HRegionInfo can
- * represent the age of the region - larger RegionId means the region
- * is younger.
- * This comparator is used in balanceCluster() to account for the out-of-band
- * regions which were assigned to the server after some other region server
- * crashed.
- */
- private static class RegionInfoComparator implements Comparator {
- @Override
- public int compare(HRegionInfo l, HRegionInfo r) {
- long diff = r.getRegionId() - l.getRegionId();
- if (diff < 0) return -1;
- if (diff > 0) return 1;
- return 0;
- }
- }
-
-
- RegionInfoComparator riComparator = new RegionInfoComparator();
-
- private static class RegionPlanComparator implements Comparator {
- @Override
- public int compare(RegionPlan l, RegionPlan r) {
- long diff = r.getRegionInfo().getRegionId() - l.getRegionInfo().getRegionId();
- if (diff < 0) return -1;
- if (diff > 0) return 1;
- return 0;
- }
- }
-
- RegionPlanComparator rpComparator = new RegionPlanComparator();
-
- /**
- * Generate a global load balancing plan according to the specified map of
- * server information to the most loaded regions of each server.
- *
- * The load balancing invariant is that all servers are within 1 region of the
- * average number of regions per server. If the average is an integer number,
- * all servers will be balanced to the average. Otherwise, all servers will
- * have either floor(average) or ceiling(average) regions.
- *
- * HBASE-3609 Modeled regionsToMove using Guava's MinMaxPriorityQueue so that
- * we can fetch from both ends of the queue.
- * At the beginning, we check whether there was empty region server
- * just discovered by Master. If so, we alternately choose new / old
- * regions from head / tail of regionsToMove, respectively. This alternation
- * avoids clustering young regions on the newly discovered region server.
- * Otherwise, we choose new regions from head of regionsToMove.
- *
- * Another improvement from HBASE-3609 is that we assign regions from
- * regionsToMove to underloaded servers in round-robin fashion.
- * Previously one underloaded server would be filled before we move onto
- * the next underloaded server, leading to clustering of young regions.
- *
- * Finally, we randomly shuffle underloaded servers so that they receive
- * offloaded regions relatively evenly across calls to balanceCluster().
- *
- * The algorithm is currently implemented as such:
- *
- *
- *
Determine the two valid numbers of regions each server should have,
- * MIN=floor(average) and MAX=ceiling(average).
- *
- *
Iterate down the most loaded servers, shedding regions from each so
- * each server hosts exactly MAX regions. Stop once you reach a
- * server that already has <= MAX regions.
- *
- * Order the regions to move from most recent to least.
- *
- *
Iterate down the least loaded servers, assigning regions so each server
- * has exactly MIN regions. Stop once you reach a server that
- * already has >= MIN regions.
- *
- * Regions being assigned to underloaded servers are those that were shed
- * in the previous step. It is possible that there were not enough
- * regions shed to fill each underloaded server to MIN. If so we
- * end up with a number of regions required to do so, neededRegions.
- *
- * It is also possible that we were able to fill each underloaded but ended
- * up with regions that were unassigned from overloaded servers but that
- * still do not have assignment.
- *
- * If neither of these conditions hold (no regions needed to fill the
- * underloaded servers, no regions leftover from overloaded servers),
- * we are done and return. Otherwise we handle these cases below.
- *
- *
If neededRegions is non-zero (still have underloaded servers),
- * we iterate the most loaded servers again, shedding a single server from
- * each (this brings them from having MAX regions to having
- * MIN regions).
- *
- *
We now definitely have more regions that need assignment, either from
- * the previous step or from the original shedding from overloaded servers.
- * Iterate the least loaded servers filling each to MIN.
- *
- *
If we still have more regions that need assignment, again iterate the
- * least loaded servers, this time giving each one (filling them to
- * MAX) until we run out.
- *
- *
All servers will now either host MIN or MAX regions.
- *
- * In addition, any server hosting >= MAX regions is guaranteed
- * to end up with MAX regions at the end of the balancing. This
- * ensures the minimal number of regions possible are moved.
- *
- *
- * TODO: We can at-most reassign the number of regions away from a particular
- * server to be how many they report as most loaded.
- * Should we just keep all assignment in memory? Any objections?
- * Does this mean we need HeapSize on HMaster? Or just careful monitor?
- * (current thinking is we will hold all assignments in memory)
- *
- * @param clusterState Map of regionservers and their load/region information to
- * a list of their most loaded regions
- * @return a list of regions to be moved, including source and destination,
- * or null if cluster is already balanced
- */
- public List balanceCluster(
- Map> clusterState) {
- boolean emptyRegionServerPresent = false;
- long startTime = System.currentTimeMillis();
-
- int numServers = clusterState.size();
- if (numServers == 0) {
- LOG.debug("numServers=0 so skipping load balancing");
- return null;
- }
- NavigableMap> serversByLoad =
- new TreeMap>();
- int numRegions = 0;
- // Iterate so we can count regions as we build the map
- for (Map.Entry> server: clusterState.entrySet()) {
- List regions = server.getValue();
- int sz = regions.size();
- if (sz == 0) emptyRegionServerPresent = true;
- numRegions += sz;
- serversByLoad.put(new ServerAndLoad(server.getKey(), sz), regions);
- }
- // Check if we even need to do any load balancing
- float average = (float)numRegions / numServers; // for logging
- // HBASE-3681 check sloppiness first
- int floor = (int) Math.floor(average * (1 - slop));
- int ceiling = (int) Math.ceil(average * (1 + slop));
- if (serversByLoad.lastKey().getLoad() <= ceiling &&
- serversByLoad.firstKey().getLoad() >= floor) {
- // Skipped because no server outside (min,max) range
- LOG.info("Skipping load balancing because balanced cluster; " +
- "servers=" + numServers + " " +
- "regions=" + numRegions + " average=" + average + " " +
- "mostloaded=" + serversByLoad.lastKey().getLoad() +
- " leastloaded=" + serversByLoad.firstKey().getLoad());
- return null;
- }
- int min = numRegions / numServers;
- int max = numRegions % numServers == 0 ? min : min + 1;
-
- // Using to check balance result.
- StringBuilder strBalanceParam = new StringBuilder();
- strBalanceParam.append("Balance parameter: numRegions=").append(numRegions)
- .append(", numServers=").append(numServers).append(", max=").append(max)
- .append(", min=").append(min);
- LOG.debug(strBalanceParam.toString());
-
- // Balance the cluster
- // TODO: Look at data block locality or a more complex load to do this
- MinMaxPriorityQueue regionsToMove =
- MinMaxPriorityQueue.orderedBy(rpComparator).create();
- List regionsToReturn = new ArrayList();
-
- // Walk down most loaded, pruning each to the max
- int serversOverloaded = 0;
- // flag used to fetch regions from head and tail of list, alternately
- boolean fetchFromTail = false;
- Map serverBalanceInfo =
- new TreeMap();
- for (Map.Entry> server:
- serversByLoad.descendingMap().entrySet()) {
- ServerAndLoad sal = server.getKey();
- int regionCount = sal.getLoad();
- if (regionCount <= max) {
- serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0));
- break;
- }
- serversOverloaded++;
- List regions = server.getValue();
- int numToOffload = Math.min(regionCount - max, regions.size());
- // account for the out-of-band regions which were assigned to this server
- // after some other region server crashed
- Collections.sort(regions, riComparator);
- int numTaken = 0;
- for (int i = 0; i <= numToOffload; ) {
- HRegionInfo hri = regions.get(i); // fetch from head
- if (fetchFromTail) {
- hri = regions.get(regions.size() - 1 - i);
- }
- i++;
- // Don't rebalance meta regions.
- if (hri.isMetaRegion()) continue;
- regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null));
- numTaken++;
- if (numTaken >= numToOffload) break;
- // fetch in alternate order if there is new region server
- if (emptyRegionServerPresent) {
- fetchFromTail = !fetchFromTail;
- }
- }
- serverBalanceInfo.put(sal.getServerName(),
- new BalanceInfo(numToOffload, (-1)*numTaken));
- }
- int totalNumMoved = regionsToMove.size();
-
- // Walk down least loaded, filling each to the min
- int neededRegions = 0; // number of regions needed to bring all up to min
- fetchFromTail = false;
-
- Map underloadedServers = new HashMap();
- for (Map.Entry> server:
- serversByLoad.entrySet()) {
- int regionCount = server.getKey().getLoad();
- if (regionCount >= min) {
- break;
- }
- underloadedServers.put(server.getKey().getServerName(), min - regionCount);
- }
- // number of servers that get new regions
- int serversUnderloaded = underloadedServers.size();
- int incr = 1;
- List sns =
- Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded]));
- Collections.shuffle(sns, RANDOM);
- while (regionsToMove.size() > 0) {
- int cnt = 0;
- int i = incr > 0 ? 0 : underloadedServers.size()-1;
- for (; i >= 0 && i < underloadedServers.size(); i += incr) {
- if (regionsToMove.isEmpty()) break;
- ServerName si = sns.get(i);
- int numToTake = underloadedServers.get(si);
- if (numToTake == 0) continue;
-
- addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn);
- if (emptyRegionServerPresent) {
- fetchFromTail = !fetchFromTail;
- }
-
- underloadedServers.put(si, numToTake-1);
- cnt++;
- BalanceInfo bi = serverBalanceInfo.get(si);
- if (bi == null) {
- bi = new BalanceInfo(0, 0);
- serverBalanceInfo.put(si, bi);
- }
- bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1);
- }
- if (cnt == 0) break;
- // iterates underloadedServers in the other direction
- incr = -incr;
- }
- for (Integer i : underloadedServers.values()) {
- // If we still want to take some, increment needed
- neededRegions += i;
- }
-
- // If none needed to fill all to min and none left to drain all to max,
- // we are done
- if (neededRegions == 0 && regionsToMove.isEmpty()) {
- long endTime = System.currentTimeMillis();
- LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " +
- "Moving " + totalNumMoved + " regions off of " +
- serversOverloaded + " overloaded servers onto " +
- serversUnderloaded + " less loaded servers");
- return regionsToReturn;
- }
-
- // Need to do a second pass.
- // Either more regions to assign out or servers that are still underloaded
-
- // If we need more to fill min, grab one from each most loaded until enough
- if (neededRegions != 0) {
- // Walk down most loaded, grabbing one from each until we get enough
- for (Map.Entry> server :
- serversByLoad.descendingMap().entrySet()) {
- BalanceInfo balanceInfo =
- serverBalanceInfo.get(server.getKey().getServerName());
- int idx =
- balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload();
- if (idx >= server.getValue().size()) break;
- HRegionInfo region = server.getValue().get(idx);
- if (region.isMetaRegion()) continue; // Don't move meta regions.
- regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null));
- totalNumMoved++;
- if (--neededRegions == 0) {
- // No more regions needed, done shedding
- break;
- }
- }
- }
-
- // Now we have a set of regions that must be all assigned out
- // Assign each underloaded up to the min, then if leftovers, assign to max
-
- // Walk down least loaded, assigning to each to fill up to min
- for (Map.Entry> server :
- serversByLoad.entrySet()) {
- int regionCount = server.getKey().getLoad();
- if (regionCount >= min) break;
- BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
- if(balanceInfo != null) {
- regionCount += balanceInfo.getNumRegionsAdded();
- }
- if(regionCount >= min) {
- continue;
- }
- int numToTake = min - regionCount;
- int numTaken = 0;
- while(numTaken < numToTake && 0 < regionsToMove.size()) {
- addRegionPlan(regionsToMove, fetchFromTail,
- server.getKey().getServerName(), regionsToReturn);
- numTaken++;
- if (emptyRegionServerPresent) {
- fetchFromTail = !fetchFromTail;
- }
- }
- }
-
- // If we still have regions to dish out, assign underloaded to max
- if (0 < regionsToMove.size()) {
- for (Map.Entry> server :
- serversByLoad.entrySet()) {
- int regionCount = server.getKey().getLoad();
- if(regionCount >= max) {
- break;
- }
- addRegionPlan(regionsToMove, fetchFromTail,
- server.getKey().getServerName(), regionsToReturn);
- if (emptyRegionServerPresent) {
- fetchFromTail = !fetchFromTail;
- }
- if (regionsToMove.isEmpty()) {
- break;
- }
- }
- }
-
- long endTime = System.currentTimeMillis();
-
- if (!regionsToMove.isEmpty() || neededRegions != 0) {
- // Emit data so can diagnose how balancer went astray.
- LOG.warn("regionsToMove=" + totalNumMoved +
- ", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded +
- ", serversUnderloaded=" + serversUnderloaded);
- StringBuilder sb = new StringBuilder();
- for (Map.Entry> e: clusterState.entrySet()) {
- if (sb.length() > 0) sb.append(", ");
- sb.append(e.getKey().toString());
- sb.append(" ");
- sb.append(e.getValue().size());
- }
- LOG.warn("Input " + sb.toString());
- }
-
- // All done!
- LOG.info("Done. Calculated a load balance in " + (endTime-startTime) + "ms. " +
- "Moving " + totalNumMoved + " regions off of " +
- serversOverloaded + " overloaded servers onto " +
- serversUnderloaded + " less loaded servers");
-
- return regionsToReturn;
- }
-
- /**
- * Add a region from the head or tail to the List of regions to return.
- */
- void addRegionPlan(final MinMaxPriorityQueue regionsToMove,
- final boolean fetchFromTail, final ServerName sn, List regionsToReturn) {
- RegionPlan rp = null;
- if (!fetchFromTail) rp = regionsToMove.remove();
- else rp = regionsToMove.removeLast();
- rp.setDestination(sn);
- regionsToReturn.add(rp);
- }
-
- /**
- * Stores additional per-server information about the regions added/removed
- * during the run of the balancing algorithm.
- *
- * For servers that shed regions, we need to track which regions we have
- * already shed. nextRegionForUnload contains the index in the list
- * of regions on the server that is the next to be shed.
- */
- private static class BalanceInfo {
-
- private final int nextRegionForUnload;
- private int numRegionsAdded;
-
- public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) {
- this.nextRegionForUnload = nextRegionForUnload;
- this.numRegionsAdded = numRegionsAdded;
- }
-
- public int getNextRegionForUnload() {
- return nextRegionForUnload;
- }
-
- public int getNumRegionsAdded() {
- return numRegionsAdded;
- }
-
- public void setNumRegionsAdded(int numAdded) {
- this.numRegionsAdded = numAdded;
- }
- }
-
- /**
- * Generates a bulk assignment plan to be used on cluster startup using a
- * simple round-robin assignment.
- *
- * Takes a list of all the regions and all the servers in the cluster and
- * returns a map of each server to the regions that it should be assigned.
- *
- * Currently implemented as a round-robin assignment. Same invariant as
- * load balancing, all servers holding floor(avg) or ceiling(avg).
- *
- * TODO: Use block locations from HDFS to place regions with their blocks
- *
- * @param regions all regions
- * @param servers all servers
- * @return map of server to the regions it should take, or null if no
- * assignment is possible (ie. no regions or no servers)
- */
- public Map> roundRobinAssignment(
- List regions, List servers) {
- if (regions.isEmpty() || servers.isEmpty()) {
- return null;
- }
- Map> assignments =
- new TreeMap>();
- int numRegions = regions.size();
- int numServers = servers.size();
- int max = (int)Math.ceil((float)numRegions/numServers);
- int serverIdx = 0;
- if (numServers > 1) {
- serverIdx = RANDOM.nextInt(numServers);
- }
- int regionIdx = 0;
- for (int j = 0; j < numServers; j++) {
- ServerName server = servers.get((j + serverIdx) % numServers);
- List serverRegions = new ArrayList(max);
- for (int i=regionIdx; i
- * Takes a map of all regions to their existing assignment from META. Also
- * takes a list of online servers for regions to be assigned to. Attempts to
- * retain all assignment, so in some instances initial assignment will not be
- * completely balanced.
- *
- * Any leftover regions without an existing server to be assigned to will be
- * assigned randomly to available servers.
- * @param regions regions and existing assignment from meta
- * @param servers available servers
- * @return map of servers and regions to be assigned to them
- */
- public Map> retainAssignment(
- Map regions, List servers) {
- // Group all of the old assignments by their hostname.
- // We can't group directly by ServerName since the servers all have
- // new start-codes.
-
- // Group the servers by their hostname. It's possible we have multiple
- // servers on the same host on different ports.
- ArrayListMultimap serversByHostname =
- ArrayListMultimap.create();
- for (ServerName server : servers) {
- serversByHostname.put(server.getHostname(), server);
- }
-
- // Now come up with new assignments
- Map> assignments =
- new TreeMap>();
-
- for (ServerName server : servers) {
- assignments.put(server, new ArrayList());
- }
-
- // Collection of the hostnames that used to have regions
- // assigned, but for which we no longer have any RS running
- // after the cluster restart.
- Set oldHostsNoLongerPresent = Sets.newTreeSet();
-
- int numRandomAssignments = 0;
- int numRetainedAssigments = 0;
- for (Map.Entry entry : regions.entrySet()) {
- HRegionInfo region = entry.getKey();
- ServerName oldServerName = entry.getValue();
- List localServers = new ArrayList();
- if (oldServerName != null) {
- localServers = serversByHostname.get(oldServerName.getHostname());
- }
- if (localServers.isEmpty()) {
- // No servers on the new cluster match up with this hostname,
- // assign randomly.
- ServerName randomServer = servers.get(RANDOM.nextInt(servers.size()));
- assignments.get(randomServer).add(region);
- numRandomAssignments++;
- if (oldServerName != null) oldHostsNoLongerPresent.add(oldServerName.getHostname());
- } else if (localServers.size() == 1) {
- // the usual case - one new server on same host
- assignments.get(localServers.get(0)).add(region);
- numRetainedAssigments++;
- } else {
- // multiple new servers in the cluster on this same host
- int size = localServers.size();
- ServerName target = localServers.get(RANDOM.nextInt(size));
- assignments.get(target).add(region);
- numRetainedAssigments++;
- }
- }
-
- String randomAssignMsg = "";
- if (numRandomAssignments > 0) {
- randomAssignMsg = numRandomAssignments + " regions were assigned " +
- "to random hosts, since the old hosts for these regions are no " +
- "longer present in the cluster. These hosts were:\n " +
- Joiner.on("\n ").join(oldHostsNoLongerPresent);
- }
-
- LOG.info("Reassigned " + regions.size() + " regions. " +
- numRetainedAssigments + " retained the pre-restart assignment. " +
- randomAssignMsg);
- return assignments;
- }
-
- /**
- * Returns an ordered list of hosts that are hosting the blocks for this
- * region. The weight of each host is the sum of the block lengths of all
- * files on that host, so the first host in the list is the server which
- * holds the most bytes of the given region's HFiles.
- *
- * @param fs the filesystem
- * @param region region
- * @return ordered list of hosts holding blocks of the specified region
- */
- @SuppressWarnings("unused")
- private List getTopBlockLocations(FileSystem fs,
- HRegionInfo region) {
- List topServerNames = null;
- try {
- HTableDescriptor tableDescriptor = getTableDescriptor(
- region.getTableName());
- if (tableDescriptor != null) {
- HDFSBlocksDistribution blocksDistribution =
- HRegion.computeHDFSBlocksDistribution(config, tableDescriptor,
- region.getEncodedName());
- List topHosts = blocksDistribution.getTopHosts();
- topServerNames = mapHostNameToServerName(topHosts);
- }
- } catch (IOException ioe) {
- LOG.debug("IOException during HDFSBlocksDistribution computation. for " +
- "region = " + region.getEncodedName() , ioe);
- }
-
- return topServerNames;
- }
-
- /**
- * return HTableDescriptor for a given tableName
- * @param tableName the table name
- * @return HTableDescriptor
- * @throws IOException
- */
- private HTableDescriptor getTableDescriptor(byte[] tableName)
- throws IOException {
- HTableDescriptor tableDescriptor = null;
- try {
- if ( this.services != null)
- {
- tableDescriptor = this.services.getTableDescriptors().
- get(Bytes.toString(tableName));
- }
- } catch (FileNotFoundException fnfe) {
- LOG.debug("FileNotFoundException during getTableDescriptors."
- + " Current table name = " + Bytes.toStringBinary(tableName), fnfe);
- }
-
- return tableDescriptor;
- }
-
- /**
- * Map hostname to ServerName, The output ServerName list will have the same
- * order as input hosts.
- * @param hosts the list of hosts
- * @return ServerName list
- */
- private List mapHostNameToServerName(List hosts) {
- if ( hosts == null || status == null) {
- return null;
- }
-
- List topServerNames = new ArrayList();
- Collection regionServers = status.getServers();
-
- // create a mapping from hostname to ServerName for fast lookup
- HashMap hostToServerName =
- new HashMap();
- for (ServerName sn : regionServers) {
- hostToServerName.put(sn.getHostname(), sn);
- }
-
- for (String host : hosts ) {
- ServerName sn = hostToServerName.get(host);
- // it is possible that HDFS is up ( thus host is valid ),
- // but RS is down ( thus sn is null )
- if (sn != null) {
- topServerNames.add(sn);
- }
- }
- return topServerNames;
- }
-
-
- /**
- * Generates an immediate assignment plan to be used by a new master for
- * regions in transition that do not have an already known destination.
- *
- * Takes a list of regions that need immediate assignment and a list of
- * all available servers. Returns a map of regions to the server they
- * should be assigned to.
- *
- * This method will return quickly and does not do any intelligent
- * balancing. The goal is to make a fast decision not the best decision
- * possible.
- *
- * Currently this is random.
- *
- * @param regions
- * @param servers
- * @return map of regions to the server it should be assigned to
- */
- public Map immediateAssignment(
- List regions, List servers) {
- Map assignments =
- new TreeMap();
- for(HRegionInfo region : regions) {
- assignments.put(region, servers.get(RANDOM.nextInt(servers.size())));
- }
- return assignments;
- }
-
- public ServerName randomAssignment(HRegionInfo regionInfo,
- List servers) {
- if (servers == null || servers.isEmpty()) {
- LOG.warn("Wanted to do random assignment but no servers to assign to");
- return null;
- }
- return servers.get(RANDOM.nextInt(servers.size()));
- }
-
-}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index a3af820..f9a003b 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -82,6 +82,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.handler.CreateTableHandler;
import org.apache.hadoop.hbase.master.handler.DeleteTableHandler;
import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
@@ -1135,6 +1136,8 @@ Server {
this.assignmentManager.getAssignmentsByTable();
List plans = new ArrayList();
+ //Give the balancer the current cluster state.
+ this.balancer.setClusterStatus(getClusterStatus());
for (Map> assignments : assignmentsByTable.values()) {
List partialPlans = this.balancer.balanceCluster(assignments);
if (partialPlans != null) plans.addAll(partialPlans);
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
index 135b94d..5ef7df8 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
@@ -19,15 +19,15 @@
*/
package org.apache.hadoop.hbase.master;
+import java.util.List;
+import java.util.Map;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
-import java.util.List;
-import java.util.Map;
-
/**
* Makes decisions about the placement and movement of Regions across
* RegionServers.
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancerFactory.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancerFactory.java
deleted file mode 100644
index de6f1fb..0000000
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancerFactory.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Copyright 2011 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.master;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.util.ReflectionUtils;
-
-/**
- * The class that creates a load balancer from a conf.
- */
-@InterfaceAudience.Private
-public class LoadBalancerFactory {
-
- /**
- * Create a loadblanacer from the given conf.
- * @param conf
- * @return A {@link LoadBalancer}
- */
- public static LoadBalancer getLoadBalancer(Configuration conf) {
-
- // Create the balancer
- Class extends LoadBalancer> balancerKlass = conf.getClass(
- HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
- DefaultLoadBalancer.class, LoadBalancer.class);
- return ReflectionUtils.newInstance(balancerKlass, conf);
-
- }
-}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java
index 6fbf9ab..6d85dc8 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.master;
+import java.io.Serializable;
+import java.util.Comparator;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
@@ -37,6 +40,19 @@ public class RegionPlan implements Comparable {
private final ServerName source;
private ServerName dest;
+ public static class RegionPlanComparator implements Comparator, Serializable {
+
+ private static final long serialVersionUID = 4213207330485734853L;
+
+ @Override
+ public int compare(RegionPlan l, RegionPlan r) {
+ long diff = r.getRegionInfo().getRegionId() - l.getRegionInfo().getRegionId();
+ if (diff < 0) return -1;
+ if (diff > 0) return 1;
+ return 0;
+ }
+ }
+
/**
* Instantiate a plan for a region move, moving the specified region from
* the specified source server to the specified destination server.
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerAndLoad.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerAndLoad.java
deleted file mode 100644
index 8c7dadf..0000000
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerAndLoad.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master;
-
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.ServerName;
-
-/**
- * Data structure that holds servername and 'load'.
- */
-@InterfaceAudience.Private
-class ServerAndLoad implements Comparable {
- private final ServerName sn;
- private final int load;
-
- ServerAndLoad(final ServerName sn, final int load) {
- this.sn = sn;
- this.load = load;
- }
-
- ServerName getServerName() {
- return this.sn;
- }
-
- int getLoad() {
- return this.load;
- }
-
- @Override
- public int compareTo(ServerAndLoad other) {
- int diff = this.load - other.load;
- return diff != 0 ? diff : this.sn.compareTo(other.getServerName());
- }
-}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
new file mode 100644
index 0000000..e40322f
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.master.LoadBalancer;
+import org.apache.hadoop.hbase.master.MasterServices;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Sets;
+
+/**
+ * The base class for load balancers. It provides the the functions used to by
+ * {@link AssignmentManager} to assign regions in the edge cases. It doesn't
+ * provide an implementation of the actual balancing algorithm.
+ *
+ */
+public abstract class BaseLoadBalancer implements LoadBalancer {
+
+ // slop for regions
+ private float slop;
+ private Configuration config;
+ private static final Random RANDOM = new Random(System.currentTimeMillis());
+ private static final Log LOG = LogFactory.getLog(BaseLoadBalancer.class);
+
+ protected MasterServices services;
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2);
+ if (slop < 0) slop = 0;
+ else if (slop > 1) slop = 1;
+ this.config = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return this.config;
+ }
+
+ public void setClusterStatus(ClusterStatus st) {
+ // Not used except for the StocasticBalancer
+ }
+
+ public void setMasterServices(MasterServices masterServices) {
+ this.services = masterServices;
+ }
+
+ protected boolean needsBalance(ClusterLoadState cs) {
+ // Check if we even need to do any load balancing
+ float average = cs.getLoadAverage(); // for logging
+ // HBASE-3681 check sloppiness first
+ int floor = (int) Math.floor(average * (1 - slop));
+ int ceiling = (int) Math.ceil(average * (1 + slop));
+
+ return cs.getMinLoad() > ceiling || cs.getMaxLoad() < floor;
+ }
+
+ /**
+ * Generates a bulk assignment plan to be used on cluster startup using a
+ * simple round-robin assignment.
+ *
+ * Takes a list of all the regions and all the servers in the cluster and
+ * returns a map of each server to the regions that it should be assigned.
+ *
+ * Currently implemented as a round-robin assignment. Same invariant as load
+ * balancing, all servers holding floor(avg) or ceiling(avg).
+ *
+ * TODO: Use block locations from HDFS to place regions with their blocks
+ *
+ * @param regions all regions
+ * @param servers all servers
+ * @return map of server to the regions it should take, or null if no
+ * assignment is possible (ie. no regions or no servers)
+ */
+ public Map> roundRobinAssignment(List regions,
+ List servers) {
+ if (regions.isEmpty() || servers.isEmpty()) {
+ return null;
+ }
+ Map> assignments = new TreeMap>();
+ int numRegions = regions.size();
+ int numServers = servers.size();
+ int max = (int) Math.ceil((float) numRegions / numServers);
+ int serverIdx = 0;
+ if (numServers > 1) {
+ serverIdx = RANDOM.nextInt(numServers);
+ }
+ int regionIdx = 0;
+ for (int j = 0; j < numServers; j++) {
+ ServerName server = servers.get((j + serverIdx) % numServers);
+ List serverRegions = new ArrayList(max);
+ for (int i = regionIdx; i < numRegions; i += numServers) {
+ serverRegions.add(regions.get(i % numRegions));
+ }
+ assignments.put(server, serverRegions);
+ regionIdx++;
+ }
+ return assignments;
+ }
+
+ /**
+ * Generates an immediate assignment plan to be used by a new master for
+ * regions in transition that do not have an already known destination.
+ *
+ * Takes a list of regions that need immediate assignment and a list of all
+ * available servers. Returns a map of regions to the server they should be
+ * assigned to.
+ *
+ * This method will return quickly and does not do any intelligent balancing.
+ * The goal is to make a fast decision not the best decision possible.
+ *
+ * Currently this is random.
+ *
+ * @param regions
+ * @param servers
+ * @return map of regions to the server it should be assigned to
+ */
+ public Map immediateAssignment(List regions,
+ List servers) {
+ Map assignments = new TreeMap();
+ for (HRegionInfo region : regions) {
+ assignments.put(region, randomAssignment(region, servers));
+ }
+ return assignments;
+ }
+
+ /**
+ * Used to assign a single region to a random server.
+ */
+ public ServerName randomAssignment(HRegionInfo regionInfo, List servers) {
+ if (servers == null || servers.isEmpty()) {
+ LOG.warn("Wanted to do random assignment but no servers to assign to");
+ return null;
+ }
+ return servers.get(RANDOM.nextInt(servers.size()));
+ }
+
+ /**
+ * Generates a bulk assignment startup plan, attempting to reuse the existing
+ * assignment information from META, but adjusting for the specified list of
+ * available/online servers available for assignment.
+ *
+ * Takes a map of all regions to their existing assignment from META. Also
+ * takes a list of online servers for regions to be assigned to. Attempts to
+ * retain all assignment, so in some instances initial assignment will not be
+ * completely balanced.
+ *
+ * Any leftover regions without an existing server to be assigned to will be
+ * assigned randomly to available servers.
+ *
+ * @param regions regions and existing assignment from meta
+ * @param servers available servers
+ * @return map of servers and regions to be assigned to them
+ */
+ public Map> retainAssignment(Map regions,
+ List servers) {
+ // Group all of the old assignments by their hostname.
+ // We can't group directly by ServerName since the servers all have
+ // new start-codes.
+
+ // Group the servers by their hostname. It's possible we have multiple
+ // servers on the same host on different ports.
+ ArrayListMultimap serversByHostname = ArrayListMultimap.create();
+ for (ServerName server : servers) {
+ serversByHostname.put(server.getHostname(), server);
+ }
+
+ // Now come up with new assignments
+ Map> assignments = new TreeMap>();
+
+ for (ServerName server : servers) {
+ assignments.put(server, new ArrayList());
+ }
+
+ // Collection of the hostnames that used to have regions
+ // assigned, but for which we no longer have any RS running
+ // after the cluster restart.
+ Set oldHostsNoLongerPresent = Sets.newTreeSet();
+
+ int numRandomAssignments = 0;
+ int numRetainedAssigments = 0;
+ for (Map.Entry entry : regions.entrySet()) {
+ HRegionInfo region = entry.getKey();
+ ServerName oldServerName = entry.getValue();
+ List localServers = new ArrayList();
+ if (oldServerName != null) {
+ localServers = serversByHostname.get(oldServerName.getHostname());
+ }
+ if (localServers.isEmpty()) {
+ // No servers on the new cluster match up with this hostname,
+ // assign randomly.
+ ServerName randomServer = servers.get(RANDOM.nextInt(servers.size()));
+ assignments.get(randomServer).add(region);
+ numRandomAssignments++;
+ if (oldServerName != null) oldHostsNoLongerPresent.add(oldServerName.getHostname());
+ } else if (localServers.size() == 1) {
+ // the usual case - one new server on same host
+ assignments.get(localServers.get(0)).add(region);
+ numRetainedAssigments++;
+ } else {
+ // multiple new servers in the cluster on this same host
+ int size = localServers.size();
+ ServerName target = localServers.get(RANDOM.nextInt(size));
+ assignments.get(target).add(region);
+ numRetainedAssigments++;
+ }
+ }
+
+ String randomAssignMsg = "";
+ if (numRandomAssignments > 0) {
+ randomAssignMsg =
+ numRandomAssignments + " regions were assigned "
+ + "to random hosts, since the old hosts for these regions are no "
+ + "longer present in the cluster. These hosts were:\n "
+ + Joiner.on("\n ").join(oldHostsNoLongerPresent);
+ }
+
+ LOG.info("Reassigned " + regions.size() + " regions. " + numRetainedAssigments
+ + " retained the pre-restart assignment. " + randomAssignMsg);
+ return assignments;
+ }
+
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/ClusterLoadState.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/ClusterLoadState.java
new file mode 100644
index 0000000..9bf4b59
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/ClusterLoadState.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+
+/**
+ * Class used to hold the current state of the cluster and how balanced it is.
+ */
+public class ClusterLoadState {
+ private final Map> clusterState;
+ private final NavigableMap> serversByLoad;
+ private boolean emptyRegionServerPresent = false;
+ private int numRegions = 0;
+ private int numServers = 0;
+
+ public ClusterLoadState(Map> clusterState) {
+ super();
+ this.numRegions = 0;
+ this.numServers = clusterState.size();
+ this.clusterState = clusterState;
+ serversByLoad = new TreeMap>();
+ // Iterate so we can count regions as we build the map
+ for (Map.Entry> server : clusterState.entrySet()) {
+ List regions = server.getValue();
+ int sz = regions.size();
+ if (sz == 0) emptyRegionServerPresent = true;
+ numRegions += sz;
+ serversByLoad.put(new ServerAndLoad(server.getKey(), sz), regions);
+ }
+ }
+
+ Map> getClusterState() {
+ return clusterState;
+ }
+
+ NavigableMap> getServersByLoad() {
+ return serversByLoad;
+ }
+
+ boolean isEmptyRegionServerPresent() {
+ return emptyRegionServerPresent;
+ }
+
+ int getNumRegions() {
+ return numRegions;
+ }
+
+ int getNumServers() {
+ return numServers;
+ }
+
+ float getLoadAverage() {
+ return (float) numRegions / numServers;
+ }
+
+ int getMinLoad() {
+ return getServersByLoad().lastKey().getLoad();
+ }
+
+ int getMaxLoad() {
+ return getServersByLoad().firstKey().getLoad();
+ }
+
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DefaultLoadBalancer.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DefaultLoadBalancer.java
new file mode 100644
index 0000000..b68901e
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/DefaultLoadBalancer.java
@@ -0,0 +1,436 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.master.RegionPlan;
+
+import com.google.common.collect.MinMaxPriorityQueue;
+
+/**
+ * Makes decisions about the placement and movement of Regions across
+ * RegionServers.
+ *
+ *
Cluster-wide load balancing will occur only when there are no regions in
+ * transition and according to a fixed period of a time using {@link #balanceCluster(Map)}.
+ *
+ *
Inline region placement with {@link #immediateAssignment} can be used when
+ * the Master needs to handle closed regions that it currently does not have
+ * a destination set for. This can happen during master failover.
+ *
+ *
On cluster startup, bulk assignment can be used to determine
+ * locations for all Regions in a cluster.
+ *
+ *
This classes produces plans for the {@link AssignmentManager} to execute.
+ */
+@InterfaceAudience.Private
+public class DefaultLoadBalancer extends BaseLoadBalancer {
+ private static final Log LOG = LogFactory.getLog(DefaultLoadBalancer.class);
+ private static final Random RANDOM = new Random(System.currentTimeMillis());
+
+ private RegionInfoComparator riComparator = new RegionInfoComparator();
+ private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator();
+
+
+ /**
+ * Stores additional per-server information about the regions added/removed
+ * during the run of the balancing algorithm.
+ *
+ * For servers that shed regions, we need to track which regions we have already
+ * shed. nextRegionForUnload contains the index in the list of regions on
+ * the server that is the next to be shed.
+ */
+ static class BalanceInfo {
+
+ private final int nextRegionForUnload;
+ private int numRegionsAdded;
+
+ public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) {
+ this.nextRegionForUnload = nextRegionForUnload;
+ this.numRegionsAdded = numRegionsAdded;
+ }
+
+ int getNextRegionForUnload() {
+ return nextRegionForUnload;
+ }
+
+ int getNumRegionsAdded() {
+ return numRegionsAdded;
+ }
+
+ void setNumRegionsAdded(int numAdded) {
+ this.numRegionsAdded = numAdded;
+ }
+ }
+
+ /**
+ * Generate a global load balancing plan according to the specified map of
+ * server information to the most loaded regions of each server.
+ *
+ * The load balancing invariant is that all servers are within 1 region of the
+ * average number of regions per server. If the average is an integer number,
+ * all servers will be balanced to the average. Otherwise, all servers will
+ * have either floor(average) or ceiling(average) regions.
+ *
+ * HBASE-3609 Modeled regionsToMove using Guava's MinMaxPriorityQueue so that
+ * we can fetch from both ends of the queue.
+ * At the beginning, we check whether there was empty region server
+ * just discovered by Master. If so, we alternately choose new / old
+ * regions from head / tail of regionsToMove, respectively. This alternation
+ * avoids clustering young regions on the newly discovered region server.
+ * Otherwise, we choose new regions from head of regionsToMove.
+ *
+ * Another improvement from HBASE-3609 is that we assign regions from
+ * regionsToMove to underloaded servers in round-robin fashion.
+ * Previously one underloaded server would be filled before we move onto
+ * the next underloaded server, leading to clustering of young regions.
+ *
+ * Finally, we randomly shuffle underloaded servers so that they receive
+ * offloaded regions relatively evenly across calls to balanceCluster().
+ *
+ * The algorithm is currently implemented as such:
+ *
+ *
+ *
Determine the two valid numbers of regions each server should have,
+ * MIN=floor(average) and MAX=ceiling(average).
+ *
+ *
Iterate down the most loaded servers, shedding regions from each so
+ * each server hosts exactly MAX regions. Stop once you reach a
+ * server that already has <= MAX regions.
+ *
+ * Order the regions to move from most recent to least.
+ *
+ *
Iterate down the least loaded servers, assigning regions so each server
+ * has exactly MIN regions. Stop once you reach a server that
+ * already has >= MIN regions.
+ *
+ * Regions being assigned to underloaded servers are those that were shed
+ * in the previous step. It is possible that there were not enough
+ * regions shed to fill each underloaded server to MIN. If so we
+ * end up with a number of regions required to do so, neededRegions.
+ *
+ * It is also possible that we were able to fill each underloaded but ended
+ * up with regions that were unassigned from overloaded servers but that
+ * still do not have assignment.
+ *
+ * If neither of these conditions hold (no regions needed to fill the
+ * underloaded servers, no regions leftover from overloaded servers),
+ * we are done and return. Otherwise we handle these cases below.
+ *
+ *
If neededRegions is non-zero (still have underloaded servers),
+ * we iterate the most loaded servers again, shedding a single server from
+ * each (this brings them from having MAX regions to having
+ * MIN regions).
+ *
+ *
We now definitely have more regions that need assignment, either from
+ * the previous step or from the original shedding from overloaded servers.
+ * Iterate the least loaded servers filling each to MIN.
+ *
+ *
If we still have more regions that need assignment, again iterate the
+ * least loaded servers, this time giving each one (filling them to
+ * MAX) until we run out.
+ *
+ *
All servers will now either host MIN or MAX regions.
+ *
+ * In addition, any server hosting >= MAX regions is guaranteed
+ * to end up with MAX regions at the end of the balancing. This
+ * ensures the minimal number of regions possible are moved.
+ *
+ *
+ * TODO: We can at-most reassign the number of regions away from a particular
+ * server to be how many they report as most loaded.
+ * Should we just keep all assignment in memory? Any objections?
+ * Does this mean we need HeapSize on HMaster? Or just careful monitor?
+ * (current thinking is we will hold all assignments in memory)
+ *
+ * @param clusterState Map of regionservers and their load/region information to
+ * a list of their most loaded regions
+ * @return a list of regions to be moved, including source and destination,
+ * or null if cluster is already balanced
+ */
+ public List balanceCluster(
+ Map> clusterMap) {
+ boolean emptyRegionServerPresent = false;
+ long startTime = System.currentTimeMillis();
+
+
+ ClusterLoadState cs = new ClusterLoadState(clusterMap);
+
+ int numServers = cs.getNumServers();
+ if (numServers == 0) {
+ LOG.debug("numServers=0 so skipping load balancing");
+ return null;
+ }
+ NavigableMap> serversByLoad = cs.getServersByLoad();
+
+ int numRegions = cs.getNumRegions();
+
+ if (!this.needsBalance(cs)) {
+ // Skipped because no server outside (min,max) range
+ float average = cs.getLoadAverage(); // for logging
+ LOG.info("Skipping load balancing because balanced cluster; " +
+ "servers=" + numServers + " " +
+ "regions=" + numRegions + " average=" + average + " " +
+ "mostloaded=" + serversByLoad.lastKey().getLoad() +
+ " leastloaded=" + serversByLoad.firstKey().getLoad());
+ return null;
+ }
+
+ int min = numRegions / numServers;
+ int max = numRegions % numServers == 0 ? min : min + 1;
+
+ // Using to check balance result.
+ StringBuilder strBalanceParam = new StringBuilder();
+ strBalanceParam.append("Balance parameter: numRegions=").append(numRegions)
+ .append(", numServers=").append(numServers).append(", max=").append(max)
+ .append(", min=").append(min);
+ LOG.debug(strBalanceParam.toString());
+
+ // Balance the cluster
+ // TODO: Look at data block locality or a more complex load to do this
+ MinMaxPriorityQueue regionsToMove =
+ MinMaxPriorityQueue.orderedBy(rpComparator).create();
+ List regionsToReturn = new ArrayList();
+
+ // Walk down most loaded, pruning each to the max
+ int serversOverloaded = 0;
+ // flag used to fetch regions from head and tail of list, alternately
+ boolean fetchFromTail = false;
+ Map serverBalanceInfo =
+ new TreeMap();
+ for (Map.Entry> server:
+ serversByLoad.descendingMap().entrySet()) {
+ ServerAndLoad sal = server.getKey();
+ int regionCount = sal.getLoad();
+ if (regionCount <= max) {
+ serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0));
+ break;
+ }
+ serversOverloaded++;
+ List regions = server.getValue();
+ int numToOffload = Math.min(regionCount - max, regions.size());
+ // account for the out-of-band regions which were assigned to this server
+ // after some other region server crashed
+ Collections.sort(regions, riComparator);
+ int numTaken = 0;
+ for (int i = 0; i <= numToOffload; ) {
+ HRegionInfo hri = regions.get(i); // fetch from head
+ if (fetchFromTail) {
+ hri = regions.get(regions.size() - 1 - i);
+ }
+ i++;
+ // Don't rebalance meta regions.
+ if (hri.isMetaRegion()) continue;
+ regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null));
+ numTaken++;
+ if (numTaken >= numToOffload) break;
+ // fetch in alternate order if there is new region server
+ if (emptyRegionServerPresent) {
+ fetchFromTail = !fetchFromTail;
+ }
+ }
+ serverBalanceInfo.put(sal.getServerName(),
+ new BalanceInfo(numToOffload, (-1)*numTaken));
+ }
+ int totalNumMoved = regionsToMove.size();
+
+ // Walk down least loaded, filling each to the min
+ int neededRegions = 0; // number of regions needed to bring all up to min
+ fetchFromTail = false;
+
+ Map underloadedServers = new HashMap();
+ for (Map.Entry> server:
+ serversByLoad.entrySet()) {
+ int regionCount = server.getKey().getLoad();
+ if (regionCount >= min) {
+ break;
+ }
+ underloadedServers.put(server.getKey().getServerName(), min - regionCount);
+ }
+ // number of servers that get new regions
+ int serversUnderloaded = underloadedServers.size();
+ int incr = 1;
+ List sns =
+ Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded]));
+ Collections.shuffle(sns, RANDOM);
+ while (regionsToMove.size() > 0) {
+ int cnt = 0;
+ int i = incr > 0 ? 0 : underloadedServers.size()-1;
+ for (; i >= 0 && i < underloadedServers.size(); i += incr) {
+ if (regionsToMove.isEmpty()) break;
+ ServerName si = sns.get(i);
+ int numToTake = underloadedServers.get(si);
+ if (numToTake == 0) continue;
+
+ addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn);
+ if (emptyRegionServerPresent) {
+ fetchFromTail = !fetchFromTail;
+ }
+
+ underloadedServers.put(si, numToTake-1);
+ cnt++;
+ BalanceInfo bi = serverBalanceInfo.get(si);
+ if (bi == null) {
+ bi = new BalanceInfo(0, 0);
+ serverBalanceInfo.put(si, bi);
+ }
+ bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1);
+ }
+ if (cnt == 0) break;
+ // iterates underloadedServers in the other direction
+ incr = -incr;
+ }
+ for (Integer i : underloadedServers.values()) {
+ // If we still want to take some, increment needed
+ neededRegions += i;
+ }
+
+ // If none needed to fill all to min and none left to drain all to max,
+ // we are done
+ if (neededRegions == 0 && regionsToMove.isEmpty()) {
+ long endTime = System.currentTimeMillis();
+ LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " +
+ "Moving " + totalNumMoved + " regions off of " +
+ serversOverloaded + " overloaded servers onto " +
+ serversUnderloaded + " less loaded servers");
+ return regionsToReturn;
+ }
+
+ // Need to do a second pass.
+ // Either more regions to assign out or servers that are still underloaded
+
+ // If we need more to fill min, grab one from each most loaded until enough
+ if (neededRegions != 0) {
+ // Walk down most loaded, grabbing one from each until we get enough
+ for (Map.Entry> server :
+ serversByLoad.descendingMap().entrySet()) {
+ BalanceInfo balanceInfo =
+ serverBalanceInfo.get(server.getKey().getServerName());
+ int idx =
+ balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload();
+ if (idx >= server.getValue().size()) break;
+ HRegionInfo region = server.getValue().get(idx);
+ if (region.isMetaRegion()) continue; // Don't move meta regions.
+ regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null));
+ totalNumMoved++;
+ if (--neededRegions == 0) {
+ // No more regions needed, done shedding
+ break;
+ }
+ }
+ }
+
+ // Now we have a set of regions that must be all assigned out
+ // Assign each underloaded up to the min, then if leftovers, assign to max
+
+ // Walk down least loaded, assigning to each to fill up to min
+ for (Map.Entry> server :
+ serversByLoad.entrySet()) {
+ int regionCount = server.getKey().getLoad();
+ if (regionCount >= min) break;
+ BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
+ if(balanceInfo != null) {
+ regionCount += balanceInfo.getNumRegionsAdded();
+ }
+ if(regionCount >= min) {
+ continue;
+ }
+ int numToTake = min - regionCount;
+ int numTaken = 0;
+ while(numTaken < numToTake && 0 < regionsToMove.size()) {
+ addRegionPlan(regionsToMove, fetchFromTail,
+ server.getKey().getServerName(), regionsToReturn);
+ numTaken++;
+ if (emptyRegionServerPresent) {
+ fetchFromTail = !fetchFromTail;
+ }
+ }
+ }
+
+ // If we still have regions to dish out, assign underloaded to max
+ if (0 < regionsToMove.size()) {
+ for (Map.Entry> server :
+ serversByLoad.entrySet()) {
+ int regionCount = server.getKey().getLoad();
+ if(regionCount >= max) {
+ break;
+ }
+ addRegionPlan(regionsToMove, fetchFromTail,
+ server.getKey().getServerName(), regionsToReturn);
+ if (emptyRegionServerPresent) {
+ fetchFromTail = !fetchFromTail;
+ }
+ if (regionsToMove.isEmpty()) {
+ break;
+ }
+ }
+ }
+
+ long endTime = System.currentTimeMillis();
+
+ if (!regionsToMove.isEmpty() || neededRegions != 0) {
+ // Emit data so can diagnose how balancer went astray.
+ LOG.warn("regionsToMove=" + totalNumMoved +
+ ", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded +
+ ", serversUnderloaded=" + serversUnderloaded);
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry> e: clusterMap.entrySet()) {
+ if (sb.length() > 0) sb.append(", ");
+ sb.append(e.getKey().toString());
+ sb.append(" ");
+ sb.append(e.getValue().size());
+ }
+ LOG.warn("Input " + sb.toString());
+ }
+
+ // All done!
+ LOG.info("Done. Calculated a load balance in " + (endTime-startTime) + "ms. " +
+ "Moving " + totalNumMoved + " regions off of " +
+ serversOverloaded + " overloaded servers onto " +
+ serversUnderloaded + " less loaded servers");
+
+ return regionsToReturn;
+ }
+
+ /**
+ * Add a region from the head or tail to the List of regions to return.
+ */
+ private void addRegionPlan(final MinMaxPriorityQueue regionsToMove,
+ final boolean fetchFromTail, final ServerName sn, List regionsToReturn) {
+ RegionPlan rp = null;
+ if (!fetchFromTail) rp = regionsToMove.remove();
+ else rp = regionsToMove.removeLast();
+ rp.setDestination(sn);
+ regionsToReturn.add(rp);
+ }
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadBalancerFactory.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadBalancerFactory.java
new file mode 100644
index 0000000..68a0887
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadBalancerFactory.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.master.LoadBalancer;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * The class that creates a load balancer from a conf.
+ */
+@InterfaceAudience.Private
+public class LoadBalancerFactory {
+
+ /**
+ * Create a loadblanacer from the given conf.
+ * @param conf
+ * @return A {@link LoadBalancer}
+ */
+ public static LoadBalancer getLoadBalancer(Configuration conf) {
+
+ // Create the balancer
+ Class extends LoadBalancer> balancerKlass =
+ conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, DefaultLoadBalancer.class,
+ LoadBalancer.class);
+ return ReflectionUtils.newInstance(balancerKlass, conf);
+
+ }
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionInfoComparator.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionInfoComparator.java
new file mode 100644
index 0000000..51d8c88
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionInfoComparator.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import java.util.Comparator;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+
+/**
+ * The following comparator assumes that RegionId from HRegionInfo can represent
+ * the age of the region - larger RegionId means the region is younger. This
+ * comparator is used in balanceCluster() to account for the out-of-band regions
+ * which were assigned to the server after some other region server crashed.
+ */
+class RegionInfoComparator implements Comparator {
+ @Override
+ public int compare(HRegionInfo l, HRegionInfo r) {
+ long diff = r.getRegionId() - l.getRegionId();
+ if (diff < 0) return -1;
+ if (diff > 0) return 1;
+ return 0;
+ }
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
new file mode 100644
index 0000000..00b48f9
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+/**
+ * This will find where data for a region is located in HDFS. It ranks
+ * {@link ServerName}'s by the size of the store files they are holding for a
+ * given region.
+ *
+ */
+class RegionLocationFinder {
+
+ private static Log LOG = LogFactory.getLog(RegionLocationFinder.class);
+
+ private Configuration conf;
+ private ClusterStatus status;
+ private MasterServices services;
+
+ private CacheLoader> loader =
+ new CacheLoader>() {
+
+ @Override
+ public List load(HRegionInfo key) throws Exception {
+ List servers = internalGetTopBlockLocation(key);
+ if (servers == null) {
+ return new LinkedList();
+ }
+ return servers;
+ }
+ };
+
+ // The cache for where regions are located.
+ private LoadingCache> cache = null;
+
+ /**
+ * Create a cache for region to list of servers
+ * @param mins Number of mins to cache
+ * @return A new Cache.
+ */
+ private LoadingCache> createCache(int mins) {
+ return CacheBuilder.newBuilder().expireAfterAccess(mins, TimeUnit.MINUTES).build(loader);
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ cache = createCache(conf.getInt("hbase.master.balancer.regionLocationCacheTime", 30));
+ }
+
+ public void setServices(MasterServices services) {
+ this.services = services;
+ }
+
+ public void setClusterStatus(ClusterStatus status) {
+ this.status = status;
+ }
+
+ protected List getTopBlockLocations(HRegionInfo region) {
+ List servers = null;
+ try {
+ servers = cache.get(region);
+ } catch (ExecutionException ex) {
+ servers = new LinkedList();
+ }
+ return servers;
+
+ }
+
+ /**
+ * Returns an ordered list of hosts that are hosting the blocks for this
+ * region. The weight of each host is the sum of the block lengths of all
+ * files on that host, so the first host in the list is the server which holds
+ * the most bytes of the given region's HFiles.
+ *
+ * @param fs the filesystem
+ * @param region region
+ * @return ordered list of hosts holding blocks of the specified region
+ */
+ protected List internalGetTopBlockLocation(HRegionInfo region) {
+ List topServerNames = null;
+ try {
+ HTableDescriptor tableDescriptor = getTableDescriptor(region.getTableName());
+ if (tableDescriptor != null) {
+ HDFSBlocksDistribution blocksDistribution =
+ HRegion.computeHDFSBlocksDistribution(getConf(), tableDescriptor,
+ region.getEncodedName());
+ List topHosts = blocksDistribution.getTopHosts();
+ topServerNames = mapHostNameToServerName(topHosts);
+ }
+ } catch (IOException ioe) {
+ LOG.debug("IOException during HDFSBlocksDistribution computation. for " + "region = "
+ + region.getEncodedName(), ioe);
+ }
+
+ return topServerNames;
+ }
+
+ /**
+ * return HTableDescriptor for a given tableName
+ *
+ * @param tableName the table name
+ * @return HTableDescriptor
+ * @throws IOException
+ */
+ protected HTableDescriptor getTableDescriptor(byte[] tableName) throws IOException {
+ HTableDescriptor tableDescriptor = null;
+ try {
+ if (this.services != null) {
+ tableDescriptor = this.services.getTableDescriptors().get(Bytes.toString(tableName));
+ }
+ } catch (FileNotFoundException fnfe) {
+ LOG.debug("FileNotFoundException during getTableDescriptors." + " Current table name = "
+ + Bytes.toStringBinary(tableName), fnfe);
+ }
+
+ return tableDescriptor;
+ }
+
+ /**
+ * Map hostname to ServerName, The output ServerName list will have the same
+ * order as input hosts.
+ *
+ * @param hosts the list of hosts
+ * @return ServerName list
+ */
+ protected List mapHostNameToServerName(List hosts) {
+ if (hosts == null || status == null) {
+ return null;
+ }
+
+ List topServerNames = new ArrayList();
+ Collection regionServers = status.getServers();
+
+ // create a mapping from hostname to ServerName for fast lookup
+ HashMap hostToServerName = new HashMap();
+ for (ServerName sn : regionServers) {
+ hostToServerName.put(sn.getHostname(), sn);
+ }
+
+ for (String host : hosts) {
+ ServerName sn = hostToServerName.get(host);
+ // it is possible that HDFS is up ( thus host is valid ),
+ // but RS is down ( thus sn is null )
+ if (sn != null) {
+ topServerNames.add(sn);
+ }
+ }
+ return topServerNames;
+ }
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/ServerAndLoad.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/ServerAndLoad.java
new file mode 100644
index 0000000..93298d8
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/ServerAndLoad.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ServerName;
+
+/**
+ * Data structure that holds servername and 'load'.
+ */
+@InterfaceAudience.Private
+class ServerAndLoad implements Comparable, Serializable {
+ private static final long serialVersionUID = 2735470854607296965L;
+ private final ServerName sn;
+ private final int load;
+
+ ServerAndLoad(final ServerName sn, final int load) {
+ this.sn = sn;
+ this.load = load;
+ }
+
+ ServerName getServerName() {
+ return this.sn;
+ }
+
+ int getLoad() {
+ return this.load;
+ }
+
+ @Override
+ public int compareTo(ServerAndLoad other) {
+ int diff = this.load - other.load;
+ return diff != 0 ? diff : this.sn.compareTo(other.getServerName());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof ServerAndLoad) {
+ ServerAndLoad sl = (ServerAndLoad) o;
+ return this.compareTo(sl) == 0;
+ }
+ return false;
+ }
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
new file mode 100644
index 0000000..660cd90
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
@@ -0,0 +1,696 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerLoad;
+import org.apache.hadoop.hbase.HServerLoad.RegionLoad;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+
+/**
+ *
This is a best effort load balancer. Given a Cost function F(C) => x It will
+ * randomly try and mutate the cluster to Cprime. If F(Cprime) < F(C) then the
+ * new cluster state becomes the plan. It includes costs functions to compute the cost of:
+ *
+ *
Region Load
+ *
Table Load
+ *
Data Locality
+ *
Memstore Sizes
+ *
Storefile Sizes
+ *
+ *
+ *
+ *
Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost
+ * best solution, and 1 is the highest possible cost and the worst solution. The computed costs are
+ * scaled by their respective multipliers:
In addition to the above configurations, the balancer can be tuned by the following
+ * configuration values:
+ *
+ *
hbase.master.balancer.stochastic.maxMoveRegions which
+ * controls what the max number of regions that can be moved in a single invocation of this
+ * balancer.
+ *
hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
+ * regions is multiplied to try and get the number of times the balancer will
+ * mutate all servers.
+ *
hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that
+ * the balancer will try and mutate all the servers. The balancer will use the minimum of this
+ * value and the above computation.
+ *
+ *
+ *
This balancer is best used with hbase.master.loadbalance.bytable set to false
+ * so that the balancer gets the full picture of all loads on the cluster.
+ */
+@InterfaceAudience.Private
+public class StochasticLoadBalancer extends BaseLoadBalancer {
+
+ private static final String STOREFILE_SIZE_COST_KEY =
+ "hbase.master.balancer.stochastic.storefileSizeCost";
+ private static final String MEMSTORE_SIZE_COST_KEY =
+ "hbase.master.balancer.stochastic.memstoreSizeCost";
+ private static final String WRITE_REQUEST_COST_KEY =
+ "hbase.master.balancer.stochastic.writeRequestCost";
+ private static final String READ_REQUEST_COST_KEY =
+ "hbase.master.balancer.stochastic.readRequestCost";
+ private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
+ private static final String TABLE_LOAD_COST_KEY =
+ "hbase.master.balancer.stochastic.tableLoadCost";
+ private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
+ private static final String REGION_LOAD_COST_KEY =
+ "hbase.master.balancer.stochastic.regionLoadCost";
+ private static final String STEPS_PER_REGION_KEY =
+ "hbase.master.balancer.stochastic.stepsPerRegion";
+ private static final String MAX_STEPS_KEY = "hbase.master.balancer.stochastic.maxSteps";
+ private static final String MAX_MOVES_KEY = "hbase.master.balancer.stochastic.maxMoveRegions";
+
+ private static final Random RANDOM = new Random(System.currentTimeMillis());
+ private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
+ private final RegionLocationFinder regionFinder = new RegionLocationFinder();
+ private ClusterStatus clusterStatus = null;
+ private Map loads = new HashMap();
+
+ // values are defaults
+ private int maxSteps = 15000;
+ private int stepsPerRegion = 110;
+ private int maxMoves = 600;
+ private float loadMultiplier = 55;
+ private float moveCostMultiplier = 5;
+ private float tableMultiplier = 5;
+ private float localityMultiplier = 5;
+ private float readRequestMultiplier = 0;
+ private float writeRequestMultiplier = 0;
+ private float memStoreSizeMultiplier = 5;
+ private float storeFileSizeMultiplier = 5;
+
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ regionFinder.setConf(conf);
+
+ maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
+ maxMoves = conf.getInt(MAX_MOVES_KEY, maxMoves);
+ stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
+
+ // Load multiplier should be the greatest as it is the most general way to balance data.
+ loadMultiplier = conf.getFloat(REGION_LOAD_COST_KEY, loadMultiplier);
+
+ // Move cost multiplier should be the same cost or higer than the rest of the costs to ensure
+ // that two costs must get better to justify a move cost.
+ moveCostMultiplier = conf.getFloat(MOVE_COST_KEY, moveCostMultiplier);
+
+ // These are the added costs so that the stochastic load balancer can get a little bit smarter
+ // about where to move regions.
+ tableMultiplier = conf.getFloat(TABLE_LOAD_COST_KEY, tableMultiplier);
+ localityMultiplier = conf.getFloat(LOCALITY_COST_KEY, localityMultiplier);
+ memStoreSizeMultiplier = conf.getFloat(MEMSTORE_SIZE_COST_KEY, memStoreSizeMultiplier);
+ storeFileSizeMultiplier = conf.getFloat(STOREFILE_SIZE_COST_KEY, storeFileSizeMultiplier);
+
+ // These are not used currently.
+ // TODO: Start using these once rolling averages are implemented for read/write load.
+ readRequestMultiplier = conf.getFloat(READ_REQUEST_COST_KEY, readRequestMultiplier);
+ writeRequestMultiplier = conf.getFloat(WRITE_REQUEST_COST_KEY, writeRequestMultiplier);
+ }
+
+ @Override
+ public void setClusterStatus(ClusterStatus st) {
+ super.setClusterStatus(st);
+ regionFinder.setClusterStatus(st);
+ this.clusterStatus = st;
+ updateRegionLoad();
+ }
+
+ @Override
+ public void setMasterServices(MasterServices masterServices) {
+ super.setMasterServices(masterServices);
+ this.services = masterServices;
+ this.regionFinder.setServices(masterServices);
+ }
+
+ /**
+ * Given the cluster state this will try and approach an optimal balance. This
+ * should always approach the optimal state given enough steps.
+ */
+ @Override
+ public List balanceCluster(Map> clusterState) {
+
+ // No need to balance a one node cluster.
+ if (clusterState.size() <= 1) {
+ LOG.debug("Skipping load balance as cluster has only one node.");
+ return null;
+ }
+
+ long startTime = System.currentTimeMillis();
+
+ // Keep track of servers to iterate through them.
+ List servers = new ArrayList(clusterState.keySet());
+ Map initialRegionMapping = createRegionMapping(clusterState);
+ double currentCost, newCost, initCost;
+ currentCost = newCost = initCost = computeCost(initialRegionMapping, clusterState);
+
+ int computedMaxSteps =
+ Math.min(this.maxSteps, (initialRegionMapping.size() * this.stepsPerRegion));
+ // Perform a stochastic walk to see if we can get a good fit.
+ for (int step = 0; step < computedMaxSteps; step++) {
+
+ // try and perform a mutation
+ for (ServerName leftServer : servers) {
+
+ // What server are we going to be swapping regions with ?
+ ServerName rightServer = pickOtherServer(leftServer, servers);
+ if (rightServer == null) {
+ continue;
+ }
+
+ // Get the regions.
+ List leftRegionList = clusterState.get(leftServer);
+ List rightRegionList = clusterState.get(rightServer);
+
+ // Pick what regions to swap around.
+ // If we get a null for one then this isn't a swap just a move
+ HRegionInfo lRegion = pickRandomRegion(leftRegionList, 0);
+ HRegionInfo rRegion = pickRandomRegion(rightRegionList, 0.5);
+
+ // We randomly picked to do nothing.
+ if (lRegion == null && rRegion == null) {
+ continue;
+ }
+
+ if (rRegion != null) {
+ leftRegionList.add(rRegion);
+ }
+
+ if (lRegion != null) {
+ rightRegionList.add(lRegion);
+ }
+
+ newCost = computeCost(initialRegionMapping, clusterState);
+
+ // Should this be kept?
+ if (newCost < currentCost) {
+ currentCost = newCost;
+ } else {
+ // Put things back the way they were before.
+ if (rRegion != null) {
+ leftRegionList.remove(rRegion);
+ rightRegionList.add(rRegion);
+ }
+
+ if (lRegion != null) {
+ rightRegionList.remove(lRegion);
+ leftRegionList.add(lRegion);
+ }
+ }
+ }
+
+ }
+
+ long endTime = System.currentTimeMillis();
+
+ if (initCost > currentCost) {
+ List plans = createRegionPlans(initialRegionMapping, clusterState);
+
+ LOG.debug("Finished computing new laod balance plan. Computation took "
+ + (endTime - startTime) + "ms to try " + computedMaxSteps
+ + " different iterations. Found a solution that moves " + plans.size()
+ + " regions; Going from a computed cost of " + initCost + " to a new cost of "
+ + currentCost);
+ return plans;
+ }
+ LOG.debug("Could not find a better load balance plan. Tried " + computedMaxSteps
+ + " different configurations in " + (endTime - startTime)
+ + "ms, and did not find anything with a computed cost less than " + initCost);
+ return null;
+ }
+
+ /**
+ * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
+ * state.
+ *
+ * @param initialRegionMapping Initial mapping of Region to Server
+ * @param clusterState The desired mapping of ServerName to Regions
+ * @return List of RegionPlan's that represent the moves needed to get to desired final state.
+ */
+ private List createRegionPlans(Map initialRegionMapping,
+ Map> clusterState) {
+ List plans = new LinkedList();
+
+ for (Entry> entry : clusterState.entrySet()) {
+ ServerName newServer = entry.getKey();
+
+ for (HRegionInfo region : entry.getValue()) {
+ ServerName initialServer = initialRegionMapping.get(region);
+ if (!newServer.equals(initialServer)) {
+ LOG.trace("Moving Region " + region.getEncodedName() + " from server "
+ + initialServer.getHostname() + " to " + newServer.getHostname());
+ RegionPlan rp = new RegionPlan(region, initialServer, newServer);
+ plans.add(rp);
+ }
+ }
+ }
+ return plans;
+ }
+
+ /**
+ * Create a map that will represent the initial location of regions on a
+ * {@link ServerName}
+ *
+ * @param clusterState starting state of the cluster and regions.
+ * @return A map of {@link HRegionInfo} to the {@link ServerName} that is
+ * currently hosting that region
+ */
+ private Map createRegionMapping(
+ Map> clusterState) {
+ Map mapping = new HashMap();
+
+ for (Entry> entry : clusterState.entrySet()) {
+ for (HRegionInfo region : entry.getValue()) {
+ mapping.put(region, entry.getKey());
+ }
+ }
+ return mapping;
+ }
+
+ /** Store the current region loads. */
+ private void updateRegionLoad() {
+ loads.clear();
+ for (ServerName sn : clusterStatus.getServers()) {
+ HServerLoad hsl = clusterStatus.getLoad(sn);
+ if (hsl == null) continue;
+ for (Entry entry : hsl.getRegionsLoad().entrySet()) {
+ loads.put(Bytes.toString(entry.getKey()), entry.getValue());
+
+ }
+ }
+ }
+
+ /**
+ * From a list of regions pick a random one. Null can be returned which
+ * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
+ * rather than swap.
+ *
+ * @param regions list of regions.
+ * @param chanceOfNoSwap Chance that this will decide to try a move rather
+ * than a swap.
+ * @return a random {@link HRegionInfo} or null if an asymmetrical move is
+ * suggested.
+ */
+ private HRegionInfo pickRandomRegion(List regions, double chanceOfNoSwap) {
+
+ //Check to see if this is just a move.
+ if (regions.isEmpty() || RANDOM.nextFloat() < chanceOfNoSwap) {
+ //signal a move only.
+ return null;
+ }
+
+ int count = 0;
+ HRegionInfo r = null;
+
+ //We will try and find a region up to 10 times. If we always
+ while (count < 10 && r == null ) {
+ count++;
+ r = regions.get(RANDOM.nextInt(regions.size()));
+
+ // If this is a special region we always try not to move it.
+ // so clear out r. try again
+ if (r.isMetaRegion() || r.isRootRegion() ) {
+ r = null;
+ }
+ }
+ if (r != null) {
+ regions.remove(r);
+ }
+ return r;
+ }
+
+ /**
+ * Given a server we will want to switch regions with another server. This
+ * function picks a random server from the list.
+ *
+ * @param server Current Server. This server will never be the return value.
+ * @param allServers list of all server from which to pick
+ * @return random server. Null if no other servers were found.
+ */
+ private ServerName pickOtherServer(ServerName server, List allServers) {
+ ServerName s = null;
+ int count = 0;
+ while (count < 100 && (s == null || s.equals(server))) {
+ count++;
+ s = allServers.get(RANDOM.nextInt(allServers.size()));
+ }
+
+ // If nothing but the current server was found return null.
+ return (s == null || s.equals(server)) ? null : s;
+ }
+
+ /**
+ * This is the main cost function. It will compute a cost associated with a proposed cluster
+ * state. All different costs will be combined with their multipliers to produce a double cost.
+ *
+ * @param initialRegionMapping Map of where the regions started.
+ * @param clusterState Map of ServerName to list of regions.
+ * @return a double of a cost associated with the proposed
+ */
+ protected double computeCost(Map initialRegionMapping,
+ Map> clusterState) {
+
+ double moveCost = moveCostMultiplier * computeMoveCost(initialRegionMapping, clusterState);
+
+ double regionCountSkewCost = loadMultiplier * computeSkewLoadCost(clusterState);
+ double tableSkewCost = tableMultiplier * computeTableSkewLoadCost(clusterState);
+ double localityCost =
+ localityMultiplier * computeDataLocalityCost(initialRegionMapping, clusterState);
+
+ // TODO: Add Read and Write requests back in here after keeping a running average on per
+ // region load metrics.
+ double memstoreSizeCost =
+ memStoreSizeMultiplier
+ * computeRegionLoadCost(clusterState, RegionLoadCostType.MEMSTORE_SIZE);
+ double storefileSizeCost =
+ storeFileSizeMultiplier
+ * computeRegionLoadCost(clusterState, RegionLoadCostType.STOREFILE_SIZE);
+ double total =
+ moveCost + regionCountSkewCost + tableSkewCost + localityCost + memstoreSizeCost
+ + storefileSizeCost;
+ LOG.trace("Computed weights for a potential balancing total = " + total + " moveCost = "
+ + moveCost + " regionCountSkewCost = " + regionCountSkewCost + " tableSkewCost = "
+ + tableSkewCost + " localityCost = " + localityCost + " memstoreSizeCost = "
+ + memstoreSizeCost + " storefileSizeCost = " + storefileSizeCost);
+ return total;
+ }
+
+ /**
+ * Given the starting state of the regions and a potential ending state
+ * compute cost based upon the number of regions that have moved.
+ *
+ * @param initialRegionMapping The starting location of regions.
+ * @param clusterState The potential new cluster state.
+ * @return The cost. Between 0 and 1.
+ */
+ double computeMoveCost(Map initialRegionMapping,
+ Map> clusterState) {
+ float moveCost = 0;
+ for (Entry> entry : clusterState.entrySet()) {
+ for (HRegionInfo region : entry.getValue()) {
+ if (initialRegionMapping.get(region) != entry.getKey()) {
+ moveCost += 1;
+ }
+ }
+ }
+
+ //Don't let this single balance move more than the max moves.
+ //This allows better scaling to accurately represent the actual cost of a move.
+ if (moveCost > maxMoves) {
+ return 10000; //return a number much greater than any of the other cost functions
+ }
+
+ return scale(0, Math.min(maxMoves, initialRegionMapping.size()), moveCost);
+ }
+
+ /**
+ * Compute the cost of a potential cluster state from skew in number of
+ * regions on a cluster
+ *
+ * @param clusterState The proposed cluster state
+ * @return The cost of region load imbalance.
+ */
+ double computeSkewLoadCost(Map> clusterState) {
+ DescriptiveStatistics stats = new DescriptiveStatistics();
+ for (List regions : clusterState.values()) {
+ int size = regions.size();
+ stats.addValue(size);
+ }
+ return costFromStats(stats);
+ }
+
+ /**
+ * Compute the cost of a potential cluster configuration based upon how evenly
+ * distributed tables are.
+ *
+ * @param clusterState Proposed cluster state.
+ * @return Cost of imbalance in table.
+ */
+ double computeTableSkewLoadCost(Map> clusterState) {
+
+ Map tableRegionsTotal = new HashMap();
+ Map tableRegionsOnCurrentServer = new HashMap();
+ Map tableCostSeenSoFar = new HashMap();
+ // Go through everything per server
+ for (Entry> entry : clusterState.entrySet()) {
+ tableRegionsOnCurrentServer.clear();
+
+ // For all of the regions count how many are from each table
+ for (HRegionInfo region : entry.getValue()) {
+ String tableName = region.getTableNameAsString();
+
+ // See if this table already has a count on this server
+ MutableInt regionsOnServerCount = tableRegionsOnCurrentServer.get(tableName);
+
+ // If this is the first time we've seen this table on this server
+ // create a new mutable int.
+ if (regionsOnServerCount == null) {
+ regionsOnServerCount = new MutableInt(0);
+ tableRegionsOnCurrentServer.put(tableName, regionsOnServerCount);
+ }
+
+ // Increment the count of how many regions from this table are host on
+ // this server
+ regionsOnServerCount.increment();
+
+ // Now count the number of regions in this table.
+ MutableInt totalCount = tableRegionsTotal.get(tableName);
+
+ // If this is the first region from this table create a new counter for
+ // this table.
+ if (totalCount == null) {
+ totalCount = new MutableInt(0);
+ tableRegionsTotal.put(tableName, totalCount);
+ }
+ totalCount.increment();
+ }
+
+ // Now go through all of the tables we have seen and keep the max number
+ // of regions of this table a single region server is hosting.
+ for (String tableName : tableRegionsOnCurrentServer.keySet()) {
+ Integer thisCount = tableRegionsOnCurrentServer.get(tableName).toInteger();
+ Integer maxCountSoFar = tableCostSeenSoFar.get(tableName);
+
+ if (maxCountSoFar == null || thisCount.compareTo(maxCountSoFar) > 0) {
+ tableCostSeenSoFar.put(tableName, thisCount);
+ }
+
+ }
+ }
+
+ double max = 0;
+ double min = 0;
+ double value = 0;
+
+ // Compute the min, value, and max.
+ for (String tableName : tableRegionsTotal.keySet()) {
+ max += tableRegionsTotal.get(tableName).doubleValue();
+ min += tableRegionsTotal.get(tableName).doubleValue() / (double) clusterState.size();
+ value += tableCostSeenSoFar.get(tableName).doubleValue();
+
+ }
+ return scale(min, max, value);
+ }
+
+ /**
+ * Compute a cost of a potential cluster configuration based upon where
+ * {@link org.apache.hadoop.hbase.regionserver.StoreFile}s are located.
+ *
+ * @param clusterState The state of the cluster
+ * @return A cost between 0 and 1. 0 Means all regions are on the sever with
+ * the most local store files.
+ */
+ double computeDataLocalityCost(Map initialRegionMapping,
+ Map> clusterState) {
+
+ double max = 0;
+ double cost = 0;
+
+ // If there's no master so there's no way anything else works.
+ if (this.services == null) return cost;
+
+ for (Entry> entry : clusterState.entrySet()) {
+ ServerName sn = entry.getKey();
+ for (HRegionInfo region : entry.getValue()) {
+
+ max += 1;
+
+ // Only compute the data locality for moved regions.
+ if (initialRegionMapping.equals(sn)) {
+ continue;
+ }
+
+ List dataOnServers = regionFinder.getTopBlockLocations(region);
+
+ // If we can't find where the data is getTopBlock returns null.
+ // so count that as being the best possible.
+ if (dataOnServers == null) {
+ continue;
+ }
+
+ int index = dataOnServers.indexOf(sn);
+ if (index < 0) {
+ cost += 1;
+ } else {
+ cost += (double) index / (double) dataOnServers.size();
+ }
+
+ }
+ }
+ return scale(0, max, cost);
+ }
+
+ /** The cost's that can be derived from RegionLoad */
+ private enum RegionLoadCostType {
+ READ_REQUEST, WRITE_REQUEST, MEMSTORE_SIZE, STOREFILE_SIZE
+ }
+
+ /**
+ * Compute the cost of the current cluster state due to some RegionLoadCost type
+ *
+ * @param clusterState the cluster
+ * @param costType what type of cost to consider
+ * @return the scaled cost.
+ */
+ private double computeRegionLoadCost(Map> clusterState,
+ RegionLoadCostType costType) {
+
+ if (this.clusterStatus == null || this.loads == null || this.loads.size() == 0) return 0;
+
+ DescriptiveStatistics stats = new DescriptiveStatistics();
+
+ // For every server look at the cost of each region
+ for (List regions : clusterState.values()) {
+ long cost = 0; //Cost this server has from RegionLoad
+
+ // For each region
+ for (HRegionInfo region : regions) {
+ // Try and get the region using the regionNameAsString
+ RegionLoad rl = loads.get(region.getRegionNameAsString());
+
+ // That could have failed if the RegionLoad is using the other regionName
+ if (rl == null) {
+ // Try getting the region load using encoded name.
+ rl = loads.get(region.getEncodedName());
+ }
+ // Now if we found a region load get the type of cost that was requested.
+ if (rl != null) {
+ cost += getRegionLoadCost(rl, costType);
+ }
+ }
+
+ // Add the total cost to the stats.
+ stats.addValue(cost);
+ }
+
+ // No return the scaled cost from data held in the stats object.
+ return costFromStats(stats);
+ }
+
+ /**
+ * Get the un-scaled cost from a RegionLoad
+ *
+ * @param rl the Region load
+ * @param type The type of cost to extract
+ * @return the double representing the cost
+ */
+ private double getRegionLoadCost(RegionLoad rl, RegionLoadCostType type) {
+ switch (type) {
+ case READ_REQUEST:
+ return rl.getReadRequestsCount();
+ case WRITE_REQUEST:
+ return rl.getWriteRequestsCount();
+ case MEMSTORE_SIZE:
+ return rl.getMemStoreSizeMB();
+ case STOREFILE_SIZE:
+ return rl.getStorefileSizeMB();
+ default:
+ assert false : "RegionLoad cost type not supported.";
+ return 0;
+ }
+ }
+
+ /**
+ * Function to compute a scaled cost using {@link DescriptiveStatistics}. It
+ * assumes that this is a zero sum set of costs. It assumes that the worst case
+ * possible is all of the elements in one region server and the rest having 0.
+ *
+ * @param stats the costs
+ * @return a scaled set of costs.
+ */
+ double costFromStats(DescriptiveStatistics stats) {
+ double totalCost = 0;
+ double mean = stats.getMean();
+
+ //Compute max as if all region servers had 0 and one had the sum of all costs. This must be
+ // a zero sum cost for this to make sense.
+ double max = ((stats.getN() - 1) * stats.getMean()) + (stats.getSum() - stats.getMean());
+ for (double n : stats.getValues()) {
+ totalCost += Math.abs(mean - n);
+
+ }
+
+ return scale(0, max, totalCost);
+ }
+
+ /**
+ * Scale the value between 0 and 1.
+ *
+ * @param min Min value
+ * @param max The Max value
+ * @param value The value to be scaled.
+ * @return The scaled value.
+ */
+ private double scale(double min, double max, double value) {
+ if (max == 0 || value == 0) {
+ return 0;
+ }
+
+ return Math.max(0d, Math.min(1d, (value - min) / max));
+ }
+}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java hbase-server/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java
index aa74cd5..63dc8dc 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -37,35 +38,52 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
/**
- * Test whether region rebalancing works. (HBASE-71)
+ * Test whether region re-balancing works. (HBASE-71)
*/
@Category(LargeTests.class)
+@RunWith(value = Parameterized.class)
public class TestRegionRebalancing {
- final Log LOG = LogFactory.getLog(this.getClass().getName());
- private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
- HTable table;
- HTableDescriptor desc;
- private static final byte [] FAMILY_NAME = Bytes.toBytes("col");
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- UTIL.startMiniCluster(1);
+
+ @Parameters
+ public static Collection