Index: src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java (revision 1029935) +++ src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java (working copy) @@ -339,18 +339,23 @@ * ZK = CLOSING */ - // Region of enabled table being closed but not complete - // Region is already assigned, don't say anything to RS but set ZK closing - region = enabledAndAssignedRegions.remove(0); - regionsThatShouldBeOnline.add(region); - ZKAssign.createNodeClosing(zkw, region, serverName); +// Disabled test of CLOSING. This case is invalid after HBASE-3181. +// How can an RS stop a CLOSING w/o deleting the node? If it did ever fail +// and left the node in CLOSING, the RS would have aborted and we'd process +// these regions in server shutdown +// +// // Region of enabled table being closed but not complete +// // Region is already assigned, don't say anything to RS but set ZK closing +// region = enabledAndAssignedRegions.remove(0); +// regionsThatShouldBeOnline.add(region); +// ZKAssign.createNodeClosing(zkw, region, serverName); +// +// // Region of disabled table being closed but not complete +// // Region is already assigned, don't say anything to RS but set ZK closing +// region = disabledAndAssignedRegions.remove(0); +// regionsThatShouldBeOffline.add(region); +// ZKAssign.createNodeClosing(zkw, region, serverName); - // Region of disabled table being closed but not complete - // Region is already assigned, don't say anything to RS but set ZK closing - region = disabledAndAssignedRegions.remove(0); - regionsThatShouldBeOffline.add(region); - ZKAssign.createNodeClosing(zkw, region, serverName); - /* * ZK = CLOSED */ @@ -797,26 +802,32 @@ // Let's add some weird states to master in-memory state + // After HBASE-3181, we need to have some ZK state if we're PENDING_OPEN + // b/c it is impossible for us to get into this state w/o a zk node + // this is not true of PENDING_CLOSE + // PENDING_OPEN and enabled region = enabledRegions.remove(0); regionsThatShouldBeOnline.add(region); master.assignmentManager.regionsInTransition.put(region.getEncodedName(), - new RegionState(region, RegionState.State.PENDING_OPEN)); + new RegionState(region, RegionState.State.PENDING_OPEN, 0)); + ZKAssign.createNodeOffline(zkw, region, master.getServerName()); // PENDING_OPEN and disabled region = disabledRegions.remove(0); regionsThatShouldBeOffline.add(region); master.assignmentManager.regionsInTransition.put(region.getEncodedName(), - new RegionState(region, RegionState.State.PENDING_OPEN)); + new RegionState(region, RegionState.State.PENDING_OPEN, 0)); + ZKAssign.createNodeOffline(zkw, region, master.getServerName()); // PENDING_CLOSE and enabled region = enabledRegions.remove(0); regionsThatShouldBeOnline.add(region); master.assignmentManager.regionsInTransition.put(region.getEncodedName(), - new RegionState(region, RegionState.State.PENDING_CLOSE)); + new RegionState(region, RegionState.State.PENDING_CLOSE, 0)); // PENDING_CLOSE and disabled region = disabledRegions.remove(0); regionsThatShouldBeOffline.add(region); master.assignmentManager.regionsInTransition.put(region.getEncodedName(), - new RegionState(region, RegionState.State.PENDING_CLOSE)); + new RegionState(region, RegionState.State.PENDING_CLOSE, 0)); // Failover should be completed, now wait for no RIT log("Waiting for no more RIT"); Index: src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java (revision 1029935) +++ src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java (working copy) @@ -28,6 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.MiniHBaseCluster; @@ -37,6 +38,7 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; import org.junit.Test; /** @@ -51,38 +53,50 @@ // Start a cluster with 2 masters and 4 regionservers final int NUM_MASTERS = 2; final int NUM_RS = 3; - final int NUM_REGIONS_TO_CREATE = 27; + final int NUM_REGIONS_TO_CREATE = 20; int expectedNumRS = 3; // Start the cluster - HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + log("Starting cluster"); + Configuration conf = HBaseConfiguration.create(); + conf.setInt("hbase.master.assignment.timeoutmonitor.period", 2000); + conf.setInt("hbase.master.assignment.timeoutmonitor.timeout", 5000); + HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf); TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS); MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + log("Waiting for active/ready master"); cluster.waitForActiveAndReadyMaster(); - Configuration conf = TEST_UTIL.getConfiguration(); ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testRollingRestart", null); + HMaster master = cluster.getMaster(); // Create a table with regions byte [] table = Bytes.toBytes("tableRestart"); byte [] family = Bytes.toBytes("family"); + log("Creating table with " + NUM_REGIONS_TO_CREATE + " regions"); HTable ht = TEST_UTIL.createTable(table, family); int numRegions = TEST_UTIL.createMultiRegions(conf, ht, family, NUM_REGIONS_TO_CREATE); numRegions += 2; // catalogs - LOG.debug("\n\nWaiting for no more RIT\n"); - ZKAssign.blockUntilNoRIT(zkw); - LOG.debug("\n\nDisabling table\n"); + log("Waiting for no more RIT\n"); + blockUntilNoRIT(zkw, master); + log("Disabling table\n"); TEST_UTIL.getHBaseAdmin().disableTable(table); - LOG.debug("\n\nWaiting for no more RIT\n"); - ZKAssign.blockUntilNoRIT(zkw); - LOG.debug("\n\nEnabling table\n"); + log("Waiting for no more RIT\n"); + blockUntilNoRIT(zkw, master); + NavigableSet regions = getAllOnlineRegions(cluster); + log("Verifying only catalog regions are assigned\n"); + if (regions.size() != 2) { + for (String oregion : regions) log("Region still online: " + oregion); + } + assertEquals(2, regions.size()); + log("Enabling table\n"); TEST_UTIL.getHBaseAdmin().enableTable(table); - LOG.debug("\n\nWaiting for no more RIT\n"); - ZKAssign.blockUntilNoRIT(zkw); - LOG.debug("\n\nVerifying there are " + numRegions + " assigned on cluster\n"); - NavigableSet regions = getAllOnlineRegions(cluster); + log("Waiting for no more RIT\n"); + blockUntilNoRIT(zkw, master); + log("Verifying there are " + numRegions + " assigned on cluster\n"); + regions = getAllOnlineRegions(cluster); assertRegionsAssigned(cluster, regions); assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); @@ -93,7 +107,7 @@ restarted.waitForServerOnline(); log("Additional RS is online"); log("Waiting for no more RIT"); - ZKAssign.blockUntilNoRIT(zkw); + blockUntilNoRIT(zkw, master); log("Verifying there are " + numRegions + " assigned on cluster"); assertRegionsAssigned(cluster, regions); assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); @@ -112,22 +126,23 @@ } // Bring down the backup master - LOG.debug("\n\nStopping backup master\n\n"); + log("Stopping backup master\n\n"); backupMaster.getMaster().stop("Stop of backup during rolling restart"); cluster.hbaseCluster.waitOnMaster(backupMaster); // Bring down the primary master - LOG.debug("\n\nStopping primary master\n\n"); + log("Stopping primary master\n\n"); activeMaster.getMaster().stop("Stop of active during rolling restart"); cluster.hbaseCluster.waitOnMaster(activeMaster); // Start primary master - LOG.debug("\n\nRestarting primary master\n\n"); + log("Restarting primary master\n\n"); activeMaster = cluster.startMaster(); cluster.waitForActiveAndReadyMaster(); + master = activeMaster.getMaster(); // Start backup master - LOG.debug("\n\nRestarting backup master\n\n"); + log("Restarting backup master\n\n"); backupMaster = cluster.startMaster(); assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); @@ -148,7 +163,7 @@ log("Waiting for RS shutdown to be handled by master"); waitForRSShutdownToStartAndFinish(activeMaster, serverName); log("RS shutdown done, waiting for no more RIT"); - ZKAssign.blockUntilNoRIT(zkw); + blockUntilNoRIT(zkw, master); log("Verifying there are " + numRegions + " assigned on cluster"); assertRegionsAssigned(cluster, regions); expectedNumRS--; @@ -159,7 +174,7 @@ expectedNumRS++; log("Region server " + num + " is back online"); log("Waiting for no more RIT"); - ZKAssign.blockUntilNoRIT(zkw); + blockUntilNoRIT(zkw, master); log("Verifying there are " + numRegions + " assigned on cluster"); assertRegionsAssigned(cluster, regions); assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); @@ -192,7 +207,7 @@ waitForRSShutdownToStartAndFinish(activeMaster, metaServer.getRegionServer().getServerName()); log("Waiting for no more RIT"); - ZKAssign.blockUntilNoRIT(zkw); + blockUntilNoRIT(zkw, master); log("Verifying there are " + numRegions + " assigned on cluster"); assertRegionsAssigned(cluster, regions); assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); @@ -208,7 +223,7 @@ waitForRSShutdownToStartAndFinish(activeMaster, metaServer.getRegionServer().getServerName()); log("RS shutdown done, waiting for no more RIT"); - ZKAssign.blockUntilNoRIT(zkw); + blockUntilNoRIT(zkw, master); log("Verifying there are " + numRegions + " assigned on cluster"); assertRegionsAssigned(cluster, regions); assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); @@ -219,7 +234,7 @@ cluster.startRegionServer().waitForServerOnline(); Thread.sleep(1000); log("Waiting for no more RIT"); - ZKAssign.blockUntilNoRIT(zkw); + blockUntilNoRIT(zkw, master); log("Verifying there are " + numRegions + " assigned on cluster"); assertRegionsAssigned(cluster, regions); // Shutdown server hosting META @@ -232,7 +247,7 @@ waitForRSShutdownToStartAndFinish(activeMaster, metaServer.getRegionServer().getServerName()); log("RS shutdown done, waiting for no more RIT"); - ZKAssign.blockUntilNoRIT(zkw); + blockUntilNoRIT(zkw, master); log("Verifying there are " + numRegions + " assigned on cluster"); assertRegionsAssigned(cluster, regions); @@ -246,7 +261,7 @@ waitForRSShutdownToStartAndFinish(activeMaster, metaServer.getRegionServer().getServerName()); log("RS shutdown done, waiting for no more RIT"); - ZKAssign.blockUntilNoRIT(zkw); + blockUntilNoRIT(zkw, master); log("Verifying there are " + numRegions + " assigned on cluster"); assertRegionsAssigned(cluster, regions); @@ -260,7 +275,7 @@ waitForRSShutdownToStartAndFinish(activeMaster, metaServer.getRegionServer().getServerName()); log("RS shutdown done, waiting for no more RIT"); - ZKAssign.blockUntilNoRIT(zkw); + blockUntilNoRIT(zkw, master); log("Verifying there are " + numRegions + " assigned on cluster"); assertRegionsAssigned(cluster, regions); @@ -280,6 +295,12 @@ TEST_UTIL.shutdownMiniCluster(); } + private void blockUntilNoRIT(ZooKeeperWatcher zkw, HMaster master) + throws KeeperException, InterruptedException { + ZKAssign.blockUntilNoRIT(zkw); + master.assignmentManager.waitUntilNoRegionsInTransition(60000); + } + private void waitForRSShutdownToStartAndFinish(MasterThread activeMaster, String serverName) throws InterruptedException { ServerManager sm = activeMaster.getMaster().getServerManager(); @@ -298,7 +319,7 @@ } private void log(String msg) { - LOG.debug("\n\n" + msg + "\n"); + LOG.debug("\n\nTRR: " + msg + "\n"); } private RegionServerThread getServerHostingMeta(MiniHBaseCluster cluster) { @@ -325,16 +346,25 @@ for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { numFound += rst.getRegionServer().getNumberOfOnlineRegions(); } - if (expectedRegions.size() != numFound) { - LOG.debug("Expected to find " + expectedRegions.size() + " but only found" + if (expectedRegions.size() > numFound) { + log("Expected to find " + expectedRegions.size() + " but only found" + " " + numFound); NavigableSet foundRegions = getAllOnlineRegions(cluster); for (String region : expectedRegions) { if (!foundRegions.contains(region)) { - LOG.debug("Missing region: " + region); + log("Missing region: " + region); } } assertEquals(expectedRegions.size(), numFound); + } else if (expectedRegions.size() < numFound) { + int doubled = numFound - expectedRegions.size(); + log("Expected to find " + expectedRegions.size() + " but found" + + " " + numFound + " (" + doubled + " double assignments?)"); + NavigableSet doubleRegions = getDoubleAssignedRegions(cluster); + for (String region : doubleRegions) { + log("Region is double assigned: " + region); + } + assertEquals(expectedRegions.size(), numFound); } else { log("Success! Found expected number of " + numFound + " regions"); } @@ -350,4 +380,18 @@ return online; } + private NavigableSet getDoubleAssignedRegions( + MiniHBaseCluster cluster) { + NavigableSet online = new TreeSet(); + NavigableSet doubled = new TreeSet(); + for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { + for (HRegionInfo region : rst.getRegionServer().getOnlineRegions()) { + if(!online.add(region.getRegionNameAsString())) { + doubled.add(region.getRegionNameAsString()); + } + } + } + return doubled; + } + } Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java (revision 1029935) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java (working copy) @@ -742,6 +742,33 @@ } /** + * Gets the current data in the unassigned node for the specified region name + * or fully-qualified path. + * + *

