Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1090069) +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -26,6 +26,7 @@ import java.net.ConnectException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -1339,7 +1340,8 @@ * @throws InterruptedException * @throws IOException */ - public void assignUserRegions(HRegionInfo[] regions, List servers) throws IOException, InterruptedException { + public void assignUserRegions(HRegionInfo[] regions, List servers, boolean sync) + throws IOException, InterruptedException { if (regions == null) return; Map> bulkPlan = null; @@ -1349,7 +1351,7 @@ servers.size() + " server(s)"); // Use fixed count thread pool assigning. BulkAssigner ba = new BulkStartupAssigner(this.master, bulkPlan, this); - ba.bulkAssign(); + ba.bulkAssign(sync); LOG.info("Bulk assigning done"); } @@ -1381,7 +1383,8 @@ bulkPlan = LoadBalancer.retainAssignment(allRegions, servers); } else { // assign regions in round-robin fashion - assignUserRegions(allRegions.keySet().toArray(new HRegionInfo[allRegions.size()]), servers); + assignUserRegions(allRegions.keySet().toArray(new HRegionInfo[allRegions.size()]), + servers, true); return; } LOG.info("Bulk assigning " + allRegions.size() + " region(s) across " + @@ -1409,11 +1412,11 @@ } @Override - public boolean bulkAssign() throws InterruptedException { + public boolean bulkAssign(boolean sync) throws InterruptedException { // Disable timing out regions in transition up in zk while bulk assigning. this.assignmentManager.timeoutMonitor.bulkAssign(true); try { - return super.bulkAssign(); + return super.bulkAssign(sync); } finally { // Reenable timing out regions in transition up in zi. this.assignmentManager.timeoutMonitor.bulkAssign(false); @@ -1435,7 +1438,11 @@ protected boolean waitUntilDone(final long timeout) throws InterruptedException { - return this.assignmentManager.waitUntilNoRegionsInTransition(timeout); + Set regionSet = new HashSet(); + for (List regionList : bulkPlan.values()) { + for (HRegionInfo region : regionList) regionSet.add(region); + } + return this.assignmentManager.waitUntilNoRegionsInTransition(timeout, regionSet); } } @@ -1455,7 +1462,12 @@ } @Override public void run() { - this.assignmentManager.assign(this.regionserver, this.regions); + try { + this.assignmentManager.assign(this.regionserver, this.regions); + } catch (Exception e) { + LOG.error("Error assigning " + this.regions.size() + " regions for " + + this.regionserver.getHostnamePort(), e); + } } } @@ -1486,6 +1498,40 @@ } /** + * Wait until no regions from set regions are in transition. + * @param timeout How long to wait. + * @param regions set of regions to wait for + * @return True if nothing in regions in transition. + * @throws InterruptedException + */ + boolean waitUntilNoRegionsInTransition(final long timeout, Set regions) + throws InterruptedException { + // Blocks until there are no regions in transition. + long startTime = System.currentTimeMillis(); + long remaining = timeout; + boolean stillInTransition = true; + synchronized (regionsInTransition) { + while (regionsInTransition.size() > 0 && !this.master.isStopped() + && remaining > 0 && stillInTransition) { + int count = 0; + for (RegionState rs : regionsInTransition.values()) { + if (regions.contains(rs.getRegion())) { + count++; + break; + } + } + if (count == 0) { + stillInTransition = false; + break; + } + regionsInTransition.wait(remaining); + remaining = timeout - (System.currentTimeMillis() - startTime); + } + } + return stillInTransition; + } + + /** * Rebuild the list of user regions and assignment information. *

* Returns a map of servers that are not found to be online and the regions Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1090069) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -301,6 +301,7 @@ if (this.catalogTracker != null) this.catalogTracker.stop(); if (this.serverManager != null) this.serverManager.stop(); if (this.assignmentManager != null) this.assignmentManager.stop(); + if (this.fileSystemManager != null) this.fileSystemManager.stop(); HConnectionManager.deleteConnection(this.conf, true); this.zooKeeper.close(); } @@ -913,13 +914,13 @@ // 5. Trigger immediate assignment of the regions in round-robin fashion List servers = serverManager.getOnlineServersList(); try { - this.assignmentManager.assignUserRegions(newRegions, servers); + this.assignmentManager.assignUserRegions(newRegions, servers, sync); } catch (InterruptedException ie) { LOG.error("Caught " + ie + " during round-robin assignment"); throw new IOException(ie); } - // 5. If sync, wait for assignment of regions + // 6. If sync, wait for assignment of regions if(sync) { LOG.debug("Waiting for " + newRegions.length + " region(s) to be " + "assigned before returning"); Index: src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java (revision 1090069) +++ src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java (working copy) @@ -68,12 +68,16 @@ protected abstract void populatePool(final java.util.concurrent.ExecutorService pool); + public boolean bulkAssign() throws InterruptedException { + return bulkAssign(true); + } + /** * Run the bulk assign. * @throws InterruptedException * @return True if done. */ - public boolean bulkAssign() throws InterruptedException { + public boolean bulkAssign(boolean sync) throws InterruptedException { boolean result = false; ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); builder.setDaemon(true); @@ -86,7 +90,9 @@ populatePool(pool); // How long to wait on empty regions-in-transition. If we timeout, the // RIT monitor should do fixup. - result = waitUntilDone(getTimeoutOnRIT()); + if (sync) { + result = waitUntilDone(getTimeoutOnRIT()); + } } finally { // We're done with the pool. It'll exit when its done all in queue. pool.shutdown();