Index: src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 1028470) +++ src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -715,7 +715,31 @@ return createMultiRegions(c, table, columnFamily, KEYS); } + /** + * Creates the specified number of regions in the specified table. + * @param c + * @param table + * @param columnFamily + * @param startKeys + * @return + * @throws IOException + */ public int createMultiRegions(final Configuration c, final HTable table, + final byte [] family, int numRegions) + throws IOException { + if (numRegions < 3) throw new IOException("Must create at least 3 regions"); + byte [] startKey = Bytes.toBytes("aaaaa"); + byte [] endKey = Bytes.toBytes("zzzzz"); + byte [][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); + byte [][] regionStartKeys = new byte[splitKeys.length+1][]; + for (int i=0;i regions = getAllOnlineRegions(cluster); + assertRegionsAssigned(cluster, regions); + assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); + + // Add a new regionserver + log("Adding a fourth RS"); + RegionServerThread restarted = cluster.startRegionServer(); + expectedNumRS++; + restarted.waitForServerOnline(); + log("Additional RS is online"); + log("Waiting for no more RIT"); + ZKAssign.blockUntilNoRIT(zkw); + log("Verifying there are " + numRegions + " assigned on cluster"); + assertRegionsAssigned(cluster, regions); + assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); + + // Master Restarts + List masterThreads = cluster.getMasterThreads(); + MasterThread activeMaster = null; + MasterThread backupMaster = null; + assertEquals(2, masterThreads.size()); + if (masterThreads.get(0).getMaster().isActiveMaster()) { + activeMaster = masterThreads.get(0); + backupMaster = masterThreads.get(1); + } else { + activeMaster = masterThreads.get(1); + backupMaster = masterThreads.get(0); + } + + // Bring down the backup master + LOG.debug("\n\nStopping backup master\n\n"); + backupMaster.getMaster().stop("Stop of backup during rolling restart"); + cluster.hbaseCluster.waitOnMaster(backupMaster); + + // Bring down the primary master + LOG.debug("\n\nStopping primary master\n\n"); + activeMaster.getMaster().stop("Stop of active during rolling restart"); + cluster.hbaseCluster.waitOnMaster(activeMaster); + + // Start primary master + LOG.debug("\n\nRestarting primary master\n\n"); + activeMaster = cluster.startMaster(); + cluster.waitForActiveAndReadyMaster(); + + // Start backup master + LOG.debug("\n\nRestarting backup master\n\n"); + backupMaster = cluster.startMaster(); + + assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); + + // RegionServer Restarts + + // Bring them down, one at a time, waiting between each to complete + List regionServers = + cluster.getLiveRegionServerThreads(); + int num = 1; + int total = regionServers.size(); + for (RegionServerThread rst : regionServers) { + String serverName = rst.getRegionServer().getServerName(); + log("Stopping region server " + num + " of " + total + " [ " + + serverName + "]"); + rst.getRegionServer().stop("Stopping RS during rolling restart"); + cluster.hbaseCluster.waitOnRegionServer(rst); + log("Waiting for RS shutdown to be handled by master"); + waitForRSShutdownToStartAndFinish(activeMaster, serverName); + log("RS shutdown done, waiting for no more RIT"); + ZKAssign.blockUntilNoRIT(zkw); + log("Verifying there are " + numRegions + " assigned on cluster"); + assertRegionsAssigned(cluster, regions); + expectedNumRS--; + assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); + log("Restarting region server " + num + " of " + total); + restarted = cluster.startRegionServer(); + restarted.waitForServerOnline(); + expectedNumRS++; + log("Region server " + num + " is back online"); + log("Waiting for no more RIT"); + ZKAssign.blockUntilNoRIT(zkw); + log("Verifying there are " + numRegions + " assigned on cluster"); + assertRegionsAssigned(cluster, regions); + assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); + num++; + } + Thread.sleep(2000); + assertRegionsAssigned(cluster, regions); + + // Bring the RS hosting ROOT down and the RS hosting META down at once + RegionServerThread rootServer = getServerHostingRoot(cluster); + RegionServerThread metaServer = getServerHostingMeta(cluster); + if (rootServer == metaServer) { + log("ROOT and META on the same server so killing another random server"); + int i=0; + while (rootServer == metaServer) { + metaServer = cluster.getRegionServerThreads().get(i); + i++; + } + } + log("Stopping server hosting ROOT"); + rootServer.getRegionServer().stop("Stopping ROOT server"); + log("Stopping server hosting META #1"); + metaServer.getRegionServer().stop("Stopping META server"); + cluster.hbaseCluster.waitOnRegionServer(rootServer); + log("Root server down"); + cluster.hbaseCluster.waitOnRegionServer(metaServer); + log("Meta server down #1"); + expectedNumRS -= 2; + log("Waiting for meta server #1 RS shutdown to be handled by master"); + waitForRSShutdownToStartAndFinish(activeMaster, + metaServer.getRegionServer().getServerName()); + log("Waiting for no more RIT"); + ZKAssign.blockUntilNoRIT(zkw); + log("Verifying there are " + numRegions + " assigned on cluster"); + assertRegionsAssigned(cluster, regions); + assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); + + // Kill off the server hosting META again + metaServer = getServerHostingMeta(cluster); + log("Stopping server hosting META #2"); + metaServer.getRegionServer().stop("Stopping META server"); + cluster.hbaseCluster.waitOnRegionServer(metaServer); + log("Meta server down"); + expectedNumRS--; + log("Waiting for RS shutdown to be handled by master"); + waitForRSShutdownToStartAndFinish(activeMaster, + metaServer.getRegionServer().getServerName()); + log("RS shutdown done, waiting for no more RIT"); + ZKAssign.blockUntilNoRIT(zkw); + log("Verifying there are " + numRegions + " assigned on cluster"); + assertRegionsAssigned(cluster, regions); + assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); + + // Start 3 RS again + cluster.startRegionServer().waitForServerOnline(); + cluster.startRegionServer().waitForServerOnline(); + cluster.startRegionServer().waitForServerOnline(); + Thread.sleep(1000); + log("Waiting for no more RIT"); + ZKAssign.blockUntilNoRIT(zkw); + log("Verifying there are " + numRegions + " assigned on cluster"); + assertRegionsAssigned(cluster, regions); + // Shutdown server hosting META + metaServer = getServerHostingMeta(cluster); + log("Stopping server hosting META (1 of 3)"); + metaServer.getRegionServer().stop("Stopping META server"); + cluster.hbaseCluster.waitOnRegionServer(metaServer); + log("Meta server down (1 of 3)"); + log("Waiting for RS shutdown to be handled by master"); + waitForRSShutdownToStartAndFinish(activeMaster, + metaServer.getRegionServer().getServerName()); + log("RS shutdown done, waiting for no more RIT"); + ZKAssign.blockUntilNoRIT(zkw); + log("Verifying there are " + numRegions + " assigned on cluster"); + assertRegionsAssigned(cluster, regions); + + // Shutdown server hosting META again + metaServer = getServerHostingMeta(cluster); + log("Stopping server hosting META (2 of 3)"); + metaServer.getRegionServer().stop("Stopping META server"); + cluster.hbaseCluster.waitOnRegionServer(metaServer); + log("Meta server down (2 of 3)"); + log("Waiting for RS shutdown to be handled by master"); + waitForRSShutdownToStartAndFinish(activeMaster, + metaServer.getRegionServer().getServerName()); + log("RS shutdown done, waiting for no more RIT"); + ZKAssign.blockUntilNoRIT(zkw); + log("Verifying there are " + numRegions + " assigned on cluster"); + assertRegionsAssigned(cluster, regions); + + // Shutdown server hosting META again + metaServer = getServerHostingMeta(cluster); + log("Stopping server hosting META (3 of 3)"); + metaServer.getRegionServer().stop("Stopping META server"); + cluster.hbaseCluster.waitOnRegionServer(metaServer); + log("Meta server down (3 of 3)"); + log("Waiting for RS shutdown to be handled by master"); + waitForRSShutdownToStartAndFinish(activeMaster, + metaServer.getRegionServer().getServerName()); + log("RS shutdown done, waiting for no more RIT"); + ZKAssign.blockUntilNoRIT(zkw); + log("Verifying there are " + numRegions + " assigned on cluster"); + assertRegionsAssigned(cluster, regions); + + if (cluster.getRegionServerThreads().size() != 1) { + log("Online regionservers:"); + for (RegionServerThread rst : cluster.getRegionServerThreads()) { + log("RS: " + rst.getRegionServer().getServerName()); + } + } + assertEquals(1, cluster.getRegionServerThreads().size()); + + + // TODO: Bring random 3 of 4 RS down at the same time + + + // Stop the cluster + TEST_UTIL.shutdownMiniCluster(); + } + + private void waitForRSShutdownToStartAndFinish(MasterThread activeMaster, + String serverName) throws InterruptedException { + ServerManager sm = activeMaster.getMaster().getServerManager(); + // First wait for it to be in dead list + while (!sm.deadservers.isDeadServer(serverName)) { + log("Waiting for [" + serverName + "] to be listed as dead in master"); + Thread.sleep(100); + } + log("Server [" + serverName + "] marked as dead, waiting for it to " + + "finish dead processing"); + while (sm.deadservers.isDeadServer(serverName)) { + log("Server [" + serverName + "] still marked as dead, waiting"); + Thread.sleep(100); + } + log("Server [" + serverName + "] done with server shutdown processing"); + } + + private void log(String msg) { + LOG.debug("\n\n" + msg + "\n"); + } + + private RegionServerThread getServerHostingMeta(MiniHBaseCluster cluster) { + return getServerHosting(cluster, HRegionInfo.FIRST_META_REGIONINFO); + } + + private RegionServerThread getServerHostingRoot(MiniHBaseCluster cluster) { + return getServerHosting(cluster, HRegionInfo.ROOT_REGIONINFO); + } + + private RegionServerThread getServerHosting(MiniHBaseCluster cluster, + HRegionInfo region) { + for (RegionServerThread rst : cluster.getRegionServerThreads()) { + if (rst.getRegionServer().getOnlineRegions().contains(region)) { + return rst; + } + } + return null; + } + + private void assertRegionsAssigned(MiniHBaseCluster cluster, + Set expectedRegions) { + int numFound = 0; + for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { + numFound += rst.getRegionServer().getNumberOfOnlineRegions(); + } + if (expectedRegions.size() != numFound) { + LOG.debug("Expected to find " + expectedRegions.size() + " but only found" + + " " + numFound); + NavigableSet foundRegions = getAllOnlineRegions(cluster); + for (String region : expectedRegions) { + if (!foundRegions.contains(region)) { + LOG.debug("Missing region: " + region); + } + } + assertEquals(expectedRegions.size(), numFound); + } else { + log("Success! Found expected number of " + numFound + " regions"); + } + } + + private NavigableSet getAllOnlineRegions(MiniHBaseCluster cluster) { + NavigableSet online = new TreeSet(); + for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { + for (HRegionInfo region : rst.getRegionServer().getOnlineRegions()) { + online.add(region.getRegionNameAsString()); + } + } + return online; + } + +} Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (revision 1028470) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (working copy) @@ -1093,6 +1093,9 @@ LOG.debug(zkw.prefix("Retrieved " + ((data == null)? 0: data.length) + " byte(s) of data from znode " + znode + (watcherSet? " and set watcher; ": "; data=") + - (data == null? "null": StringUtils.abbreviate(Bytes.toString(data), 32)))); + (data == null? "null": ( + znode.startsWith(zkw.assignmentZNode) ? + RegionTransitionData.fromBytes(data).toString() + : StringUtils.abbreviate(Bytes.toString(data), 32))))); } } Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java (revision 1028470) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java (working copy) @@ -244,11 +244,22 @@ int version = ZKUtil.checkExists(zkw, node); if(version == -1) { ZKUtil.createAndWatch(zkw, node, data.getBytes()); - return true; } else { - return ZKUtil.setData(zkw, node, data.getBytes(), version); + if (!ZKUtil.setData(zkw, node, data.getBytes(), version)) { + return false; + } else { + // We successfully forced to OFFLINE, reset watch and handle if + // the state changed in between our set and the watch + RegionTransitionData curData = + ZKAssign.getData(zkw, region.getEncodedName()); + if (curData.getEventType() != data.getEventType()) { + // state changed, need to process + return false; + } + } } } + return true; } /** @@ -404,6 +415,8 @@ "after verifying it was in OPENED state, we got a version mismatch")); return false; } + LOG.debug(zkw.prefix("Successfully deleted unassigned node for region " + + regionName + " in expected state " + expectedState)); return true; } } @@ -745,6 +758,8 @@ /** * Blocks until there are no node in regions in transition. + *

+ * Used in testing only. * @param zkw zk reference * @throws KeeperException * @throws InterruptedException @@ -759,11 +774,31 @@ LOG.debug("ZK RIT -> " + znode); } } - Thread.sleep(200); + Thread.sleep(100); } } /** + * Blocks until there is at least one node in regions in transition. + *

+ * Used in testing only. + * @param zkw zk reference + * @throws KeeperException + * @throws InterruptedException + */ + public static void blockUntilRIT(ZooKeeperWatcher zkw) + throws KeeperException, InterruptedException { + while (!ZKUtil.nodeHasChildren(zkw, zkw.assignmentZNode)) { + List znodes = + ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.assignmentZNode); + if (znodes == null || znodes.isEmpty()) { + LOG.debug("No RIT in ZK"); + } + Thread.sleep(100); + } + } + + /** * Verifies that the specified region is in the specified state in ZooKeeper. *

* Returns true if region is in transition and in the specified state in Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1028470) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -425,6 +425,7 @@ } catch (Throwable t) { // Call stop if error or process will stick around for ever since server // puts up non-daemon threads. + LOG.error("Stopping HRS because failed initialize", t); this.server.stop(); } } @@ -812,6 +813,7 @@ this.metrics = new RegionServerMetrics(); startServiceThreads(); LOG.info("Serving as " + this.serverInfo.getServerName() + + ", RPC listening on " + this.server.getListenerAddress() + ", sessionid=0x" + Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId())); isOnline = true; Index: src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (revision 1028470) +++ src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (working copy) @@ -239,7 +239,34 @@ } /** + * Wait for the specified region server to stop + * Removes this thread from list of running threads. * @param serverNumber + * @return Name of region server that just went down. + */ + public String waitOnRegionServer(JVMClusterUtil.RegionServerThread rst) { + while (rst.isAlive()) { + try { + LOG.info("Waiting on " + + rst.getRegionServer().getHServerInfo().toString()); + rst.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + } + for (int i=0;i