Returns null if the region does not currently have a node. + * + *

Does not set a watch. + * + * @param watcher zk reference + * @param pathOrRegionName fully-specified path or region name + * @param stat object to store node info into on getData call + * @return data for the unassigned node + * @throws KeeperException + * @throws KeeperException if unexpected zookeeper exception + */ + public static RegionTransitionData getDataNoWatch(ZooKeeperWatcher zkw, + String pathOrRegionName, Stat stat) + throws KeeperException { + String node = pathOrRegionName.startsWith("/") ? + pathOrRegionName : getNodeName(zkw, pathOrRegionName); + byte [] data = ZKUtil.getDataNoWatch(zkw, node, stat); + if(data == null) { + return null; + } + return RegionTransitionData.fromBytes(data); + } + + /** * Delete the assignment node regardless of its current state. *

* Fail silent even if the node does not exist at all. Index: src/main/java/org/apache/hadoop/hbase/master/ServerManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (revision 1029935) +++ src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (working copy) @@ -580,12 +580,16 @@ */ public boolean sendRegionClose(HServerInfo server, HRegionInfo region) throws IOException { - if (server == null) return false; + if (server == null) { + LOG.debug("Unable to send region close because server is null; region=" + + region.getRegionNameAsString()); + return false; + } HRegionInterface hri = getServerConnection(server); if(hri == null) { LOG.warn("Attempting to send CLOSE RPC to server " + - server.getServerName() + " failed because no RPC connection found " + - "to this server"); + server.getServerName() + " for region " + region.getRegionNameAsString() + + " failed because no RPC connection found to this server"); return false; } return hri.closeRegion(region); Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1029935) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -751,7 +751,7 @@ region.getLog().closeAndDelete(); // 4. Trigger immediate assignment of this region - assignmentManager.assign(region.getRegionInfo()); + assignmentManager.assign(region.getRegionInfo(), true); } // 5. If sync, wait for assignment of regions @@ -958,7 +958,7 @@ } public void assignRegion(HRegionInfo hri) { - assignmentManager.assign(hri); + assignmentManager.assign(hri, true); } /** Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1029935) +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -34,6 +34,7 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.Executors; @@ -97,13 +98,19 @@ private TimeoutMonitor timeoutMonitor; - /** Regions currently in transition. */ + /** + * Regions currently in transition. Map of encoded region names to the master + * in-memory state for that region. + */ final ConcurrentSkipListMap regionsInTransition = new ConcurrentSkipListMap(); /** Plans for region movement. Key is the encoded version of a region name*/ // TODO: When do plans get cleaned out? Ever? In server open and in server // shutdown processing -- St.Ack + // TODO: Better to just synchronize access around regionPlans? I think that + // would be better than a concurrent structure since we do more than + // one operation at a time -- jgray final ConcurrentNavigableMap regionPlans = new ConcurrentSkipListMap(); @@ -152,9 +159,9 @@ this.executorService = service; Configuration conf = master.getConfiguration(); this.timeoutMonitor = new TimeoutMonitor( - conf.getInt("hbase.master.assignment.timeoutmonitor.period", 30000), + conf.getInt("hbase.master.assignment.timeoutmonitor.period", 10000), master, - conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 15000)); + conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 30000)); Threads.setDaemonThreadRunning(timeoutMonitor, master.getServerName() + ".timeoutMonitor"); } @@ -272,14 +279,14 @@ // Region is closed, insert into RIT and handle it regionsInTransition.put(encodedRegionName, new RegionState( regionInfo, RegionState.State.CLOSED, data.getStamp())); - new ClosedRegionHandler(master, this, data, regionInfo).process(); + new ClosedRegionHandler(master, this, regionInfo).process(); break; case M_ZK_REGION_OFFLINE: // Region is offline, insert into RIT and handle it like a closed regionsInTransition.put(encodedRegionName, new RegionState( regionInfo, RegionState.State.OFFLINE, data.getStamp())); - new ClosedRegionHandler(master, this, data, regionInfo).process(); + new ClosedRegionHandler(master, this, regionInfo).process(); break; case RS_ZK_REGION_OPENING: @@ -303,7 +310,7 @@ "; letting RIT timeout so will be assigned elsewhere"); break; } - new OpenedRegionHandler(master, this, data, regionInfo, hsi).process(); + new OpenedRegionHandler(master, this, regionInfo, hsi).process(); break; } } @@ -367,7 +374,7 @@ // what follows will fail because not in expected state. regionState.update(RegionState.State.CLOSED, data.getStamp()); this.executorService.submit(new ClosedRegionHandler(master, - this, data, regionState.getRegion())); + this, regionState.getRegion())); break; case RS_ZK_REGION_OPENING: @@ -400,7 +407,7 @@ // Handle OPENED by removing from transition and deleted zk node regionState.update(RegionState.State.OPEN, data.getStamp()); this.executorService.submit( - new OpenedRegionHandler(master, this, data, regionState.getRegion(), + new OpenedRegionHandler(master, this, regionState.getRegion(), this.serverManager.getServerInfo(data.getServerName()))); break; } @@ -600,7 +607,8 @@ public void offlineDisabledRegion(HRegionInfo regionInfo) { // Disabling so should not be reassigned, just delete the CLOSED node LOG.debug("Table being disabled so deleting ZK node and removing from " + - "regions in transition, skipping assignment"); + "regions in transition, skipping assignment of region " + + regionInfo.getRegionNameAsString()); try { if (!ZKAssign.deleteClosedNode(watcher, regionInfo.getEncodedName())) { // Could also be in OFFLINE mode @@ -632,8 +640,15 @@ * in-memory checks pass, the zk node is forced to OFFLINE before assigning. * * @param regionName server to be assigned + * @param setOfflineInZK whether ZK node should be created/transitioned to an + * OFFLINE state before assigning the region */ - public void assign(HRegionInfo region) { + public void assign(HRegionInfo region, boolean setOfflineInZK) { + assign(region, setOfflineInZK, false); + } + + public void assign(HRegionInfo region, boolean setOfflineInZK, + boolean forceNewPlan) { String tableName = region.getTableDesc().getNameAsString(); if (isTableDisabled(tableName)) { LOG.info("Table " + tableName + " disabled; skipping assign of " + @@ -648,7 +663,7 @@ } RegionState state = addToRegionsInTransition(region); synchronized (state) { - assign(state); + assign(state, setOfflineInZK, forceNewPlan); } } @@ -800,13 +815,14 @@ * Caller must hold lock on the passed state object. * @param state */ - private void assign(final RegionState state) { - if (!setOfflineInZooKeeper(state)) return; + private void assign(final RegionState state, final boolean setOfflineInZK, + final boolean forceNewPlan) { + if (setOfflineInZK && !setOfflineInZooKeeper(state)) return; if (this.master.isStopped()) { LOG.debug("Server stopped; skipping assign of " + state); return; } - RegionPlan plan = getRegionPlan(state); + RegionPlan plan = getRegionPlan(state, forceNewPlan); if (plan == null) return; // Should get reassigned later when RIT times out. try { LOG.debug("Assigning region " + state.getRegion().getRegionNameAsString() + @@ -823,12 +839,13 @@ // succeed anyways; we need a new plan! // Transition back to OFFLINE state.update(RegionState.State.OFFLINE); - // Remove the plan - this.regionPlans.remove(state.getRegion().getEncodedName()); - // Put in place a new plan and reassign. Calling getRegionPlan will add - // a plan if none exists (We removed it in line above). - if (getRegionPlan(state, plan.getDestination()) == null) return; - assign(state); + // Force a new plan and reassign. + if (getRegionPlan(state, plan.getDestination(), true) == null) { + LOG.warn("Unable to find a viable location to assign region " + + state.getRegion().getRegionNameAsString()); + return; + } + assign(state, false, false); } } @@ -890,43 +907,48 @@ * @return Plan for passed state (If none currently, it creates one or * if no servers to assign, it returns null). */ - RegionPlan getRegionPlan(final RegionState state) { - return getRegionPlan(state, null); + RegionPlan getRegionPlan(final RegionState state, + final boolean forceNewPlan) { + return getRegionPlan(state, null, forceNewPlan); } /** * @param state * @param serverToExclude Server to exclude (we know its bad). Pass null if * all servers are thought to be assignable. + * @param forceNewPlan If true, then if an existing plan exists, a new plan + * will be generated. * @return Plan for passed state (If none currently, it creates one or * if no servers to assign, it returns null). */ RegionPlan getRegionPlan(final RegionState state, - final HServerInfo serverToExclude) { + final HServerInfo serverToExclude, final boolean forceNewPlan) { // Pickup existing plan or make a new one String encodedName = state.getRegion().getEncodedName(); List servers = this.serverManager.getOnlineServersList(); // The remove below hinges on the fact that the call to // serverManager.getOnlineServersList() returns a copy if (serverToExclude != null) servers.remove(serverToExclude); - if (servers.size() < 0) return null; - RegionPlan newPlan = new RegionPlan(state.getRegion(), null, + if (servers.size() <= 0) return null; + RegionPlan randomPlan = new RegionPlan(state.getRegion(), null, LoadBalancer.randomAssignment(servers)); - RegionPlan existingPlan = this.regionPlans.putIfAbsent(encodedName, newPlan); - RegionPlan plan = null; - if (existingPlan == null) { - LOG.debug("No previous transition plan for " + - state.getRegion().getRegionNameAsString() + - " so generated a random one; " + newPlan + "; " + - serverManager.countOfRegionServers() + - " (online=" + serverManager.getOnlineServers().size() + - ", exclude=" + serverToExclude + ") available servers"); - plan = newPlan; - } else { - LOG.debug("Using preexisting plan=" + existingPlan); - plan = existingPlan; + synchronized (this.regionPlans) { + RegionPlan existingPlan = this.regionPlans.get(encodedName); + if (existingPlan == null || forceNewPlan || + existingPlan.getDestination().equals(serverToExclude)) { + LOG.debug("No previous transition plan was found (or we are ignoring " + + "an existing plan) for " + state.getRegion().getRegionNameAsString() + + " so generated a random one; " + randomPlan + "; " + + serverManager.countOfRegionServers() + + " (online=" + serverManager.getOnlineServers().size() + + ", exclude=" + serverToExclude + ") available servers"); + this.regionPlans.put(encodedName, randomPlan); + return randomPlan; + } + LOG.debug("Using pre-exisitng plan for region " + + state.getRegion().getRegionNameAsString() + "; plan=" + existingPlan); + return existingPlan; } - return plan; } /** @@ -974,10 +996,10 @@ if (state == null) { state = new RegionState(region, RegionState.State.PENDING_CLOSE); regionsInTransition.put(encodedName, state); - } else if (force && (state.isClosing() || state.isPendingClose())) { + } else if (force && state.isPendingClose()) { LOG.debug("Attempting to unassign region " + - region.getRegionNameAsString() + " which is already closing but " + - "forcing an additional close"); + region.getRegionNameAsString() + " which is already pending close " + + "but forcing an additional close"); state.update(RegionState.State.PENDING_CLOSE); } else { LOG.debug("Attempting to unassign region " + @@ -987,20 +1009,26 @@ } } // Send CLOSE RPC + HServerInfo server = null; + synchronized (this.regions) { + server = regions.get(region); + } try { // TODO: We should consider making this look more like it does for the // region open where we catch all throwables and never abort - if(serverManager.sendRegionClose(regions.get(region), - state.getRegion())) { - LOG.debug("Sent CLOSE to " + regions.get(region) + " for region " + + if(serverManager.sendRegionClose(server, state.getRegion())) { + LOG.debug("Sent CLOSE to " + server + " for region " + region.getRegionNameAsString()); return; } + LOG.debug("Server " + server + " region CLOSE RPC returned false"); } catch (NotServingRegionException nsre) { // Failed to close, so pass through and reassign + LOG.debug("Server " + server + " returned NotServingRegionException"); } catch (RemoteException re) { if (re.unwrapRemoteException() instanceof NotServingRegionException) { // Failed to close, so pass through and reassign + LOG.debug("Server " + server + " returned NotServingRegionException"); } else { this.master.abort("Remote unexpected exception", re.unwrapRemoteException()); @@ -1011,13 +1039,13 @@ this.master.abort("Remote unexpected exception", t); } // Did not CLOSE, so set region offline and assign it - LOG.debug("Attempted to send CLOSE to " + regions.get(region) + + LOG.debug("Attempted to send CLOSE to " + server + " for region " + region.getRegionNameAsString() + " but failed, " + "setting region as OFFLINE and reassigning"); synchronized (regionsInTransition) { forceRegionStateToOffline(region); - assign(region); } + assign(region, true); } /** @@ -1049,7 +1077,7 @@ */ public void assignRoot() throws KeeperException { RootLocationEditor.deleteRootLocation(this.master.getZooKeeper()); - assign(HRegionInfo.ROOT_REGIONINFO); + assign(HRegionInfo.ROOT_REGIONINFO, true); } /** @@ -1062,7 +1090,7 @@ */ public void assignMeta() { // Force assignment to a random server - assign(HRegionInfo.FIRST_META_REGIONINFO); + assign(HRegionInfo.FIRST_META_REGIONINFO, true); } /** @@ -1460,67 +1488,73 @@ LOG.info("Regions in transition timed out: " + regionState); // Expired! Do a retry. switch (regionState.getState()) { - case OFFLINE: case CLOSED: - LOG.info("Region has been OFFLINE or CLOSED for too long, " + - "reassigning " + regionInfo.getRegionNameAsString()); - assign(regionState.getRegion()); + LOG.info("Region has been CLOSED for too long, " + + "retriggering ClosedRegionHandler"); + AssignmentManager.this.executorService.submit( + new ClosedRegionHandler(master, AssignmentManager.this, + regionState.getRegion())); break; + case OFFLINE: + LOG.info("Region has been OFFLINE for too long, " + + "reassigning " + regionInfo.getRegionNameAsString() + + " to a random server"); + assign(regionState.getRegion(), false); + break; case PENDING_OPEN: LOG.info("Region has been PENDING_OPEN for too " + "long, reassigning region=" + regionInfo.getRegionNameAsString()); - // Should have a ZK node in OFFLINE state or no node at all - try { - if (ZKUtil.watchAndCheckExists(watcher, - ZKAssign.getNodeName(watcher, - regionInfo.getEncodedName())) && - !ZKAssign.verifyRegionState(watcher, regionInfo, - EventType.M_ZK_REGION_OFFLINE)) { - LOG.info("Region exists and not in expected OFFLINE " + - "state so skipping timeout, region=" + - regionInfo.getRegionNameAsString()); - break; - } - } catch (KeeperException ke) { - LOG.error("Unexpected ZK exception timing out " + - "PENDING_CLOSE region", - ke); - break; - } - AssignmentManager.this.setOffline(regionState.getRegion()); - regionState.update(RegionState.State.OFFLINE); - assign(regionState.getRegion()); - break; + assign(regionState.getRegion(), false, true); + break; case OPENING: LOG.info("Region has been OPENING for too " + "long, reassigning region=" + regionInfo.getRegionNameAsString()); // Should have a ZK node in OPENING state try { - if (ZKUtil.watchAndCheckExists(watcher, - ZKAssign.getNodeName(watcher, - regionInfo.getEncodedName())) && - ZKAssign.transitionNode(watcher, regionInfo, - HMaster.MASTER, EventType.RS_ZK_REGION_OPENING, - EventType.M_ZK_REGION_OFFLINE, -1) == -1) { - LOG.info("Region transitioned out of OPENING so " + - "skipping timeout, region=" + - regionInfo.getRegionNameAsString()); + String node = ZKAssign.getNodeName(watcher, + regionInfo.getEncodedName()); + Stat stat = new Stat(); + RegionTransitionData data = ZKAssign.getDataNoWatch(watcher, + node, stat); + if (data.getEventType() == EventType.RS_ZK_REGION_OPENED) { + LOG.debug("Region has transitioned to OPENED, allowing " + + "watched event handlers to process"); break; + } else if (data.getEventType() != + EventType.RS_ZK_REGION_OPENING) { + LOG.warn("While timing out a region in state OPENING, " + + "found ZK node in unexpected state: " + + data.getEventType()); + break; } + // Attempt to transition node into OFFLINE + try { + data = new RegionTransitionData( + EventType.M_ZK_REGION_OFFLINE, + regionInfo.getRegionName()); + if (ZKUtil.setData(watcher, node, data.getBytes(), + stat.getVersion())) { + // Node is now OFFLINE, let's trigger another assignment + ZKUtil.getDataAndWatch(watcher, node); // re-set the watch + LOG.info("Successfully transitioned region=" + + regionInfo.getRegionNameAsString() + " into OFFLINE" + + " and forcing a new assignment"); + assign(regionState, false, true); + } + } catch (KeeperException.NoNodeException nne) { + // Node did not exist, can't time this out + } } catch (KeeperException ke) { LOG.error("Unexpected ZK exception timing out CLOSING region", ke); break; } - AssignmentManager.this.setOffline(regionState.getRegion()); - regionState.update(RegionState.State.OFFLINE); - assign(regionState.getRegion()); break; case OPEN: - LOG.warn("Long-running region in OPEN state? This should " + - "not happen; region=" + regionInfo.getRegionNameAsString()); + LOG.error("Region has been OPEN for too long, " + + "we don't know where region was opened so can't do anything"); break; case PENDING_CLOSE: LOG.info("Region has been PENDING_CLOSE for too " + @@ -1544,20 +1578,8 @@ break; case CLOSING: LOG.info("Region has been CLOSING for too " + - "long, running forced unassign again on region=" + - regionInfo.getRegionNameAsString()); - try { - if (ZKAssign.deleteClosingNode(watcher, - regionInfo.getEncodedName())) { - unassign(regionInfo, true); - } - } catch (NoNodeException e) { - LOG.debug("Node no longer existed so not forcing another " + - "unassignment"); - } catch (KeeperException e) { - LOG.warn("Unexpected ZK exception timing out a region " + - "close", e); - } + "long, this should eventually complete or the server will " + + "expire, doing nothing"); break; } } @@ -1569,54 +1591,42 @@ /** * Process shutdown server removing any assignments. * @param hsi Server that went down. + * @return set of regions on this server that are not in transition */ - public void processServerShutdown(final HServerInfo hsi) { - // Clean out any exisiting assignment plans for this server - for (Iterator > i = - this.regionPlans.entrySet().iterator(); i.hasNext();) { - Map.Entry e = i.next(); - if (e.getValue().getDestination().equals(hsi)) { - // Use iterator's remove else we'll get CME - i.remove(); + public List processServerShutdown(final HServerInfo hsi) { + // Clean out any existing assignment plans for this server + synchronized (this.regionPlans) { + for (Iterator > i = + this.regionPlans.entrySet().iterator(); i.hasNext();) { + Map.Entry e = i.next(); + if (e.getValue().getDestination().equals(hsi)) { + // Use iterator's remove else we'll get CME + i.remove(); + } } } - // Remove assignment info related to the downed server. Remove the downed - // server from list of servers else it looks like a server w/ no load. + // TODO: Do we want to sync on RIT here? + // Remove this server from map of servers to regions, and remove all regions + // of this server from online map of regions. + Set deadRegions = null; synchronized (this.regions) { - Set hris = new HashSet(); - for (Map.Entry e: this.regions.entrySet()) { - // Add to a Set -- don't call setOffline in here else we get a CME. - if (e.getValue().equals(hsi)) hris.add(e.getKey()); + deadRegions = new TreeSet(this.servers.remove(hsi)); + for (HRegionInfo region : deadRegions) { + this.regions.remove(region); } - for (HRegionInfo hri: hris) setOffline(hri); - this.servers.remove(hsi); } - // If anything in transition related to the server, clean it up. + // See if any of the regions that were online on this server were in RIT + // If they are, normal timeouts will deal with them appropriately so + // let's skip a manual re-assignment. + List rits = new ArrayList(); synchronized (regionsInTransition) { - // Iterate all regions in transition checking if were on this server - final String serverName = hsi.getServerName(); - for (Map.Entry e: this.regionsInTransition.entrySet()) { - if (!e.getKey().equals(serverName)) continue; - RegionState regionState = e.getValue(); - switch(regionState.getState()) { - case PENDING_OPEN: - case OPENING: - case OFFLINE: - case CLOSED: - case PENDING_CLOSE: - case CLOSING: - LOG.info("Region " + regionState.getRegion().getRegionNameAsString() + - " was in state=" + regionState.getState() + " on shutdown server=" + - serverName + ", reassigning"); - assign(regionState.getRegion()); - break; - - case OPEN: - LOG.warn("Long-running region in OPEN state? Should not happen"); - break; + for (RegionState region : this.regionsInTransition.values()) { + if (deadRegions.remove(region.getRegion())) { + rits.add(region.getRegion()); } } } + return rits; } /** Index: src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java (revision 1029935) +++ src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java (working copy) @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.handler; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -97,7 +98,8 @@ // doing after log splitting. Could do some states before -- OPENING? // OFFLINE? -- and then others after like CLOSING that depend on log // splitting. - this.services.getAssignmentManager().processServerShutdown(this.hsi); + List regionsInTransition = + this.services.getAssignmentManager().processServerShutdown(this.hsi); // Assign root and meta if we were carrying them. if (isCarryingRoot()) { // -ROOT- @@ -113,41 +115,66 @@ if (isCarryingMeta()) this.services.getAssignmentManager().assignMeta(); // Wait on meta to come online; we need it to progress. - try { - this.server.getCatalogTracker().waitForMeta(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted", e); + // TODO: Best way to hold strictly here? We should build this retry logic + // into the MetaReader operations themselves. + NavigableMap hris = null; + while (!this.server.isStopped()) { + try { + this.server.getCatalogTracker().waitForMeta(); + hris = MetaReader.getServerUserRegions(this.server.getCatalogTracker(), + this.hsi); + break; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted", e); + } catch (IOException ioe) { + LOG.info("Received exception accessing META during server shutdown of " + + serverName + ", retrying META read"); + } } - NavigableMap hris = - MetaReader.getServerUserRegions(this.server.getCatalogTracker(), this.hsi); - LOG.info("Reassigning the " + hris.size() + " region(s) that " + serverName + - " was carrying"); + // Remove regions that were in transition + for (HRegionInfo rit : regionsInTransition) hris.remove(rit); + LOG.info("Reassigning the " + hris.size() + " region(s) that " + serverName + + " was carrying (skipping " + regionsInTransition.size() + + " regions(s) that are in transition)"); - // We should encounter -ROOT- and .META. first in the Set given how its - // a sorted set. + // Iterate regions that were on this server and assign them for (Map.Entry e: hris.entrySet()) { - processDeadRegion(e.getKey(), e.getValue(), + if (processDeadRegion(e.getKey(), e.getValue(), this.services.getAssignmentManager(), - this.server.getCatalogTracker()); - this.services.getAssignmentManager().assign(e.getKey()); + this.server.getCatalogTracker())) { + this.services.getAssignmentManager().assign(e.getKey(), true); + } } this.deadServers.remove(serverName); LOG.info("Finished processing of shutdown of " + serverName); } - public static void processDeadRegion(HRegionInfo hri, Result result, + /** + * Process a dead region from a dead RS. Checks if the region is disabled + * or if the region has a partially completed split. + *

