Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1090069) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -301,6 +301,7 @@ if (this.catalogTracker != null) this.catalogTracker.stop(); if (this.serverManager != null) this.serverManager.stop(); if (this.assignmentManager != null) this.assignmentManager.stop(); + if (this.fileSystemManager != null) this.fileSystemManager.stop(); HConnectionManager.deleteConnection(this.conf, true); this.zooKeeper.close(); } @@ -913,13 +914,13 @@ // 5. Trigger immediate assignment of the regions in round-robin fashion List servers = serverManager.getOnlineServersList(); try { - this.assignmentManager.assignUserRegions(newRegions, servers); + this.assignmentManager.assignUserRegions(newRegions, servers, sync); } catch (InterruptedException ie) { LOG.error("Caught " + ie + " during round-robin assignment"); throw new IOException(ie); } - // 5. If sync, wait for assignment of regions + // 6. If sync, wait for assignment of regions if(sync) { LOG.debug("Waiting for " + newRegions.length + " region(s) to be " + "assigned before returning"); Index: src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java (revision 1090069) +++ src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java (working copy) @@ -68,12 +68,16 @@ protected abstract void populatePool(final java.util.concurrent.ExecutorService pool); + public boolean bulkAssign() throws InterruptedException { + return bulkAssign(true); + } + /** * Run the bulk assign. * @throws InterruptedException * @return True if done. */ - public boolean bulkAssign() throws InterruptedException { + public boolean bulkAssign(boolean sync) throws InterruptedException { boolean result = false; ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); builder.setDaemon(true); @@ -86,7 +90,9 @@ populatePool(pool); // How long to wait on empty regions-in-transition. If we timeout, the // RIT monitor should do fixup. - result = waitUntilDone(getTimeoutOnRIT()); + if (sync) { + result = waitUntilDone(getTimeoutOnRIT()); + } } finally { // We're done with the pool. It'll exit when its done all in queue. pool.shutdown(); Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1090251) +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -23,9 +23,11 @@ import java.io.DataOutput; import java.io.EOFException; import java.io.IOException; +import java.lang.Thread.UncaughtExceptionHandler; import java.net.ConnectException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -890,7 +892,7 @@ * @param regions Regions to assign. */ void assign(final HServerInfo destination, - final List regions) { + final List regions, boolean startup) { LOG.debug("Bulk assigning " + regions.size() + " region(s) to " + destination.getServerName()); @@ -924,7 +926,7 @@ } // Move on to open regions. try { - // Send OPEN RPC. This can fail if the server on other end is is not up. + // Send OPEN RPC. This can fail if the server on other end is not up. // If we fail, fail the startup by aborting the server. There is one // exception we will tolerate: ServerNotRunningException. This is thrown // between report of regionserver being up and @@ -945,9 +947,13 @@ } } } catch (Throwable t) { - this.master.abort("Failed assignment of regions to " + destination + - "; bulk assign FAILED", t); - return; + if (startup) { + this.master.abort("Failed assignment of regions to " + destination + + "; bulk assign FAILED", t); + return; + } + LOG.error("Failed assignment of regions to " + destination + + "; bulk assign FAILED", t); } LOG.debug("Bulk assigning done for " + destination.getServerName()); } @@ -1339,7 +1345,8 @@ * @throws InterruptedException * @throws IOException */ - public void assignUserRegions(HRegionInfo[] regions, List servers) throws IOException, InterruptedException { + public void assignUserRegions(HRegionInfo[] regions, List servers, boolean sync) + throws IOException, InterruptedException { if (regions == null) return; Map> bulkPlan = null; @@ -1348,8 +1355,8 @@ LOG.info("Bulk assigning " + regions.length + " region(s) round-robin across " + servers.size() + " server(s)"); // Use fixed count thread pool assigning. - BulkAssigner ba = new BulkStartupAssigner(this.master, bulkPlan, this); - ba.bulkAssign(); + BulkAssigner ba = new BulkTableAssigner(this.master, bulkPlan, this); + ba.bulkAssign(sync); LOG.info("Bulk assigning done"); } @@ -1381,7 +1388,8 @@ bulkPlan = LoadBalancer.retainAssignment(allRegions, servers); } else { // assign regions in round-robin fashion - assignUserRegions(allRegions.keySet().toArray(new HRegionInfo[allRegions.size()]), servers); + assignUserRegions(allRegions.keySet().toArray(new HRegionInfo[allRegions.size()]), + servers, true); return; } LOG.info("Bulk assigning " + allRegions.size() + " region(s) across " + @@ -1393,12 +1401,42 @@ LOG.info("Bulk assigning done"); } + static class BulkTableAssigner extends BulkStartupAssigner { + BulkTableAssigner(final Server server, + final Map> bulkPlan, + final AssignmentManager am) { + super(server, bulkPlan, am); + } + + @Override + protected UncaughtExceptionHandler getUncaughtExceptionHandler() { + return new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + LOG.error("Error assigning regions in " + t.getName(), e); + } + }; + } + @Override + protected String getThreadNamePrefix() { + return super.getThreadNamePrefix() + "-table"; + } + + @Override + protected void populatePool(java.util.concurrent.ExecutorService pool) { + for (Map.Entry> e: this.bulkPlan.entrySet()) { + pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue(), + this.assignmentManager, true)); + } + } + } + /** * Run bulk assign on startup. */ static class BulkStartupAssigner extends BulkAssigner { - private final Map> bulkPlan; - private final AssignmentManager assignmentManager; + protected final Map> bulkPlan; + protected final AssignmentManager assignmentManager; BulkStartupAssigner(final Server server, final Map> bulkPlan, @@ -1409,11 +1447,11 @@ } @Override - public boolean bulkAssign() throws InterruptedException { + public boolean bulkAssign(boolean sync) throws InterruptedException { // Disable timing out regions in transition up in zk while bulk assigning. this.assignmentManager.timeoutMonitor.bulkAssign(true); try { - return super.bulkAssign(); + return super.bulkAssign(sync); } finally { // Reenable timing out regions in transition up in zi. this.assignmentManager.timeoutMonitor.bulkAssign(false); @@ -1421,21 +1459,35 @@ } @Override - protected String getThreadNamePrefix() { - return super.getThreadNamePrefix() + "-startup"; - } + protected String getThreadNamePrefix() { + return super.getThreadNamePrefix() + "-startup"; + } @Override + protected UncaughtExceptionHandler getUncaughtExceptionHandler() { + return new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + server.abort("Uncaught exception in " + t.getName(), e); + } + }; + } + + @Override protected void populatePool(java.util.concurrent.ExecutorService pool) { for (Map.Entry> e: this.bulkPlan.entrySet()) { pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue(), - this.assignmentManager)); + this.assignmentManager, true)); } } protected boolean waitUntilDone(final long timeout) throws InterruptedException { - return this.assignmentManager.waitUntilNoRegionsInTransition(timeout); + Set regionSet = new HashSet(); + for (List regionList : bulkPlan.values()) { + regionSet.addAll(regionList); + } + return this.assignmentManager.waitUntilNoRegionsInTransition(timeout, regionSet); } } @@ -1446,16 +1498,19 @@ private final HServerInfo regionserver; private final List regions; private final AssignmentManager assignmentManager; + private final boolean startup; SingleServerBulkAssigner(final HServerInfo regionserver, - final List regions, final AssignmentManager am) { + final List regions, final AssignmentManager am, + final boolean startUp) { this.regionserver = regionserver; this.regions = regions; this.assignmentManager = am; + this.startup = startUp; } @Override public void run() { - this.assignmentManager.assign(this.regionserver, this.regions); + this.assignmentManager.assign(this.regionserver, this.regions, this.startup); } } @@ -1486,6 +1541,40 @@ } /** + * Wait until no regions from set regions are in transition. + * @param timeout How long to wait. + * @param regions set of regions to wait for + * @return True if nothing in regions in transition. + * @throws InterruptedException + */ + boolean waitUntilNoRegionsInTransition(final long timeout, Set regions) + throws InterruptedException { + // Blocks until there are no regions in transition. + long startTime = System.currentTimeMillis(); + long remaining = timeout; + boolean stillInTransition = true; + synchronized (regionsInTransition) { + while (regionsInTransition.size() > 0 && !this.master.isStopped() + && remaining > 0 && stillInTransition) { + int count = 0; + for (RegionState rs : regionsInTransition.values()) { + if (regions.contains(rs.getRegion())) { + count++; + break; + } + } + if (count == 0) { + stillInTransition = false; + break; + } + regionsInTransition.wait(remaining); + remaining = timeout - (System.currentTimeMillis() - startTime); + } + } + return stillInTransition; + } + + /** * Rebuild the list of user regions and assignment information. *

* Returns a map of servers that are not found to be online and the regions