Index: src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 939172) +++ src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -40,12 +40,12 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; -import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.zookeeper.ZooKeeper; @@ -60,7 +60,6 @@ * make changes to configuration parameters. */ public class HBaseTestingUtility { - private final Log LOG = LogFactory.getLog(getClass()); private final HBaseConfiguration conf; @@ -68,6 +67,7 @@ private MiniDFSCluster dfsCluster = null; private MiniHBaseCluster hbaseCluster = null; private MiniMRCluster mrCluster = null; + // If non-null, then already a cluster running. private File clusterTestBuildDir = null; private HBaseAdmin hbaseAdmin = null; @@ -78,7 +78,7 @@ public HBaseTestingUtility(HBaseConfiguration conf) { this.conf = conf; } - + /** System property key to get test directory value. */ public static final String TEST_DIRECTORY_KEY = "test.build.data"; @@ -98,6 +98,36 @@ } /** + * Home our cluster in a dir under build/test. Give it a random name + * so can have many concurrent clusters running if we need to. Need to + * amend the test.build.data System property. Its what minidfscluster bases + * it data dir on. Moding a System property is not the way to do concurrent + * instances -- another instance could grab the temporary + * value unintentionally -- but not anything can do about it at moment; its + * how the minidfscluster works. + * @return The calculated cluster test build directory. + */ + File setupClusterTestBuildDir() { + String oldTestBuildDir = + System.getProperty(TEST_DIRECTORY_KEY, "build/test/data"); + String randomStr = UUID.randomUUID().toString(); + String dirStr = oldTestBuildDir + "." + randomStr; + File dir = new File(dirStr).getAbsoluteFile(); + // Have it cleaned up on exit + dir.deleteOnExit(); + return dir; + } + + /** + * @throws IOException If cluster already running. + */ + void isRunningCluster() throws IOException { + if (this.clusterTestBuildDir == null) return; + throw new IOException("Cluster already running at " + + this.clusterTestBuildDir); + } + + /** * @param subdirName * @return Path to a subdirectory named subdirName under * {@link #getTestDir()}. @@ -114,16 +144,35 @@ startMiniCluster(1); } + /** + * Call this if you only want a zk cluster. + * @see #startMiniZKCluster() if you want zk + dfs + hbase mini cluster. + * @throws Exception + * @see #shutdownMiniZKCluster() + */ public void startMiniZKCluster() throws Exception { - // Note that this is done before we create the MiniHBaseCluster because we - // need to edit the config to add the ZooKeeper servers. + isRunningCluster(); + this.clusterTestBuildDir = setupClusterTestBuildDir(); + startMiniZKCluster(this.clusterTestBuildDir); + + } + + private void startMiniZKCluster(final File dir) throws Exception { this.zkCluster = new MiniZooKeeperCluster(); - int clientPort = this.zkCluster.startup(this.clusterTestBuildDir); + int clientPort = this.zkCluster.startup(dir); this.conf.set("hbase.zookeeper.property.clientPort", Integer.toString(clientPort)); } /** + * @throws IOException + * @see #startMiniZKCluster() + */ + public void shutdownMiniZKCluster() throws IOException { + if (this.zkCluster != null) this.zkCluster.shutdown(); + } + + /** * Start up a minicluster of hbase, optinally dfs, and zookeeper. * Modifies Configuration. Homes the cluster data directory under a random * subdirectory in a directory under System property test.build.data. @@ -138,27 +187,13 @@ throws Exception { LOG.info("Starting up minicluster"); // If we already put up a cluster, fail. - if (this.clusterTestBuildDir != null) { - throw new IOException("Cluster already running at " + - this.clusterTestBuildDir); - } - // Now, home our cluster in a dir under build/test. Give it a random name - // so can have many concurrent clusters running if we need to. Need to - // amend the test.build.data System property. Its what minidfscluster bases - // it data dir on. Moding a System property is not the way to do concurrent - // instances -- another instance could grab the temporary - // value unintentionally -- but not anything can do about it at moment; its - // how the minidfscluster works. - String oldTestBuildDir = + isRunningCluster(); + String oldBuildTestDir = System.getProperty(TEST_DIRECTORY_KEY, "build/test/data"); - String randomStr = UUID.randomUUID().toString(); - String clusterTestBuildDirStr = oldTestBuildDir + "." + randomStr; - this.clusterTestBuildDir = - new File(clusterTestBuildDirStr).getAbsoluteFile(); - // Have it cleaned up on exit - this.clusterTestBuildDir.deleteOnExit(); + this.clusterTestBuildDir = setupClusterTestBuildDir(); + // Set our random dir while minidfscluster is being constructed. - System.setProperty(TEST_DIRECTORY_KEY, clusterTestBuildDirStr); + System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestBuildDir.getPath()); // Bring up mini dfs cluster. This spews a bunch of warnings about missing // scheme. TODO: fix. // Complaints are 'Scheme is undefined for build/test/data/dfs/name1'. @@ -167,7 +202,8 @@ // Restore System property. minidfscluster accesses content of // the TEST_DIRECTORY_KEY to make bad blocks, a feature we are not using, // but otherwise, just in constructor. - System.setProperty(TEST_DIRECTORY_KEY, oldTestBuildDir); + System.setProperty(TEST_DIRECTORY_KEY, oldBuildTestDir); + // Mangle conf so fs parameter points to minidfs we just started up FileSystem fs = this.dfsCluster.getFileSystem(); this.conf.set("fs.defaultFS", fs.getUri().toString()); @@ -175,7 +211,7 @@ // It could be created before the cluster if(this.zkCluster == null) { - startMiniZKCluster(); + startMiniZKCluster(this.clusterTestBuildDir); } // Now do the mini hbase cluster. Set the hbase.rootdir in config. @@ -192,8 +228,17 @@ } /** + * @return Current mini hbase cluster. Only has something in it after a call + * to {@link #startMiniCluster()}. + * @see #startMiniCluster() + */ + public MiniHBaseCluster getMiniHBaseCluster() { + return this.hbaseCluster; + } + + /** * @throws IOException - * @see {@link #startMiniCluster(boolean, int)} + * @see {@link #startMiniCluster(int)} */ public void shutdownMiniCluster() throws IOException { LOG.info("Shutting down minicluster"); @@ -202,7 +247,7 @@ // Wait till hbase is down before going on to shutdown zk. this.hbaseCluster.join(); } - if (this.zkCluster != null) this.zkCluster.shutdown(); + shutdownMiniZKCluster(); if (this.dfsCluster != null) { // The below throws an exception per dn, AsynchronousCloseException. this.dfsCluster.shutdown(); @@ -369,10 +414,25 @@ * * @param table The table to use for the data. * @param columnFamily The family to insert the data into. + * @return count of regions created. * @throws IOException When creating the regions fails. */ - public void createMultiRegions(HTable table, byte[] columnFamily) + public int createMultiRegions(HTable table, byte[] columnFamily) throws IOException { + return createMultiRegions(getConfiguration(), table, columnFamily); + } + + /** + * Creates many regions names "aaa" to "zzz". + * @param c Configuration to use. + * @param table The table to use for the data. + * @param columnFamily The family to insert the data into. + * @return count of regions created. + * @throws IOException When creating the regions fails. + */ + public int createMultiRegions(final HBaseConfiguration c, final HTable table, + final byte[] columnFamily) + throws IOException { byte[][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"), Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), @@ -385,7 +445,6 @@ Bytes.toBytes("xxx"), Bytes.toBytes("yyy") }; - HBaseConfiguration c = getConfiguration(); HTable meta = new HTable(c, HConstants.META_TABLE_NAME); HTableDescriptor htd = table.getTableDescriptor(); if(!htd.hasFamily(columnFamily)) { @@ -398,6 +457,7 @@ // including the new start region from empty to "bbb". lg List rows = getMetaTableRows(); // add custom ones + int count = 0; for (int i = 0; i < KEYS.length; i++) { int j = (i + 1) % KEYS.length; HRegionInfo hri = new HRegionInfo(table.getTableDescriptor(), @@ -407,6 +467,7 @@ Writables.getBytes(hri)); meta.put(put); LOG.info("createMultiRegions: inserted " + hri.toString()); + count++; } // see comment above, remove "old" (or previous) single region for (byte[] row : rows) { @@ -417,6 +478,7 @@ // flush cache of regions HConnection conn = table.getConnection(); conn.clearRegionCache(); + return count; } /** Index: src/test/org/apache/hadoop/hbase/TestInfoServers.java =================================================================== --- src/test/org/apache/hadoop/hbase/TestInfoServers.java (revision 939172) +++ src/test/org/apache/hadoop/hbase/TestInfoServers.java (working copy) @@ -51,7 +51,7 @@ int port = cluster.getMaster().getInfoServer().getPort(); assertHasExpectedContent(new URL("http://localhost:" + port + "/index.html"), "master"); - port = cluster.getRegionThreads().get(0).getRegionServer(). + port = cluster.getRegionServerThreads().get(0).getRegionServer(). getInfoServer().getPort(); assertHasExpectedContent(new URL("http://localhost:" + port + "/index.html"), "regionserver"); Index: src/test/org/apache/hadoop/hbase/TestRegionRebalancing.java =================================================================== --- src/test/org/apache/hadoop/hbase/TestRegionRebalancing.java (revision 939172) +++ src/test/org/apache/hadoop/hbase/TestRegionRebalancing.java (working copy) @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; /** * Test whether region rebalancing works. (HBASE-71) @@ -195,7 +196,7 @@ private List getOnlineRegionServers() { List list = new ArrayList(); - for (LocalHBaseCluster.RegionServerThread rst : cluster.getRegionThreads()) { + for (JVMClusterUtil.RegionServerThread rst : cluster.getRegionServerThreads()) { if (rst.getRegionServer().isOnline()) { list.add(rst.getRegionServer()); } Index: src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java =================================================================== --- src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java (revision 939172) +++ src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java (working copy) @@ -199,8 +199,8 @@ * regionservers and master threads are no long alive. */ public void threadDumpingJoin() { - if (this.cluster.getRegionThreads() != null) { - for(Thread t: this.cluster.getRegionThreads()) { + if (this.cluster.getRegionServerThreads() != null) { + for(Thread t: this.cluster.getRegionServerThreads()) { threadDumpingJoin(t); } } Index: src/test/org/apache/hadoop/hbase/regionserver/TestLogRolling.java =================================================================== --- src/test/org/apache/hadoop/hbase/regionserver/TestLogRolling.java (revision 939172) +++ src/test/org/apache/hadoop/hbase/regionserver/TestLogRolling.java (working copy) @@ -135,7 +135,7 @@ // When the META table can be opened, the region servers are running new HTable(conf, HConstants.META_TABLE_NAME); - this.server = cluster.getRegionThreads().get(0).getRegionServer(); + this.server = cluster.getRegionServerThreads().get(0).getRegionServer(); this.log = server.getLog(); // Create the test table and open it @@ -219,7 +219,7 @@ // When the META table can be opened, the region servers are running new HTable(conf, HConstants.META_TABLE_NAME); - this.server = cluster.getRegionThreads().get(0).getRegionServer(); + this.server = cluster.getRegionServerThreads().get(0).getRegionServer(); this.log = server.getLog(); assertTrue("Need HDFS-826 for this test", log.canGetCurReplicas()); Index: src/test/org/apache/hadoop/hbase/regionserver/DisabledTestRegionServerExit.java =================================================================== --- src/test/org/apache/hadoop/hbase/regionserver/DisabledTestRegionServerExit.java (revision 939172) +++ src/test/org/apache/hadoop/hbase/regionserver/DisabledTestRegionServerExit.java (working copy) @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; /** * Tests region server failover when a region server exits both cleanly and @@ -125,8 +126,8 @@ * is just shut down. */ private void stopOrAbortMetaRegionServer(boolean abort) { - List regionThreads = - cluster.getRegionThreads(); + List regionThreads = + cluster.getRegionServerThreads(); int server = -1; for (int i = 0; i < regionThreads.size() && server == -1; i++) { Index: src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java =================================================================== --- src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (revision 939172) +++ src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (working copy) @@ -21,7 +21,10 @@ import java.io.IOException; import java.net.BindException; +import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -30,6 +33,7 @@ import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.JVMClusterUtil; /** * This class creates a single process HBase cluster. One thread is created for @@ -53,12 +57,62 @@ init(numRegionServers); } + /** + * Override Master so can add inject behaviors testing. + */ + public static class MiniHBaseClusterMaster extends HMaster { + private final Map> messages = + new ConcurrentHashMap>(); + + public MiniHBaseClusterMaster(final HBaseConfiguration conf) + throws IOException { + super(conf); + } + + /** + * Add a message to send to a regionserver next time it checks in. + * @param hsi RegionServer's HServerInfo. + * @param msg Message to add. + */ + void addMessage(final HServerInfo hsi, HMsg msg) { + synchronized(this.messages) { + List hmsgs = this.messages.get(hsi); + if (hmsgs == null) { + hmsgs = new ArrayList(); + this.messages.put(hsi, hmsgs); + } + hmsgs.add(msg); + } + } + + @Override + protected HMsg[] adornRegionServerAnswer(final HServerInfo hsi, + final HMsg[] msgs) { + HMsg [] answerMsgs = msgs; + synchronized (this.messages) { + List hmsgs = this.messages.get(hsi); + if (hmsgs != null && !hmsgs.isEmpty()) { + int size = answerMsgs.length; + HMsg [] newAnswerMsgs = new HMsg[size + hmsgs.size()]; + System.arraycopy(answerMsgs, 0, newAnswerMsgs, 0, answerMsgs.length); + for (int i = 0; i < hmsgs.size(); i++) { + newAnswerMsgs[answerMsgs.length + i] = hmsgs.get(i); + } + answerMsgs = newAnswerMsgs; + hmsgs.clear(); + } + } + return super.adornRegionServerAnswer(hsi, answerMsgs); + } + } + private void init(final int nRegionNodes) throws IOException { try { // start up a LocalHBaseCluster while (true) { try { - hbaseCluster = new LocalHBaseCluster(conf, nRegionNodes); + hbaseCluster = new LocalHBaseCluster(conf, nRegionNodes, + MiniHBaseCluster.MiniHBaseClusterMaster.class); hbaseCluster.startup(); } catch (BindException e) { //this port is already in use. try to use another (for multiple testing) @@ -83,8 +137,7 @@ * @return Name of regionserver started. */ public String startRegionServer() throws IOException { - LocalHBaseCluster.RegionServerThread t = - this.hbaseCluster.addRegionServer(); + JVMClusterUtil.RegionServerThread t = this.hbaseCluster.addRegionServer(); t.start(); t.waitForServerOnline(); return t.getName(); @@ -106,18 +159,16 @@ } /** - * Cause a region server to exit without cleaning up - * + * Cause a region server to exit doing basic clean up only on its way out. * @param serverNumber Used as index into a list. */ - public void abortRegionServer(int serverNumber) { + public String abortRegionServer(int serverNumber) { HRegionServer server = getRegionServer(serverNumber); - try { - LOG.info("Aborting " + server.getHServerInfo().toString()); - } catch (IOException e) { - e.printStackTrace(); - } + // Don't run hdfs shutdown thread. + server.setHDFSShutdownThreadOnExit(null); + LOG.info("Aborting " + server.toString()); server.abort(); + return server.toString(); } /** @@ -126,7 +177,7 @@ * @param serverNumber Used as index into a list. * @return the region server that was stopped */ - public LocalHBaseCluster.RegionServerThread stopRegionServer(int serverNumber) { + public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) { return stopRegionServer(serverNumber, true); } @@ -140,9 +191,9 @@ * before end of the test. * @return the region server that was stopped */ - public LocalHBaseCluster.RegionServerThread stopRegionServer(int serverNumber, + public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber, final boolean shutdownFS) { - LocalHBaseCluster.RegionServerThread server = + JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber); LOG.info("Stopping " + server.toString()); if (!shutdownFS) { @@ -154,8 +205,8 @@ } /** - * Wait for the specified region server to stop - * Removes this thread from list of running threads. + * Wait for the specified region server to stop. Removes this thread from list + * of running threads. * @param serverNumber * @return Name of region server that just went down. */ @@ -185,7 +236,7 @@ * @throws IOException */ public void flushcache() throws IOException { - for (LocalHBaseCluster.RegionServerThread t: + for (JVMClusterUtil.RegionServerThread t: this.hbaseCluster.getRegionServers()) { for(HRegion r: t.getRegionServer().getOnlineRegions()) { r.flushcache(); @@ -196,7 +247,7 @@ /** * @return List of region server threads. */ - public List getRegionThreads() { + public List getRegionServerThreads() { return this.hbaseCluster.getRegionServers(); } @@ -208,4 +259,38 @@ public HRegionServer getRegionServer(int serverNumber) { return hbaseCluster.getRegionServer(serverNumber); } -} + + /** + * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()} + * of HRS carrying .META. Returns -1 if none found. + */ + public int getServerWithMeta() { + int index = -1; + int count = 0; + for (JVMClusterUtil.RegionServerThread rst: getRegionServerThreads()) { + HRegionServer hrs = rst.getRegionServer(); + HRegion metaRegion = + hrs.getOnlineRegion(HRegionInfo.FIRST_META_REGIONINFO.getRegionName()); + if (metaRegion != null) { + index = count; + break; + } + count++; + } + return index; + } + + /** + * Add a message to include in the responses send a regionserver when it + * checks back in. + * @param serverNumber Which server to send it to. + * @param msg The MESSAGE + * @throws IOException + */ + public void addMessageToSendRegionServer(final int serverNumber, + final HMsg msg) + throws IOException { + HRegionServer hrs = getRegionServer(serverNumber); + ((MiniHBaseClusterMaster)getMaster()).addMessage(hrs.getHServerInfo(), msg); + } +} \ No newline at end of file Index: src/test/org/apache/hadoop/hbase/master/TestRegionServerOperationQueue.java =================================================================== --- src/test/org/apache/hadoop/hbase/master/TestRegionServerOperationQueue.java (revision 0) +++ src/test/org/apache/hadoop/hbase/master/TestRegionServerOperationQueue.java (revision 0) @@ -0,0 +1,47 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.conf.Configuration; +import org.junit.After; +import org.junit.Before; + +/** + * Test the queue used to manage RegionServerOperations. + * Currently RegionServerOperationQueue is untestable because each + * RegionServerOperation has a {@link HMaster} reference. TOOD: Fix. + */ +public class TestRegionServerOperationQueue { + private RegionServerOperationQueue queue; + private Configuration conf; + private AtomicBoolean closed; + + @Before + public void setUp() throws Exception { + this.closed = new AtomicBoolean(false); + this.conf = new Configuration(); + this.queue = new RegionServerOperationQueue(this.conf, this.closed); + } + + @After + public void tearDown() throws Exception { + } +} \ No newline at end of file Index: src/test/org/apache/hadoop/hbase/master/TestMasterTransistions.java =================================================================== --- src/test/org/apache/hadoop/hbase/master/TestMasterTransistions.java (revision 0) +++ src/test/org/apache/hadoop/hbase/master/TestMasterTransistions.java (revision 0) @@ -0,0 +1,262 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HMsg; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test transitions of state across the master. + */ +public class TestMasterTransistions { + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final String TABLENAME = "master_transitions"; + private static final byte [][] FAMILIES = new byte [][] {Bytes.toBytes("a"), + Bytes.toBytes("b"), Bytes.toBytes("c")}; + + /** + * Start up a mini cluster and put a small table of many empty regions into it. + * @throws Exception + */ + @BeforeClass public static void beforeAllTests() throws Exception { + // Start a cluster of two regionservers. + TEST_UTIL.startMiniCluster(2); + // Create a table of three families. This will assign a region. + TEST_UTIL.createTable(Bytes.toBytes(TABLENAME), FAMILIES); + HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME); + int countOfRegions = TEST_UTIL.createMultiRegions(t, FAMILIES[0]); + waitUntilAllRegionsAssigned(countOfRegions); + } + + @AfterClass public static void afterAllTests() throws IOException { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Listener for regionserver events testing hbase-2428 (Infinite loop of + * region closes if META region is offline). In particular, listen + * for the close of the 'metaServer' and when it comes in, requeue it with a + * delay as though there were an issue processing the shutdown. As part of + * the requeuing, send over a close of a region on 'otherServer' so it comes + * into a master that has its meta region marked as offline. + */ + static class HBase2428Listener implements RegionServerOperationListener { + // Map of what we've delayed so we don't do do repeated delays. + private final Set postponed = + new CopyOnWriteArraySet(); + private boolean done = false;; + private boolean metaShutdownReceived = false; + private final HServerAddress metaAddress; + private final MiniHBaseCluster cluster; + private final int otherServerIndex; + private final HRegionInfo hri; + private int closeCount = 0; + static final int SERVER_DURATION = 10 * 1000; + static final int CLOSE_DURATION = 1 * 1000; + + HBase2428Listener(final MiniHBaseCluster c, final HServerAddress metaAddress, + final HRegionInfo closingHRI, final int otherServerIndex) { + this.cluster = c; + this.metaAddress = metaAddress; + this.hri = closingHRI; + this.otherServerIndex = otherServerIndex; + } + + @Override + public boolean process(final RegionServerOperation op) throws IOException { + // If a regionserver shutdown and its of the meta server, then we want to + // delay the processing of the shutdown and send off a close of a region on + // the 'otherServer. + boolean result = true; + if (op instanceof ProcessServerShutdown) { + ProcessServerShutdown pss = (ProcessServerShutdown)op; + if (pss.getDeadServerAddress().equals(this.metaAddress)) { + // Don't postpone more than once. + if (!this.postponed.contains(pss)) { + // Close some region. + this.cluster.addMessageToSendRegionServer(this.otherServerIndex, + new HMsg(HMsg.Type.MSG_REGION_CLOSE, hri, + Bytes.toBytes("Forcing close in test"))); + this.postponed.add(pss); + // Put off the processing of the regionserver shutdown processing. + pss.setExpirationDuration(SERVER_DURATION); + this.metaShutdownReceived = true; + // Return false. This will add this op to the delayed queue. + result = false; + } + } + } else { + // Have the close run frequently. + if (isWantedCloseOperation(op) != null) { + op.setExpirationDuration(CLOSE_DURATION); + // Count how many times it comes through here. + this.closeCount++; + } + } + return result; + } + + public void processed(final RegionServerOperation op) { + if (isWantedCloseOperation(op) == null) return; + this.done = true; + } + + /* + * @param op + * @return Null if not the wanted ProcessRegionClose, else op + * cast as a ProcessRegionClose. + */ + private ProcessRegionClose isWantedCloseOperation(final RegionServerOperation op) { + // Count every time we get a close operation. + if (op instanceof ProcessRegionClose) { + ProcessRegionClose c = (ProcessRegionClose)op; + if (c.regionInfo.equals(hri)) { + return c; + } + } + return null; + } + + boolean isDone() { + return this.done; + } + + boolean isMetaShutdownReceived() { + return metaShutdownReceived; + } + + int getCloseCount() { + return this.closeCount; + } + } + + /** + * In 2428, the meta region has just been set offline and then a close comes + * in. + * @see HBASE-2428 + */ + @Test public void testRegionCloseWhenNoMetaHBase2428() throws Exception { + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + final HMaster master = cluster.getMaster(); + int metaIndex = cluster.getServerWithMeta(); + // Figure the index of the server that is not server the .META. + int otherServerIndex = -1; + for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { + if (i == metaIndex) continue; + otherServerIndex = i; + break; + } + final HRegionServer otherServer = cluster.getRegionServer(otherServerIndex); + final HRegionServer metaHRS = cluster.getRegionServer(metaIndex); + + // Get a region out on the otherServer. + final HRegionInfo hri = + otherServer.getOnlineRegions().iterator().next().getRegionInfo(); + + // Add our ReionServerOperationsListener + HBase2428Listener listener = new HBase2428Listener(cluster, + metaHRS.getHServerInfo().getServerAddress(), hri, otherServerIndex); + master.getRegionServerOperationQueue(). + registerRegionServerOperationListener(listener); + try { + // Now close the server carrying index. + cluster.abortRegionServer(metaIndex); + + // First wait on receipt of meta server shutdown message. + while(!listener.metaShutdownReceived) Threads.sleep(100); + while(!listener.isDone()) Threads.sleep(10); + // We should not have retried the close more times than it took for the + // server shutdown message to exit the delay queue and get processed + // (Multiple by two to add in some slop in case of GC or something). + assertTrue(listener.getCloseCount() < + ((HBase2428Listener.SERVER_DURATION/HBase2428Listener.CLOSE_DURATION) * 2)); + + assertClosedRegionIsBackOnline(hri); + } finally { + master.getRegionServerOperationQueue(). + unregisterRegionServerOperationListener(listener); + } + } + + private void assertClosedRegionIsBackOnline(final HRegionInfo hri) + throws IOException { + // When we get here, region should be successfully deployed. Assert so. + // 'aaa' is safe as first row if startkey is EMPTY_BYTE_ARRAY because we + // loaded with HBaseTestingUtility#createMultiRegions. + byte [] row = Bytes.equals(HConstants.EMPTY_BYTE_ARRAY, hri.getStartKey())? + new byte [] {'a', 'a', 'a'}: hri.getStartKey(); + Put p = new Put(row); + p.add(FAMILIES[0], FAMILIES[0], FAMILIES[0]); + HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME); + t.put(p); + Get g = new Get(row); + assertTrue((t.get(g)).size() > 0); + } + + /* + * Wait until all rows in .META. have a non-empty info:server. This means + * all regions have been deployed, master has been informed and updated + * .META. with the regions deployed server. + * @param countOfRegions How many regions in .META. + * @throws IOException + */ + private static void waitUntilAllRegionsAssigned(final int countOfRegions) + throws IOException { + HTable meta = new HTable(TEST_UTIL.getConfiguration(), + HConstants.META_TABLE_NAME); + while (true) { + int rows = 0; + Scan scan = new Scan(); + scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); + ResultScanner s = meta.getScanner(scan); + for (Result r = null; (r = s.next()) != null;) { + byte [] b = + r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); + if (b == null || b.length <= 0) break; + rows++; + } + s.close(); + // If I got to hear and all rows have a Server, then all have been assigned. + if (rows == countOfRegions) break; + } + } +} \ No newline at end of file Index: src/test/org/apache/hadoop/hbase/mapreduce/TestTableIndex.java =================================================================== --- src/test/org/apache/hadoop/hbase/mapreduce/TestTableIndex.java (revision 939172) +++ src/test/org/apache/hadoop/hbase/mapreduce/TestTableIndex.java (working copy) @@ -192,7 +192,7 @@ private void verify() throws IOException { // Force a cache flush for every online region to ensure that when the // scanner takes its snapshot, all the updates have made it into the cache. - for (HRegion r : cluster.getRegionThreads().get(0).getRegionServer(). + for (HRegion r : cluster.getRegionServerThreads().get(0).getRegionServer(). getOnlineRegions()) { HRegionIncommon region = new HRegionIncommon(r); region.flushcache(); Index: src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java =================================================================== --- src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java (revision 939172) +++ src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java (working copy) @@ -197,7 +197,7 @@ private void verify() throws IOException { // Force a cache flush for every online region to ensure that when the // scanner takes its snapshot, all the updates have made it into the cache. - for (HRegion r : cluster.getRegionThreads().get(0).getRegionServer(). + for (HRegion r : cluster.getRegionServerThreads().get(0).getRegionServer(). getOnlineRegions()) { HRegionIncommon region = new HRegionIncommon(r); region.flushcache(); Index: src/test/org/apache/hadoop/hbase/util/TestMigration.java =================================================================== --- src/test/org/apache/hadoop/hbase/util/TestMigration.java (revision 939172) +++ src/test/org/apache/hadoop/hbase/util/TestMigration.java (working copy) @@ -60,7 +60,8 @@ * @throws IOException * @throws InterruptedException */ - public void testMigration() throws IOException, InterruptedException { + public void disabledTestMigration() throws IOException, InterruptedException { + // Currently disabled until we develop migration for next version. Path rootdir = getUnitTestdir(getName()); Path hbasedir = loadTestData(fs, rootdir); assertTrue(fs.exists(hbasedir)); Index: src/contrib/stargate/build.xml =================================================================== --- src/contrib/stargate/build.xml (revision 939172) +++ src/contrib/stargate/build.xml (working copy) @@ -1,4 +1,6 @@