Index: src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 1004853) +++ src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -70,8 +70,6 @@ import org.apache.hadoop.security.UnixUserGroupInformation; import org.apache.hadoop.security.UserGroupInformation; import org.apache.zookeeper.ZooKeeper; -import org.eclipse.jdt.core.dom.ThisExpression; - import com.google.common.base.Preconditions; /** @@ -268,7 +266,7 @@ * @see {@link #shutdownMiniDFSCluster()} */ public MiniHBaseCluster startMiniCluster() throws Exception { - return startMiniCluster(1); + return startMiniCluster(1, 1); } /** @@ -276,17 +274,40 @@ * Modifies Configuration. Homes the cluster data directory under a random * subdirectory in a directory under System property test.build.data. * Directory is cleaned up on exit. - * @param servers Number of servers to start up. We'll start this many - * datanodes and regionservers. If servers is > 1, then make sure + * @param numSlaves Number of slaves to start up. We'll start this many + * datanodes and regionservers. If numSlaves is > 1, then make sure * hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise * bind errors. * @throws Exception * @see {@link #shutdownMiniCluster()} * @return Mini hbase cluster instance created. */ - public MiniHBaseCluster startMiniCluster(final int servers) + public MiniHBaseCluster startMiniCluster(final int numSlaves) throws Exception { - LOG.info("Starting up minicluster"); + return startMiniCluster(1, numSlaves); + } + + /** + * Start up a minicluster of hbase, optionally dfs, and zookeeper. + * Modifies Configuration. Homes the cluster data directory under a random + * subdirectory in a directory under System property test.build.data. + * Directory is cleaned up on exit. + * @param numMasters Number of masters to start up. We'll start this many + * hbase masters. If numMasters > 1, you can find the active/primary master + * with {@link MiniHBaseCluster#getMaster()}. + * @param numSlaves Number of slaves to start up. We'll start this many + * datanodes and regionservers. If numSlaves is > 1, then make sure + * hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise + * bind errors. + * @throws Exception + * @see {@link #shutdownMiniCluster()} + * @return Mini hbase cluster instance created. + */ + public MiniHBaseCluster startMiniCluster(final int numMasters, + final int numSlaves) + throws Exception { + LOG.info("Starting up minicluster with " + numMasters + " master(s) and " + + numSlaves + " regionserver(s) and datanode(s)"); // If we already put up a cluster, fail. String testBuildPath = conf.get(TEST_DIRECTORY_KEY, null); isRunningCluster(testBuildPath); @@ -300,7 +321,7 @@ System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestBuildDir.getPath()); // Bring up mini dfs cluster. This spews a bunch of warnings about missing // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'. - startMiniDFSCluster(servers, this.clusterTestBuildDir); + startMiniDFSCluster(numSlaves, this.clusterTestBuildDir); // Mangle conf so fs parameter points to minidfs we just started up FileSystem fs = this.dfsCluster.getFileSystem(); @@ -319,7 +340,7 @@ this.conf.set(HConstants.HBASE_DIR, hbaseRootdir.toString()); fs.mkdirs(hbaseRootdir); FSUtils.setVersion(fs, hbaseRootdir); - this.hbaseCluster = new MiniHBaseCluster(this.conf, servers); + this.hbaseCluster = new MiniHBaseCluster(this.conf, numMasters, numSlaves); // Don't leave here till we've done a successful scan of the .META. HTable t = new HTable(this.conf, HConstants.META_TABLE_NAME); ResultScanner s = t.getScanner(new Scan()); @@ -852,7 +873,7 @@ * Returns a HBaseAdmin instance. * * @return The HBaseAdmin instance. - * @throws IOException + * @throws IOException */ public HBaseAdmin getHBaseAdmin() throws IOException { @@ -942,7 +963,7 @@ /** * @param dir Directory to delete * @return True if we deleted it. - * @throws IOException + * @throws IOException */ public boolean deleteDir(final Path dir) throws IOException { FileSystem fs = getTestFileSystem(); Index: src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (revision 1004853) +++ src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (working copy) @@ -67,9 +67,22 @@ */ public MiniHBaseCluster(Configuration conf, int numRegionServers) throws IOException { + this(conf, 1, numRegionServers); + } + + /** + * Start a MiniHBaseCluster. + * @param conf Configuration to be used for cluster + * @param numMasters initial number of masters to start. + * @param numRegionServers initial number of region servers to start. + * @throws IOException + */ + public MiniHBaseCluster(Configuration conf, int numMasters, + int numRegionServers) + throws IOException { this.conf = conf; conf.set(HConstants.MASTER_PORT, "0"); - init(numRegionServers); + init(numMasters, numRegionServers); } /** @@ -203,6 +216,7 @@ } } + @Override public void kill() { super.kill(); } @@ -231,10 +245,11 @@ } } - private void init(final int nRegionNodes) throws IOException { + private void init(final int nMasterNodes, final int nRegionNodes) + throws IOException { try { // start up a LocalHBaseCluster - hbaseCluster = new LocalHBaseCluster(conf, nRegionNodes, + hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, nRegionNodes, MiniHBaseCluster.MiniHBaseClusterMaster.class, MiniHBaseCluster.MiniHBaseClusterRegionServer.class); hbaseCluster.startup(); @@ -258,21 +273,6 @@ } /** - * @return Returns the rpc address actually used by the master server, because - * the supplied port is not necessarily the actual port used. - */ - public HServerAddress getHMasterAddress() { - return this.hbaseCluster.getMaster().getMasterAddress(); - } - - /** - * @return the HMaster - */ - public HMaster getMaster() { - return this.hbaseCluster.getMaster(); - } - - /** * Cause a region server to exit doing basic clean up only on its way out. * @param serverNumber Used as index into a list. */ @@ -322,7 +322,131 @@ return this.hbaseCluster.waitOnRegionServer(serverNumber); } + /** + * Starts a master thread running + * + * @throws IOException + * @return New RegionServerThread + */ + public JVMClusterUtil.MasterThread startMaster() throws IOException { + JVMClusterUtil.MasterThread t = this.hbaseCluster.addMaster(); + t.start(); + t.waitForServerOnline(); + return t; + } + + /** + * @return Returns the rpc address actually used by the currently active + * master server, because the supplied port is not necessarily the actual port + * used. + */ + public HServerAddress getHMasterAddress() { + return this.hbaseCluster.getActiveMaster().getMasterAddress(); + } + + /** + * Returns the current active master, if available. + * @return the active HMaster, null if none is active. + */ + public HMaster getMaster() { + return this.hbaseCluster.getActiveMaster(); + } + + /** + * Returns the master at the specified index, if available. + * @return the active HMaster, null if none is active. + */ + public HMaster getMaster(final int serverNumber) { + return this.hbaseCluster.getMaster(serverNumber); + } + + /** + * Cause a master to exit without shutting down entire cluster. + * @param serverNumber Used as index into a list. + */ + public String abortMaster(int serverNumber) { + HMaster server = getMaster(serverNumber); + LOG.info("Aborting " + server.toString()); + server.abort("Aborting for tests", new Exception("Trace info")); + return server.toString(); + } + + /** + * Shut down the specified master cleanly + * + * @param serverNumber Used as index into a list. + * @return the region server that was stopped + */ + public JVMClusterUtil.MasterThread stopMaster(int serverNumber) { + return stopMaster(serverNumber, true); + } + + /** + * Shut down the specified master cleanly + * + * @param serverNumber Used as index into a list. + * @param shutdownFS True is we are to shutdown the filesystem as part of this + * master's shutdown. Usually we do but you do not want to do this if + * you are running multiple master in a test and you shut down one + * before end of the test. + * @return the master that was stopped + */ + public JVMClusterUtil.MasterThread stopMaster(int serverNumber, + final boolean shutdownFS) { + JVMClusterUtil.MasterThread server = + hbaseCluster.getMasters().get(serverNumber); + LOG.info("Stopping " + server.toString()); + server.getMaster().stop("Stopping master " + serverNumber); + return server; + } + + /** + * Wait for the specified master to stop. Removes this thread from list + * of running threads. + * @param serverNumber + * @return Name of master that just went down. + */ + public String waitOnMaster(final int serverNumber) { + return this.hbaseCluster.waitOnMaster(serverNumber); + } + + /** + * Blocks until there is an active master and that master has completed + * initialization. + * + * @return true if an active master becomes available. false if there are no + * masters left. + * @throws InterruptedException + */ + public boolean waitForActiveAndReadyMaster() throws InterruptedException { + List mts; + while ((mts = getMasterThreads()).size() > 0) { + for (JVMClusterUtil.MasterThread mt : mts) { + if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) { + return true; + } + } + Thread.sleep(200); + } + return false; + } + + /** + * @return List of master threads. + */ + public List getMasterThreads() { + return this.hbaseCluster.getMasters(); + } + + /** + * @return List of live master threads (skips the aborted and the killed) + */ + public List getLiveMasterThreads() { + return this.hbaseCluster.getLiveMasters(); + } + + /** * Wait for Mini HBase Cluster to shut down. */ public void join() { Index: src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java (revision 0) @@ -0,0 +1,112 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; +import org.junit.Test; + +public class TestMasterFailover { + private static final Log LOG = LogFactory.getLog(TestMasterFailover.class); + + /** + * Simple test of master failover. + *

+ * Starts with three masters. Kills a backup master. Then kills the active + * master. Ensures the final master becomes active and we can still contact + * the cluster. + * @throws Exception + */ + @Test (timeout=180000) + public void testSimpleMasterFailover() throws Exception { + + final int NUM_MASTERS = 3; + final int NUM_RS = 3; + + // Start the cluster + HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS); + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + + // get all the master threads + List masterThreads = cluster.getMasterThreads(); + + // wait for each to come online + for (MasterThread mt : masterThreads) { + assertTrue(mt.isAlive()); + } + + // verify only one is the active master and we have right number + int numActive = 0; + int activeIndex = -1; + String activeName = null; + for (int i = 0; i < masterThreads.size(); i++) { + if (masterThreads.get(i).getMaster().isActiveMaster()) { + numActive++; + activeIndex = i; + activeName = masterThreads.get(i).getMaster().getServerName(); + } + } + assertEquals(1, numActive); + assertEquals(NUM_MASTERS, masterThreads.size()); + + // attempt to stop one of the inactive masters + LOG.debug("\n\nStopping a backup master\n"); + int backupIndex = (activeIndex == 0 ? 1 : activeIndex - 1); + cluster.stopMaster(backupIndex, false); + cluster.waitOnMaster(backupIndex); + + // verify still one active master and it's the same + for (int i = 0; i < masterThreads.size(); i++) { + if (masterThreads.get(i).getMaster().isActiveMaster()) { + assertTrue(activeName.equals( + masterThreads.get(i).getMaster().getServerName())); + activeIndex = i; + } + } + assertEquals(1, numActive); + assertEquals(2, masterThreads.size()); + + // kill the active master + LOG.debug("\n\nStopping the active master\n"); + cluster.stopMaster(activeIndex, false); + cluster.waitOnMaster(activeIndex); + + // wait for an active master to show up and be ready + assertTrue(cluster.waitForActiveAndReadyMaster()); + + LOG.debug("\n\nVerifying backup master is now active\n"); + // should only have one master now + assertEquals(1, masterThreads.size()); + // and he should be active + assertTrue(masterThreads.get(0).getMaster().isActiveMaster()); + + // Stop the cluster + TEST_UTIL.shutdownMiniCluster(); + } +} Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (revision 1004853) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (working copy) @@ -239,9 +239,9 @@ switch(event.getState()) { case SyncConnected: // Update our identifier. Otherwise ignore. + LOG.info(this.identifier + " connected"); this.identifier = this.identifier + "-0x" + Long.toHexString(this.zooKeeper.getSessionId()); - LOG.info(this.identifier + " connected"); break; // Abort the server if Disconnected or Expired Index: src/main/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1004853) +++ src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -129,6 +129,9 @@ /** Default region server interface class name. */ public static final String DEFAULT_REGION_SERVER_CLASS = HRegionInterface.class.getName(); + /** Parameter name for what master implementation to use. */ + public static final String MASTER_IMPL= "hbase.master.impl"; + /** Parameter name for how often threads should wake up */ public static final String THREAD_WAKE_FREQUENCY = "hbase.server.thread.wakefrequency"; Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1004853) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -255,7 +255,7 @@ * * @param conf * @throws IOException - * @throws InterruptedException + * @throws InterruptedException */ public HRegionServer(Configuration conf) throws IOException, InterruptedException { this.fsOk = true; @@ -282,7 +282,36 @@ HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD); - initialize(); + this.abortRequested = false; + this.stopped = false; + + // Server to handle client requests + String machineName = DNS.getDefaultHost(conf.get( + "hbase.regionserver.dns.interface", "default"), conf.get( + "hbase.regionserver.dns.nameserver", "default")); + String addressStr = machineName + ":" + + conf.get(HConstants.REGIONSERVER_PORT, + Integer.toString(HConstants.DEFAULT_REGIONSERVER_PORT)); + HServerAddress address = new HServerAddress(addressStr); + this.server = HBaseRPC.getServer(this, + new Class[]{HRegionInterface.class, HBaseRPCErrorHandler.class, + OnlineRegions.class}, + address.getBindAddress(), + address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), + conf.getInt("hbase.regionserver.metahandler.count", 10), + false, conf, QOS_THRESHOLD); + this.server.setErrorHandler(this); + this.server.setQosFunction(new QosFunction()); + + // HServerInfo can be amended by master. See below in reportForDuty. + this.serverInfo = new HServerInfo(new HServerAddress(new InetSocketAddress( + address.getBindAddress(), this.server.getListenerAddress().getPort())), + System.currentTimeMillis(), this.conf.getInt( + "hbase.regionserver.info.port", 60030), machineName); + if (this.serverInfo.getServerAddress() == null) { + throw new NullPointerException("Server address cannot be null; " + + "hbase-958 debugging"); + } } private static final int NORMAL_QOS = 0; @@ -370,39 +399,9 @@ * call it. * * @throws IOException - * @throws InterruptedException + * @throws InterruptedException */ private void initialize() throws IOException, InterruptedException { - this.abortRequested = false; - this.stopped = false; - - // Server to handle client requests - String machineName = DNS.getDefaultHost(conf.get( - "hbase.regionserver.dns.interface", "default"), conf.get( - "hbase.regionserver.dns.nameserver", "default")); - String addressStr = machineName + ":" + - conf.get(HConstants.REGIONSERVER_PORT, - Integer.toString(HConstants.DEFAULT_REGIONSERVER_PORT)); - HServerAddress address = new HServerAddress(addressStr); - this.server = HBaseRPC.getServer(this, - new Class[]{HRegionInterface.class, HBaseRPCErrorHandler.class, - OnlineRegions.class}, - address.getBindAddress(), - address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), - conf.getInt("hbase.regionserver.metahandler.count", 10), - false, conf, QOS_THRESHOLD); - this.server.setErrorHandler(this); - this.server.setQosFunction(new QosFunction()); - - // HServerInfo can be amended by master. See below in reportForDuty. - this.serverInfo = new HServerInfo(new HServerAddress(new InetSocketAddress( - address.getBindAddress(), this.server.getListenerAddress().getPort())), - System.currentTimeMillis(), this.conf.getInt( - "hbase.regionserver.info.port", 60030), machineName); - if (this.serverInfo.getServerAddress() == null) { - throw new NullPointerException("Server address cannot be null; " - + "hbase-958 debugging"); - } initializeZooKeeper(); initializeThreads(); int nbBlocks = conf.getInt("hbase.regionserver.nbreservationblocks", 4); @@ -421,7 +420,7 @@ */ private void initializeZooKeeper() throws IOException, InterruptedException { // Open connection to zookeeper and set primary watcher - zooKeeper = new ZooKeeperWatcher(conf, REGIONSERVER + + zooKeeper = new ZooKeeperWatcher(conf, REGIONSERVER + ":" + serverInfo.getServerAddress().getPort(), this); // Create the master address manager, register with zk, and start it. Then @@ -437,7 +436,7 @@ this.clusterStatusTracker.start(); this.clusterStatusTracker.blockUntilAvailable(); - // Create the catalog tracker and start it; + // Create the catalog tracker and start it; this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection, this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE)); catalogTracker.start(); @@ -477,6 +476,14 @@ * load/unload instructions. */ public void run() { + + try { + // Initialize threads and wait for a master + initialize(); + } catch (Exception e) { + abort("Fatal exception during initialization", e); + } + this.regionServerThread = Thread.currentThread(); boolean calledCloseUserRegions = false; try { @@ -622,9 +629,19 @@ this.serverInfo.setLoad(buildServerLoad()); this.requestCount.set(0); addOutboundMsgs(outboundMessages); - HMsg [] msgs = this.hbaseMaster.regionServerReport(this.serverInfo, - outboundMessages.toArray(HMsg.EMPTY_HMSG_ARRAY), - getMostLoadedRegions()); + HMsg [] msgs = null; + while (!this.stopped) { + try { + msgs = this.hbaseMaster.regionServerReport(this.serverInfo, + outboundMessages.toArray(HMsg.EMPTY_HMSG_ARRAY), + getMostLoadedRegions()); + break; + } catch (IOException ioe) { + // Couldn't connect to the master, get location from zk and reconnect + // Method blocks until new master is found or we are stopped + getMaster(); + } + } updateOutboundMsgs(outboundMessages); outboundMessages.clear(); @@ -754,7 +771,7 @@ // hack! Maps DFSClient => RegionServer for logs. HDFS made this // config param for task trackers, but we can piggyback off of it. if (this.conf.get("mapred.task.id") == null) { - this.conf.set("mapred.task.id", + this.conf.set("mapred.task.id", "hb_rs_" + this.serverInfo.getServerName() + "_" + System.currentTimeMillis()); } @@ -1297,18 +1314,17 @@ * Method will block until a master is available. You can break from this * block by requesting the server stop. * - * @return + * @return master address, or null if server has been stopped */ - private boolean getMaster() { + private HServerAddress getMaster() { HServerAddress masterAddress = null; while ((masterAddress = masterAddressManager.getMasterAddress()) == null) { if (stopped) { - return false; + return null; } LOG.debug("No master found, will retry"); sleeper.sleep(); } - LOG.info("Telling master at " + masterAddress + " that we are up"); HMasterRegionInterface master = null; while (!stopped && master == null) { try { @@ -1323,8 +1339,9 @@ sleeper.sleep(); } } + LOG.info("Connected to master at " + masterAddress); this.hbaseMaster = master; - return true; + return masterAddress; } /** @@ -1347,7 +1364,8 @@ * us by the master. */ private MapWritable reportForDuty() { - while (!stopped && !getMaster()) { + HServerAddress masterAddress = null; + while (!stopped && (masterAddress = getMaster()) == null) { sleeper.sleep(); LOG.warn("Unable to get master for initialization"); } @@ -1362,6 +1380,7 @@ ZKUtil.joinZNode(zooKeeper.rsZNode, ZKUtil.getNodeName(serverInfo)), this.serverInfo.getServerAddress()); this.serverInfo.setLoad(buildServerLoad()); + LOG.info("Telling master at " + masterAddress + " that we are up"); result = this.hbaseMaster.regionServerStartup(this.serverInfo); break; } catch (IOException e) { @@ -2373,7 +2392,7 @@ public CompactionRequestor getCompactionRequester() { return this.compactSplitThread; } - + // // Main program and support routines // Index: src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (revision 1004853) +++ src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (working copy) @@ -54,14 +54,17 @@ */ public class LocalHBaseCluster { static final Log LOG = LogFactory.getLog(LocalHBaseCluster.class); - private final HMaster master; - private final List regionThreads; + private final List masterThreads = + new CopyOnWriteArrayList(); + private final List regionThreads = + new CopyOnWriteArrayList(); private final static int DEFAULT_NO = 1; /** local mode */ public static final String LOCAL = "local"; /** 'local:' */ public static final String LOCAL_COLON = LOCAL + ":"; private final Configuration conf; + private final Class masterClass; private final Class regionServerClass; /** @@ -83,37 +86,65 @@ */ public LocalHBaseCluster(final Configuration conf, final int noRegionServers) throws IOException { - this(conf, noRegionServers, HMaster.class, getRegionServerImplementation(conf)); + this(conf, 1, noRegionServers, getMasterImplementation(conf), + getRegionServerImplementation(conf)); } + /** + * Constructor. + * @param conf Configuration to use. Post construction has the active master + * address. + * @param noMasters Count of masters to start. + * @param noRegionServers Count of regionservers to start. + * @throws IOException + */ + public LocalHBaseCluster(final Configuration conf, final int noMasters, + final int noRegionServers) + throws IOException { + this(conf, noMasters, noRegionServers, getMasterImplementation(conf), + getRegionServerImplementation(conf)); + } + @SuppressWarnings("unchecked") private static Class getRegionServerImplementation(final Configuration conf) { return (Class)conf.getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); } + @SuppressWarnings("unchecked") + private static Class getMasterImplementation(final Configuration conf) { + return (Class)conf.getClass(HConstants.MASTER_IMPL, + HMaster.class); + } + /** * Constructor. * @param conf Configuration to use. Post construction has the master's * address. + * @param noMasters Count of masters to start. * @param noRegionServers Count of regionservers to start. * @param masterClass + * @param regionServerClass * @throws IOException */ @SuppressWarnings("unchecked") - public LocalHBaseCluster(final Configuration conf, + public LocalHBaseCluster(final Configuration conf, final int noMasters, final int noRegionServers, final Class masterClass, final Class regionServerClass) throws IOException { this.conf = conf; - // Create the master - this.master = HMaster.constructMaster(masterClass, conf); - // Start the HRegionServers. Always have region servers come up on - // port '0' so there won't be clashes over default port as unit tests - // start/stop ports at different times during the life of the test. + // Always have masters and regionservers come up on port '0' so we don't + // clash over default ports. + conf.set(HConstants.MASTER_PORT, "0"); conf.set(HConstants.REGIONSERVER_PORT, "0"); - this.regionThreads = - new CopyOnWriteArrayList(); + // Start the HMasters. + this.masterClass = + (Class)conf.getClass(HConstants.MASTER_IMPL, + masterClass); + for (int i = 0; i < noMasters; i++) { + addMaster(i); + } + // Start the HRegionServers. this.regionServerClass = (Class)conf.getClass(HConstants.REGION_SERVER_IMPL, regionServerClass); @@ -138,6 +169,22 @@ return rst; } + public JVMClusterUtil.MasterThread addMaster() throws IOException { + return addMaster(this.masterThreads.size()); + } + + public JVMClusterUtil.MasterThread addMaster(final int index) + throws IOException { + // Create each master with its own Configuration instance so each has + // its HConnection instance rather than share (see HBASE_INSTANCES down in + // the guts of HConnectionManager. + JVMClusterUtil.MasterThread mt = + JVMClusterUtil.createMasterThread(new Configuration(this.conf), + this.masterClass, index); + this.masterThreads.add(mt); + return mt; + } + /** * @param serverNumber * @return region server @@ -147,13 +194,6 @@ } /** - * @return the HMaster thread - */ - public HMaster getMaster() { - return this.master; - } - - /** * @return Read-only list of region server threads. */ public List getRegionServers() { @@ -199,6 +239,73 @@ } /** + * @param serverNumber + * @return the HMaster thread + */ + public HMaster getMaster(int serverNumber) { + return masterThreads.get(serverNumber).getMaster(); + } + + /** + * Gets the current active master, if available. If no active master, returns + * null. + * @return the HMaster for the active master + */ + public HMaster getActiveMaster() { + for (JVMClusterUtil.MasterThread mt : masterThreads) { + if (mt.getMaster().isActiveMaster()) { + return mt.getMaster(); + } + } + return null; + } + + /** + * @return Read-only list of master threads. + */ + public List getMasters() { + return Collections.unmodifiableList(this.masterThreads); + } + + /** + * @return List of running master servers (Some servers may have been killed + * or aborted during lifetime of cluster; these servers are not included in + * this list). + */ + public List getLiveMasters() { + List liveServers = + new ArrayList(); + List list = getMasters(); + for (JVMClusterUtil.MasterThread mt: list) { + if (mt.isAlive()) { + liveServers.add(mt); + } + } + return liveServers; + } + + /** + * Wait for the specified master to stop + * Removes this thread from list of running threads. + * @param serverNumber + * @return Name of master that just went down. + */ + public String waitOnMaster(int serverNumber) { + JVMClusterUtil.MasterThread masterThread = + this.masterThreads.remove(serverNumber); + while (masterThread.isAlive()) { + try { + LOG.info("Waiting on " + + masterThread.getMaster().getServerName().toString()); + masterThread.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return masterThread.getName(); + } + + /** * Wait for Mini HBase Cluster to shut down. * Presumes you've already called {@link #shutdown()}. */ @@ -214,11 +321,15 @@ } } } - if (this.master != null && this.master.isAlive()) { - try { - this.master.join(); - } catch(InterruptedException e) { - // continue + if (this.masterThreads != null) { + for (Thread t : this.masterThreads) { + if (t.isAlive()) { + try { + t.join(); + } catch (InterruptedException e) { + // continue + } + } } } } @@ -227,14 +338,14 @@ * Start the cluster. */ public void startup() { - JVMClusterUtil.startup(this.master, this.regionThreads); + JVMClusterUtil.startup(this.masterThreads, this.regionThreads); } /** * Shut down the mini HBase cluster */ public void shutdown() { - JVMClusterUtil.shutdown(this.master, this.regionThreads); + JVMClusterUtil.shutdown(this.masterThreads, this.regionThreads); } /** Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1004853) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; -import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableNotDisabledException; @@ -80,7 +79,6 @@ import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; import org.apache.hadoop.hbase.zookeeper.RegionServerTracker; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; @@ -131,17 +129,17 @@ // Address of the HMaster private final HServerAddress address; // file system manager for the master FS operations - private final MasterFileSystem fileSystemManager; + private MasterFileSystem fileSystemManager; - private final HConnection connection; + private HConnection connection; // server manager to deal with region server info - private final ServerManager serverManager; + private ServerManager serverManager; // manager of assignment nodes in zookeeper - final AssignmentManager assignmentManager; + AssignmentManager assignmentManager; // manager of catalog regions - private final CatalogTracker catalogTracker; + private CatalogTracker catalogTracker; // Cluster status zk tracker and local setter private ClusterStatusTracker clusterStatusTracker; @@ -150,6 +148,10 @@ private volatile boolean stopped = false; // Set on abort -- usually failure of our zk session. private volatile boolean abort = false; + // flag set after we become the active master (used for testing) + protected volatile boolean isActiveMaster = false; + // flag set after we complete initialization once active (used for testing) + protected volatile boolean isInitialized = false; // Instance of the hbase executor service. ExecutorService executorService; @@ -163,21 +165,22 @@ /** * Initializes the HMaster. The steps are as follows: - * + *

*

    *
  1. Initialize HMaster RPC and address - *
  2. Connect to ZooKeeper. Get count of regionservers still up. - *
  3. Block until becoming active master - *
  4. Initialize master components - server manager, region manager, - * region server queue, file system manager, etc + *
  5. Connect to ZooKeeper. *
+ *

+ * Remaining steps of initialization occur in {@link #run()} so that they + * run in their own thread rather than within the context of the constructor. * @throws InterruptedException */ public HMaster(final Configuration conf) throws IOException, KeeperException, InterruptedException { this.conf = conf; + /* - * 1. Determine address and initialize RPC server (but do not start). + * Determine address and initialize RPC server (but do not start). * The RPC server ports can be ephemeral. Create a ZKW instance. */ HServerAddress a = new HServerAddress(getMyAddress(this.conf)); @@ -201,66 +204,11 @@ "_" + System.currentTimeMillis()); } - this.zooKeeper = new ZooKeeperWatcher(conf, MASTER, this); - - /* - * 2. Count of regoinservers that are up. - */ - int count = ZKUtil.getNumberOfChildren(zooKeeper, zooKeeper.rsZNode); - - /* - * 3. Block on becoming the active master. - * We race with other masters to write our address into ZooKeeper. If we - * succeed, we are the primary/active master and finish initialization. - * - * If we do not succeed, there is another active master and we should - * now wait until it dies to try and become the next active master. If we - * do not succeed on our first attempt, this is no longer a cluster startup. - */ - this.activeMasterManager = new ActiveMasterManager(zooKeeper, address, this); - this.zooKeeper.registerListener(activeMasterManager); - stallIfBackupMaster(this.conf, this.activeMasterManager); - activeMasterManager.blockUntilBecomingActiveMaster(); - - /* - * 4. We are active master now... go initialize components we need to run. - * Note, there may be dross in zk from previous runs; it'll get addressed - * when we enter {@link #run()} below. - */ - // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring. - this.fileSystemManager = new MasterFileSystem(this); - this.connection = HConnectionManager.getConnection(conf); - this.executorService = new ExecutorService(getServerName()); - - this.serverManager = new ServerManager(this, this); - - this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection, - this, conf.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE)); - this.catalogTracker.start(); - - this.assignmentManager = new AssignmentManager(this, serverManager, - this.catalogTracker, this.executorService); - zooKeeper.registerListener(assignmentManager); - - this.regionServerTracker = new RegionServerTracker(zooKeeper, this, - this.serverManager); - this.regionServerTracker.start(); - - // Set the cluster as up. If new RSs, they'll be waiting on this before - // going ahead with their startup. - this.clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this); - boolean wasUp = this.clusterStatusTracker.isClusterUp(); - if (!wasUp) this.clusterStatusTracker.setClusterUp(); - this.clusterStatusTracker.start(); - - LOG.info("Server active/primary master; " + this.address + - ", sessionid=0x" + - Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()) + - ", ephemeral nodes still up in zk=" + count + - ", cluster-up flag was=" + wasUp); + this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + + address.getPort(), this); } - /* + /** * Stall startup if we are designated a backup master; i.e. we want someone * else to become the master before proceeding. * @param c @@ -272,7 +220,9 @@ throws InterruptedException { // If we're a backup master, stall until a primary to writes his address if (!c.getBoolean(HConstants.MASTER_TYPE_BACKUP, - HConstants.DEFAULT_MASTER_TYPE_BACKUP)) return; + HConstants.DEFAULT_MASTER_TYPE_BACKUP)) { + return; + } // This will only be a minute or so while the cluster starts up, // so don't worry about setting watches on the parent znode while (!amm.isActiveMaster()) { @@ -285,74 +235,173 @@ /** * Main processing loop for the HMaster. *

    - *
  1. Handle both fresh cluster start as well as failed over initialization of - * the HMaster
  2. - *
  3. Start the necessary services
  4. - *
  5. Reassign the root region
  6. - *
  7. The master is no longer closed - set "closed" to false
  8. + *
  9. Block until becoming active master + *
  10. Finish initialization via {@link #finishInitialization()} + *
  11. Enter loop until we are stopped + *
  12. Stop services and perform cleanup once stopped *
*/ @Override public void run() { try { - // start up all service threads. - startServiceThreads(); + /* + * Block on becoming the active master. + * + * We race with other masters to write our address into ZooKeeper. If we + * succeed, we are the primary/active master and finish initialization. + * + * If we do not succeed, there is another active master and we should + * now wait until it dies to try and become the next active master. If we + * do not succeed on our first attempt, this is no longer a cluster startup. + */ + this.activeMasterManager = new ActiveMasterManager(zooKeeper, address, this); + this.zooKeeper.registerListener(activeMasterManager); + stallIfBackupMaster(this.conf, this.activeMasterManager); + activeMasterManager.blockUntilBecomingActiveMaster(); - // Wait for region servers to report in. Returns count of regions. - int regionCount = this.serverManager.waitForRegionServers(); + // We are either the active master or we were asked to shutdown - // TODO: Should do this in background rather than block master startup - // TODO: Do we want to do this before/while/after RSs check in? - // It seems that this method looks at active RSs but happens concurrently - // with when we expect them to be checking in - this.fileSystemManager. - splitLogAfterStartup(this.serverManager.getOnlineServers()); + if (!this.stopped) { + // We are active master. Finish init and loop until we are closed. + finishInitialization(); + loop(); + // Once we break out of here, we are being shutdown - // Make sure root and meta assigned before proceeding. - assignRootAndMeta(); + // Stop balancer and meta catalog janitor + if (this.balancerChore != null) { + this.balancerChore.interrupt(); + } + if (this.catalogJanitorChore != null) { + this.catalogJanitorChore.interrupt(); + } - // Is this fresh start with no regions assigned or are we a master joining - // an already-running cluster? If regionsCount == 0, then for sure a - // fresh start. TOOD: Be fancier. If regionsCount == 2, perhaps the - // 2 are .META. and -ROOT- and we should fall into the fresh startup - // branch below. For now, do processFailover. - if (regionCount == 0) { - this.assignmentManager.cleanoutUnassigned(); - this.assignmentManager.assignAllUserRegions(); - } else { - this.assignmentManager.processFailover(); + // Wait for all the remaining region servers to report in IFF we were + // running a cluster shutdown AND we were NOT aborting. + if (!this.abort && this.serverManager.isClusterShutdown()) { + this.serverManager.letRegionServersShutdown(); + } + stopServiceThreads(); } - // Start balancer and meta catalog janitor after meta and regions have - // been assigned. - this.balancerChore = getAndStartBalancerChore(this); - this.catalogJanitorChore = - Threads.setDaemonThreadRunning(new CatalogJanitor(this, this)); + // Handle either a backup or active master being stopped - // Check if we should stop every second. - Sleeper sleeper = new Sleeper(1000, this); - while (!this.stopped) sleeper.sleep(); + // Stop services started for both backup and active masters + this.activeMasterManager.stop(); + HConnectionManager.deleteConnection(this.conf, true); + this.zooKeeper.close(); + LOG.info("HMaster main thread exiting"); + } catch (Throwable t) { abort("Unhandled exception. Starting shutdown.", t); } - // Stop balancer and meta catalog janitor - if (this.balancerChore != null) this.balancerChore.interrupt(); - if (this.catalogJanitorChore != null) this.catalogJanitorChore.interrupt(); + } - // Wait for all the remaining region servers to report in IFF we were - // running a cluster shutdown AND we were NOT aborting. - if (!this.abort && this.serverManager.isClusterShutdown()) { - this.serverManager.letRegionServersShutdown(); + private void loop() { + // Check if we should stop every second. + Sleeper sleeper = new Sleeper(1000, this); + while (!this.stopped) { + sleeper.sleep(); } - stopServiceThreads(); - // Stop services started up in the constructor. - this.activeMasterManager.stop(); - HConnectionManager.deleteConnection(this.conf, true); - this.zooKeeper.close(); - LOG.info("HMaster main thread exiting"); } /** + * Finish initialization of HMaster after becoming the primary master. + * + *
    + *
  1. Initialize master components - file system manager, server manager, + * assignment manager, region server tracker, catalog tracker, etc
  2. + *
  3. Start necessary service threads - rpc server, info server, + * executor services, etc
  4. + *
  5. Set cluster as UP in ZooKeeper
  6. + *
  7. Wait for RegionServers to check-in
  8. + *
  9. Split logs and perform data recovery, if necessary
  10. + *
  11. Ensure assignment of root and meta regions
  12. + *
  13. Handle either fresh cluster start or master failover
  14. + *
+ * + * @throws IOException + * @throws InterruptedException + * @throws KeeperException + */ + private void finishInitialization() + throws IOException, InterruptedException, KeeperException { + + isActiveMaster = true; + + /* + * We are active master now... go initialize components we need to run. + * Note, there may be dross in zk from previous runs; it'll get addressed + * below after we determine if cluster startup or failover. + */ + + // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring. + this.fileSystemManager = new MasterFileSystem(this); + this.connection = HConnectionManager.getConnection(conf); + this.executorService = new ExecutorService(getServerName()); + + this.serverManager = new ServerManager(this, this); + + this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection, + this, conf.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE)); + this.catalogTracker.start(); + + this.assignmentManager = new AssignmentManager(this, serverManager, + this.catalogTracker, this.executorService); + zooKeeper.registerListener(assignmentManager); + + this.regionServerTracker = new RegionServerTracker(zooKeeper, this, + this.serverManager); + this.regionServerTracker.start(); + + // Set the cluster as up. If new RSs, they'll be waiting on this before + // going ahead with their startup. + this.clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this); + this.clusterStatusTracker.start(); + boolean wasUp = this.clusterStatusTracker.isClusterUp(); + if (!wasUp) this.clusterStatusTracker.setClusterUp(); + + LOG.info("Server active/primary master; " + this.address + + ", sessionid=0x" + + Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()) + + ", cluster-up flag was=" + wasUp); + + // start up all service threads. + startServiceThreads(); + + // Wait for region servers to report in. Returns count of regions. + int regionCount = this.serverManager.waitForRegionServers(); + + // TODO: Should do this in background rather than block master startup + this.fileSystemManager. + splitLogAfterStartup(this.serverManager.getOnlineServers()); + + // Make sure root and meta assigned before proceeding. + assignRootAndMeta(); + + // Is this fresh start with no regions assigned or are we a master joining + // an already-running cluster? If regionsCount == 0, then for sure a + // fresh start. TOOD: Be fancier. If regionsCount == 2, perhaps the + // 2 are .META. and -ROOT- and we should fall into the fresh startup + // branch below. For now, do processFailover. + if (regionCount == 0) { + LOG.info("Master startup proceeding: cluster startup"); + this.assignmentManager.cleanoutUnassigned(); + this.assignmentManager.assignAllUserRegions(); + } else { + LOG.info("Master startup proceeding: master failover"); + this.assignmentManager.processFailover(); + } + + // Start balancer and meta catalog janitor after meta and regions have + // been assigned. + this.balancerChore = getAndStartBalancerChore(this); + this.catalogJanitorChore = + Threads.setDaemonThreadRunning(new CatalogJanitor(this, this)); + + isInitialized = true; + } + + /** * Check -ROOT- and .META. are assigned. If not, * assign them. * @throws InterruptedException @@ -374,7 +423,7 @@ assigned++; } LOG.info("-ROOT- assigned=" + assigned + ", rit=" + rit); - + // Work on meta region rit = this.assignmentManager. processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.FIRST_META_REGIONINFO); @@ -482,11 +531,7 @@ } } catch (IOException e) { if (e instanceof RemoteException) { - try { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } catch (IOException ex) { - LOG.warn("thread start", ex); - } + e = ((RemoteException)e).unwrapRemoteException(); } // Something happened during startup. Shut things down. abort("Failed startup", e); @@ -631,8 +676,10 @@ throws UnknownRegionException { Pair p = this.assignmentManager.getAssignment(encodedRegionName); - if (p == null) throw new UnknownRegionException(Bytes.toString(encodedRegionName)); - HServerInfo dest = this.serverManager.getServerInfo(new String(destServerName)); + if (p == null) + throw new UnknownRegionException(Bytes.toString(encodedRegionName)); + HServerInfo dest = + this.serverManager.getServerInfo(new String(destServerName)); RegionPlan rp = new RegionPlan(p.getFirst(), p.getSecond(), dest); this.assignmentManager.balance(rp); } @@ -865,6 +912,10 @@ public void stop(final String why) { LOG.info(why); this.stopped = true; + // If we are a backup master, we need to interrupt wait + synchronized (this.activeMasterManager.clusterHasActiveMaster) { + this.activeMasterManager.clusterHasActiveMaster.notifyAll(); + } } @Override @@ -872,6 +923,31 @@ return this.stopped; } + /** + * Report whether this master is currently the active master or not. + * If not active master, we are parked on ZK waiting to become active. + * + * This method is used for testing. + * + * @return true if active master, false if not. + */ + public boolean isActiveMaster() { + return isActiveMaster; + } + + /** + * Report whether this master has completed with its initialization and is + * ready. If ready, the master is also the active master. A standby master + * is never ready. + * + * This method is used for testing. + * + * @return true if master is ready to go, false if not. + */ + public boolean isInitialized() { + return isInitialized; + } + public void assignRegion(HRegionInfo hri) { assignmentManager.assign(hri); } Index: src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java (revision 1004853) +++ src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java (working copy) @@ -135,9 +135,9 @@ Integer.toString(clientPort)); // Need to have the zk cluster shutdown when master is shutdown. // Run a subclass that does the zk cluster shutdown on its way out. - LocalHBaseCluster cluster = new LocalHBaseCluster(conf, 1, + LocalHBaseCluster cluster = new LocalHBaseCluster(conf, 1, 1, LocalHMaster.class, HRegionServer.class); - ((LocalHMaster)cluster.getMaster()).setZKCluster(zooKeeperCluster); + ((LocalHMaster)cluster.getMaster(0)).setZKCluster(zooKeeperCluster); cluster.startup(); } else { HMaster master = HMaster.constructMaster(masterClass, conf); Index: src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java (revision 1004853) +++ src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java (working copy) @@ -76,12 +76,13 @@ * Call 'start' on the returned thread to make it run. * @param c Configuration to use. * @param hrsc Class to create. - * @param index Used distingushing the object returned. + * @param index Used distinguishing the object returned. * @throws IOException * @return Region server added. */ - public static JVMClusterUtil.RegionServerThread createRegionServerThread(final Configuration c, - final Class hrsc, final int index) + public static JVMClusterUtil.RegionServerThread createRegionServerThread( + final Configuration c, final Class hrsc, + final int index) throws IOException { HRegionServer server; try { @@ -99,32 +100,121 @@ return new JVMClusterUtil.RegionServerThread(server, index); } + /** - * Start the cluster. + * Datastructure to hold Master Thread and Master instance + */ + public static class MasterThread extends Thread { + private final HMaster master; + + public MasterThread(final HMaster m, final int index) { + super(m, "Master:" + index + ";" + m.getServerName()); + this.master = m; + } + + /** @return the master */ + public HMaster getMaster() { + return this.master; + } + + /** + * Block until the master has come online, indicating it is ready + * to be used. + */ + public void waitForServerOnline() { + // The server is marked online after init begins but before race to become + // the active master. + while (!this.master.isAlive() && !this.master.isStopped()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // continue waiting + } + } + } + } + + /** + * Creates a {@link MasterThread}. + * Call 'start' on the returned thread to make it run. + * @param c Configuration to use. + * @param hmc Class to create. + * @param index Used distinguishing the object returned. + * @throws IOException + * @return Master added. + */ + public static JVMClusterUtil.MasterThread createMasterThread( + final Configuration c, final Class hmc, + final int index) + throws IOException { + HMaster server; + try { + server = hmc.getConstructor(Configuration.class).newInstance(c); + } catch (InvocationTargetException ite) { + Throwable target = ite.getTargetException(); + throw new RuntimeException("Failed construction of RegionServer: " + + hmc.toString() + ((target.getCause() != null)? + target.getCause().getMessage(): ""), target); + } catch (Exception e) { + IOException ioe = new IOException(); + ioe.initCause(e); + throw ioe; + } + return new JVMClusterUtil.MasterThread(server, index); + } + + /** + * Start the cluster. Waits until there is a primary master and returns its + * address. * @param m * @param regionServers - * @return Address to use contacting master. + * @return Address to use contacting primary master. */ - public static String startup(final HMaster m, + public static String startup(final List masters, final List regionservers) { - if (m != null) m.start(); + if (masters != null) { + for (JVMClusterUtil.MasterThread t : masters) { + t.start(); + } + } if (regionservers != null) { for (JVMClusterUtil.RegionServerThread t: regionservers) { t.start(); } } - return m == null? null: m.getMasterAddress().toString(); + if (masters == null || masters.isEmpty()) { + return null; + } + // Wait for an active master + while (true) { + for (JVMClusterUtil.MasterThread t : masters) { + if (t.master.isActiveMaster()) { + return t.master.getMasterAddress().toString(); + } + } + try { + Thread.sleep(1000); + } catch(InterruptedException e) { + // Keep waiting + } + } } /** * @param master * @param regionservers */ - public static void shutdown(final HMaster master, + public static void shutdown(final List masters, final List regionservers) { LOG.debug("Shutting down HBase Cluster"); - if (master != null) { - master.shutdown(); + if (masters != null) { + for (JVMClusterUtil.MasterThread t : masters) { + if (t.master.isActiveMaster()) { + t.master.shutdown(); + } else { + t.master.stopMaster(); + } + } } // regionServerThreads can never be null because they are initialized when // the class is constructed. @@ -137,20 +227,23 @@ } } } - if (master != null) { - while (master.isAlive()) { - try { - // The below has been replaced to debug sometime hangs on end of - // tests. - // this.master.join(): - Threads.threadDumpingIsAlive(master); - } catch(InterruptedException e) { - // continue + if (masters != null) { + for (JVMClusterUtil.MasterThread t : masters) { + while (t.master.isAlive()) { + try { + // The below has been replaced to debug sometime hangs on end of + // tests. + // this.master.join(): + Threads.threadDumpingIsAlive(t.master); + } catch(InterruptedException e) { + // continue + } } } } - LOG.info("Shutdown " + - ((regionservers != null)? master.getName(): "0 masters") + - " " + regionservers.size() + " region server(s)"); + LOG.info("Shutdown of " + + ((masters != null) ? masters.size() : "0") + " master(s) and " + + ((regionservers != null) ? regionservers.size() : "0") + + " regionserver(s) complete"); } }