Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 999150) +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -33,7 +33,6 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; -import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -118,8 +117,6 @@ private final SortedMap regions = new TreeMap(); - private final ReentrantLock assignLock = new ReentrantLock(); - private final ExecutorService executorService; /** @@ -141,9 +138,9 @@ this.executorService = service; Configuration conf = master.getConfiguration(); this.timeoutMonitor = new TimeoutMonitor( - conf.getInt("hbase.master.assignment.timeoutmonitor.period", 30000), + conf.getInt("hbase.master.assignment.timeoutmonitor.period", 10000), master, - conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 15000)); + conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 30000)); Threads.setDaemonThreadRunning(timeoutMonitor, master.getServerName() + ".timeoutMonitor"); } @@ -493,25 +490,16 @@ // Grab the state of this region and synchronize on it String encodedName = region.getEncodedName(); RegionState state; - // This assignLock is used bridging the two synchronization blocks. Once - // we've made it into the 'state' synchronization block, then we can let - // go of this lock. There must be a better construct that this -- St.Ack 20100811 - this.assignLock.lock(); - try { - synchronized (regionsInTransition) { - state = regionsInTransition.get(encodedName); - if(state == null) { - state = new RegionState(region, RegionState.State.OFFLINE); - regionsInTransition.put(encodedName, state); - } + synchronized (regionsInTransition) { + state = regionsInTransition.get(encodedName); + if (state == null) { + state = new RegionState(region, RegionState.State.OFFLINE); + regionsInTransition.put(encodedName, state); } - synchronized(state) { - this.assignLock.unlock(); - assign(state); - } - } finally { - if (this.assignLock.isHeldByCurrentThread()) this.assignLock.unlock(); } + synchronized(state) { + assign(state); + } } /** @@ -689,18 +677,10 @@ Map> bulkPlan = LoadBalancer.bulkAssignment(allRegions, servers); - // For each server, create OFFLINE nodes and send OPEN RPCs - for (Map.Entry> entry : bulkPlan.entrySet()) { - HServerInfo server = entry.getKey(); - List regions = entry.getValue(); - LOG.debug("Assigning " + regions.size() + " regions to " + server); - for (HRegionInfo region : regions) { - LOG.debug("Assigning " + region.getRegionNameAsString() + " to " + server); - String regionName = region.getEncodedName(); - RegionPlan plan = new RegionPlan(region, null,server); - regionPlans.put(regionName, plan); - assign(region); - } + // For each server, create OFFLINE nodes and send OPEN RPCs. + for (Map.Entry> entry : bulkPlan.entrySet()) { + BulkAssignThread t = new BulkAssignThread(entry.getKey(), entry.getValue()); + t.start(); } // Wait for no regions to be in transition @@ -714,6 +694,31 @@ LOG.info("All user regions have been assigned"); } + /** + * Thread that runs a bulk assign to a single server. + */ + class BulkAssignThread extends Thread { + private final HServerInfo server; + private final List hris; + + BulkAssignThread(final HServerInfo server, final List hris) { + this.server = server; + this.hris = hris; + } + + @Override + public void run() { + LOG.debug("Assigning " + this.hris.size() + " to " + this.server); + for (HRegionInfo region : this.hris) { + LOG.debug("Assigning " + region.getRegionNameAsString() + " to " + this.server); + String regionName = region.getEncodedName(); + RegionPlan plan = new RegionPlan(region, null,server); + regionPlans.put(regionName, plan); + assign(region); + } + } + } + private void rebuildUserRegions() throws IOException { Map allRegions = MetaReader.fullScan(catalogTracker); @@ -898,7 +903,7 @@ // Iterate all regions in transition checking for time outs long now = System.currentTimeMillis(); for (RegionState regionState : regionsInTransition.values()) { - if(regionState.getStamp() + timeout <= now) { + if (regionState.getStamp() + timeout <= now) { HRegionInfo regionInfo = regionState.getRegion(); LOG.info("Regions in transition timed out: " + regionState); // Expired! Do a retry.