Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1244360) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -39,6 +39,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -405,8 +406,9 @@ this.balancer = LoadBalancerFactory.getLoadBalancer(conf); zooKeeper.registerListenerFirst(assignmentManager); - this.regionServerTracker = new RegionServerTracker(zooKeeper, this, - this.serverManager); + this.regionServerTracker = + createRegionServerTracker(this.zooKeeper, this, this.serverManager); + this.regionServerTracker.start(); this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, @@ -432,6 +434,18 @@ ", cluster-up flag was=" + wasUp); } + /** + * Used testing + * @param zkw + * @param a + * @param sm + * @return Instance of RegionServerTracker + */ + public RegionServerTracker createRegionServerTracker(final ZooKeeperWatcher zkw, + final Abortable a, final ServerManager sm) { + return new RegionServerTracker(zkw, a, sm); + } + // Check if we should stop every second. private Sleeper stopSleeper = new Sleeper(1000, this); private void loop() { @@ -512,8 +526,7 @@ // TODO: Should do this in background rather than block master startup status.setStatus("Splitting logs after master startup"); - this.fileSystemManager. - splitLogAfterStartup(this.serverManager.getOnlineServers().keySet()); + splitLogAfterStartup(this.fileSystemManager, this.serverManager); // Make sure root and meta assigned before proceeding. assignRootAndMeta(status); @@ -560,6 +573,16 @@ } /** + * Used in tests + * @param mfs + * @param sm + */ + public void splitLogAfterStartup(final MasterFileSystem mfs, + final ServerManager sm) { + mfs. splitLogAfterStartup(sm.getOnlineServers().keySet()); + } + + /** * Check -ROOT- and .META. are assigned. If not, * assign them. * @throws InterruptedException Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1244360) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -83,7 +83,6 @@ import org.apache.hadoop.hbase.catalog.RootLocationEditor; import org.apache.hadoop.hbase.client.Action; import org.apache.hadoop.hbase.client.Append; -import org.apache.hadoop.hbase.client.RowMutation; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HConnectionManager; @@ -93,6 +92,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutation; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.coprocessor.Exec; import org.apache.hadoop.hbase.client.coprocessor.ExecResult; @@ -756,7 +756,7 @@ // Interrupt catalog tracker here in case any regions being opened out in // handlers are stuck waiting on meta or root. if (this.catalogTracker != null) this.catalogTracker.stop(); - if (this.fsOk) { + if (!this.killed && this.fsOk) { waitOnAllRegionsToClose(abortRequested); LOG.info("stopping server " + this.serverNameFromMasterPOV + "; all regions closed."); Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenMasterInitializing.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenMasterInitializing.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenMasterInitializing.java (revision 0) @@ -0,0 +1,283 @@ +/* + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.master.TestMasterFailover; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.RegionServerTracker; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestRSKilledWhenMasterInitializing { + private static final Log LOG = LogFactory.getLog(TestMasterFailover.class); + + private static final HBaseTestingUtility TESTUTIL = new HBaseTestingUtility(); + private static final int NUM_MASTERS = 1; + private static final int NUM_RS = 4; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // Set it so that this test runs with my custom master + TESTUTIL.getConfiguration().setClass(HConstants.MASTER_IMPL, + TestingMaster.class, HMaster.class); + // Start up the cluster. + TESTUTIL.startMiniCluster(NUM_MASTERS, NUM_RS); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + if (!TESTUTIL.getHBaseCluster().getMaster().isInitialized()) { + // master is not initialized and is waiting something forever. + for (MasterThread mt : TESTUTIL.getHBaseCluster().getLiveMasterThreads()) { + mt.interrupt(); + } + } + TESTUTIL.shutdownMiniCluster(); + } + + /** + * An HMaster instance used in this test. If 'TestingMaster.sleep' is set in + * the Configuration, then we'll sleep after log is split and we'll also + * return a custom RegionServerTracker. + */ + public static class TestingMaster extends HMaster { + private boolean isLogSplitted = false; + public TestingMaster(Configuration conf) throws IOException, + KeeperException, InterruptedException { + super(conf); + } + + @Override + public void splitLogAfterStartup(MasterFileSystem mfs, ServerManager sm) { + super.splitLogAfterStartup(mfs, sm); + isLogSplitted = true; + // If "TestingMaster.sleep" is set, sleep after log split. + if (getConfiguration().getBoolean("TestingMaster.sleep", false)) { + int duration = getConfiguration().getInt( + "TestingMaster.sleep.duration", 0); + Threads.sleep(duration); + } + } + + @Override + public RegionServerTracker createRegionServerTracker( + final ZooKeeperWatcher zkw, final Abortable a, final ServerManager sm) { + // If "TestingMaster.sleep", then return our custom RegionServerTracker + return getConfiguration().getBoolean("TestingMaster.sleep", false) ? new GatedNodeDeleteRegionServerTracker( + zkw, a, sm) : super.createRegionServerTracker(zkw, a, sm); + } + + public boolean isLogSplittedAfterStartup() { + return isLogSplitted; + } + } + + /** + * A RegionServerTracker whose delete we can stall. On nodeDeleted, it will + * block until the data member gate is cleared. + */ + static class GatedNodeDeleteRegionServerTracker extends RegionServerTracker { + final AtomicBoolean gate = new AtomicBoolean(true); + + public GatedNodeDeleteRegionServerTracker(ZooKeeperWatcher watcher, + Abortable abortable, ServerManager serverManager) { + super(watcher, abortable, serverManager); + } + + @Override + public void nodeDeleted(final String path) { + if (path.startsWith(watcher.rsZNode)) { + Thread t = new Thread() { + public void run() { + while (gate.get()) { + Threads.sleep(100); + } + GatedNodeDeleteRegionServerTracker.super.nodeDeleted(path); + } + }; + t.start(); + } + } + } + + @Test(timeout = 120000) + public void testCorrectnessWhenMasterFailOver() throws Exception { + final byte[] TABLENAME = Bytes.toBytes("testCorrectnessWhenMasterFailOver"); + final byte[] FAMILY = Bytes.toBytes("family"); + final byte[][] SPLITKEYS = { Bytes.toBytes("b"), Bytes.toBytes("i") }; + + + MiniHBaseCluster cluster = TESTUTIL.getHBaseCluster(); + + HTableDescriptor desc = new HTableDescriptor(TABLENAME); + desc.addFamily(new HColumnDescriptor(FAMILY)); + HBaseAdmin hbaseAdmin = TESTUTIL.getHBaseAdmin(); + hbaseAdmin.createTable(desc, SPLITKEYS); + + assertTrue(hbaseAdmin.isTableAvailable(TABLENAME)); + + HTable table = new HTable(TESTUTIL.getConfiguration(), TABLENAME); + List puts = new ArrayList(); + Put put1 = new Put(Bytes.toBytes("a")); + put1.add(FAMILY, Bytes.toBytes("q1"), Bytes.toBytes("value")); + Put put2 = new Put(Bytes.toBytes("h")); + put2.add(FAMILY, Bytes.toBytes("q1"), Bytes.toBytes("value")); + Put put3 = new Put(Bytes.toBytes("o")); + put3.add(FAMILY, Bytes.toBytes("q1"), Bytes.toBytes("value")); + puts.add(put1); + puts.add(put2); + puts.add(put3); + table.put(puts); + ResultScanner resultScanner = table.getScanner(new Scan()); + int count = 0; + while (resultScanner.next() != null) { + count++; + } + resultScanner.close(); + table.close(); + assertEquals(3, count); + + + /* Starting test */ + cluster.getConfiguration().setBoolean("TestingMaster.sleep", true); + cluster.getConfiguration().setInt("TestingMaster.sleep.duration", 10000); + + /* NO.1 .META. region correctness */ + // First abort master + for(MasterThread mt:cluster.getLiveMasterThreads()){ + if (mt.getMaster().isActiveMaster()) { + mt.getMaster().abort("Aborting for tests", new Exception("Trace info")); + mt.join(); + break; + } + } + LOG.debug("Master is aborted"); + TestingMaster master = (TestingMaster) cluster.startMaster().getMaster(); + while (!master.isLogSplittedAfterStartup()) { + Thread.sleep(1000); + } + + LOG.debug("splitted:" + master.isLogSplittedAfterStartup() + + ",initialized:" + master.isInitialized()); + + // Second kill meta server + int metaServerNum = cluster.getServerWithMeta(); + int rootServerNum = cluster.getServerWith(HRegionInfo.ROOT_REGIONINFO + .getRegionName()); + HRegionServer metaRS = cluster.getRegionServer(metaServerNum); + LOG.debug("Killing metaRS and carryingRoot = "+ (metaServerNum == rootServerNum)); + metaRS.kill(); + metaRS.join(); + + Thread.sleep(10000 * 2); + ((GatedNodeDeleteRegionServerTracker) master.getRegionServerTracker()).gate + .set(false); + + while (!master.isInitialized()) { + Thread.sleep(1000); + } + LOG.debug("master isInitialized"); + // Third check whether data is correct in meta region + assertTrue(hbaseAdmin.isTableAvailable(TABLENAME)); + + /* NO.2 -ROOT- region correctness */ + if (rootServerNum != metaServerNum) { + // First abort master + for (MasterThread mt : cluster.getLiveMasterThreads()) { + if (mt.getMaster().isActiveMaster()) { + mt.getMaster().abort("Aborting for tests", + new Exception("Trace info")); + mt.join(); + break; + } + } + LOG.debug("Master is aborted"); + master = (TestingMaster) cluster.startMaster().getMaster(); + while (!master.isLogSplittedAfterStartup()) { + Thread.sleep(1000); + } + LOG.debug("splitted:" + master.isLogSplittedAfterStartup() + + ",initialized:" + master.isInitialized()); + + // Second kill meta server + HRegionServer rootRS = cluster.getRegionServer(rootServerNum); + LOG.debug("Killing rootRS"); + rootRS.kill(); + rootRS.join(); + Thread.sleep(10000 * 2); + ((GatedNodeDeleteRegionServerTracker) master.getRegionServerTracker()).gate + .set(false); + + while (!master.isInitialized()) { + Thread.sleep(1000); + } + LOG.debug("master isInitialized"); + // Third check whether data is correct in meta region + assertTrue(hbaseAdmin.isTableAvailable(TABLENAME)); + } + + /* NO.3 data region correctness */ + ServerManager serverManager = cluster.startMaster().getMaster().getServerManager(); + while (serverManager.areDeadServersInProgress()) { + Thread.sleep(1000); + } + table = new HTable(TESTUTIL.getConfiguration(), TABLENAME); + resultScanner = table.getScanner(new Scan()); + count = 0; + while (resultScanner.next() != null) { + count++; + } + resultScanner.close(); + table.close(); + assertEquals(3, count); + } + +}