Index: src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java (revision 1002947) +++ src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java (working copy) @@ -79,7 +79,7 @@ public void tearDown() throws Exception { } - /* REENALBE -- DISABLED UNTIL REPLICATION BROUGHT UP TO NEW MASTER @Test*/ + /* REENABLE -- DISABLED UNTIL REPLICATION BROUGHT UP TO NEW MASTER @Test*/ public void testLogCleaning() throws Exception{ Configuration c = TEST_UTIL.getConfiguration(); Path oldLogDir = new Path(HBaseTestingUtility.getTestDir(), Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1002947) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -1927,6 +1927,12 @@ } @Override + public void openRegions(List regions) { + LOG.info("Received request to open " + regions.size() + " region(s)"); + for (HRegionInfo region: regions) openRegion(region); + } + + @Override public boolean closeRegion(HRegionInfo region) throws NotServingRegionException { LOG.info("Received close region: " + region.getRegionNameAsString()); @@ -2453,5 +2459,4 @@ new HRegionServerCommandLine(regionServerClass).doMain(args); } - -} +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/master/ServerManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (revision 1002947) +++ src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (working copy) @@ -494,7 +494,7 @@ * Open should not fail but can if server just crashed. *

* @param server server to open a region - * @param regionName region to open + * @param region region to open */ public void sendRegionOpen(HServerInfo server, HRegionInfo region) { HRegionInterface hri = getServerConnection(server); @@ -507,6 +507,24 @@ } /** + * Sends an OPEN RPC to the specified server to open the specified region. + *

+ * Open should not fail but can if server just crashed. + *

+ * @param server server to open a region + * @param regions regions to open + */ + public void sendRegionOpen(HServerInfo server, List regions) { + HRegionInterface hri = getServerConnection(server); + if (hri == null) { + LOG.warn("Attempting to send OPEN RPC to server " + server.getServerName() + + " failed because no RPC connection found to this server"); + return; + } + hri.openRegions(regions); + } + + /** * Sends an CLOSE RPC to the specified server to close the specified region. *

* A region server could reject the close request because it either does not Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1002947) +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -22,6 +22,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.lang.Thread.UncaughtExceptionHandler; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -33,6 +34,7 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.Executors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -65,6 +67,8 @@ import org.apache.hadoop.io.Writable; import org.apache.zookeeper.KeeperException; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * Manages and performs region assignment. *

@@ -475,9 +479,9 @@ synchronized (this.regionsInTransition) { RegionState rs = this.regionsInTransition.remove(regionInfo.getEncodedName()); - if (rs != null) { + if (rs == null) { this.regionsInTransition.notifyAll(); - LOG.warn("Asked online a region that was already in " + + LOG.warn("Asked online a region that was not in " + "regionsInTransition: " + rs); } } @@ -550,33 +554,110 @@ * @param regionName server to be assigned */ public void assign(HRegionInfo region) { - // Grab the state of this region and synchronize on it - String encodedName = region.getEncodedName(); - RegionState state; - synchronized (regionsInTransition) { - state = regionsInTransition.get(encodedName); - if (state == null) { - state = new RegionState(region, RegionState.State.OFFLINE); - regionsInTransition.put(encodedName, state); - } - } - // This here gap between synchronizations looks like a hole but it should - // be ok because the assign below would protect against being called with - // a state instance that is not in the right 'state' -- St.Ack 20100920. + RegionState state = addToRegionsInTransition(region); synchronized (state) { assign(state); } } /** + * Bulk assign regions to destination. If we fail in any way, + * we'll crash out the server. + * @param destination + * @param regions Regions to assign. + */ + public void assign(final HServerInfo destination, + final List regions) { + List states = new ArrayList(regions.size()); + synchronized (this.regionsInTransition) { + for (HRegionInfo region: regions) { + states.add(forceRegionStateToOffline(region)); + } + } + for (RegionState state: states) { + // TODO: PROBLEM!!!!! We need to wait on the zk callback to say state + // has been set to OFFLINE else what can happen is that the below where + // we set state to PENDING_OPEN can happen BEFORE the setting of state + // to OFFLINE up in zk has completed. + if (!setOfflineInZooKeeper(state)) { + RuntimeException re = + new RuntimeException("Failed setting region OFFLINE"); + this.master.abort("destination=" + destination + ", state=" + state, re); + return; + } + } + try { + LOG.debug("Assigning " + regions.size() + " region(s) to " + + destination.getServerName()); + // Send OPEN RPC. This can fail if the server on other end is is not up. + this.serverManager.sendRegionOpen(destination, regions); + } catch (Throwable t) { + this.master.abort("Failed assignment of regions to " + destination, t); + return; + } + for (RegionState state: states) { + // Transition RegionState to PENDING_OPEN + state.update(RegionState.State.PENDING_OPEN); + } + } + + private RegionState addToRegionsInTransition(final HRegionInfo region) { + synchronized (regionsInTransition) { + return forceRegionStateToOffline(region); + } + } + + /** + * Sets regions {@link RegionState} to {@link RegionState.State#OFFLINE}. + * Caller must hold lock on this.regionsInTransition. + * @param region + * @return Amended RegionState. + */ + private RegionState forceRegionStateToOffline(final HRegionInfo region) { + String encodedName = region.getEncodedName(); + RegionState state = this.regionsInTransition.get(encodedName); + if (state == null) { + state = new RegionState(region, RegionState.State.OFFLINE); + this.regionsInTransition.put(encodedName, state); + } + return state; + } + + /** * Caller must hold lock on the passed state object. * @param state */ private void assign(final RegionState state) { + if (!setOfflineInZooKeeper(state)) return; + RegionPlan plan = getRegionPlan(state); + try { + LOG.debug("Assigning region " + state.getRegion().getRegionNameAsString() + + " to " + plan.getDestination().getServerName()); + // Send OPEN RPC. This can fail if the server on other end is is not up. + serverManager.sendRegionOpen(plan.getDestination(), state.getRegion()); + // Transition RegionState to PENDING_OPEN + state.update(RegionState.State.PENDING_OPEN); + } catch (Throwable t) { + LOG.warn("Failed assignment of " + + state.getRegion().getRegionNameAsString() + " to " + + plan.getDestination(), t); + // Clean out plan we failed execute and one that doesn't look like it'll + // succeed anyways; we need a new plan! + synchronized(regionPlans) { + this.regionPlans.remove(state.getRegion().getEncodedName()); + } + } + } + + /** + * Set region as OFFLINED up in zookeeper + * @param state + * @return True if we succeeded, false otherwise. + */ + boolean setOfflineInZooKeeper(final RegionState state) { if (!state.isClosed() && !state.isOffline()) { - LOG.info("Attempting to assign region but it is in transition and in " + - "an unexpected state:" + state); - return; + LOG.warn("On assign, unexpected state: " + state); + return false; } else { state.update(RegionState.State.OFFLINE); } @@ -584,23 +665,27 @@ if(!ZKAssign.createOrForceNodeOffline(master.getZooKeeper(), state.getRegion(), master.getServerName())) { LOG.warn("Attempted to create/force node into OFFLINE state before " + - "completing assignment but failed to do so"); - return; + "completing assignment but failed to do so for " + state); + return false; } } catch (KeeperException e) { master.abort("Unexpected ZK exception creating/setting node OFFLINE", e); - return; + return false; } + return true; + } + + RegionPlan getRegionPlan(final RegionState state) { // Pickup existing plan or make a new one String encodedName = state.getRegion().getEncodedName(); RegionPlan plan; - synchronized(regionPlans) { + synchronized (regionPlans) { plan = regionPlans.get(encodedName); if (plan == null) { LOG.debug("No previous transition plan for " + - state.getRegion().getRegionNameAsString() + - " so generating a random one; " + serverManager.countOfRegionServers() + - " (online=" + serverManager.getOnlineServers().size() + ") available servers"); + state.getRegion().getRegionNameAsString() + + " so generating a random one; " + serverManager.countOfRegionServers() + + " (online=" + serverManager.getOnlineServers().size() + ") available servers"); plan = new RegionPlan(state.getRegion(), null, LoadBalancer.randomAssignment(serverManager.getOnlineServersList())); regionPlans.put(encodedName, plan); @@ -608,24 +693,7 @@ LOG.debug("Using preexisting plan=" + plan); } } - try { - LOG.debug("Assigning region " + - state.getRegion().getRegionNameAsString() + " to " + - plan.getDestination().getServerName()); - // Send OPEN RPC. This can fail if the server on other end is is not up. - serverManager.sendRegionOpen(plan.getDestination(), state.getRegion()); - // Transition RegionState to PENDING_OPEN - state.update(RegionState.State.PENDING_OPEN); - } catch (Throwable t) { - LOG.warn("Failed assignment of " + - state.getRegion().getRegionNameAsString() + " to " + - plan.getDestination(), t); - // Clean out plan we failed execute and one that doesn't look like it'll - // succeed anyways; we need a new plan! - synchronized(regionPlans) { - this.regionPlans.remove(encodedName); - } - } + return plan; } /** @@ -736,66 +804,65 @@ // Get all available servers List servers = serverManager.getOnlineServersList(); - LOG.info("Assigning " + allRegions.size() + " region(s) across " + + LOG.info("Bulk assigning " + allRegions.size() + " region(s) across " + servers.size() + " server(s)"); // Generate a cluster startup region placement plan Map> bulkPlan = LoadBalancer.bulkAssignment(allRegions, servers); - // Now start a thread per server to run assignment. - for (Map.Entry> entry: bulkPlan.entrySet()) { - Thread t = new BulkAssignServer(entry.getKey(), entry.getValue(), this.master); - t.start(); - } - - // Wait for no regions to be in transition + // Make a fixed thread count pool to run bulk assignments. Thought is that + // if a 1k cluster, running 1k bulk assignments will kill HDFS or ZK? + int threadCount = + this.master.getConfiguration().getInt("hbase.bulk.assignment.threadpool.size", 10); + ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); + builder.setDaemon(true); + builder.setNameFormat(this.master.getServerName() + "-BulkAssignerr-%1$d"); + builder.setUncaughtExceptionHandler(new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + // Abort if exception of any kind. + master.abort("Uncaught exception bulk assigning in " + t.getName(), e); + } + }); + java.util.concurrent.ExecutorService pool = + Executors.newFixedThreadPool(threadCount, builder.build()); + // Disable timing out regions in transition up in zk while bulk assigning. + this.timeoutMonitor.bulkAssign(true); try { - waitUntilNoRegionsInTransition(); - } catch (InterruptedException e) { - LOG.error("Interrupted waiting for regions to be assigned", e); - throw new IOException(e); + for (Map.Entry> e: bulkPlan.entrySet()) { + pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue())); + assign(e.getKey(), e.getValue()); + } + // Wait for no regions to be in transition + try { + waitUntilNoRegionsInTransition(); + } catch (InterruptedException e) { + LOG.error("Interrupted waiting for regions to be assigned", e); + throw new IOException(e); + } + } finally { + pool.shutdown(); + // Reenable timing out regions in transition up in zi. + this.timeoutMonitor.bulkAssign(false); } - - LOG.info("All user regions have been assigned"); + LOG.info("Bulk assigning done"); } /** - * Class to run bulk assign to a single server. + * Manage bulk assigning a server. */ - class BulkAssignServer extends Thread { + class SingleServerBulkAssigner implements Runnable { + private final HServerInfo regionserver; private final List regions; - private final HServerInfo server; - private final Stoppable stopper; - - BulkAssignServer(final HServerInfo server, - final List regions, final Stoppable stopper) { - super("serverassign-" + server.getServerName()); - setDaemon(true); - this.server = server; + SingleServerBulkAssigner(final HServerInfo regionserver, + final List regions) { + this.regionserver = regionserver; this.regions = regions; - this.stopper = stopper; } - @Override public void run() { - // Insert a plan for each region with 'server' as the target regionserver. - // Below, we run through regions one at a time. The call to assign will - // move the region into the regionsInTransition which starts up a timer. - // if the region is not out of the regionsInTransition by a certain time, - // it will be reassigned. We don't want that to happen. So, do it this - // way a region at a time for now. Presumably the regionserver will put - // up a back pressure if opening a region takes time which is good since - // this will block our adding new regions to regionsInTransition. Later - // make it so we can send over a lump of regions in one rpc with the - // regionserver on remote side tickling zk on a period to prevent our - // regionsInTransition timing out. Currently its not possible given the - // Executor architecture on the regionserver side. St.Ack 20100920. - for (HRegionInfo region : regions) { - regionPlans.put(region.getEncodedName(), new RegionPlan(region, null, server)); - assign(region); - if (this.stopper.isStopped()) break; - } + assign(this.regionserver, this.regions); } } @@ -838,9 +905,10 @@ * @throws InterruptedException */ public void waitUntilNoRegionsInTransition() throws InterruptedException { - synchronized(regionsInTransition) { - while(regionsInTransition.size() > 0) { - regionsInTransition.wait(); + synchronized (this.regionsInTransition) { + while(this.regionsInTransition.size() > 0 && + !this.master.isStopped()) { + this.regionsInTransition.wait(); } } } @@ -849,14 +917,18 @@ * @return A copy of the Map of regions currently in transition. */ public NavigableMap getRegionsInTransition() { - return new TreeMap(this.regionsInTransition); + synchronized (this.regionsInTransition) { + return new TreeMap(this.regionsInTransition); + } } /** * @return True if regions in transition. */ public boolean isRegionsInTransition() { - return !this.regionsInTransition.isEmpty(); + synchronized (this.regionsInTransition) { + return !this.regionsInTransition.isEmpty(); + } } /** @@ -956,11 +1028,11 @@ } /** - * Unsets the specified table as disabled (enables it). + * Monitor to check for time outs on region transition operations */ public class TimeoutMonitor extends Chore { - private final int timeout; + private boolean bulkAssign = false; /** * Creates a periodic monitor to check for time outs on region transition @@ -977,8 +1049,21 @@ this.timeout = timeout; } + /** + * @param bulkAssign If true, we'll suspend checking regions in transition + * up in zookeeper. If false, will reenable check. + * @return Old setting for bulkAssign. + */ + public boolean bulkAssign(final boolean bulkAssign) { + boolean result = this.bulkAssign; + this.bulkAssign = bulkAssign; + return result; + } + @Override protected void chore() { + // If bulkAssign in progress, suspend checks + if (this.bulkAssign) return; synchronized (regionsInTransition) { // Iterate all regions in transition checking for time outs long now = System.currentTimeMillis(); @@ -1140,6 +1225,9 @@ unassign(plan.getRegionInfo()); } + /** + * State of a Region while undergoing transitions. + */ public static class RegionState implements Writable { private HRegionInfo region; Index: src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java (revision 1002947) +++ src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java (working copy) @@ -85,7 +85,8 @@ @Override public void process() { - LOG.debug("Handling OPENED event; deleting unassigned node"); + LOG.debug("Handling OPENED event for " + this.regionInfo.getEncodedName() + + "; deleting unassigned node"); // TODO: should we check if this table was disabled and get it closed? // Remove region from in-memory transition and unassigned node from ZK try { Index: src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (revision 1002947) +++ src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (working copy) @@ -303,6 +303,12 @@ public void openRegion(final HRegionInfo region); /** + * Opens the specified regions. + * @param regions regions to open + */ + public void openRegions(final List regions); + + /** * Closes the specified region. * @param region region to close * @return true if closing region, false if not