Index: src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java (revision 1032133) +++ src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java (working copy) @@ -19,6 +19,7 @@ */ package org.apache.hadoop.hbase.master; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.util.ArrayList; @@ -28,11 +29,11 @@ import java.util.Map; import java.util.Queue; import java.util.Random; +import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; -import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HRegionInfo; @@ -239,7 +240,7 @@ List regions = randomRegions(mock[0]); List servers = randomServers(mock[1], 0); Map> assignments = - LoadBalancer.bulkAssignment(regions, servers); + LoadBalancer.roundRobinAssignment(regions, servers); float average = (float)regions.size()/servers.size(); int min = (int)Math.floor(average); int max = (int)Math.ceil(average); @@ -253,6 +254,80 @@ } } + /** + * Test the cluster startup bulk assignment which attempts to retain + * assignment info. + * @throws Exception + */ + @Test + public void testRetainAssignment() throws Exception { + // Test simple case where all same servers are there + List servers = randomServers(10, 10); + List regions = randomRegions(100); + Map existing = + new TreeMap(); + for (int i=0;i> assignment = + LoadBalancer.retainAssignment(existing, servers); + assertRetainedAssignment(existing, servers, assignment); + + // Include two new servers that were not there before + List servers2 = new ArrayList(servers); + servers2.add(randomServer(10)); + servers2.add(randomServer(10)); + assignment = LoadBalancer.retainAssignment(existing, servers2); + assertRetainedAssignment(existing, servers2, assignment); + + // Remove two of the servers that were previously there + List servers3 = new ArrayList(servers); + servers3.remove(servers3.size()-1); + servers3.remove(servers3.size()-2); + assignment = LoadBalancer.retainAssignment(existing, servers3); + assertRetainedAssignment(existing, servers3, assignment); + } + + /** + * Asserts a valid retained assignment plan. + *

+ * Must meet the following conditions: + *

    + *
  • Every input region has an assignment, and to an online server + *
  • If a region had an existing assignment to a server with the same + * address a a currently online server, it will be assigned to it + *
+ * @param existing + * @param servers + * @param assignment + */ + private void assertRetainedAssignment( + Map existing, List servers, + Map> assignment) { + // Verify condition 1, every region assigned, and to online server + Set onlineServerSet = new TreeSet(servers); + Set assignedRegions = new TreeSet(); + for (Map.Entry> a : assignment.entrySet()) { + assertTrue("Region assigned to server that was not listed as online", + onlineServerSet.contains(a.getKey())); + for (HRegionInfo r : a.getValue()) assignedRegions.add(r); + } + assertEquals(existing.size(), assignedRegions.size()); + + // Verify condition 2, if server had existing assignment, must have same + Set onlineAddresses = new TreeSet(); + for (HServerInfo s : servers) onlineAddresses.add(s.getServerAddress()); + for (Map.Entry> a : assignment.entrySet()) { + for (HRegionInfo r : a.getValue()) { + HServerAddress address = existing.get(r); + if (address != null && onlineAddresses.contains(address)) { + assertTrue(a.getKey().getServerAddress().equals(address)); + } + } + } + } + private String printStats(Map> servers) { int numServers = servers.size(); int totalRegions = 0; Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1032133) +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -46,6 +46,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.NotServingRegionException; @@ -54,7 +55,6 @@ import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.catalog.RootLocationEditor; -import org.apache.hadoop.hbase.client.MetaScanner; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.RegionTransitionData; @@ -1106,23 +1106,33 @@ * should be shutdown. */ public void assignAllUserRegions() throws IOException { - // First experiment at synchronous assignment - // Simpler because just wait for no regions in transition - // Scan META for all user regions; do not include offlined regions in list. - List allRegions = - MetaScanner.listAllRegions(master.getConfiguration(), false); - if (allRegions == null || allRegions.isEmpty()) return; + Map> bulkPlan = null; // Get all available servers List servers = serverManager.getOnlineServersList(); + + // Scan META for all user regions, skipping any disabled tables + Map allRegions = + MetaReader.fullScan(catalogTracker, disabledTables); + if (allRegions == null || allRegions.isEmpty()) return; + + // Determine what type of assignment to do on startup + boolean retainAssignment = master.getConfiguration().getBoolean( + "hbase.master.startup.retainassign", true); + + if (retainAssignment) { + // Reuse existing assignment info + bulkPlan = LoadBalancer.retainAssignment(allRegions, servers); + } else { + // Generate a round-robin bulk assignment plan + bulkPlan = LoadBalancer.roundRobinAssignment( + new ArrayList(allRegions.keySet()), servers); + } + LOG.info("Bulk assigning " + allRegions.size() + " region(s) across " + servers.size() + " server(s)"); - // Generate a cluster startup region placement plan - Map> bulkPlan = - LoadBalancer.bulkAssignment(allRegions, servers); - // Make a fixed thread count pool to run bulk assignments. Thought is that // if a 1k cluster, running 1k bulk concurrent assignment threads will kill // master, HDFS or ZK? Index: src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java (revision 1032133) +++ src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java (working copy) @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HServerInfo; /** @@ -51,7 +52,7 @@ * *

On cluster startup, {@link #bulkAssignment} can be used to determine * locations for all Regions in a cluster. - * + * *

This classes produces plans for the {@link AssignmentManager} to execute. */ public class LoadBalancer { @@ -337,11 +338,12 @@ } /** - * Generates a bulk assignment plan to be used on cluster startup. - * + * Generates a bulk assignment plan to be used on cluster startup using a + * simple round-robin assignment. + *

* Takes a list of all the regions and all the servers in the cluster and * returns a map of each server to the regions that it should be assigned. - * + *

* Currently implemented as a round-robin assignment. Same invariant as * load balancing, all servers holding floor(avg) or ceiling(avg). * @@ -352,7 +354,7 @@ * @return map of server to the regions it should take, or null if no * assignment is possible (ie. no regions or no servers) */ - public static Map> bulkAssignment( + public static Map> roundRobinAssignment( List regions, List servers) { if(regions.size() == 0 || servers.size() == 0) { return null; @@ -375,6 +377,45 @@ } /** + * Generates a bulk assignment startup plan, attempting to reuse the existing + * assignment information from META, but adjusting for the specified list of + * available/online servers available for assignment. + *

+ * Takes a map of all regions to their existing assignment from META. Also + * takes a list of online servers for regions to be assigned to. Attempts to + * retain all assignment, so in some instances initial assignment will not be + * completely balanced. + *

+ * Any leftover regions without an existing server to be assigned to will be + * assigned randomly to available servers. + * @param regions regions and existing assignment from meta + * @param servers available servers + * @return map of servers and regions to be assigned to them + */ + public static Map> retainAssignment( + Map regions, List servers) { + Map> assignments = + new TreeMap>(); + // Build a map of server addresses to server info so we can match things up + Map serverMap = + new TreeMap(); + for (HServerInfo server : servers) { + serverMap.put(server.getServerAddress(), server); + assignments.put(server, new ArrayList()); + } + for (Map.Entry region : regions.entrySet()) { + HServerInfo server = serverMap.get(region.getValue()); + if (server != null) { + assignments.get(server).add(region.getKey()); + } else { + assignments.get(servers.get(rand.nextInt(assignments.size()))).add( + region.getKey()); + } + } + return assignments; + } + + /** * Find the block locations for all of the files for the specified region. * * Returns an ordered list of hosts that are hosting the blocks for this @@ -573,7 +614,7 @@ public String getRegionName() { return this.hri.getEncodedName(); } - + public HRegionInfo getRegionInfo() { return this.hri; } Index: src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java (revision 1032133) +++ src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java (working copy) @@ -24,7 +24,9 @@ import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -129,6 +131,25 @@ public static Map fullScan( CatalogTracker catalogTracker) throws IOException { + return fullScan(catalogTracker, new TreeSet()); + } + + /** + * Performs a full scan of .META., skipping regions from any + * tables in the specified set of disabled tables. + *

+ * Returns a map of every region to it's currently assigned server, according + * to META. If the region does not have an assignment it will have a null + * value in the map. + * + * @param catalogTracker + * @param disabledTables set of disabled tables that will not be returned + * @return map of regions to their currently assigned server + * @throws IOException + */ + public static Map fullScan( + CatalogTracker catalogTracker, final Set disabledTables) + throws IOException { final Map regions = new TreeMap(); Visitor v = new Visitor() { @@ -137,6 +158,8 @@ if (r == null || r.isEmpty()) return true; Pair region = metaRowToRegionPair(r); if (region == null) return true; + if (disabledTables.contains( + region.getFirst().getTableDesc().getNameAsString())) return true; regions.put(region.getFirst(), region.getSecond()); return true; }