diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index 3081811..0d10531 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -35,6 +35,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.ServerName; @@ -66,16 +67,20 @@ public abstract class BaseLoadBalancer implements LoadBalancer { ArrayList tables; HRegionInfo[] regions; Deque[] regionLoads; + RegionLocationFinder regionFinder; int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality int[][] regionsPerServer; //serverIndex -> region list + float[] localityPerServer; int[] regionIndexToServerIndex; //regionIndex -> serverIndex int[] initialRegionIndexToServerIndex; //regionIndex -> serverIndex (initial cluster state) int[] regionIndexToTableIndex; //regionIndex -> tableIndex int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions int[] numMaxRegionsPerTable; //tableIndex -> max number of regions in a single RS + Integer[] serverIndicesSortedByRegionCount; + Integer[] serverIndicesSortedByLocality; Map serversToIndex; Map tablesToIndex; @@ -96,6 +101,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { //TODO: We should get the list of tables from master tables = new ArrayList(); + this.regionFinder = regionFinder; numRegions = 0; @@ -126,6 +132,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer { regionLoads = new Deque[numRegions]; regionLocations = new int[numRegions][]; serverIndicesSortedByRegionCount = new Integer[numServers]; + serverIndicesSortedByLocality = new Integer[numServers]; + localityPerServer = new float[numServers]; int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0; @@ -147,6 +155,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { regionsPerServer[serverIndex] = new int[entry.getValue().size()]; } serverIndicesSortedByRegionCount[serverIndex] = serverIndex; + serverIndicesSortedByLocality[serverIndex] = serverIndex; } for (Entry> entry : clusterState.entrySet()) { @@ -189,7 +198,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer { (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1 : serversToIndex.get(loc.get(i).getHostAndPort())); } } - regionIndex++; } } @@ -303,10 +311,18 @@ public abstract class BaseLoadBalancer implements LoadBalancer { Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator); } + void sortServersByLocality() { + Arrays.sort(serverIndicesSortedByLocality, localityComparator); + } + int getNumRegions(int server) { return regionsPerServer[server].length; } + float getLocality(int server) { + return localityPerServer[server]; + } + private Comparator numRegionsComparator = new Comparator() { @Override public int compare(Integer integer, Integer integer2) { @@ -314,6 +330,111 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } }; + private Comparator localityComparator = new Comparator() { + @Override + public int compare(Integer integer, Integer integer2) { + float locality1 = getLocality(integer); + float locality2 = getLocality(integer2); + if (locality1 < locality2) { + return -1; + } else if (locality1 > locality2) { + return 1; + } else { + return 0; + } + } + }; + + int getLowestLocalityRegionServer() { + if (regionFinder == null) { + return -1; + } else { + sortServersByLocality(); + // We want to find server with non zero regions having lowest locality. + int i = 0; + int lowestLocalityServerIndex = serverIndicesSortedByLocality[i]; + while (localityPerServer[lowestLocalityServerIndex] == 0 + && (regionsPerServer[lowestLocalityServerIndex].length == 0)) { + i++; + lowestLocalityServerIndex = serverIndicesSortedByLocality[i]; + } + LOG.debug("Lowest locality region server with non zero regions is " + + servers[lowestLocalityServerIndex].getHostname() + " with locality " + + localityPerServer[lowestLocalityServerIndex]); + return lowestLocalityServerIndex; + } + } + + int getLowestLocalityRegionOnServer(int serverIndex) { + if (regionFinder != null) { + float lowestLocality = 1.0f; + int lowestLocalityRegionIndex = 0; + if (regionsPerServer[serverIndex].length == 0) { + // No regions on that region server + return -1; + } + for (int j = 0; j < regionsPerServer[serverIndex].length; j++) { + int regionIndex = regionsPerServer[serverIndex][j]; + HDFSBlocksDistribution distribution = regionFinder + .getBlockDistribution(regions[regionIndex]); + float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname()); + if (locality < lowestLocality) { + lowestLocality = locality; + lowestLocalityRegionIndex = j; + } + } + LOG.debug(" Lowest locality region index is " + lowestLocalityRegionIndex + + " and its region server contains " + regionsPerServer[serverIndex].length + + " regions"); + return regionsPerServer[serverIndex][lowestLocalityRegionIndex]; + } else { + return -1; + } + } + + float getLocalityOfRegion(int region, int server) { + HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]); + return distribution.getBlockLocalityIndex(servers[server].getHostname()); + } + + int getLeastLoadedTopServerForRegion(int region) { + if (regionFinder != null) { + List topLocalServers = regionFinder.getTopBlockLocations(regions[region]); + int leastLoadedServerIndex = -1; + int load = Integer.MAX_VALUE; + for (ServerName sn : topLocalServers) { + int index = serversToIndex.get(sn); + int tempLoad = regionsPerServer[index].length; + if (tempLoad <= load) { + leastLoadedServerIndex = index; + load = tempLoad; + } + } + return leastLoadedServerIndex; + } else { + return -1; + } + } + + void calculateRegionServerLocalities() { + if (regionFinder == null) { + LOG.warn("Region location finder found null, skipping locality calculations."); + return; + } + for (int i = 0; i < regionsPerServer.length; i++) { + HDFSBlocksDistribution distribution = new HDFSBlocksDistribution(); + if (regionsPerServer[i].length > 0) { + for (int j = 0; j < regionsPerServer[i].length; j++) { + int regionIndex = regionsPerServer[i][j]; + distribution.add(regionFinder.getBlockDistribution(regions[regionIndex])); + } + } else { + LOG.debug("Server " + servers[i].getHostname() + " had 0 regions."); + } + localityPerServer[i] = distribution.getBlockLocalityIndex(servers[i].getHostname()); + } + } + @Override public String toString() { String desc = "Cluster{" + @@ -350,7 +471,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { // slop for regions protected float slop; private Configuration config; - private static final Random RANDOM = new Random(System.currentTimeMillis()); + static final Random RANDOM = new Random(System.currentTimeMillis()); private static final Log LOG = LogFactory.getLog(BaseLoadBalancer.class); protected final MetricsBalancer metricsBalancer = new MetricsBalancer(); @@ -477,7 +598,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { */ @Override public Map immediateAssignment(List regions, - List servers) { + List servers) throws HBaseIOException { metricsBalancer.incrMiscInvocations(); Map assignments = new TreeMap(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java index 690d8c9..4475d02 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -31,17 +30,18 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterStatus; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.regionserver.HRegion; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.google.common.collect.Lists; /** * This will find where data for a region is located in HDFS. It ranks @@ -54,31 +54,27 @@ class RegionLocationFinder { private static Log LOG = LogFactory.getLog(RegionLocationFinder.class); private Configuration conf; - private ClusterStatus status; + private volatile ClusterStatus status; private MasterServices services; - private CacheLoader> loader = - new CacheLoader>() { + private CacheLoader loader = + new CacheLoader() { - @Override - public List load(HRegionInfo key) throws Exception { - List servers = internalGetTopBlockLocation(key); - if (servers == null) { - return new LinkedList(); - } - return servers; - } - }; + @Override + public HDFSBlocksDistribution load(HRegionInfo key) throws Exception { + return internalGetTopBlockLocation(key); + } + }; // The cache for where regions are located. - private LoadingCache> cache = null; + private LoadingCache cache = null; /** * Create a cache for region to list of servers * @param mins Number of mins to cache * @return A new Cache. */ - private LoadingCache> createCache(int mins) { + private LoadingCache createCache(int mins) { return CacheBuilder.newBuilder().expireAfterAccess(mins, TimeUnit.MINUTES).build(loader); } @@ -100,14 +96,9 @@ class RegionLocationFinder { } protected List getTopBlockLocations(HRegionInfo region) { - List servers = null; - try { - servers = cache.get(region); - } catch (ExecutionException ex) { - servers = new LinkedList(); - } - return servers; - + HDFSBlocksDistribution blocksDistribution = getBlockDistribution(region); + List topHosts = blocksDistribution.getTopHosts(); + return mapHostNameToServerName(topHosts); } /** @@ -119,22 +110,20 @@ class RegionLocationFinder { * @param region region * @return ordered list of hosts holding blocks of the specified region */ - protected List internalGetTopBlockLocation(HRegionInfo region) { - List topServerNames = null; + protected HDFSBlocksDistribution internalGetTopBlockLocation(HRegionInfo region) { try { HTableDescriptor tableDescriptor = getTableDescriptor(region.getTable()); if (tableDescriptor != null) { HDFSBlocksDistribution blocksDistribution = HRegion.computeHDFSBlocksDistribution(getConf(), tableDescriptor, region); - List topHosts = blocksDistribution.getTopHosts(); - topServerNames = mapHostNameToServerName(topHosts); + return blocksDistribution; } } catch (IOException ioe) { - LOG.debug("IOException during HDFSBlocksDistribution computation. for " + "region = " + LOG.warn("IOException during HDFSBlocksDistribution computation. for " + "region = " + region.getEncodedName(), ioe); } - return topServerNames; + return new HDFSBlocksDistribution(); } /** @@ -167,7 +156,10 @@ class RegionLocationFinder { */ protected List mapHostNameToServerName(List hosts) { if (hosts == null || status == null) { - return null; + if (hosts == null) { + LOG.warn("RegionLocationFinder top hosts is null"); + } + return Lists.newArrayList(); } List topServerNames = new ArrayList(); @@ -189,4 +181,25 @@ class RegionLocationFinder { } return topServerNames; } + + public HDFSBlocksDistribution getBlockDistribution(HRegionInfo hri) { + HDFSBlocksDistribution blockDistbn = null; + try { + if (cache.asMap().containsKey(hri)) { + blockDistbn = cache.get(hri); + return blockDistbn; + } else { + LOG.debug("HDFSBlocksDistribution not found in cache for region " + + hri.getRegionNameAsString()); + blockDistbn = internalGetTopBlockLocation(hri); + cache.put(hri, blockDistbn); + return blockDistbn; + } + } catch (ExecutionException e) { + LOG.warn("Error while fetching cache entry ", e); + blockDistbn = internalGetTopBlockLocation(hri); + cache.put(hri, blockDistbn); + return blockDistbn; + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index 9e1c9f5..6797ee7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.master.balancer; + import java.util.ArrayDeque; import java.util.Collection; import java.util.Deque; @@ -30,7 +31,6 @@ import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.math.stat.descriptive.DescriptiveStatistics; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.util.Bytes; @@ -136,11 +137,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { localityPicker = new LocalityBasedPicker(services); localityCost = new LocalityCostFunction(conf, services); - pickers = new RegionPicker[] { - new RandomRegionPicker(), - new LoadPicker(), - localityPicker - }; + pickers = new RegionPicker[] { new RandomRegionPicker(), new LoadPicker(), localityPicker }; regionLoadFunctions = new CostFromRegionLoadFunction[] { new ReadRequestCostFunction(conf), @@ -192,10 +189,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { */ @Override public List balanceCluster(Map> clusterState) { + if (!needsBalance(new ClusterLoadState(clusterState))) { return null; } - long startTime = EnvironmentEdgeManager.currentTimeMillis(); // On clusters with lots of HFileLinks or lots of reference files, @@ -206,14 +203,13 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { if (this.localityCost != null && this.localityCost.getMultiplier() > 0) { finder = this.regionFinder; } - + // Keep track of servers to iterate through them. Cluster cluster = new Cluster(clusterState, loads, finder); double currentCost = computeCost(cluster, Double.MAX_VALUE); double initCost = currentCost; double newCost = currentCost; - long computedMaxSteps = Math.min(this.maxSteps, ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers)); // Perform a stochastic walk to see if we can get a good fit. @@ -391,6 +387,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * suggested. */ protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) { + //TODO: Fix Ticket 7227684 which causes server to be -1. + if (server == -1) { + return -1; + } // Check to see if this is just a move. if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) { // signal a move only. @@ -467,7 +467,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { @Override Pair, Pair> pick(Cluster cluster) { cluster.sortServersByRegionCount(); - int thisServer = pickMostLoadedServer(cluster, -1); + int thisServer = pickMostLoadedServer(cluster); int otherServer = pickLeastLoadedServer(cluster, thisServer); Pair regions = pickRandomRegions(cluster, thisServer, otherServer); @@ -491,17 +491,24 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { return servers[index]; } - private int pickMostLoadedServer(final Cluster cluster, int thisServer) { + /** + * Pick a random server which is loaded above average. + * + * @param cluster + * @return index of the region server picked. + */ + private int pickMostLoadedServer(final Cluster cluster) { Integer[] servers = cluster.serverIndicesSortedByRegionCount; - - int index = servers.length - 1; - while (servers[index] == null || servers[index] == thisServer) { - index--; - if (index < 0) { - return -1; + float averageLoad = (float) cluster.regions.length / cluster.servers.length; + int startIndex = 0; + for ( int i = 0 ; i < servers.length; i++) { + if (cluster.getNumRegions(servers[i]) >= averageLoad) { + startIndex = i; + break; } } - return servers[index]; + int randomServerIndex = RANDOM.nextInt(servers.length - startIndex) + startIndex; + return servers[randomServerIndex]; } } @@ -515,23 +522,30 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { @Override Pair, Pair> pick(Cluster cluster) { + Pair, Pair> nothingPicked = + new Pair, Pair>( + new Pair(-1, -1), new Pair(-1, -1)); if (this.masterServices == null) { - return new Pair, Pair>( - new Pair(-1,-1), - new Pair(-1,-1) - ); + return nothingPicked; + } + cluster.calculateRegionServerLocalities(); + // Pick lowest local region server + int thisServer = pickLowestLocalityServer(cluster); + int thisRegion; + if ( thisServer == -1) { + LOG.warn("Could not pick lowest local region server"); + return nothingPicked; + } else { + // Pick lowest local region on this server + thisRegion = pickLowestLocalRegionOnServer(cluster, thisServer); } - // Pick a random region server - int thisServer = pickRandomServer(cluster); - - // Pick a random region on this server - int thisRegion = pickRandomRegion(cluster, thisServer, 0.0f); if (thisRegion == -1) { - return new Pair, Pair>( - new Pair(-1,-1), - new Pair(-1,-1) - ); + if (cluster.regionsPerServer[thisServer].length > 0) { + LOG.warn("Could not pick lowest local region even when region server held " + + cluster.regionsPerServer[thisServer].length + " regions"); + } + return nothingPicked; } // Pick the server with the highest locality @@ -546,10 +560,19 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { ); } + private int pickLowestLocalityServer(Cluster cluster) { + return cluster.getLowestLocalityRegionServer(); + } + + private int pickLowestLocalRegionOnServer(Cluster cluster, int server) { + return cluster.getLowestLocalityRegionOnServer(server); + } + private int pickHighestLocalityServer(Cluster cluster, int thisServer, int thisRegion) { int[] regionLocations = cluster.regionLocations[thisRegion]; if (regionLocations == null || regionLocations.length <= 1) { + LOG.warn("Picking random destination as pickHighestLocalityServer did not give good result"); return pickOtherRandomServer(cluster, thisServer); } @@ -560,6 +583,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } // no location found + LOG.warn("Picking random destination as pickHighestLocalityServer did not give good result"); return pickOtherRandomServer(cluster, thisServer); } @@ -631,8 +655,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { return scaled; } - - private double getSum(double[] stats) { double total = 0; for(double s:stats) { @@ -734,7 +756,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { for (int i =0; i < cluster.numServers; i++) { stats[i] = cluster.regionsPerServer[i].length; } - return costFromArray(stats); } } @@ -820,11 +841,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } if (index < 0) { - if (regionLocations.length > 0) { - cost += 1; - } + cost += 1; } else { - cost += (double) index / (double) regionLocations.length; + cost += (1 - cluster.getLocalityOfRegion(i, index)); } } return scale(0, max, cost); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java index 9f8ea42..aac9fd6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java @@ -58,6 +58,8 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { Configuration conf = HBaseConfiguration.create(); conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 0.75f); conf.setFloat("hbase.regions.slop", 0.0f); + conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0); + conf.setInt("hbase.master.balancer.stochastic.maxSteps", 10000000); loadBalancer = new StochasticLoadBalancer(); loadBalancer.setConf(conf); } @@ -132,6 +134,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { new int[]{13, 14, 6, 10, 10, 10, 8, 10}, new int[]{130, 14, 60, 10, 100, 10, 80, 10}, new int[]{130, 140, 60, 100, 100, 100, 80, 100}, + new int[]{0, 5 , 5, 5, 5}, largeCluster, };