Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1244360)
+++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy)
@@ -39,6 +39,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -405,8 +406,9 @@
this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
zooKeeper.registerListenerFirst(assignmentManager);
- this.regionServerTracker = new RegionServerTracker(zooKeeper, this,
- this.serverManager);
+ this.regionServerTracker =
+ createRegionServerTracker(this.zooKeeper, this, this.serverManager);
+
this.regionServerTracker.start();
this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this,
@@ -432,6 +434,18 @@
", cluster-up flag was=" + wasUp);
}
+ /**
+ * Used testing
+ * @param zkw
+ * @param a
+ * @param sm
+ * @return Instance of RegionServerTracker
+ */
+ public RegionServerTracker createRegionServerTracker(final ZooKeeperWatcher zkw,
+ final Abortable a, final ServerManager sm) {
+ return new RegionServerTracker(zkw, a, sm);
+ }
+
// Check if we should stop every second.
private Sleeper stopSleeper = new Sleeper(1000, this);
private void loop() {
@@ -512,8 +526,7 @@
// TODO: Should do this in background rather than block master startup
status.setStatus("Splitting logs after master startup");
- this.fileSystemManager.
- splitLogAfterStartup(this.serverManager.getOnlineServers().keySet());
+ splitLogAfterStartup(this.fileSystemManager, this.serverManager);
// Make sure root and meta assigned before proceeding.
assignRootAndMeta(status);
@@ -560,6 +573,16 @@
}
/**
+ * Used in tests
+ * @param mfs
+ * @param sm
+ */
+ public void splitLogAfterStartup(final MasterFileSystem mfs,
+ final ServerManager sm) {
+ mfs. splitLogAfterStartup(sm.getOnlineServers().keySet());
+ }
+
+ /**
* Check -ROOT- and .META. are assigned. If not,
* assign them.
* @throws InterruptedException
Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1244360)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy)
@@ -83,7 +83,6 @@
import org.apache.hadoop.hbase.catalog.RootLocationEditor;
import org.apache.hadoop.hbase.client.Action;
import org.apache.hadoop.hbase.client.Append;
-import org.apache.hadoop.hbase.client.RowMutation;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HConnectionManager;
@@ -93,6 +92,7 @@
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.RowMutation;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Exec;
import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
@@ -756,7 +756,7 @@
// Interrupt catalog tracker here in case any regions being opened out in
// handlers are stuck waiting on meta or root.
if (this.catalogTracker != null) this.catalogTracker.stop();
- if (this.fsOk) {
+ if (!this.killed && this.fsOk) {
waitOnAllRegionsToClose(abortRequested);
LOG.info("stopping server " + this.serverNameFromMasterPOV +
"; all regions closed.");
Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenMasterInitializing.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenMasterInitializing.java (revision 0)
+++ src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenMasterInitializing.java (revision 0)
@@ -0,0 +1,283 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.master.TestMasterFailover;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestRSKilledWhenMasterInitializing {
+ private static final Log LOG = LogFactory.getLog(TestMasterFailover.class);
+
+ private static final HBaseTestingUtility TESTUTIL = new HBaseTestingUtility();
+ private static final int NUM_MASTERS = 1;
+ private static final int NUM_RS = 4;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // Set it so that this test runs with my custom master
+ TESTUTIL.getConfiguration().setClass(HConstants.MASTER_IMPL,
+ TestingMaster.class, HMaster.class);
+ // Start up the cluster.
+ TESTUTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ if (!TESTUTIL.getHBaseCluster().getMaster().isInitialized()) {
+ // master is not initialized and is waiting something forever.
+ for (MasterThread mt : TESTUTIL.getHBaseCluster().getLiveMasterThreads()) {
+ mt.interrupt();
+ }
+ }
+ TESTUTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * An HMaster instance used in this test. If 'TestingMaster.sleep' is set in
+ * the Configuration, then we'll sleep after log is split and we'll also
+ * return a custom RegionServerTracker.
+ */
+ public static class TestingMaster extends HMaster {
+ private boolean isLogSplitted = false;
+ public TestingMaster(Configuration conf) throws IOException,
+ KeeperException, InterruptedException {
+ super(conf);
+ }
+
+ @Override
+ public void splitLogAfterStartup(MasterFileSystem mfs, ServerManager sm) {
+ super.splitLogAfterStartup(mfs, sm);
+ isLogSplitted = true;
+ // If "TestingMaster.sleep" is set, sleep after log split.
+ if (getConfiguration().getBoolean("TestingMaster.sleep", false)) {
+ int duration = getConfiguration().getInt(
+ "TestingMaster.sleep.duration", 0);
+ Threads.sleep(duration);
+ }
+ }
+
+ @Override
+ public RegionServerTracker createRegionServerTracker(
+ final ZooKeeperWatcher zkw, final Abortable a, final ServerManager sm) {
+ // If "TestingMaster.sleep", then return our custom RegionServerTracker
+ return getConfiguration().getBoolean("TestingMaster.sleep", false) ? new GatedNodeDeleteRegionServerTracker(
+ zkw, a, sm) : super.createRegionServerTracker(zkw, a, sm);
+ }
+
+ public boolean isLogSplittedAfterStartup() {
+ return isLogSplitted;
+ }
+ }
+
+ /**
+ * A RegionServerTracker whose delete we can stall. On nodeDeleted, it will
+ * block until the data member gate is cleared.
+ */
+ static class GatedNodeDeleteRegionServerTracker extends RegionServerTracker {
+ final AtomicBoolean gate = new AtomicBoolean(true);
+
+ public GatedNodeDeleteRegionServerTracker(ZooKeeperWatcher watcher,
+ Abortable abortable, ServerManager serverManager) {
+ super(watcher, abortable, serverManager);
+ }
+
+ @Override
+ public void nodeDeleted(final String path) {
+ if (path.startsWith(watcher.rsZNode)) {
+ Thread t = new Thread() {
+ public void run() {
+ while (gate.get()) {
+ Threads.sleep(100);
+ }
+ GatedNodeDeleteRegionServerTracker.super.nodeDeleted(path);
+ }
+ };
+ t.start();
+ }
+ }
+ }
+
+ @Test(timeout = 120000)
+ public void testCorrectnessWhenMasterFailOver() throws Exception {
+ final byte[] TABLENAME = Bytes.toBytes("testCorrectnessWhenMasterFailOver");
+ final byte[] FAMILY = Bytes.toBytes("family");
+ final byte[][] SPLITKEYS = { Bytes.toBytes("b"), Bytes.toBytes("i") };
+
+
+ MiniHBaseCluster cluster = TESTUTIL.getHBaseCluster();
+
+ HTableDescriptor desc = new HTableDescriptor(TABLENAME);
+ desc.addFamily(new HColumnDescriptor(FAMILY));
+ HBaseAdmin hbaseAdmin = TESTUTIL.getHBaseAdmin();
+ hbaseAdmin.createTable(desc, SPLITKEYS);
+
+ assertTrue(hbaseAdmin.isTableAvailable(TABLENAME));
+
+ HTable table = new HTable(TESTUTIL.getConfiguration(), TABLENAME);
+ List puts = new ArrayList();
+ Put put1 = new Put(Bytes.toBytes("a"));
+ put1.add(FAMILY, Bytes.toBytes("q1"), Bytes.toBytes("value"));
+ Put put2 = new Put(Bytes.toBytes("h"));
+ put2.add(FAMILY, Bytes.toBytes("q1"), Bytes.toBytes("value"));
+ Put put3 = new Put(Bytes.toBytes("o"));
+ put3.add(FAMILY, Bytes.toBytes("q1"), Bytes.toBytes("value"));
+ puts.add(put1);
+ puts.add(put2);
+ puts.add(put3);
+ table.put(puts);
+ ResultScanner resultScanner = table.getScanner(new Scan());
+ int count = 0;
+ while (resultScanner.next() != null) {
+ count++;
+ }
+ resultScanner.close();
+ table.close();
+ assertEquals(3, count);
+
+
+ /* Starting test */
+ cluster.getConfiguration().setBoolean("TestingMaster.sleep", true);
+ cluster.getConfiguration().setInt("TestingMaster.sleep.duration", 10000);
+
+ /* NO.1 .META. region correctness */
+ // First abort master
+ for(MasterThread mt:cluster.getLiveMasterThreads()){
+ if (mt.getMaster().isActiveMaster()) {
+ mt.getMaster().abort("Aborting for tests", new Exception("Trace info"));
+ mt.join();
+ break;
+ }
+ }
+ LOG.debug("Master is aborted");
+ TestingMaster master = (TestingMaster) cluster.startMaster().getMaster();
+ while (!master.isLogSplittedAfterStartup()) {
+ Thread.sleep(1000);
+ }
+
+ LOG.debug("splitted:" + master.isLogSplittedAfterStartup()
+ + ",initialized:" + master.isInitialized());
+
+ // Second kill meta server
+ int metaServerNum = cluster.getServerWithMeta();
+ int rootServerNum = cluster.getServerWith(HRegionInfo.ROOT_REGIONINFO
+ .getRegionName());
+ HRegionServer metaRS = cluster.getRegionServer(metaServerNum);
+ LOG.debug("Killing metaRS and carryingRoot = "+ (metaServerNum == rootServerNum));
+ metaRS.kill();
+ metaRS.join();
+
+ Thread.sleep(10000 * 2);
+ ((GatedNodeDeleteRegionServerTracker) master.getRegionServerTracker()).gate
+ .set(false);
+
+ while (!master.isInitialized()) {
+ Thread.sleep(1000);
+ }
+ LOG.debug("master isInitialized");
+ // Third check whether data is correct in meta region
+ assertTrue(hbaseAdmin.isTableAvailable(TABLENAME));
+
+ /* NO.2 -ROOT- region correctness */
+ if (rootServerNum != metaServerNum) {
+ // First abort master
+ for (MasterThread mt : cluster.getLiveMasterThreads()) {
+ if (mt.getMaster().isActiveMaster()) {
+ mt.getMaster().abort("Aborting for tests",
+ new Exception("Trace info"));
+ mt.join();
+ break;
+ }
+ }
+ LOG.debug("Master is aborted");
+ master = (TestingMaster) cluster.startMaster().getMaster();
+ while (!master.isLogSplittedAfterStartup()) {
+ Thread.sleep(1000);
+ }
+ LOG.debug("splitted:" + master.isLogSplittedAfterStartup()
+ + ",initialized:" + master.isInitialized());
+
+ // Second kill meta server
+ HRegionServer rootRS = cluster.getRegionServer(rootServerNum);
+ LOG.debug("Killing rootRS");
+ rootRS.kill();
+ rootRS.join();
+ Thread.sleep(10000 * 2);
+ ((GatedNodeDeleteRegionServerTracker) master.getRegionServerTracker()).gate
+ .set(false);
+
+ while (!master.isInitialized()) {
+ Thread.sleep(1000);
+ }
+ LOG.debug("master isInitialized");
+ // Third check whether data is correct in meta region
+ assertTrue(hbaseAdmin.isTableAvailable(TABLENAME));
+ }
+
+ /* NO.3 data region correctness */
+ ServerManager serverManager = cluster.startMaster().getMaster().getServerManager();
+ while (serverManager.areDeadServersInProgress()) {
+ Thread.sleep(1000);
+ }
+ table = new HTable(TESTUTIL.getConfiguration(), TABLENAME);
+ resultScanner = table.getScanner(new Scan());
+ count = 0;
+ while (resultScanner.next() != null) {
+ count++;
+ }
+ resultScanner.close();
+ table.close();
+ assertEquals(3, count);
+ }
+
+}