Index: src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java (revision 0) @@ -0,0 +1,351 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.junit.Assert.*; +import java.util.List; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.Test; + +/** + * Tests the restarting of everything as done during rolling restarts. + */ +public class TestRollingRestart { + private static final Log LOG = LogFactory.getLog(TestRollingRestart.class); + + @Test + public void testBasicRollingRestart() throws Exception { + + // Start a cluster with 2 masters and 4 regionservers + final int NUM_MASTERS = 2; + final int NUM_RS = 3; + + int expectedNumRS = 3; + + // Start the cluster + HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS); + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + cluster.waitForActiveAndReadyMaster(); + Configuration conf = TEST_UTIL.getConfiguration(); + ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testRollingRestart", + null); + + // Create a table with regions + byte [] table = Bytes.toBytes("tableRestart"); + byte [] family = Bytes.toBytes("family"); + HTable ht = TEST_UTIL.createTable(table, family); + int numRegions = TEST_UTIL.createMultiRegions(ht, family); + numRegions += 2; // catalogs + LOG.debug("\n\nWaiting for no more RIT\n"); + ZKAssign.blockUntilNoRIT(zkw); + LOG.debug("\n\nDisabling table\n"); + TEST_UTIL.getHBaseAdmin().disableTable(table); + LOG.debug("\n\nWaiting for no more RIT\n"); + ZKAssign.blockUntilNoRIT(zkw); + LOG.debug("\n\nEnabling table\n"); + TEST_UTIL.getHBaseAdmin().enableTable(table); + LOG.debug("\n\nWaiting for no more RIT\n"); + ZKAssign.blockUntilNoRIT(zkw); + LOG.debug("\n\nVerifying there are " + numRegions + " assigned on cluster\n"); + NavigableSet regions = getAllOnlineRegions(cluster); + assertRegionsAssigned(cluster, regions); + assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); + + // Add a new regionserver + log("Adding a fourth RS"); + RegionServerThread restarted = cluster.startRegionServer(); + expectedNumRS++; + restarted.waitForServerOnline(); + log("Additional RS is online"); + log("Waiting for no more RIT"); + ZKAssign.blockUntilNoRIT(zkw); + log("Verifying there are " + numRegions + " assigned on cluster"); + assertRegionsAssigned(cluster, regions); + assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); + + // Master Restarts + List masterThreads = cluster.getMasterThreads(); + MasterThread activeMaster = null; + MasterThread backupMaster = null; + assertEquals(2, masterThreads.size()); + if (masterThreads.get(0).getMaster().isActiveMaster()) { + activeMaster = masterThreads.get(0); + backupMaster = masterThreads.get(1); + } else { + activeMaster = masterThreads.get(1); + backupMaster = masterThreads.get(0); + } + + // Bring down the backup master + LOG.debug("\n\nStopping backup master\n\n"); + backupMaster.getMaster().stop("Stop of backup during rolling restart"); + cluster.hbaseCluster.waitOnMaster(backupMaster); + + // Bring down the primary master + LOG.debug("\n\nStopping primary master\n\n"); + activeMaster.getMaster().stop("Stop of active during rolling restart"); + cluster.hbaseCluster.waitOnMaster(activeMaster); + + // Start primary master + LOG.debug("\n\nRestarting primary master\n\n"); + activeMaster = cluster.startMaster(); + cluster.waitForActiveAndReadyMaster(); + + // Start backup master + LOG.debug("\n\nRestarting backup master\n\n"); + backupMaster = cluster.startMaster(); + + assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); + + // RegionServer Restarts + + // Bring them down, one at a time, waiting between each to complete + List regionServers = + cluster.getLiveRegionServerThreads(); + int num = 1; + int total = regionServers.size(); + for (RegionServerThread rst : regionServers) { + String serverName = rst.getRegionServer().getServerName(); + log("Stopping region server " + num + " of " + total + " [ " + + serverName + "]"); + rst.getRegionServer().stop("Stopping RS during rolling restart"); + cluster.hbaseCluster.waitOnRegionServer(rst); + log("Waiting for RS shutdown to be handled by master"); + waitForRSShutdownToStartAndFinish(activeMaster, serverName); + log("RS shutdown done, waiting for no more RIT"); + ZKAssign.blockUntilNoRIT(zkw); + log("Verifying there are " + numRegions + " assigned on cluster"); + assertRegionsAssigned(cluster, regions); + expectedNumRS--; + assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); + log("Restarting region server " + num + " of " + total); + restarted = cluster.startRegionServer(); + restarted.waitForServerOnline(); + expectedNumRS++; + log("Region server " + num + " is back online"); + log("Waiting for no more RIT"); + ZKAssign.blockUntilNoRIT(zkw); + log("Verifying there are " + numRegions + " assigned on cluster"); + assertRegionsAssigned(cluster, regions); + assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); + num++; + } + Thread.sleep(2000); + assertRegionsAssigned(cluster, regions); + + // Bring the RS hosting ROOT down and the RS hosting META down at once + RegionServerThread rootServer = getServerHostingRoot(cluster); + RegionServerThread metaServer = getServerHostingMeta(cluster); + if (rootServer == metaServer) { + log("ROOT and META on the same server so killing another random server"); + int i=0; + while (rootServer == metaServer) { + metaServer = cluster.getRegionServerThreads().get(i); + i++; + } + } + log("Stopping server hosting ROOT"); + rootServer.getRegionServer().stop("Stopping ROOT server"); + log("Stopping server hosting META #1"); + metaServer.getRegionServer().stop("Stopping META server"); + cluster.hbaseCluster.waitOnRegionServer(rootServer); + log("Root server down"); + cluster.hbaseCluster.waitOnRegionServer(metaServer); + log("Meta server down #1"); + expectedNumRS -= 2; + log("Waiting for meta server #1 RS shutdown to be handled by master"); + waitForRSShutdownToStartAndFinish(activeMaster, + metaServer.getRegionServer().getServerName()); + log("Waiting for no more RIT"); + ZKAssign.blockUntilNoRIT(zkw); + log("Verifying there are " + numRegions + " assigned on cluster"); + assertRegionsAssigned(cluster, regions); + assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); + + // Kill off the server hosting META again + metaServer = getServerHostingMeta(cluster); + log("Stopping server hosting META #2"); + metaServer.getRegionServer().stop("Stopping META server"); + cluster.hbaseCluster.waitOnRegionServer(metaServer); + log("Meta server down"); + expectedNumRS--; + log("Waiting for RS shutdown to be handled by master"); + waitForRSShutdownToStartAndFinish(activeMaster, + metaServer.getRegionServer().getServerName()); + log("RS shutdown done, waiting for no more RIT"); + ZKAssign.blockUntilNoRIT(zkw); + log("Verifying there are " + numRegions + " assigned on cluster"); + assertRegionsAssigned(cluster, regions); + assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); + + // Start 3 RS again + cluster.startRegionServer().waitForServerOnline(); + cluster.startRegionServer().waitForServerOnline(); + cluster.startRegionServer().waitForServerOnline(); + Thread.sleep(1000); + log("Waiting for no more RIT"); + ZKAssign.blockUntilNoRIT(zkw); + log("Verifying there are " + numRegions + " assigned on cluster"); + assertRegionsAssigned(cluster, regions); + // Shutdown server hosting META + metaServer = getServerHostingMeta(cluster); + log("Stopping server hosting META (1 of 3)"); + metaServer.getRegionServer().stop("Stopping META server"); + cluster.hbaseCluster.waitOnRegionServer(metaServer); + log("Meta server down (1 of 3)"); + log("Waiting for RS shutdown to be handled by master"); + waitForRSShutdownToStartAndFinish(activeMaster, + metaServer.getRegionServer().getServerName()); + log("RS shutdown done, waiting for no more RIT"); + ZKAssign.blockUntilNoRIT(zkw); + log("Verifying there are " + numRegions + " assigned on cluster"); + assertRegionsAssigned(cluster, regions); + + // Shutdown server hosting META again + metaServer = getServerHostingMeta(cluster); + log("Stopping server hosting META (2 of 3)"); + metaServer.getRegionServer().stop("Stopping META server"); + cluster.hbaseCluster.waitOnRegionServer(metaServer); + log("Meta server down (2 of 3)"); + log("Waiting for RS shutdown to be handled by master"); + waitForRSShutdownToStartAndFinish(activeMaster, + metaServer.getRegionServer().getServerName()); + log("RS shutdown done, waiting for no more RIT"); + ZKAssign.blockUntilNoRIT(zkw); + log("Verifying there are " + numRegions + " assigned on cluster"); + assertRegionsAssigned(cluster, regions); + + // Shutdown server hosting META again + metaServer = getServerHostingMeta(cluster); + log("Stopping server hosting META (3 of 3)"); + metaServer.getRegionServer().stop("Stopping META server"); + cluster.hbaseCluster.waitOnRegionServer(metaServer); + log("Meta server down (3 of 3)"); + log("Waiting for RS shutdown to be handled by master"); + waitForRSShutdownToStartAndFinish(activeMaster, + metaServer.getRegionServer().getServerName()); + log("RS shutdown done, waiting for no more RIT"); + ZKAssign.blockUntilNoRIT(zkw); + log("Verifying there are " + numRegions + " assigned on cluster"); + assertRegionsAssigned(cluster, regions); + + if (cluster.getRegionServerThreads().size() != 1) { + log("Online regionservers:"); + for (RegionServerThread rst : cluster.getRegionServerThreads()) { + log("RS: " + rst.getRegionServer().getServerName()); + } + } + assertEquals(1, cluster.getRegionServerThreads().size()); + + + // TODO: Bring random 3 of 4 RS down at the same time + + + // Stop the cluster + TEST_UTIL.shutdownMiniCluster(); + } + + private void waitForRSShutdownToStartAndFinish(MasterThread activeMaster, + String serverName) throws InterruptedException { + ServerManager sm = activeMaster.getMaster().getServerManager(); + // First wait for it to be in dead list + while (!sm.deadservers.isDeadServer(serverName)) { + log("Waiting for [" + serverName + "] to be listed as dead in master"); + Thread.sleep(10); + } + log("Server [" + serverName + "] marked as dead, waiting for it to " + + "finish dead processing"); + while (sm.deadservers.isDeadServer(serverName)) { + log("Server [" + serverName + "] still marked as dead, waiting"); + Thread.sleep(10); + } + log("Server [" + serverName + "] done with server shutdown processing"); + } + + private void log(String msg) { + LOG.debug("\n\n" + msg + "\n"); + } + + private RegionServerThread getServerHostingMeta(MiniHBaseCluster cluster) { + return getServerHosting(cluster, HRegionInfo.FIRST_META_REGIONINFO); + } + + private RegionServerThread getServerHostingRoot(MiniHBaseCluster cluster) { + return getServerHosting(cluster, HRegionInfo.ROOT_REGIONINFO); + } + + private RegionServerThread getServerHosting(MiniHBaseCluster cluster, + HRegionInfo region) { + for (RegionServerThread rst : cluster.getRegionServerThreads()) { + if (rst.getRegionServer().getOnlineRegions().contains(region)) { + return rst; + } + } + return null; + } + + private void assertRegionsAssigned(MiniHBaseCluster cluster, + Set expectedRegions) { + int numFound = 0; + for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { + numFound += rst.getRegionServer().getNumberOfOnlineRegions(); + } + if (expectedRegions.size() != numFound) { + LOG.debug("Expected to find " + expectedRegions.size() + " but only found" + + " " + numFound); + NavigableSet foundRegions = getAllOnlineRegions(cluster); + for (String region : expectedRegions) { + if (!foundRegions.contains(region)) { + LOG.debug("Missing region: " + region); + } + } + assertEquals(expectedRegions.size(), numFound); + } else { + log("Success! Found expected number of " + numFound + " regions"); + } + } + + private NavigableSet getAllOnlineRegions(MiniHBaseCluster cluster) { + NavigableSet online = new TreeSet(); + for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { + for (HRegionInfo region : rst.getRegionServer().getOnlineRegions()) { + online.add(region.getRegionNameAsString()); + } + } + return online; + } + +} Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (revision 1027683) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (working copy) @@ -1058,6 +1058,9 @@ LOG.debug(zkw.prefix("Retrieved " + ((data == null)? 0: data.length) + " byte(s) of data from znode " + znode + (watcherSet? " and set watcher; ": "; data=") + - (data == null? "null": StringUtils.abbreviate(Bytes.toString(data), 32)))); + (data == null? "null": ( + znode.startsWith(zkw.assignmentZNode) ? + RegionTransitionData.fromBytes(data).toString() + : StringUtils.abbreviate(Bytes.toString(data), 32))))); } } Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java (revision 1027683) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java (working copy) @@ -244,11 +244,22 @@ int version = ZKUtil.checkExists(zkw, node); if(version == -1) { ZKUtil.createAndWatch(zkw, node, data.getBytes()); - return true; } else { - return ZKUtil.setData(zkw, node, data.getBytes(), version); + if (!ZKUtil.setData(zkw, node, data.getBytes(), version)) { + return false; + } else { + // We successfully forced to OFFLINE, reset watch and handle if + // the state changed in between our set and the watch + RegionTransitionData curData = + ZKAssign.getData(zkw, region.getEncodedName()); + if (curData.getEventType() != data.getEventType()) { + // state changed, need to process + return false; + } + } } } + return true; } /** @@ -404,6 +415,8 @@ "after verifying it was in OPENED state, we got a version mismatch")); return false; } + LOG.debug(zkw.prefix("Successfully deleted unassigned node for region " + + regionName + " in expected state " + expectedState)); return true; } } @@ -745,6 +758,8 @@ /** * Blocks until there are no node in regions in transition. + *

+ * Used in testing only. * @param zkw zk reference * @throws KeeperException * @throws InterruptedException @@ -759,11 +774,31 @@ LOG.debug("ZK RIT -> " + znode); } } - Thread.sleep(200); + Thread.sleep(100); } } /** + * Blocks until there is at least one node in regions in transition. + *

+ * Used in testing only. + * @param zkw zk reference + * @throws KeeperException + * @throws InterruptedException + */ + public static void blockUntilRIT(ZooKeeperWatcher zkw) + throws KeeperException, InterruptedException { + while (!ZKUtil.nodeHasChildren(zkw, zkw.assignmentZNode)) { + List znodes = + ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.assignmentZNode); + if (znodes == null || znodes.isEmpty()) { + LOG.debug("No RIT in ZK"); + } + Thread.sleep(100); + } + } + + /** * Verifies that the specified region is in the specified state in ZooKeeper. *

* Returns true if region is in transition and in the specified state in Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1027683) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -425,6 +425,7 @@ } catch (Throwable t) { // Call stop if error or process will stick around for ever since server // puts up non-daemon threads. + LOG.error("Stopping HRS because failed initialize", t); this.server.stop(); } } @@ -815,6 +816,8 @@ ", sessionid=0x" + Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId())); isOnline = true; + LOG.debug("RPC server is listening on: " + + this.server.getListenerAddress()); } catch (Throwable e) { this.isOnline = false; stop("Failed initialization"); Index: src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (revision 1027683) +++ src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (working copy) @@ -239,7 +239,34 @@ } /** + * Wait for the specified region server to stop + * Removes this thread from list of running threads. * @param serverNumber + * @return Name of region server that just went down. + */ + public String waitOnRegionServer(JVMClusterUtil.RegionServerThread rst) { + while (rst.isAlive()) { + try { + LOG.info("Waiting on " + + rst.getRegionServer().getHServerInfo().toString()); + rst.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + } + for (int i=0;i