Index: src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java
===================================================================
--- src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 939172)
+++ src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy)
@@ -40,12 +40,12 @@
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
-import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.zookeeper.ZooKeeper;
@@ -60,7 +60,6 @@
* make changes to configuration parameters.
*/
public class HBaseTestingUtility {
-
private final Log LOG = LogFactory.getLog(getClass());
private final HBaseConfiguration conf;
@@ -68,6 +67,7 @@
private MiniDFSCluster dfsCluster = null;
private MiniHBaseCluster hbaseCluster = null;
private MiniMRCluster mrCluster = null;
+ // If non-null, then already a cluster running.
private File clusterTestBuildDir = null;
private HBaseAdmin hbaseAdmin = null;
@@ -78,7 +78,7 @@
public HBaseTestingUtility(HBaseConfiguration conf) {
this.conf = conf;
}
-
+
/** System property key to get test directory value.
*/
public static final String TEST_DIRECTORY_KEY = "test.build.data";
@@ -98,6 +98,36 @@
}
/**
+ * Home our cluster in a dir under build/test. Give it a random name
+ * so can have many concurrent clusters running if we need to. Need to
+ * amend the test.build.data System property. Its what minidfscluster bases
+ * it data dir on. Moding a System property is not the way to do concurrent
+ * instances -- another instance could grab the temporary
+ * value unintentionally -- but not anything can do about it at moment; its
+ * how the minidfscluster works.
+ * @return The calculated cluster test build directory.
+ */
+ File setupClusterTestBuildDir() {
+ String oldTestBuildDir =
+ System.getProperty(TEST_DIRECTORY_KEY, "build/test/data");
+ String randomStr = UUID.randomUUID().toString();
+ String dirStr = oldTestBuildDir + "." + randomStr;
+ File dir = new File(dirStr).getAbsoluteFile();
+ // Have it cleaned up on exit
+ dir.deleteOnExit();
+ return dir;
+ }
+
+ /**
+ * @throws IOException If cluster already running.
+ */
+ void isRunningCluster() throws IOException {
+ if (this.clusterTestBuildDir == null) return;
+ throw new IOException("Cluster already running at " +
+ this.clusterTestBuildDir);
+ }
+
+ /**
* @param subdirName
* @return Path to a subdirectory named subdirName under
* {@link #getTestDir()}.
@@ -114,16 +144,35 @@
startMiniCluster(1);
}
+ /**
+ * Call this if you only want a zk cluster.
+ * @see #startMiniZKCluster() if you want zk + dfs + hbase mini cluster.
+ * @throws Exception
+ * @see #shutdownMiniZKCluster()
+ */
public void startMiniZKCluster() throws Exception {
- // Note that this is done before we create the MiniHBaseCluster because we
- // need to edit the config to add the ZooKeeper servers.
+ isRunningCluster();
+ this.clusterTestBuildDir = setupClusterTestBuildDir();
+ startMiniZKCluster(this.clusterTestBuildDir);
+
+ }
+
+ private void startMiniZKCluster(final File dir) throws Exception {
this.zkCluster = new MiniZooKeeperCluster();
- int clientPort = this.zkCluster.startup(this.clusterTestBuildDir);
+ int clientPort = this.zkCluster.startup(dir);
this.conf.set("hbase.zookeeper.property.clientPort",
Integer.toString(clientPort));
}
/**
+ * @throws IOException
+ * @see #startMiniZKCluster()
+ */
+ public void shutdownMiniZKCluster() throws IOException {
+ if (this.zkCluster != null) this.zkCluster.shutdown();
+ }
+
+ /**
* Start up a minicluster of hbase, optinally dfs, and zookeeper.
* Modifies Configuration. Homes the cluster data directory under a random
* subdirectory in a directory under System property test.build.data.
@@ -138,27 +187,13 @@
throws Exception {
LOG.info("Starting up minicluster");
// If we already put up a cluster, fail.
- if (this.clusterTestBuildDir != null) {
- throw new IOException("Cluster already running at " +
- this.clusterTestBuildDir);
- }
- // Now, home our cluster in a dir under build/test. Give it a random name
- // so can have many concurrent clusters running if we need to. Need to
- // amend the test.build.data System property. Its what minidfscluster bases
- // it data dir on. Moding a System property is not the way to do concurrent
- // instances -- another instance could grab the temporary
- // value unintentionally -- but not anything can do about it at moment; its
- // how the minidfscluster works.
- String oldTestBuildDir =
+ isRunningCluster();
+ String oldBuildTestDir =
System.getProperty(TEST_DIRECTORY_KEY, "build/test/data");
- String randomStr = UUID.randomUUID().toString();
- String clusterTestBuildDirStr = oldTestBuildDir + "." + randomStr;
- this.clusterTestBuildDir =
- new File(clusterTestBuildDirStr).getAbsoluteFile();
- // Have it cleaned up on exit
- this.clusterTestBuildDir.deleteOnExit();
+ this.clusterTestBuildDir = setupClusterTestBuildDir();
+
// Set our random dir while minidfscluster is being constructed.
- System.setProperty(TEST_DIRECTORY_KEY, clusterTestBuildDirStr);
+ System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestBuildDir.getPath());
// Bring up mini dfs cluster. This spews a bunch of warnings about missing
// scheme. TODO: fix.
// Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
@@ -167,7 +202,8 @@
// Restore System property. minidfscluster accesses content of
// the TEST_DIRECTORY_KEY to make bad blocks, a feature we are not using,
// but otherwise, just in constructor.
- System.setProperty(TEST_DIRECTORY_KEY, oldTestBuildDir);
+ System.setProperty(TEST_DIRECTORY_KEY, oldBuildTestDir);
+
// Mangle conf so fs parameter points to minidfs we just started up
FileSystem fs = this.dfsCluster.getFileSystem();
this.conf.set("fs.defaultFS", fs.getUri().toString());
@@ -175,7 +211,7 @@
// It could be created before the cluster
if(this.zkCluster == null) {
- startMiniZKCluster();
+ startMiniZKCluster(this.clusterTestBuildDir);
}
// Now do the mini hbase cluster. Set the hbase.rootdir in config.
@@ -192,8 +228,17 @@
}
/**
+ * @return Current mini hbase cluster. Only has something in it after a call
+ * to {@link #startMiniCluster()}.
+ * @see #startMiniCluster()
+ */
+ public MiniHBaseCluster getMiniHBaseCluster() {
+ return this.hbaseCluster;
+ }
+
+ /**
* @throws IOException
- * @see {@link #startMiniCluster(boolean, int)}
+ * @see {@link #startMiniCluster(int)}
*/
public void shutdownMiniCluster() throws IOException {
LOG.info("Shutting down minicluster");
@@ -202,7 +247,7 @@
// Wait till hbase is down before going on to shutdown zk.
this.hbaseCluster.join();
}
- if (this.zkCluster != null) this.zkCluster.shutdown();
+ shutdownMiniZKCluster();
if (this.dfsCluster != null) {
// The below throws an exception per dn, AsynchronousCloseException.
this.dfsCluster.shutdown();
@@ -369,10 +414,25 @@
*
* @param table The table to use for the data.
* @param columnFamily The family to insert the data into.
+ * @return count of regions created.
* @throws IOException When creating the regions fails.
*/
- public void createMultiRegions(HTable table, byte[] columnFamily)
+ public int createMultiRegions(HTable table, byte[] columnFamily)
throws IOException {
+ return createMultiRegions(getConfiguration(), table, columnFamily);
+ }
+
+ /**
+ * Creates many regions names "aaa" to "zzz".
+ * @param c Configuration to use.
+ * @param table The table to use for the data.
+ * @param columnFamily The family to insert the data into.
+ * @return count of regions created.
+ * @throws IOException When creating the regions fails.
+ */
+ public int createMultiRegions(final HBaseConfiguration c, final HTable table,
+ final byte[] columnFamily)
+ throws IOException {
byte[][] KEYS = {
HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
@@ -385,7 +445,6 @@
Bytes.toBytes("xxx"), Bytes.toBytes("yyy")
};
- HBaseConfiguration c = getConfiguration();
HTable meta = new HTable(c, HConstants.META_TABLE_NAME);
HTableDescriptor htd = table.getTableDescriptor();
if(!htd.hasFamily(columnFamily)) {
@@ -398,6 +457,7 @@
// including the new start region from empty to "bbb". lg
List rows = getMetaTableRows();
// add custom ones
+ int count = 0;
for (int i = 0; i < KEYS.length; i++) {
int j = (i + 1) % KEYS.length;
HRegionInfo hri = new HRegionInfo(table.getTableDescriptor(),
@@ -407,6 +467,7 @@
Writables.getBytes(hri));
meta.put(put);
LOG.info("createMultiRegions: inserted " + hri.toString());
+ count++;
}
// see comment above, remove "old" (or previous) single region
for (byte[] row : rows) {
@@ -417,6 +478,7 @@
// flush cache of regions
HConnection conn = table.getConnection();
conn.clearRegionCache();
+ return count;
}
/**
Index: src/test/org/apache/hadoop/hbase/TestInfoServers.java
===================================================================
--- src/test/org/apache/hadoop/hbase/TestInfoServers.java (revision 939172)
+++ src/test/org/apache/hadoop/hbase/TestInfoServers.java (working copy)
@@ -51,7 +51,7 @@
int port = cluster.getMaster().getInfoServer().getPort();
assertHasExpectedContent(new URL("http://localhost:" + port +
"/index.html"), "master");
- port = cluster.getRegionThreads().get(0).getRegionServer().
+ port = cluster.getRegionServerThreads().get(0).getRegionServer().
getInfoServer().getPort();
assertHasExpectedContent(new URL("http://localhost:" + port +
"/index.html"), "regionserver");
Index: src/test/org/apache/hadoop/hbase/TestRegionRebalancing.java
===================================================================
--- src/test/org/apache/hadoop/hbase/TestRegionRebalancing.java (revision 939172)
+++ src/test/org/apache/hadoop/hbase/TestRegionRebalancing.java (working copy)
@@ -33,6 +33,7 @@
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
/**
* Test whether region rebalancing works. (HBASE-71)
@@ -195,7 +196,7 @@
private List getOnlineRegionServers() {
List list = new ArrayList();
- for (LocalHBaseCluster.RegionServerThread rst : cluster.getRegionThreads()) {
+ for (JVMClusterUtil.RegionServerThread rst : cluster.getRegionServerThreads()) {
if (rst.getRegionServer().isOnline()) {
list.add(rst.getRegionServer());
}
Index: src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java
===================================================================
--- src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java (revision 939172)
+++ src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java (working copy)
@@ -199,8 +199,8 @@
* regionservers and master threads are no long alive.
*/
public void threadDumpingJoin() {
- if (this.cluster.getRegionThreads() != null) {
- for(Thread t: this.cluster.getRegionThreads()) {
+ if (this.cluster.getRegionServerThreads() != null) {
+ for(Thread t: this.cluster.getRegionServerThreads()) {
threadDumpingJoin(t);
}
}
Index: src/test/org/apache/hadoop/hbase/regionserver/TestLogRolling.java
===================================================================
--- src/test/org/apache/hadoop/hbase/regionserver/TestLogRolling.java (revision 939172)
+++ src/test/org/apache/hadoop/hbase/regionserver/TestLogRolling.java (working copy)
@@ -135,7 +135,7 @@
// When the META table can be opened, the region servers are running
new HTable(conf, HConstants.META_TABLE_NAME);
- this.server = cluster.getRegionThreads().get(0).getRegionServer();
+ this.server = cluster.getRegionServerThreads().get(0).getRegionServer();
this.log = server.getLog();
// Create the test table and open it
@@ -219,7 +219,7 @@
// When the META table can be opened, the region servers are running
new HTable(conf, HConstants.META_TABLE_NAME);
- this.server = cluster.getRegionThreads().get(0).getRegionServer();
+ this.server = cluster.getRegionServerThreads().get(0).getRegionServer();
this.log = server.getLog();
assertTrue("Need HDFS-826 for this test", log.canGetCurReplicas());
Index: src/test/org/apache/hadoop/hbase/regionserver/DisabledTestRegionServerExit.java
===================================================================
--- src/test/org/apache/hadoop/hbase/regionserver/DisabledTestRegionServerExit.java (revision 939172)
+++ src/test/org/apache/hadoop/hbase/regionserver/DisabledTestRegionServerExit.java (working copy)
@@ -39,6 +39,7 @@
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
/**
* Tests region server failover when a region server exits both cleanly and
@@ -125,8 +126,8 @@
* is just shut down.
*/
private void stopOrAbortMetaRegionServer(boolean abort) {
- List regionThreads =
- cluster.getRegionThreads();
+ List regionThreads =
+ cluster.getRegionServerThreads();
int server = -1;
for (int i = 0; i < regionThreads.size() && server == -1; i++) {
Index: src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
===================================================================
--- src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (revision 939172)
+++ src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (working copy)
@@ -21,7 +21,10 @@
import java.io.IOException;
import java.net.BindException;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -30,6 +33,7 @@
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
/**
* This class creates a single process HBase cluster. One thread is created for
@@ -53,12 +57,62 @@
init(numRegionServers);
}
+ /**
+ * Override Master so can add inject behaviors testing.
+ */
+ public static class MiniHBaseClusterMaster extends HMaster {
+ private final Map> messages =
+ new ConcurrentHashMap>();
+
+ public MiniHBaseClusterMaster(final HBaseConfiguration conf)
+ throws IOException {
+ super(conf);
+ }
+
+ /**
+ * Add a message to send to a regionserver next time it checks in.
+ * @param hsi RegionServer's HServerInfo.
+ * @param msg Message to add.
+ */
+ void addMessage(final HServerInfo hsi, HMsg msg) {
+ synchronized(this.messages) {
+ List hmsgs = this.messages.get(hsi);
+ if (hmsgs == null) {
+ hmsgs = new ArrayList();
+ this.messages.put(hsi, hmsgs);
+ }
+ hmsgs.add(msg);
+ }
+ }
+
+ @Override
+ protected HMsg[] adornRegionServerAnswer(final HServerInfo hsi,
+ final HMsg[] msgs) {
+ HMsg [] answerMsgs = msgs;
+ synchronized (this.messages) {
+ List hmsgs = this.messages.get(hsi);
+ if (hmsgs != null && !hmsgs.isEmpty()) {
+ int size = answerMsgs.length;
+ HMsg [] newAnswerMsgs = new HMsg[size + hmsgs.size()];
+ System.arraycopy(answerMsgs, 0, newAnswerMsgs, 0, answerMsgs.length);
+ for (int i = 0; i < hmsgs.size(); i++) {
+ newAnswerMsgs[answerMsgs.length + i] = hmsgs.get(i);
+ }
+ answerMsgs = newAnswerMsgs;
+ hmsgs.clear();
+ }
+ }
+ return super.adornRegionServerAnswer(hsi, answerMsgs);
+ }
+ }
+
private void init(final int nRegionNodes) throws IOException {
try {
// start up a LocalHBaseCluster
while (true) {
try {
- hbaseCluster = new LocalHBaseCluster(conf, nRegionNodes);
+ hbaseCluster = new LocalHBaseCluster(conf, nRegionNodes,
+ MiniHBaseCluster.MiniHBaseClusterMaster.class);
hbaseCluster.startup();
} catch (BindException e) {
//this port is already in use. try to use another (for multiple testing)
@@ -83,8 +137,7 @@
* @return Name of regionserver started.
*/
public String startRegionServer() throws IOException {
- LocalHBaseCluster.RegionServerThread t =
- this.hbaseCluster.addRegionServer();
+ JVMClusterUtil.RegionServerThread t = this.hbaseCluster.addRegionServer();
t.start();
t.waitForServerOnline();
return t.getName();
@@ -106,18 +159,16 @@
}
/**
- * Cause a region server to exit without cleaning up
- *
+ * Cause a region server to exit doing basic clean up only on its way out.
* @param serverNumber Used as index into a list.
*/
- public void abortRegionServer(int serverNumber) {
+ public String abortRegionServer(int serverNumber) {
HRegionServer server = getRegionServer(serverNumber);
- try {
- LOG.info("Aborting " + server.getHServerInfo().toString());
- } catch (IOException e) {
- e.printStackTrace();
- }
+ // Don't run hdfs shutdown thread.
+ server.setHDFSShutdownThreadOnExit(null);
+ LOG.info("Aborting " + server.toString());
server.abort();
+ return server.toString();
}
/**
@@ -126,7 +177,7 @@
* @param serverNumber Used as index into a list.
* @return the region server that was stopped
*/
- public LocalHBaseCluster.RegionServerThread stopRegionServer(int serverNumber) {
+ public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) {
return stopRegionServer(serverNumber, true);
}
@@ -140,9 +191,9 @@
* before end of the test.
* @return the region server that was stopped
*/
- public LocalHBaseCluster.RegionServerThread stopRegionServer(int serverNumber,
+ public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber,
final boolean shutdownFS) {
- LocalHBaseCluster.RegionServerThread server =
+ JVMClusterUtil.RegionServerThread server =
hbaseCluster.getRegionServers().get(serverNumber);
LOG.info("Stopping " + server.toString());
if (!shutdownFS) {
@@ -154,8 +205,8 @@
}
/**
- * Wait for the specified region server to stop
- * Removes this thread from list of running threads.
+ * Wait for the specified region server to stop. Removes this thread from list
+ * of running threads.
* @param serverNumber
* @return Name of region server that just went down.
*/
@@ -185,7 +236,7 @@
* @throws IOException
*/
public void flushcache() throws IOException {
- for (LocalHBaseCluster.RegionServerThread t:
+ for (JVMClusterUtil.RegionServerThread t:
this.hbaseCluster.getRegionServers()) {
for(HRegion r: t.getRegionServer().getOnlineRegions()) {
r.flushcache();
@@ -196,7 +247,7 @@
/**
* @return List of region server threads.
*/
- public List getRegionThreads() {
+ public List getRegionServerThreads() {
return this.hbaseCluster.getRegionServers();
}
@@ -208,4 +259,38 @@
public HRegionServer getRegionServer(int serverNumber) {
return hbaseCluster.getRegionServer(serverNumber);
}
-}
+
+ /**
+ * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()}
+ * of HRS carrying .META. Returns -1 if none found.
+ */
+ public int getServerWithMeta() {
+ int index = -1;
+ int count = 0;
+ for (JVMClusterUtil.RegionServerThread rst: getRegionServerThreads()) {
+ HRegionServer hrs = rst.getRegionServer();
+ HRegion metaRegion =
+ hrs.getOnlineRegion(HRegionInfo.FIRST_META_REGIONINFO.getRegionName());
+ if (metaRegion != null) {
+ index = count;
+ break;
+ }
+ count++;
+ }
+ return index;
+ }
+
+ /**
+ * Add a message to include in the responses send a regionserver when it
+ * checks back in.
+ * @param serverNumber Which server to send it to.
+ * @param msg The MESSAGE
+ * @throws IOException
+ */
+ public void addMessageToSendRegionServer(final int serverNumber,
+ final HMsg msg)
+ throws IOException {
+ HRegionServer hrs = getRegionServer(serverNumber);
+ ((MiniHBaseClusterMaster)getMaster()).addMessage(hrs.getHServerInfo(), msg);
+ }
+}
\ No newline at end of file
Index: src/test/org/apache/hadoop/hbase/master/TestRegionServerOperationQueue.java
===================================================================
--- src/test/org/apache/hadoop/hbase/master/TestRegionServerOperationQueue.java (revision 0)
+++ src/test/org/apache/hadoop/hbase/master/TestRegionServerOperationQueue.java (revision 0)
@@ -0,0 +1,47 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.After;
+import org.junit.Before;
+
+/**
+ * Test the queue used to manage RegionServerOperations.
+ * Currently RegionServerOperationQueue is untestable because each
+ * RegionServerOperation has a {@link HMaster} reference. TOOD: Fix.
+ */
+public class TestRegionServerOperationQueue {
+ private RegionServerOperationQueue queue;
+ private Configuration conf;
+ private AtomicBoolean closed;
+
+ @Before
+ public void setUp() throws Exception {
+ this.closed = new AtomicBoolean(false);
+ this.conf = new Configuration();
+ this.queue = new RegionServerOperationQueue(this.conf, this.closed);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+}
\ No newline at end of file
Index: src/test/org/apache/hadoop/hbase/master/TestMasterTransistions.java
===================================================================
--- src/test/org/apache/hadoop/hbase/master/TestMasterTransistions.java (revision 0)
+++ src/test/org/apache/hadoop/hbase/master/TestMasterTransistions.java (revision 0)
@@ -0,0 +1,262 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HMsg;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test transitions of state across the master.
+ */
+public class TestMasterTransistions {
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static final String TABLENAME = "master_transitions";
+ private static final byte [][] FAMILIES = new byte [][] {Bytes.toBytes("a"),
+ Bytes.toBytes("b"), Bytes.toBytes("c")};
+
+ /**
+ * Start up a mini cluster and put a small table of many empty regions into it.
+ * @throws Exception
+ */
+ @BeforeClass public static void beforeAllTests() throws Exception {
+ // Start a cluster of two regionservers.
+ TEST_UTIL.startMiniCluster(2);
+ // Create a table of three families. This will assign a region.
+ TEST_UTIL.createTable(Bytes.toBytes(TABLENAME), FAMILIES);
+ HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
+ int countOfRegions = TEST_UTIL.createMultiRegions(t, FAMILIES[0]);
+ waitUntilAllRegionsAssigned(countOfRegions);
+ }
+
+ @AfterClass public static void afterAllTests() throws IOException {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * Listener for regionserver events testing hbase-2428 (Infinite loop of
+ * region closes if META region is offline). In particular, listen
+ * for the close of the 'metaServer' and when it comes in, requeue it with a
+ * delay as though there were an issue processing the shutdown. As part of
+ * the requeuing, send over a close of a region on 'otherServer' so it comes
+ * into a master that has its meta region marked as offline.
+ */
+ static class HBase2428Listener implements RegionServerOperationListener {
+ // Map of what we've delayed so we don't do do repeated delays.
+ private final Set postponed =
+ new CopyOnWriteArraySet();
+ private boolean done = false;;
+ private boolean metaShutdownReceived = false;
+ private final HServerAddress metaAddress;
+ private final MiniHBaseCluster cluster;
+ private final int otherServerIndex;
+ private final HRegionInfo hri;
+ private int closeCount = 0;
+ static final int SERVER_DURATION = 10 * 1000;
+ static final int CLOSE_DURATION = 1 * 1000;
+
+ HBase2428Listener(final MiniHBaseCluster c, final HServerAddress metaAddress,
+ final HRegionInfo closingHRI, final int otherServerIndex) {
+ this.cluster = c;
+ this.metaAddress = metaAddress;
+ this.hri = closingHRI;
+ this.otherServerIndex = otherServerIndex;
+ }
+
+ @Override
+ public boolean process(final RegionServerOperation op) throws IOException {
+ // If a regionserver shutdown and its of the meta server, then we want to
+ // delay the processing of the shutdown and send off a close of a region on
+ // the 'otherServer.
+ boolean result = true;
+ if (op instanceof ProcessServerShutdown) {
+ ProcessServerShutdown pss = (ProcessServerShutdown)op;
+ if (pss.getDeadServerAddress().equals(this.metaAddress)) {
+ // Don't postpone more than once.
+ if (!this.postponed.contains(pss)) {
+ // Close some region.
+ this.cluster.addMessageToSendRegionServer(this.otherServerIndex,
+ new HMsg(HMsg.Type.MSG_REGION_CLOSE, hri,
+ Bytes.toBytes("Forcing close in test")));
+ this.postponed.add(pss);
+ // Put off the processing of the regionserver shutdown processing.
+ pss.setExpirationDuration(SERVER_DURATION);
+ this.metaShutdownReceived = true;
+ // Return false. This will add this op to the delayed queue.
+ result = false;
+ }
+ }
+ } else {
+ // Have the close run frequently.
+ if (isWantedCloseOperation(op) != null) {
+ op.setExpirationDuration(CLOSE_DURATION);
+ // Count how many times it comes through here.
+ this.closeCount++;
+ }
+ }
+ return result;
+ }
+
+ public void processed(final RegionServerOperation op) {
+ if (isWantedCloseOperation(op) == null) return;
+ this.done = true;
+ }
+
+ /*
+ * @param op
+ * @return Null if not the wanted ProcessRegionClose, else op
+ * cast as a ProcessRegionClose.
+ */
+ private ProcessRegionClose isWantedCloseOperation(final RegionServerOperation op) {
+ // Count every time we get a close operation.
+ if (op instanceof ProcessRegionClose) {
+ ProcessRegionClose c = (ProcessRegionClose)op;
+ if (c.regionInfo.equals(hri)) {
+ return c;
+ }
+ }
+ return null;
+ }
+
+ boolean isDone() {
+ return this.done;
+ }
+
+ boolean isMetaShutdownReceived() {
+ return metaShutdownReceived;
+ }
+
+ int getCloseCount() {
+ return this.closeCount;
+ }
+ }
+
+ /**
+ * In 2428, the meta region has just been set offline and then a close comes
+ * in.
+ * @see HBASE-2428
+ */
+ @Test public void testRegionCloseWhenNoMetaHBase2428() throws Exception {
+ MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+ final HMaster master = cluster.getMaster();
+ int metaIndex = cluster.getServerWithMeta();
+ // Figure the index of the server that is not server the .META.
+ int otherServerIndex = -1;
+ for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
+ if (i == metaIndex) continue;
+ otherServerIndex = i;
+ break;
+ }
+ final HRegionServer otherServer = cluster.getRegionServer(otherServerIndex);
+ final HRegionServer metaHRS = cluster.getRegionServer(metaIndex);
+
+ // Get a region out on the otherServer.
+ final HRegionInfo hri =
+ otherServer.getOnlineRegions().iterator().next().getRegionInfo();
+
+ // Add our ReionServerOperationsListener
+ HBase2428Listener listener = new HBase2428Listener(cluster,
+ metaHRS.getHServerInfo().getServerAddress(), hri, otherServerIndex);
+ master.getRegionServerOperationQueue().
+ registerRegionServerOperationListener(listener);
+ try {
+ // Now close the server carrying index.
+ cluster.abortRegionServer(metaIndex);
+
+ // First wait on receipt of meta server shutdown message.
+ while(!listener.metaShutdownReceived) Threads.sleep(100);
+ while(!listener.isDone()) Threads.sleep(10);
+ // We should not have retried the close more times than it took for the
+ // server shutdown message to exit the delay queue and get processed
+ // (Multiple by two to add in some slop in case of GC or something).
+ assertTrue(listener.getCloseCount() <
+ ((HBase2428Listener.SERVER_DURATION/HBase2428Listener.CLOSE_DURATION) * 2));
+
+ assertClosedRegionIsBackOnline(hri);
+ } finally {
+ master.getRegionServerOperationQueue().
+ unregisterRegionServerOperationListener(listener);
+ }
+ }
+
+ private void assertClosedRegionIsBackOnline(final HRegionInfo hri)
+ throws IOException {
+ // When we get here, region should be successfully deployed. Assert so.
+ // 'aaa' is safe as first row if startkey is EMPTY_BYTE_ARRAY because we
+ // loaded with HBaseTestingUtility#createMultiRegions.
+ byte [] row = Bytes.equals(HConstants.EMPTY_BYTE_ARRAY, hri.getStartKey())?
+ new byte [] {'a', 'a', 'a'}: hri.getStartKey();
+ Put p = new Put(row);
+ p.add(FAMILIES[0], FAMILIES[0], FAMILIES[0]);
+ HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
+ t.put(p);
+ Get g = new Get(row);
+ assertTrue((t.get(g)).size() > 0);
+ }
+
+ /*
+ * Wait until all rows in .META. have a non-empty info:server. This means
+ * all regions have been deployed, master has been informed and updated
+ * .META. with the regions deployed server.
+ * @param countOfRegions How many regions in .META.
+ * @throws IOException
+ */
+ private static void waitUntilAllRegionsAssigned(final int countOfRegions)
+ throws IOException {
+ HTable meta = new HTable(TEST_UTIL.getConfiguration(),
+ HConstants.META_TABLE_NAME);
+ while (true) {
+ int rows = 0;
+ Scan scan = new Scan();
+ scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
+ ResultScanner s = meta.getScanner(scan);
+ for (Result r = null; (r = s.next()) != null;) {
+ byte [] b =
+ r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
+ if (b == null || b.length <= 0) break;
+ rows++;
+ }
+ s.close();
+ // If I got to hear and all rows have a Server, then all have been assigned.
+ if (rows == countOfRegions) break;
+ }
+ }
+}
\ No newline at end of file
Index: src/test/org/apache/hadoop/hbase/mapreduce/TestTableIndex.java
===================================================================
--- src/test/org/apache/hadoop/hbase/mapreduce/TestTableIndex.java (revision 939172)
+++ src/test/org/apache/hadoop/hbase/mapreduce/TestTableIndex.java (working copy)
@@ -192,7 +192,7 @@
private void verify() throws IOException {
// Force a cache flush for every online region to ensure that when the
// scanner takes its snapshot, all the updates have made it into the cache.
- for (HRegion r : cluster.getRegionThreads().get(0).getRegionServer().
+ for (HRegion r : cluster.getRegionServerThreads().get(0).getRegionServer().
getOnlineRegions()) {
HRegionIncommon region = new HRegionIncommon(r);
region.flushcache();
Index: src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java
===================================================================
--- src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java (revision 939172)
+++ src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java (working copy)
@@ -197,7 +197,7 @@
private void verify() throws IOException {
// Force a cache flush for every online region to ensure that when the
// scanner takes its snapshot, all the updates have made it into the cache.
- for (HRegion r : cluster.getRegionThreads().get(0).getRegionServer().
+ for (HRegion r : cluster.getRegionServerThreads().get(0).getRegionServer().
getOnlineRegions()) {
HRegionIncommon region = new HRegionIncommon(r);
region.flushcache();
Index: src/test/org/apache/hadoop/hbase/util/TestMigration.java
===================================================================
--- src/test/org/apache/hadoop/hbase/util/TestMigration.java (revision 939172)
+++ src/test/org/apache/hadoop/hbase/util/TestMigration.java (working copy)
@@ -60,7 +60,8 @@
* @throws IOException
* @throws InterruptedException
*/
- public void testMigration() throws IOException, InterruptedException {
+ public void disabledTestMigration() throws IOException, InterruptedException {
+ // Currently disabled until we develop migration for next version.
Path rootdir = getUnitTestdir(getName());
Path hbasedir = loadTestData(fs, rootdir);
assertTrue(fs.exists(hbasedir));
Index: src/contrib/stargate/build.xml
===================================================================
--- src/contrib/stargate/build.xml (revision 939172)
+++ src/contrib/stargate/build.xml (working copy)
@@ -1,4 +1,6 @@