+ * Returns true if specified region should be assigned, false if not. + * @param hri + * @param result + * @param assignmentManager + * @param catalogTracker + * @return + * @throws IOException + */ + public static boolean processDeadRegion(HRegionInfo hri, Result result, AssignmentManager assignmentManager, CatalogTracker catalogTracker) throws IOException { // If table is not disabled but the region is offlined, boolean disabled = assignmentManager.isTableDisabled( hri.getTableDesc().getNameAsString()); - if (disabled) return; + if (disabled) return false; if (hri.isOffline() && hri.isSplit()) { fixupDaughters(result, assignmentManager, catalogTracker); - return; + return false; } + return true; } /** @@ -183,7 +210,7 @@ if (pair == null || pair.getFirst() == null) { LOG.info("Fixup; missing daughter " + hri.getEncodedName()); MetaEditor.addDaughter(catalogTracker, hri, null); - assignmentManager.assign(hri); + assignmentManager.assign(hri, true); } } } Index: src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java (revision 1029935) +++ src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java (working copy) @@ -24,7 +24,6 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.executor.EventHandler; -import org.apache.hadoop.hbase.executor.RegionTransitionData; import org.apache.hadoop.hbase.master.AssignmentManager; /** @@ -39,7 +38,6 @@ private static final Log LOG = LogFactory.getLog(ClosedRegionHandler.class); private final AssignmentManager assignmentManager; - private final RegionTransitionData data; private final HRegionInfo regionInfo; private final ClosedPriority priority; @@ -58,12 +56,10 @@ } }; - public ClosedRegionHandler(Server server, - AssignmentManager assignmentManager, RegionTransitionData data, + public ClosedRegionHandler(Server server, AssignmentManager assignmentManager, HRegionInfo regionInfo) { super(server, EventType.RS_ZK_REGION_CLOSED); this.assignmentManager = assignmentManager; - this.data = data; this.regionInfo = regionInfo; if(regionInfo.isRootRegion()) { priority = ClosedPriority.ROOT; @@ -94,6 +90,6 @@ } // ZK Node is in CLOSED state, assign it. assignmentManager.setOffline(regionInfo); - assignmentManager.assign(regionInfo); + assignmentManager.assign(regionInfo, true); } } \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java (revision 1029935) +++ src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java (working copy) @@ -77,7 +77,7 @@ assignmentManager.undisableTable(this.tableNameStr); // Verify all regions of table are disabled for (HRegionInfo region : regions) { - assignmentManager.assign(region); + assignmentManager.assign(region, true); } // Wait on table's regions to clear region in transition. for (HRegionInfo region: regions) { Index: src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java (revision 1029935) +++ src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java (working copy) @@ -25,7 +25,6 @@ import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.executor.EventHandler; -import org.apache.hadoop.hbase.executor.RegionTransitionData; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.zookeeper.KeeperException; @@ -55,8 +54,8 @@ }; public OpenedRegionHandler(Server server, - AssignmentManager assignmentManager, RegionTransitionData data, - HRegionInfo regionInfo, HServerInfo serverInfo) { + AssignmentManager assignmentManager, HRegionInfo regionInfo, + HServerInfo serverInfo) { super(server, EventType.RS_ZK_REGION_OPENED); this.assignmentManager = assignmentManager; this.regionInfo = regionInfo; Index: src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java (revision 1029935) +++ src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java (working copy) @@ -498,9 +498,9 @@ Result result; while((result = metaServer.next(scannerid)) != null) { if (result != null && result.size() > 0) { - Pair pair = metaRowToRegionPair(result); - if (pair.getSecond() == null || - !pair.getSecond().equals(hsi.getServerAddress())) { + Pair pair = + metaRowToRegionPairWithInfo(result); + if (pair.getSecond() == null || !pair.getSecond().equals(hsi)) { continue; } hris.put(pair.getFirst(), result);