Index: src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java (revision 0) @@ -0,0 +1,384 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Random; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.apache.commons.lang.RandomStringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.HServerInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestLoadBalancer { + private static final Log LOG = LogFactory.getLog(TestLoadBalancer.class); + + private static Configuration conf; + + private static LoadBalancer loadBalancer; + + private static Random rand; + + @BeforeClass + public static void beforeAllTests() throws Exception { + conf = HBaseConfiguration.create(); + loadBalancer = new LoadBalancer(conf); + rand = new Random(); + } + + // int[testnum][servernumber] -> numregions + int [][] clusterStateMocks = new int [][] { + // 1 node + new int [] { 0 }, + new int [] { 1 }, + new int [] { 10 }, + // 2 node + new int [] { 0, 0 }, + new int [] { 2, 0 }, + new int [] { 2, 1 }, + new int [] { 2, 2 }, + new int [] { 2, 3 }, + new int [] { 2, 4 }, + new int [] { 1, 1 }, + new int [] { 0, 1 }, + new int [] { 10, 1 }, + new int [] { 14, 1432 }, + new int [] { 47, 53 }, + // 3 node + new int [] { 0, 1, 2 }, + new int [] { 1, 2, 3 }, + new int [] { 0, 2, 2 }, + new int [] { 0, 3, 0 }, + new int [] { 0, 4, 0 }, + new int [] { 20, 20, 0 }, + // 4 node + new int [] { 0, 1, 2, 3 }, + new int [] { 4, 0, 0, 0 }, + new int [] { 5, 0, 0, 0 }, + new int [] { 6, 6, 0, 0 }, + new int [] { 6, 2, 0, 0 }, + new int [] { 6, 1, 0, 0 }, + new int [] { 6, 0, 0, 0 }, + new int [] { 4, 4, 4, 7 }, + new int [] { 4, 4, 4, 8 }, + new int [] { 0, 0, 0, 7 }, + // 5 node + new int [] { 1, 1, 1, 1, 4 }, + // more nodes + new int [] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 }, + new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 10 }, + new int [] { 6, 6, 5, 6, 6, 6, 6, 6, 6, 1 }, + new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 54 }, + new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 55 }, + new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 56 }, + new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 16 }, + new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 8 }, + new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 9 }, + new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 10 }, + new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 123 }, + new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 155 }, + new int [] { 0, 0, 144, 1, 1, 1, 1, 1123, 133, 138, 12, 1444 }, + new int [] { 0, 0, 144, 1, 0, 4, 1, 1123, 133, 138, 12, 1444 } + }; + + int [][] regionsAndServersMocks = new int [][] { + // { num regions, num servers } + new int [] { 0, 0 }, + new int [] { 0, 1 }, + new int [] { 1, 1 }, + new int [] { 2, 1 }, + new int [] { 10, 1 }, + new int [] { 1, 2 }, + new int [] { 2, 2 }, + new int [] { 3, 2 }, + new int [] { 1, 3 }, + new int [] { 2, 3 }, + new int [] { 3, 3 }, + new int [] { 25, 3 }, + new int [] { 2, 10 }, + new int [] { 2, 100 }, + new int [] { 12, 10 }, + new int [] { 12, 100 }, + }; + + /** + * Test the load balancing algorithm. + * + * Invariant is that all servers should be hosting either + * floor(average) or ceiling(average) + * + * @throws Exception + */ + @Test + public void testBalanceCluster() throws Exception { + + for(int [] mockCluster : clusterStateMocks) { + Map> servers = mockClusterServers(mockCluster); + LOG.info("Mock Cluster : " + printMock(servers) + " " + printStats(servers)); + List plans = loadBalancer.balanceCluster(servers); + List balancedCluster = reconcile(servers, plans); + LOG.info("Mock Balance : " + printMock(balancedCluster)); + assertClusterAsBalanced(balancedCluster); + for(Map.Entry> entry : servers.entrySet()) { + returnRegions(entry.getValue()); + returnServer(entry.getKey()); + } + } + + } + + /** + * Invariant is that all servers have between floor(avg) and ceiling(avg) + * number of regions. + */ + public void assertClusterAsBalanced(List servers) { + int numServers = servers.size(); + int numRegions = 0; + int maxRegions = 0; + int minRegions = Integer.MAX_VALUE; + for(HServerInfo server : servers) { + int nr = server.getLoad().getNumberOfRegions(); + if(nr > maxRegions) { + maxRegions = nr; + } + if(nr < minRegions) { + minRegions = nr; + } + numRegions += nr; + } + if(maxRegions - minRegions < 2) { + // less than 2 between max and min, can't balance + return; + } + int min = numRegions / numServers; + int max = numRegions % numServers == 0 ? min : min + 1; + + for(HServerInfo server : servers) { + assertTrue(server.getLoad().getNumberOfRegions() <= max); + assertTrue(server.getLoad().getNumberOfRegions() >= min); + } + } + + /** + * Tests immediate assignment. + * + * Invariant is that all regions have an assignment. + * + * @throws Exception + */ + @Test + public void testImmediateAssignment() throws Exception { + for(int [] mock : regionsAndServersMocks) { + LOG.debug("testImmediateAssignment with " + mock[0] + " regions and " + mock[1] + " servers"); + List regions = randomRegions(mock[0]); + List servers = randomServers(mock[1], 0); + Map assignments = + loadBalancer.immediateAssignment(regions, servers); + assertImmediateAssignment(regions, servers, assignments); + returnRegions(regions); + returnServers(servers); + } + } + + /** + * All regions have an assignment. + * @param regions + * @param servers + * @param assignments + */ + private void assertImmediateAssignment(List regions, + List servers, Map assignments) { + for(HRegionInfo region : regions) { + assertTrue(assignments.containsKey(region)); + } + } + + /** + * Tests the bulk assignment used during cluster startup. + * + * Round-robin. Should yield a balanced cluster so same invariant as the load + * balancer holds, all servers holding either floor(avg) or ceiling(avg). + * + * @throws Exception + */ + @Test + public void testBulkAssignment() throws Exception { + for(int [] mock : regionsAndServersMocks) { + LOG.debug("testBulkAssignment with " + mock[0] + " regions and " + mock[1] + " servers"); + List regions = randomRegions(mock[0]); + List servers = randomServers(mock[1], 0); + Map> assignments = + loadBalancer.bulkAssignment(regions, servers); + float average = (float)regions.size()/servers.size(); + int min = (int)Math.floor(average); + int max = (int)Math.ceil(average); + if(assignments != null && !assignments.isEmpty()) { + for(List regionList : assignments.values()) { + assertTrue(regionList.size() == min || regionList.size() == max); + } + } + returnRegions(regions); + returnServers(servers); + } + } + + private String printStats(Map> servers) { + int numServers = servers.size(); + int totalRegions = 0; + for(HServerInfo server : servers.keySet()) { + totalRegions += server.getLoad().getNumberOfRegions(); + } + float average = (float)totalRegions / numServers; + int max = (int)Math.ceil(average); + int min = (int)Math.floor(average); + return "[srvr=" + numServers + " rgns=" + totalRegions + " avg=" + average + " max=" + max + " min=" + min + "]"; + } + + private String printMock(Map> servers) { + return printMock(Arrays.asList(servers.keySet().toArray(new HServerInfo[servers.size()]))); + } + + private String printMock(List balancedCluster) { + SortedSet sorted = new TreeSet(balancedCluster); + HServerInfo [] arr = sorted.toArray(new HServerInfo[sorted.size()]); + StringBuilder sb = new StringBuilder(sorted.size() * 4 + 4); + sb.append("{ "); + for(int i=0;i reconcile( + Map> servers, List plans) { + if(plans != null) { + for(RegionPlan plan : plans) { + plan.getSource().getLoad().setNumberOfRegions( + plan.getSource().getLoad().getNumberOfRegions() - 1); + plan.getDestination().getLoad().setNumberOfRegions( + plan.getDestination().getLoad().getNumberOfRegions() + 1); + } + } + return Arrays.asList(servers.keySet().toArray(new HServerInfo[servers.size()])); + } + + private Map> mockClusterServers( + int [] mockCluster) { + int numServers = mockCluster.length; + Map> servers = + new TreeMap>(); + for(int i=0;i regions = randomRegions(numRegions); + servers.put(server, regions); + } + return servers; + } + + private Queue regionQueue = new LinkedList(); + + private List randomRegions(int numRegions) { + List regions = new ArrayList(numRegions); + byte [] start = new byte[16]; + byte [] end = new byte[16]; + rand.nextBytes(start); + rand.nextBytes(end); + for(int i=0;i regions) { + regionQueue.addAll(regions); + } + + private Queue serverQueue = new LinkedList(); + + private HServerInfo randomServer(int numRegions) { + if(!serverQueue.isEmpty()) { + HServerInfo server = this.serverQueue.poll(); + server.getLoad().setNumberOfRegions(numRegions); + return server; + } + String host = RandomStringUtils.random(16); + int port = rand.nextInt(60000); + long startCode = rand.nextLong(); + HServerInfo hsi = + new HServerInfo(new HServerAddress(host, port), startCode, port, host); + hsi.getLoad().setNumberOfRegions(numRegions); + return hsi; + } + + private List randomServers(int numServers, int numRegionsPerServer) { + List servers = new ArrayList(numServers); + for(int i=0;i servers) { + serverQueue.addAll(servers); + } +} Index: src/main/java/org/apache/hadoop/hbase/HServerInfo.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/HServerInfo.java (revision 961064) +++ src/main/java/org/apache/hadoop/hbase/HServerInfo.java (working copy) @@ -23,6 +23,7 @@ import java.io.DataOutput; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Comparator; import java.util.Set; import org.apache.hadoop.hbase.regionserver.HRegionServer; @@ -243,6 +244,17 @@ } /** + * Orders HServerInfos by load then name. Natural/ascending order. + */ + public static class LoadComparator implements Comparator { + @Override + public int compare(HServerInfo left, HServerInfo right) { + int loadCompare = left.getLoad().compareTo(right.getLoad()); + return loadCompare != 0 ? loadCompare : left.compareTo(right); + } + } + + /** * Utility method that does a find of a servername or a hostandport combination * in the passed Set. * @param servers Set of server names Index: src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java (revision 0) @@ -0,0 +1,585 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Random; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerInfo; + +/** + * Makes decisions about the placement and movement of Regions across + * RegionServers. + * + * Cluster-wide load balancing will occur only when there are no regions in + * transition and according to a fixed period of a time using {@link #balanceCluster(Map)}. + * + * Inline region placement with {@link #immediateAssignment} can be used when + * the Master needs to handle closed regions that it currently does not have + * a destination set for. This can happen during master failover. + * + * On cluster startup, {@link #bulkAssignment} can be used to determine + * locations for all Regions in a cluster. + */ +public class LoadBalancer { + private static final Log LOG = LogFactory.getLog(LoadBalancer.class); + + // Number of seconds between each run of the load balancer + private final long balancerPeriod; + + private Random rand; + + /** + * Instantiate the load balancer with the specified configuration. + * + * This sets configuration parameters to be used by the balancing algorithms + * and launches a background thread to perform periodic load balancing. + * @param conf + */ + public LoadBalancer(Configuration conf) { + balancerPeriod = conf.getLong("hbase.balancer.period", 300000); + rand = new Random(); + } + + /** + * Generate a global load balancing plan according to the specified map of + * server information to the most loaded regions of each server. + * + * The load balancing invariant is that all servers are within 1 region of the + * average number of regions per server. If the average is an integer number, + * all servers will be balanced to the average. Otherwise, all servers will + * have either floor(average) or ceiling(average) regions. + * + * The algorithm is currently implemented as such: + * + *
    + *
  1. Determine the two valid numbers of regions each server should have, + * MIN=floor(average) and MAX=ceiling(average). + * + *
  2. Iterate down the most loaded servers, shedding regions from each so + * each server hosts exactly MAX regions. Stop once you reach a + * server that already has <= MAX regions. + * + *
  3. Iterate down the least loaded servers, assigning regions so each server + * has exactly MIN regions. Stop once you reach a server that + * already has >= MIN regions. + * + * Regions being assigned to underloaded servers are those that were shed + * in the previous step. It is possible that there were not enough + * regions shed to fill each underloaded server to MIN. If so we + * end up with a number of regions required to do so, neededRegions. + * + * It is also possible that we were able fill each underloaded but ended + * up with regions that were unassigned from overloaded servers but that + * still do not have assignment. + * + * If neither of these conditions hold (no regions needed to fill the + * underloaded servers, no regions leftover from overloaded servers), + * we are done and return. Otherwise we handle these cases below. + * + *
  4. If neededRegions is non-zero (still have underloaded servers), + * we iterate the most loaded servers again, shedding a single server from + * each (this brings them from having MAX regions to having + * MIN regions). + * + *
  5. We now definitely have more regions that need assignment, either from + * the previous step or from the original shedding from overloaded servers. + * + * Iterate the least loaded servers filling each to MIN. + * + *
  6. If we still have more regions that need assignment, again iterate the + * least loaded servers, this time giving each one (filling them to + * MAX) until we run out. + * + *
  7. All servers will now either host MIN or MAX regions. + * + * In addition, any server hosting >= MAX regions is guaranteed + * to end up with MAX regions at the end of the balancing. This + * ensures the minimal number of regions possible are moved. + *
+ * + * TODO: We can at-most reassign the number of regions away from a particular + * server to be how many they report as most loaded. + * Should we just keep all assignment in memory? Any objections? + * Does this mean we need HeapSize on HMaster? Or just careful monitor? + * (current thinking is we will hold all assignments in memory) + * + * @param serverInfo map of regionservers and their load/region information to + * a list of their most loaded regions + * @return a list of regions to be moved, including source and destination, + * or null if cluster is already balanced + */ + public List balanceCluster( + Map> clusterState) { + LOG.debug("Running load balancer"); + + long startTime = System.currentTimeMillis(); + + // Make a map sorted by load and count regions + TreeMap> serversByLoad = + new TreeMap>( + new HServerInfo.LoadComparator()); + int numServers = clusterState.size(); + int numRegions = 0; + // Iterate so we can count regions as we build the map + for(Map.Entry> server : + clusterState.entrySet()) { + numRegions += server.getKey().getLoad().getNumberOfRegions(); + serversByLoad.put(server.getKey(), server.getValue()); + } + + // Check if we even need to do any load balancing + float average = (float)numRegions / numServers; // for logging + int min = numRegions / numServers; + int max = numRegions % numServers == 0 ? min : min + 1; + if(serversByLoad.lastKey().getLoad().getNumberOfRegions() <= max && + serversByLoad.firstKey().getLoad().getNumberOfRegions() >= min) { + // Skipped because no server outside (min,max) range + if(LOG.isDebugEnabled()) { + LOG.debug("Skipping load balancing. servers=" + numServers + " " + + "regions=" + numRegions + " average=" + average + " " + + "mostloaded=" + serversByLoad.lastKey().getLoad().getNumberOfRegions() + + " leastloaded=" + serversByLoad.lastKey().getLoad().getNumberOfRegions()); + } + return null; + } + + // Balance the cluster + // TODO: Look at data block locality or a more complex load to do this + List regionsToMove = new ArrayList(); + int regionidx = 0; // track the index in above list for setting destination + + // Walk down most loaded, pruning each to the max + int serversOverloaded = 0; + Map serverBalanceInfo = + new TreeMap(); + for(Map.Entry> server : + serversByLoad.descendingMap().entrySet()) { + HServerInfo serverInfo = server.getKey(); + int regionCount = serverInfo.getLoad().getNumberOfRegions(); + if(regionCount <= max) { + serverBalanceInfo.put(serverInfo, new BalanceInfo(0, 0)); + break; + } + serversOverloaded++; + List regions = server.getValue(); + int numToOffload = Math.min(regionCount - max, regions.size()); + for(int i=0; i> server : + serversByLoad.entrySet()) { + int regionCount = server.getKey().getLoad().getNumberOfRegions(); + if(regionCount >= min) { + break; + } + serversUnderloaded++; + int numToTake = min - regionCount; + int numTaken = 0; + while(numTaken < numToTake && regionidx < regionsToMove.size()) { + regionsToMove.get(regionidx).setDestination(server.getKey()); + numTaken++; + regionidx++; + } + serverBalanceInfo.put(server.getKey(), new BalanceInfo(0, numTaken)); + // If we still want to take some, increment needed + if(numTaken < numToTake) { + neededRegions += (numToTake - numTaken); + } + } + + // If none needed to fill all to min and none left to drain all to max, + // we are done + if(neededRegions == 0 && regionidx == regionsToMove.size()) { + long endTime = System.currentTimeMillis(); + LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " + + "Moving " + regionsToMove.size() + " regions off of " + + serversOverloaded + " overloaded servers onto " + + serversUnderloaded + " less loaded servers"); + return regionsToMove; + } + + // Need to do a second pass. + // Either more regions to assign out or servers that are still underloaded + + // If we need more to fill min, grab one from each most loaded until enough + if(neededRegions != 0) { + // Walk down most loaded, grabbing one from each until we get enough + for(Map.Entry> server : + serversByLoad.descendingMap().entrySet()) { + BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey()); + int idx = + balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload(); + HRegionInfo region = server.getValue().get(idx); + regionsToMove.add(new RegionPlan(region, server.getKey(), null)); + if(--neededRegions == 0) { + // No more regions needed, done shedding + break; + } + } + } + + // Now we have a set of regions that must be all assigned out + // Assign each underloaded up to the min, then if leftovers, assign to max + + // Walk down least loaded, assigning to each to fill up to min + for(Map.Entry> server : + serversByLoad.entrySet()) { + int regionCount = server.getKey().getLoad().getNumberOfRegions(); + BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey()); + if(balanceInfo != null) { + regionCount += balanceInfo.getNumRegionsAdded(); + } + if(regionCount >= min) { + break; + } + int numToTake = min - regionCount; + int numTaken = 0; + while(numTaken < numToTake && regionidx < regionsToMove.size()) { + regionsToMove.get(regionidx).setDestination(server.getKey()); + numTaken++; + regionidx++; + } + } + + // If we still have regions to dish out, assign underloaded to max + if(regionidx != regionsToMove.size()) { + for(Map.Entry> server : + serversByLoad.entrySet()) { + int regionCount = server.getKey().getLoad().getNumberOfRegions(); + if(regionCount >= max) { + break; + } + regionsToMove.get(regionidx).setDestination(server.getKey()); + regionidx++; + if(regionidx == regionsToMove.size()) { + break; + } + } + } + + long endTime = System.currentTimeMillis(); + + assert(regionidx == regionsToMove.size()); + assert(neededRegions == 0); + + // All done! + LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " + + "Moving " + regionsToMove.size() + " regions off of " + + serversOverloaded + " overloaded servers onto " + + serversUnderloaded + " less loaded servers"); + + return regionsToMove; + } + + /** + * Stores additional per-server information about the regions added/removed + * during the run of the balancing algorithm. + * + * For servers that receive additional regions, we are not updating the number + * of regions in HServerInfo once we decide to reassign regions to a server, + * but we need this information later in the algorithm. This is stored in + * numRegionsAdded. + * + * For servers that shed regions, we need to track which regions we have + * already shed. nextRegionForUnload contains the index in the list + * of regions on the server that is the next to be shed. + */ + private static class BalanceInfo { + + private final int nextRegionForUnload; + private final int numRegionsAdded; + + public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) { + this.nextRegionForUnload = nextRegionForUnload; + this.numRegionsAdded = numRegionsAdded; + } + + public int getNextRegionForUnload() { + return nextRegionForUnload; + } + + public int getNumRegionsAdded() { + return numRegionsAdded; + } + } + + /** + * Generates a bulk assignment plan to be used on cluster startup. + * + * Takes a list of all the regions and all the servers in the cluster and + * returns a map of each server to the regions that it should be assigned. + * + * Currently implemented as a round-robin assignment. Same invariant as + * load balancing, all servers holding floor(avg) or ceiling(avg). + * + * TODO: Use block locations from HDFS to place regions with their blocks + * + * @param regions all regions + * @param servers all servers + * @return map of server to the regions it should take, or null if no + * assignment is possible (ie. no regions or no servers) + */ + public Map> bulkAssignment( + List regions, List servers) { + if(regions.size() == 0 || servers.size() == 0) { + return null; + } + Map> assignments = + new TreeMap>(); + int numRegions = regions.size(); + int numServers = servers.size(); + int max = (int)Math.ceil((float)numRegions/numServers); + int serverIdx = 0; + for(HServerInfo server : servers) { + List serverRegions = new ArrayList(max); + for(int i=serverIdx;i getTopBlockLocations(FileSystem fs, HRegionInfo region) + throws IOException { + String encodedName = region.getEncodedName(); + Path path = new Path("/hbase/table/" + encodedName); + FileStatus status = fs.getFileStatus(path); + BlockLocation [] blockLocations = + fs.getFileBlockLocations(status, 0, status.getLen()); + Map hostWeights = + new TreeMap(new HostAndWeight.HostComparator()); + for(BlockLocation bl : blockLocations) { + String [] hosts = bl.getHosts(); + long len = bl.getLength(); + for(String host : hosts) { + HostAndWeight haw = hostWeights.get(host); + if(haw == null) { + haw = new HostAndWeight(host, len); + hostWeights.put(haw, haw); + } else { + haw.addWeight(len); + } + } + } + NavigableSet orderedHosts = new TreeSet( + new HostAndWeight.WeightComparator()); + orderedHosts.addAll(hostWeights.values()); + List topHosts = new ArrayList(orderedHosts.size()); + for(HostAndWeight haw : orderedHosts.descendingSet()) { + topHosts.add(haw.getHost()); + } + return topHosts; + } + + /** + * Stores the hostname and weight for that hostname. + * + * This is used when determining the physical locations of the blocks making + * up a region. + * + * To make a prioritized list of the hosts holding the most data of a region, + * this class is used to count the total weight for each host. The weight is + * currently just the size of the file. + */ + private static class HostAndWeight { + + private final String host; + private long weight; + + public HostAndWeight(String host, long weight) { + this.host = host; + this.weight = weight; + } + + public void addWeight(long weight) { + this.weight += weight; + } + + public String getHost() { + return host; + } + + public long getWeight() { + return weight; + } + + private static class HostComparator implements Comparator { + @Override + public int compare(HostAndWeight l, HostAndWeight r) { + return l.getHost().compareTo(r.getHost()); + } + } + + private static class WeightComparator implements Comparator { + @Override + public int compare(HostAndWeight l, HostAndWeight r) { + if(l.getWeight() == r.getWeight()) { + return l.getHost().compareTo(r.getHost()); + } + return l.getWeight() < r.getWeight() ? -1 : 1; + } + } + } + + /** + * Generates an immediate assignment plan to be used by a new master for + * regions in transition that do not have an already known destination. + * + * Takes a list of regions that need immediate assignment and a list of + * all available servers. Returns a map of regions to the server they + * should be assigned to. + * + * This method will return quickly and does not do any intelligent + * balancing. The goal is to make a fast decision not the best decision + * possible. + * + * Currently this is random. + * + * @param regions + * @param servers + * @return map of regions to the server it should be assigned to + */ + public Map immediateAssignment( + List regions, List servers) { + Map assignments = + new TreeMap(); + for(HRegionInfo region : regions) { + assignments.put(region, servers.get(rand.nextInt(servers.size()))); + } + return assignments; + } + + /** + * Stores the plan for the move of an individual region. + * + * Contains info for the region being moved, info for the server the region + * should be moved from, and info for the server the region should be moved + * to. + * + * The comparable implementation of this class compares only the region + * information and not the source/dest server info. + */ + public static class RegionPlan implements Comparable { + + private final HRegionInfo region; + private final HServerInfo source; + private HServerInfo dest; + + /** + * Instantiate a plan for a region move, moving the specified region from + * the specified source server to the specified destination server. + * + * Destination server can be instantiated as null and later set + * with {@link #setDestination(HServerInfo)}. + * + * @param region region to be moved + * @param source regionserver region should be moved from + * @param dest regionserver region should be moved to + */ + public RegionPlan(HRegionInfo region, HServerInfo source, HServerInfo dest) { + this.region = region; + this.source = source; + this.dest = dest; + } + + /** + * Set the destination server for the plan for this region. + */ + public void setDestination(HServerInfo dest) { + this.dest = dest; + } + + /** + * Get the source server for the plan for this region. + * @return server info for source + */ + public HServerInfo getSource() { + return source; + } + + /** + * Get the destination server for the plan for this region. + * @return server info for destination + */ + public HServerInfo getDestination() { + return dest; + } + + /** + * Get the region information for the region this plan is for. + * @return region info + */ + public HRegionInfo getRegionInfo() { + return region; + } + + /** + * Compare the region info. + * @param o region plan you are comparing against + */ + @Override + public int compareTo(RegionPlan o) { + return getRegionInfo().compareTo(o.getRegionInfo()); + } + } +} Property changes on: src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java ___________________________________________________________________ Added: svn:executable + *