Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1090390) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -882,25 +882,24 @@ } private synchronized void createTable(final HRegionInfo [] newRegions, - boolean sync) + final boolean sync) throws IOException { String tableName = newRegions[0].getTableDesc().getNameAsString(); if(MetaReader.tableExists(catalogTracker, tableName)) { throw new TableExistsException(tableName); } - for(HRegionInfo newRegion : newRegions) { - + for (HRegionInfo newRegion : newRegions) { // 1. Set table enabling flag up in zk. try { assignmentManager.getZKTable().setEnabledTable(tableName); } catch (KeeperException e) { throw new IOException("Unable to ensure that the table will be" + - " enabled because of a ZooKeeper issue", e); + " enabled because of a ZooKeeper issue", e); } // 2. Create HRegion HRegion region = HRegion.createHRegion(newRegion, - fileSystemManager.getRootDir(), conf); + fileSystemManager.getRootDir(), conf); // 3. Insert into META MetaEditor.addRegionToMeta(catalogTracker, region.getRegionInfo()); @@ -912,23 +911,18 @@ // 5. Trigger immediate assignment of the regions in round-robin fashion List servers = serverManager.getOnlineServersList(); - try { - this.assignmentManager.assignUserRegions(newRegions, servers); - } catch (InterruptedException ie) { - LOG.error("Caught " + ie + " during round-robin assignment"); - throw new IOException(ie); - } + this.assignmentManager.bulkAssignUserRegions(newRegions, servers, sync); - // 5. If sync, wait for assignment of regions - if(sync) { - LOG.debug("Waiting for " + newRegions.length + " region(s) to be " + - "assigned before returning"); - for(HRegionInfo regionInfo : newRegions) { + // 6. If sync, wait for assignment of regions + if (sync) { + LOG.debug("Waiting for " + newRegions.length + " region(s) to be assigned"); + for (HRegionInfo regionInfo : newRegions) { try { - assignmentManager.waitForAssignment(regionInfo); + this.assignmentManager.waitForAssignment(regionInfo); } catch (InterruptedException e) { LOG.info("Interrupted waiting for region to be assigned during " + - "create table call"); + "create table call", e); + Thread.currentThread().interrupt(); return; } } Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1090390) +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -21,11 +21,11 @@ import java.io.DataInput; import java.io.DataOutput; -import java.io.EOFException; import java.io.IOException; -import java.net.ConnectException; +import java.lang.Thread.UncaughtExceptionHandler; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -893,8 +893,7 @@ } /** - * Bulk assign regions to destination. If we fail in any way, - * we'll abort the server. + * Bulk assign regions to destination. * @param destination * @param regions Regions to assign. */ @@ -931,14 +930,10 @@ if (count == total) break; Threads.sleep(1); } - // Move on to open regions. try { - // Send OPEN RPC. This can fail if the server on other end is is not up. - // If we fail, fail the startup by aborting the server. There is one - // exception we will tolerate: ServerNotRunningException. This is thrown - // between report of regionserver being up and long maxWaitTime = System.currentTimeMillis() + - this.master.getConfiguration().getLong("hbase.regionserver.rpc.startup.waittime", 60000); + this.master.getConfiguration(). + getLong("hbase.regionserver.rpc.startup.waittime", 60000); while (!this.master.isStopped()) { try { this.serverManager.sendRegionOpen(destination, regions); @@ -953,10 +948,10 @@ Thread.sleep(1000); } } - } catch (Throwable t) { - this.master.abort("Failed assignment of regions to " + destination + - "; bulk assign FAILED", t); - return; + } catch (IOException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); } LOG.debug("Bulk assigning done for " + destination.getServerName()); } @@ -1341,31 +1336,9 @@ } /** - * Assigns list of user regions in round-robin fashion, if any exist. + * Assigns all user regions, if any. Used during cluster startup. *

* This is a synchronous call and will return once every region has been - * assigned. If anything fails, an exception is thrown - * @throws InterruptedException - * @throws IOException - */ - public void assignUserRegions(HRegionInfo[] regions, List servers) throws IOException, InterruptedException { - if (regions == null) - return; - Map> bulkPlan = null; - // Generate a round-robin bulk assignment plan - bulkPlan = LoadBalancer.roundRobinAssignment(regions, servers); - LOG.info("Bulk assigning " + regions.length + " region(s) round-robin across " + - servers.size() + " server(s)"); - // Use fixed count thread pool assigning. - BulkAssigner ba = new BulkStartupAssigner(this.master, bulkPlan, this); - ba.bulkAssign(); - LOG.info("Bulk assigning done"); - } - - /** - * Assigns all user regions, if any exist. Used during cluster startup. - *

- * This is a synchronous call and will return once every region has been * assigned. If anything fails, an exception is thrown and the cluster * should be shutdown. * @throws InterruptedException @@ -1390,26 +1363,30 @@ bulkPlan = LoadBalancer.retainAssignment(allRegions, servers); } else { // assign regions in round-robin fashion - assignUserRegions(allRegions.keySet().toArray(new HRegionInfo[allRegions.size()]), servers); - return; + HRegionInfo [] regions = + allRegions.keySet().toArray(new HRegionInfo[allRegions.size()]); + bulkPlan = LoadBalancer.roundRobinAssignment(regions, servers); } LOG.info("Bulk assigning " + allRegions.size() + " region(s) across " + servers.size() + " server(s), retainAssignment=" + retainAssignment); // Use fixed count thread pool assigning. - BulkAssigner ba = new BulkStartupAssigner(this.master, bulkPlan, this); + BulkAssigner ba = new StartupBulkAssigner(this.master, bulkPlan, this); ba.bulkAssign(); LOG.info("Bulk assigning done"); } /** - * Run bulk assign on startup. + * Run bulk assign on startup. Does one RCP per regionserver passing a + * batch of reginons using {@link SingleServerBulkAssigner}. + * Uses default {@link #getUncaughtExceptionHandler()} + * which will abort the Server if exception. */ - static class BulkStartupAssigner extends BulkAssigner { - private final Map> bulkPlan; - private final AssignmentManager assignmentManager; + static class StartupBulkAssigner extends BulkAssigner { + final Map> bulkPlan; + final AssignmentManager assignmentManager; - BulkStartupAssigner(final Server server, + StartupBulkAssigner(final Server server, final Map> bulkPlan, final AssignmentManager am) { super(server); @@ -1418,11 +1395,11 @@ } @Override - public boolean bulkAssign() throws InterruptedException { + public boolean bulkAssign(boolean sync) throws InterruptedException { // Disable timing out regions in transition up in zk while bulk assigning. this.assignmentManager.timeoutMonitor.bulkAssign(true); try { - return super.bulkAssign(); + return super.bulkAssign(sync); } finally { // Reenable timing out regions in transition up in zi. this.assignmentManager.timeoutMonitor.bulkAssign(false); @@ -1430,25 +1407,51 @@ } @Override - protected String getThreadNamePrefix() { - return super.getThreadNamePrefix() + "-startup"; - } + protected String getThreadNamePrefix() { + return this.server.getServerName() + "-StartupBulkAssigner"; + } @Override protected void populatePool(java.util.concurrent.ExecutorService pool) { for (Map.Entry> e: this.bulkPlan.entrySet()) { pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue(), - this.assignmentManager)); + this.assignmentManager, true)); } } protected boolean waitUntilDone(final long timeout) throws InterruptedException { - return this.assignmentManager.waitUntilNoRegionsInTransition(timeout); + Set regionSet = new HashSet(); + for (List regionList : bulkPlan.values()) { + regionSet.addAll(regionList); + } + return this.assignmentManager.waitUntilNoRegionsInTransition(timeout, regionSet); } } /** + * Bulk user region assigner. + * If failed assign, lets timeout in RIT do cleanup. + */ + static class GeneralBulkAssigner extends StartupBulkAssigner { + GeneralBulkAssigner(final Server server, + final Map> bulkPlan, + final AssignmentManager am) { + super(server, bulkPlan, am); + } + + @Override + protected UncaughtExceptionHandler getUncaughtExceptionHandler() { + return new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + LOG.warn("Assigning regions in " + t.getName(), e); + } + }; + } + } + + /** * Manage bulk assigning to a server. */ static class SingleServerBulkAssigner implements Runnable { @@ -1457,7 +1460,8 @@ private final AssignmentManager assignmentManager; SingleServerBulkAssigner(final HServerInfo regionserver, - final List regions, final AssignmentManager am) { + final List regions, final AssignmentManager am, + final boolean startUp) { this.regionserver = regionserver; this.regions = regions; this.assignmentManager = am; @@ -1495,6 +1499,40 @@ } /** + * Wait until no regions from set regions are in transition. + * @param timeout How long to wait. + * @param regions set of regions to wait for + * @return True if nothing in regions in transition. + * @throws InterruptedException + */ + boolean waitUntilNoRegionsInTransition(final long timeout, Set regions) + throws InterruptedException { + // Blocks until there are no regions in transition. + long startTime = System.currentTimeMillis(); + long remaining = timeout; + boolean stillInTransition = true; + synchronized (regionsInTransition) { + while (regionsInTransition.size() > 0 && !this.master.isStopped() && + remaining > 0 && stillInTransition) { + int count = 0; + for (RegionState rs : regionsInTransition.values()) { + if (regions.contains(rs.getRegion())) { + count++; + break; + } + } + if (count == 0) { + stillInTransition = false; + break; + } + regionsInTransition.wait(remaining); + remaining = timeout - (System.currentTimeMillis() - startTime); + } + } + return stillInTransition; + } + + /** * Rebuild the list of user regions and assignment information. *

* Returns a map of servers that are not found to be online and the regions @@ -2018,6 +2056,30 @@ } /** + * Assigns list of user regions in round-robin fashion, if any. + * @param sync True if we are to wait on all assigns. + * @param startup True if this is server startup time. + * @throws InterruptedException + * @throws IOException + */ + void bulkAssignUserRegions(final HRegionInfo [] regions, + final List servers, final boolean sync) + throws IOException { + Map> bulkPlan = + LoadBalancer.roundRobinAssignment(regions, servers); + LOG.info("Bulk assigning " + regions.length + " region(s) " + + "round-robin across " + servers.size() + " server(s)"); + // Use fixed count thread pool assigning. + BulkAssigner ba = new GeneralBulkAssigner(this.master, bulkPlan, this); + try { + ba.bulkAssign(sync); + } catch (InterruptedException e) { + throw new IOException("InterruptedException bulk assigning", e); + } + LOG.info("Bulk assigning done"); + } + + /** * State of a Region while undergoing transitions. */ public static class RegionState implements Writable { Index: src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java (revision 1090390) +++ src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java (working copy) @@ -403,8 +403,8 @@ * @return map of server to the regions it should take, or null if no * assignment is possible (ie. no regions or no servers) */ - public static Map> roundRobinAssignment( - HRegionInfo[] regions, List servers) { + public static Map> roundRobinAssignment( + HRegionInfo [] regions, List servers) { if(regions.length == 0 || servers.size() == 0) { return null; } Index: src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java (revision 1090390) +++ src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java (working copy) @@ -30,7 +30,9 @@ * Base class used bulk assigning and unassigning regions. * Encapsulates a fixed size thread pool of executors to run assignment/unassignment. * Implement {@link #populatePool(java.util.concurrent.ExecutorService)} and - * {@link #waitUntilDone(long)}. + * {@link #waitUntilDone(long)}. The default implementation of + * the {@link #getUncaughtExceptionHandler()} is to abort the hosting + * Server. */ public abstract class BulkAssigner { final Server server; @@ -42,8 +44,11 @@ this.server = server; } + /** + * @return What to use for a thread prefix when executor runs. + */ protected String getThreadNamePrefix() { - return this.server.getServerName() + "-BulkAssigner"; + return this.server.getServerName() + "-" + this.getClass().getName(); } protected UncaughtExceptionHandler getUncaughtExceptionHandler() { @@ -68,12 +73,17 @@ protected abstract void populatePool(final java.util.concurrent.ExecutorService pool); + public boolean bulkAssign() throws InterruptedException { + return bulkAssign(true); + } + /** * Run the bulk assign. + * @param sync Whether to assign synchronously. * @throws InterruptedException * @return True if done. */ - public boolean bulkAssign() throws InterruptedException { + public boolean bulkAssign(boolean sync) throws InterruptedException { boolean result = false; ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); builder.setDaemon(true); @@ -86,7 +96,7 @@ populatePool(pool); // How long to wait on empty regions-in-transition. If we timeout, the // RIT monitor should do fixup. - result = waitUntilDone(getTimeoutOnRIT()); + if (sync) result = waitUntilDone(getTimeoutOnRIT()); } finally { // We're done with the pool. It'll exit when its done all in queue. pool.shutdown(); @@ -102,4 +112,4 @@ */ protected abstract boolean waitUntilDone(final long timeout) throws InterruptedException; -} +} \ No newline at end of file