diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index f527931..42bf65f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -40,6 +40,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.ServerName; @@ -100,6 +101,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { ArrayList tables; HRegionInfo[] regions; Deque[] regionLoads; + private RegionLocationFinder regionFinder; int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality @@ -124,12 +126,14 @@ public abstract class BaseLoadBalancer implements LoadBalancer { boolean hasRegionReplicas = false; //whether there is regions with replicas Integer[] serverIndicesSortedByRegionCount; + Integer[] serverIndicesSortedByLocality; Map serversToIndex; Map hostsToIndex; Map racksToIndex; Map tablesToIndex; Map regionsToIndex; + float[] localityPerServer; int numServers; int numHosts; @@ -177,6 +181,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { List> serversPerHostList = new ArrayList>(); List> serversPerRackList = new ArrayList>(); this.clusterState = clusterState; + this.regionFinder = regionFinder; // Use servername and port as there can be dead servers in this list. We want everything with // a matching hostname and port to have the same index. @@ -220,6 +225,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer { regionLoads = new Deque[numRegions]; regionLocations = new int[numRegions][]; serverIndicesSortedByRegionCount = new Integer[numServers]; + serverIndicesSortedByLocality = new Integer[numServers]; + localityPerServer = new float[numServers]; serverIndexToHostIndex = new int[numServers]; serverIndexToRackIndex = new int[numServers]; @@ -251,6 +258,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } primariesOfRegionsPerServer[serverIndex] = new int[regionsPerServer[serverIndex].length]; serverIndicesSortedByRegionCount[serverIndex] = serverIndex; + serverIndicesSortedByLocality[serverIndex] = serverIndex; } hosts = new String[numHosts]; @@ -753,6 +761,123 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } }; + void sortServersByLocality() { + Arrays.sort(serverIndicesSortedByLocality, localityComparator); + } + + float getLocality(int server) { + return localityPerServer[server]; + } + + private Comparator localityComparator = new Comparator() { + @Override + public int compare(Integer integer, Integer integer2) { + float locality1 = getLocality(integer); + float locality2 = getLocality(integer2); + if (locality1 < locality2) { + return -1; + } else if (locality1 > locality2) { + return 1; + } else { + return 0; + } + } + }; + + int getLowestLocalityRegionServer() { + if (regionFinder == null) { + return -1; + } else { + sortServersByLocality(); + // We want to find server with non zero regions having lowest locality. + int i = 0; + int lowestLocalServerIndex = serverIndicesSortedByLocality[i]; + while (localityPerServer[lowestLocalServerIndex] == 0 + && (regionsPerServer[lowestLocalServerIndex].length == 0)) { + i++; + lowestLocalServerIndex = serverIndicesSortedByLocality[i]; + } + LOG.debug("Lowest locality region server with non zero regions is " + + servers[lowestLocalServerIndex].getHostname() + " with locality " + + localityPerServer[lowestLocalServerIndex]); + return lowestLocalServerIndex; + } + } + + int getLowestLocalityRegionOnServer(int serverIndex) { + if (regionFinder != null) { + float lowestLocality = 1.0f; + int lowestLocalRegionIndex = 0; + if (regionsPerServer[serverIndex].length == 0) { + // No regions on that region server + return -1; + } + for (int j = 0; j < regionsPerServer[serverIndex].length; j++) { + int regionIndex = regionsPerServer[serverIndex][j]; + HDFSBlocksDistribution distribution = regionFinder + .getBlockDistribution(regions[regionIndex]); + float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname()); + if (locality < lowestLocality) { + lowestLocality = locality; + lowestLocalRegionIndex = j; + } + } + LOG.debug(" Lowest locality region index is " + lowestLocalRegionIndex + + " and its region server contains " + regionsPerServer[serverIndex].length + + " regions"); + return regionsPerServer[serverIndex][lowestLocalRegionIndex]; + } else { + return -1; + } + } + + float getLocalityOfRegion(int region, int server) { + if (regionFinder != null) { + HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]); + return distribution.getBlockLocalityIndex(servers[server].getHostname()); + } else { + return 0f; + } + } + + int getLeastLoadedTopServerForRegion(int region) { + if (regionFinder != null) { + List topLocalServers = regionFinder.getTopBlockLocations(regions[region]); + int leastLoadedServerIndex = -1; + int load = Integer.MAX_VALUE; + for (ServerName sn : topLocalServers) { + int index = serversToIndex.get(sn); + int tempLoad = regionsPerServer[index].length; + if (tempLoad <= load) { + leastLoadedServerIndex = index; + load = tempLoad; + } + } + return leastLoadedServerIndex; + } else { + return -1; + } + } + + void calculateRegionServerLocalities() { + if (regionFinder == null) { + LOG.warn("Region location finder found null, skipping locality calculations."); + return; + } + for (int i = 0; i < regionsPerServer.length; i++) { + HDFSBlocksDistribution distribution = new HDFSBlocksDistribution(); + if (regionsPerServer[i].length > 0) { + for (int j = 0; j < regionsPerServer[i].length; j++) { + int regionIndex = regionsPerServer[i][j]; + distribution.add(regionFinder.getBlockDistribution(regions[regionIndex])); + } + } else { + LOG.debug("Server " + servers[i].getHostname() + " had 0 regions."); + } + localityPerServer[i] = distribution.getBlockLocalityIndex(servers[i].getHostname()); + } + } + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SBSC_USE_STRINGBUFFER_CONCATENATION", justification="Not important but should be fixed") @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java index f201417..c0c05d7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -31,17 +30,18 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterStatus; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.regionserver.HRegion; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.google.common.collect.Lists; /** * This will find where data for a region is located in HDFS. It ranks @@ -54,31 +54,27 @@ class RegionLocationFinder { private static final Log LOG = LogFactory.getLog(RegionLocationFinder.class); private Configuration conf; - private ClusterStatus status; + private volatile ClusterStatus status; private MasterServices services; - private CacheLoader> loader = - new CacheLoader>() { + private CacheLoader loader = + new CacheLoader() { - @Override - public List load(HRegionInfo key) throws Exception { - List servers = internalGetTopBlockLocation(key); - if (servers == null) { - return new LinkedList(); - } - return servers; - } - }; + @Override + public HDFSBlocksDistribution load(HRegionInfo key) throws Exception { + return internalGetTopBlockLocation(key); + } + }; // The cache for where regions are located. - private LoadingCache> cache = null; + private LoadingCache cache = null; /** * Create a cache for region to list of servers * @param mins Number of mins to cache * @return A new Cache. */ - private LoadingCache> createCache(int mins) { + private LoadingCache createCache(int mins) { return CacheBuilder.newBuilder().expireAfterAccess(mins, TimeUnit.MINUTES).build(loader); } @@ -100,14 +96,9 @@ class RegionLocationFinder { } protected List getTopBlockLocations(HRegionInfo region) { - List servers = null; - try { - servers = cache.get(region); - } catch (ExecutionException ex) { - servers = new LinkedList(); - } - return servers; - + HDFSBlocksDistribution blocksDistribution = getBlockDistribution(region); + List topHosts = blocksDistribution.getTopHosts(); + return mapHostNameToServerName(topHosts); } /** @@ -119,22 +110,20 @@ class RegionLocationFinder { * @param region region * @return ordered list of hosts holding blocks of the specified region */ - protected List internalGetTopBlockLocation(HRegionInfo region) { - List topServerNames = null; + protected HDFSBlocksDistribution internalGetTopBlockLocation(HRegionInfo region) { try { HTableDescriptor tableDescriptor = getTableDescriptor(region.getTable()); if (tableDescriptor != null) { HDFSBlocksDistribution blocksDistribution = HRegion.computeHDFSBlocksDistribution(getConf(), tableDescriptor, region); - List topHosts = blocksDistribution.getTopHosts(); - topServerNames = mapHostNameToServerName(topHosts); + return blocksDistribution; } } catch (IOException ioe) { - LOG.debug("IOException during HDFSBlocksDistribution computation. for " + "region = " + LOG.warn("IOException during HDFSBlocksDistribution computation. for " + "region = " + region.getEncodedName(), ioe); } - return topServerNames; + return new HDFSBlocksDistribution(); } /** @@ -167,7 +156,10 @@ class RegionLocationFinder { */ protected List mapHostNameToServerName(List hosts) { if (hosts == null || status == null) { - return null; + if (hosts == null) { + LOG.warn("RegionLocationFinder top hosts is null"); + } + return Lists.newArrayList(); } List topServerNames = new ArrayList(); @@ -189,4 +181,25 @@ class RegionLocationFinder { } return topServerNames; } + + public HDFSBlocksDistribution getBlockDistribution(HRegionInfo hri) { + HDFSBlocksDistribution blockDistbn = null; + try { + if (cache.asMap().containsKey(hri)) { + blockDistbn = cache.get(hri); + return blockDistbn; + } else { + LOG.debug("HDFSBlocksDistribution not found in cache for region " + + hri.getRegionNameAsString()); + blockDistbn = internalGetTopBlockLocation(hri); + cache.put(hri, blockDistbn); + return blockDistbn; + } + } catch (ExecutionException e) { + LOG.warn("Error while fetching cache entry ", e); + blockDistbn = internalGetTopBlockLocation(hri); + cache.put(hri, blockDistbn); + return blockDistbn; + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index b6b4691..3a3a0d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -291,7 +291,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { break; } } - long endTime = EnvironmentEdgeManager.currentTime(); metricsBalancer.balanceCluster(endTime - startTime); @@ -540,7 +539,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { @Override Cluster.Action generate(Cluster cluster) { cluster.sortServersByRegionCount(); - int thisServer = pickMostLoadedServer(cluster, -1); + int thisServer = pickMostLoadedServer(cluster); int otherServer = pickLeastLoadedServer(cluster, thisServer); return pickRandomRegions(cluster, thisServer, otherServer); @@ -559,17 +558,24 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { return servers[index]; } - private int pickMostLoadedServer(final Cluster cluster, int thisServer) { + /** + * Pick a random server which is loaded above average. + * + * @param cluster + * @return index of the region server picked. + */ + private int pickMostLoadedServer(final Cluster cluster) { Integer[] servers = cluster.serverIndicesSortedByRegionCount; - - int index = servers.length - 1; - while (servers[index] == null || servers[index] == thisServer) { - index--; - if (index < 0) { - return -1; + int averageLoad = cluster.regions.length / cluster.servers.length; + int startIndex = 0; + for (int i = 0 ; i < servers.length; i++) { + if (cluster.getNumRegions(servers[i]) >= averageLoad) { + startIndex = i; + break; } } - return servers[index]; + int randomServerIndex = RANDOM.nextInt(servers.length - startIndex) + startIndex; + return servers[randomServerIndex]; } } @@ -586,44 +592,42 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { if (this.masterServices == null) { return Cluster.NullAction; } - // Pick a random region server - int thisServer = pickRandomServer(cluster); - // Pick a random region on this server - int thisRegion = pickRandomRegion(cluster, thisServer, 0.0f); + cluster.calculateRegionServerLocalities(); + // Pick server with lowest locality + int thisServer = pickLowestLocalityServer(cluster); + int thisRegion; + if (thisServer == -1) { + LOG.warn("Could not pick lowest local region server"); + return Cluster.NullAction; + } else { + // Pick lowest local region on this server + thisRegion = pickLowestLocalRegionOnServer(cluster, thisServer); + } if (thisRegion == -1) { return Cluster.NullAction; } - // Pick the server with the highest locality - int otherServer = pickHighestLocalityServer(cluster, thisServer, thisRegion); + // Pick the least loaded server with good locality for the region + int otherServer = cluster.getLeastLoadedTopServerForRegion(thisRegion); if (otherServer == -1) { return Cluster.NullAction; } - // pick an region on the other server to potentially swap - int otherRegion = this.pickRandomRegion(cluster, otherServer, 0.5f); + // Let the candidate region be moved to its highest local server. + int otherRegion = -1; return getAction(thisServer, thisRegion, otherServer, otherRegion); } - private int pickHighestLocalityServer(Cluster cluster, int thisServer, int thisRegion) { - int[] regionLocations = cluster.regionLocations[thisRegion]; - - if (regionLocations == null || regionLocations.length <= 1) { - return pickOtherRandomServer(cluster, thisServer); - } - - for (int loc : regionLocations) { - if (loc >= 0 && loc != thisServer) { // find the first suitable server - return loc; - } - } + private int pickLowestLocalityServer(Cluster cluster) { + return cluster.getLowestLocalityRegionServer(); + } - // no location found - return pickOtherRandomServer(cluster, thisServer); + private int pickLowestLocalRegionOnServer(Cluster cluster, int server) { + return cluster.getLowestLocalityRegionOnServer(server); } void setServices(MasterServices services) { @@ -1067,11 +1071,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } if (index < 0) { - if (regionLocations.length > 0) { - cost += 1; - } + cost += 1; } else { - cost += (double) index / (double) regionLocations.length; + cost += (1 - cluster.getLocalityOfRegion(i, index)); } } return scale(0, max, cost); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java index 000e331..e2c7549 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java @@ -48,14 +48,14 @@ import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster; import org.apache.hadoop.hbase.testclassification.FlakeyTests; -import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.net.DNSToSwitchMapping; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -@Category({FlakeyTests.class, MediumTests.class}) +@Category({FlakeyTests.class, LargeTests.class}) public class TestStochasticLoadBalancer extends BalancerTestBase { public static final String REGION_KEY = "testRegion"; private static StochasticLoadBalancer loadBalancer; @@ -68,6 +68,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { conf.setClass("hbase.util.ip.to.rack.determiner", MockMapping.class, DNSToSwitchMapping.class); conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 0.75f); conf.setFloat("hbase.regions.slop", 0.0f); + conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0); loadBalancer = new StochasticLoadBalancer(); loadBalancer.setConf(conf); } @@ -142,6 +143,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { new int[]{13, 14, 6, 10, 10, 10, 8, 10}, new int[]{130, 14, 60, 10, 100, 10, 80, 10}, new int[]{130, 140, 60, 100, 100, 100, 80, 100}, + new int[]{0, 5 , 5, 5, 5}, largeCluster, }; @@ -496,6 +498,8 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { @Test public void testLargeCluster() { + conf.setInt("hbase.master.balancer.stochastic.stepsPerRegion", 1000); + conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 20000000L); int numNodes = 1000; int numRegions = 100000; //100 regions per RS int numRegionsPerServer = 80; //all servers except one @@ -528,11 +532,12 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); } - @Test (timeout = 800000) + @Test (timeout = 1000000) public void testRegionReplicasOnLargeCluster() { + conf.setInt("hbase.master.balancer.stochastic.stepsPerRegion", 5000); + conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 20000000L); conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); - conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L); - conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec + conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 900 * 1000); // 900 sec loadBalancer.setConf(conf); int numNodes = 1000; int numRegions = 20 * numNodes; // 20 * replication regions per RS