Index: BRANCH_TODO.txt =================================================================== --- BRANCH_TODO.txt (revision 0) +++ BRANCH_TODO.txt (revision 0) @@ -0,0 +1,38 @@ +List of things todo for branch, including comments from reviews not yet +implemented. + + +Now: + +* synchronize all access to the boolean in ActiveMasterManager + + +Think about: + +* renaming master file manager? MasterFS/MasterFileSystem + + +Later: + +* ServerStatus/MasterStatus + + - These need new names to be more descriptive (ServerControl?) + - They should have a very clear purpose that adds value beyond passing + HMaster directly + - Current idea is these things would just have accessors/setters to + the server status booleans and abort() methods (like closed, closing, + abortRequested) + +* HBaseEventHandler/HBaseEventType/HBaseExecutorService + + - After ZK changes, renamed to EventHandler/EventType + - Currently multiple types map to a single handler, we may want 1-to-1 + - Need to do a full review of the semantics of these once bulk of + master rewrite is done + +* LoadBalancer + + - Need to finish or back out code related to block locations + (if finish, need to use files not directory, and use right location) + - Put notes from reviewboard/jira into LB javadoc or hbase "book" + Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java (revision 961971) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java (working copy) @@ -40,12 +40,12 @@ private static final Log LOG = LogFactory.getLog(TestMasterAddressManager.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - + @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniZKCluster(); } - + @AfterClass public static void tearDownAfterClass() throws Exception { TEST_UTIL.shutdownMiniZKCluster(); @@ -57,29 +57,29 @@ */ @Test public void testMasterAddressManagerFromZK() throws Exception { - - ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), + + ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), "testMasterAddressManagerFromZK", null); zk.createZNodeIfNotExists(zk.baseZNode); - + // Should not have a master yet - MasterAddressManager addressManager = new MasterAddressManager(zk); + MasterAddressManager addressManager = new MasterAddressManager(zk, null); addressManager.monitorMaster(); assertFalse(addressManager.hasMaster()); zk.registerListener(addressManager); - + // Use a listener to capture when the node is actually created NodeCreationListener listener = new NodeCreationListener(zk, zk.masterAddressZNode); zk.registerListener(listener); - + // Create the master node with a dummy address String host = "hostname"; int port = 1234; HServerAddress dummyAddress = new HServerAddress(host, port); LOG.info("Creating master node"); - zk.createZNodeIfNotExists(zk.masterAddressZNode, + zk.createZNodeIfNotExists(zk.masterAddressZNode, Bytes.toBytes(dummyAddress.toString()), CreateMode.EPHEMERAL, false); - + // Wait for the node to be created LOG.info("Waiting for master address manager to be notified"); listener.waitForCreation(); @@ -87,21 +87,21 @@ assertTrue(addressManager.hasMaster()); HServerAddress pulledAddress = addressManager.getMasterAddress(); assertTrue(pulledAddress.equals(dummyAddress)); - + } - + public static class NodeCreationListener extends ZooKeeperListener { private static final Log LOG = LogFactory.getLog(NodeCreationListener.class); - + private Semaphore lock; private String node; - + public NodeCreationListener(ZooKeeperWatcher watcher, String node) { super(watcher); lock = new Semaphore(0); this.node = node; } - + @Override public void nodeCreated(String path) { if(path.equals(node)) { @@ -109,7 +109,7 @@ lock.release(); } } - + public void waitForCreation() throws InterruptedException { lock.acquire(); } Index: src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java (revision 0) @@ -0,0 +1,269 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.client.ServerConnection; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestActiveMasterManager { + private static final Log LOG = LogFactory.getLog(TestActiveMasterManager.class); + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniZKCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniZKCluster(); + } + /** + * Unit tests that uses ZooKeeper but does not use the master-side methods + * but rather acts directly on ZK. + * @throws Exception + */ + @Test + public void testActiveMasterManagerFromZK() throws Exception { + + ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), + "testActiveMasterManagerFromZK", null); + zk.createZNodeIfNotExists(zk.baseZNode); + try { + zk.deleteZNode(zk.masterAddressZNode); + } catch(KeeperException.NoNodeException nne) {} + + // Create the master node with a dummy address + HServerAddress firstMasterAddress = new HServerAddress("firstMaster", 1234); + HServerAddress secondMasterAddress = new HServerAddress("secondMaster", 1234); + + // Should not have a master yet + DummyMasterStatus ms1 = new DummyMasterStatus(); + ActiveMasterManager activeMasterManager = new ActiveMasterManager(zk, + firstMasterAddress, ms1); + zk.registerListener(activeMasterManager); + assertFalse(activeMasterManager.clusterHasActiveMaster.get()); + + // First test becoming the active master uninterrupted + activeMasterManager.blockUntilBecomingActiveMaster(); + assertTrue(activeMasterManager.clusterHasActiveMaster.get()); + assertMaster(zk, firstMasterAddress); + + // New manager will now try to become the active master in another thread + WaitToBeMasterThread t = new WaitToBeMasterThread(zk, secondMasterAddress); + zk.registerListener(t.manager); + t.start(); + // Wait for this guy to figure out there is another active master + // Wait for 1 second at most + int sleeps = 0; + while(!t.manager.clusterHasActiveMaster.get() && sleeps < 100) { + Thread.sleep(10); + sleeps++; + } + + // Both should see that there is an active master + assertTrue(activeMasterManager.clusterHasActiveMaster.get()); + assertTrue(t.manager.clusterHasActiveMaster.get()); + // But secondary one should not be the active master + assertFalse(t.isActiveMaster); + + // Close the first server and delete it's master node + ms1.setClosed(); + + // Use a listener to capture when the node is actually deleted + NodeDeletionListener listener = new NodeDeletionListener(zk, zk.masterAddressZNode); + zk.registerListener(listener); + + LOG.info("Deleting master node"); + zk.deleteZNode(zk.masterAddressZNode); + + // Wait for the node to be deleted + LOG.info("Waiting for active master manager to be notified"); + listener.waitForDeletion(); + LOG.info("Master node deleted"); + + // Now we expect the secondary manager to have and be the active master + // Wait for 1 second at most + sleeps = 0; + while(!t.isActiveMaster && sleeps < 100) { + Thread.sleep(10); + sleeps++; + } + LOG.debug("Slept " + sleeps + " times"); + + assertTrue(t.manager.clusterHasActiveMaster.get()); + assertTrue(t.isActiveMaster); + } + + /** + * Assert there is an active master and that it has the specified address. + * @param zk + * @param thisMasterAddress + * @throws KeeperException + */ + private void assertMaster(ZooKeeperWatcher zk, + HServerAddress expectedAddress) throws KeeperException { + HServerAddress readAddress = ZKUtil.getDataAsAddress(zk, zk.masterAddressZNode); + assertNotNull(readAddress); + assertTrue(expectedAddress.equals(readAddress)); + } + + public static class WaitToBeMasterThread extends Thread { + + ActiveMasterManager manager; + boolean isActiveMaster; + + public WaitToBeMasterThread(ZooKeeperWatcher zk, + HServerAddress address) { + this.manager = new ActiveMasterManager(zk, address, + new DummyMasterStatus()); + isActiveMaster = false; + } + + @Override + public void run() { + manager.blockUntilBecomingActiveMaster(); + LOG.info("Second master has become the active master!"); + isActiveMaster = true; + } + } + + public static class NodeDeletionListener extends ZooKeeperListener { + private static final Log LOG = LogFactory.getLog(NodeDeletionListener.class); + + private Semaphore lock; + private String node; + + public NodeDeletionListener(ZooKeeperWatcher watcher, String node) { + super(watcher); + lock = new Semaphore(0); + this.node = node; + } + + @Override + public void nodeDeleted(String path) { + if(path.equals(node)) { + LOG.debug("nodeDeleted(" + path + ")"); + lock.release(); + } + } + + public void waitForDeletion() throws InterruptedException { + lock.acquire(); + } + } + + public static class DummyMasterStatus implements MasterStatus { + + private AtomicBoolean closed = new AtomicBoolean(false); + + @Override + public AtomicBoolean getClosed() { + return closed; + } + + @Override + public FileSystemManager getFileSystemManager() { + return null; + } + + @Override + public RegionManager getRegionManager() { + return null; + } + + @Override + public ServerConnection getServerConnection() { + return null; + } + + @Override + public ServerManager getServerManager() { + return null; + } + + @Override + public AtomicBoolean getShutdownRequested() { + return null; + } + + @Override + public boolean isClosed() { + return closed.get(); + } + + @Override + public boolean isClusterStartup() { + return false; + } + + @Override + public void setClosed() { + closed.set(true); + } + + @Override + public void setClusterStartup(boolean isClusterStartup) {} + + @Override + public void shutdown() {} + + @Override + public void startShutdown() {} + + @Override + public void abortServer() {} + + @Override + public Configuration getConfiguration() { + return null; + } + + @Override + public HServerAddress getHServerAddress() { + return null; + } + + @Override + public ZooKeeperWatcher getZooKeeper() { + return null; + } + + } +} Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (revision 961971) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (working copy) @@ -88,6 +88,12 @@ this.server = server; info("Connected to ZooKeeper"); setNodeNames(conf); + try { + ZKUtil.createIfNotExists(this, baseZNode); + } catch (KeeperException e) { + error("Unexpected KeeperException creating base node", e); + throw new IOException(e); + } } /** @@ -198,9 +204,10 @@ // Abort the server if Disconnected or Expired // TODO: Åny reason to handle these two differently? case Disconnected: + info("Received Disconnected from ZooKeeper, ignoring"); + break; case Expired: - error("Received Disconnected/Expired [" + event.getState() + "] " + - "from ZooKeeper, aborting server"); + error("Received Expired from ZooKeeper, aborting server"); if(server != null) { server.abortServer(); } @@ -213,13 +220,15 @@ * * This may be temporary but for now this gives one place to deal with these. * - * TODO: Currently this method aborts the server. + * TODO: Currently this method rethrows the exception to let the caller handle * * @param ke + * @throws KeeperException */ - public void keeperException(KeeperException ke) { - error("Received unexpected KeeperException, aborting server", ke); - server.abortServer(); + public void keeperException(KeeperException ke) + throws KeeperException { + error("Received unexpected KeeperException, re-throwing exception", ke); + throw ke; } /** @@ -229,6 +238,7 @@ * * TODO: Currently, this method does nothing. * Is this ever expected to happen? Do we abort or can we let it run? + * Maybe this should be logged as WARN? It shouldn't happen? * * @param ie */ Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (revision 961971) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (working copy) @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.zookeeper; import java.io.IOException; +import java.util.List; import java.util.Properties; import org.apache.commons.logging.Log; @@ -27,28 +28,34 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; /** * Internal HBase utility class for ZooKeeper. - * - * Contains only static methods and constants. + * + *
Contains only static methods and constants. + * + *
Methods all throw {@link KeeperException} if there is an unexpected
+ * zookeeper exception, so callers of these methods must handle appropriately.
+ * If ZK is required for the operation, the server will need to be aborted.
*/
public class ZKUtil {
private static final Log LOG = LogFactory.getLog(ZKUtil.class);
// TODO: Replace this with ZooKeeper constant when ZOOKEEPER-277 is resolved.
private static final char ZNODE_PATH_SEPARATOR = '/';
-
+
/**
* Creates a new connection to ZooKeeper, pulling settings and quorum config
* from the specified configuration object using methods from {@link ZKConfig}.
- *
+ *
* Sets the connection status monitoring watcher to the specified watcher.
- *
+ *
* @param conf configuration to pull quorum and other settings from
* @param watcher watcher to monitor connection changes
* @return connection to zookeeper
@@ -66,12 +73,16 @@
return new ZooKeeper(quorum, timeout, watcher);
}
+ //
+ // Helper methods
+ //
+
/**
* Join the prefix znode name with the suffix znode name to generate a proper
* full znode name.
- *
+ *
* Assumes prefix does not end with slash and suffix does not begin with it.
- *
+ *
* @param prefix beginning of znode name
* @param suffix ending of znode name
* @return result of properly joining prefix with suffix
@@ -80,16 +91,22 @@
return prefix + ZNODE_PATH_SEPARATOR + suffix;
}
+ //
+ // Existence checks and watches
+ //
+
/**
* Watch the specified znode for delete/create/change events. The watcher is
* set whether or not the node exists. If the node already exists, the method
* returns true. If the node does not exist, the method returns false.
- *
+ *
* @param zkw zk reference
* @param znode path of node to watch
* @return true if znode exists, false if does not exist or error
+ * @throws KeeperException if unexpected zookeeper exception
*/
- public static boolean watchAndCheckExists(ZooKeeperWatcher zkw, String znode) {
+ public static boolean watchAndCheckExists(ZooKeeperWatcher zkw, String znode)
+ throws KeeperException {
try {
Stat s = zkw.getZooKeeper().exists(znode, zkw);
zkw.debug("Set watcher on existing znode (" + znode + ")");
@@ -104,21 +121,103 @@
return false;
}
}
-
+
+ //
+ // Znode listings
+ //
+
/**
+ * Lists the children znodes of the specified znode. Also sets a watch on
+ * the specified znode which will capture a NodeDeleted event on the specified
+ * znode as well as NodeChildrenChanged if any children of the specified znode
+ * are created or deleted.
+ *
+ * Returns null if the specified node does not exist. Otherwise returns a
+ * list of children of the specified node. If the node exists but it has no
+ * children, an empty list will be returned.
+ *
+ * @param zkw zk reference
+ * @param znode path of node to list and watch children of
+ * @returns list of children of the specified node, an empty list if the node
+ * exists but has no children, and null if the node does not exist
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public static List Used by the Master. Waits on the master address ZNode delete event. When
- * multiple masters are brought up, they race to become master by writing their
- * address to ZooKeeper. Whoever wins becomes the master, and the rest wait for
- * that ephemeral node in ZooKeeper to evaporate (meaning the master went down),
- * at which point they try to write their own address to become the new master.
- */
-class ZKMasterAddressWatcher implements Watcher {
- private static final Log LOG = LogFactory.getLog(ZKMasterAddressWatcher.class);
-
- private ZooKeeperWrapper zookeeper;
- private final AtomicBoolean requestShutdown;
-
- /**
- * Create this watcher using passed ZooKeeperWrapper instance.
- * @param zk ZooKeeper
- * @param flag Flag to set to request shutdown.
- */
- ZKMasterAddressWatcher(final ZooKeeperWrapper zk, final AtomicBoolean flag) {
- this.requestShutdown = flag;
- this.zookeeper = zk;
- }
-
- /**
- * @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.WatchedEvent)
- */
- @Override
- public synchronized void process (WatchedEvent event) {
- EventType type = event.getType();
- LOG.debug(("Got event " + type + " with path " + event.getPath()));
- if (type.equals(EventType.NodeDeleted)) {
- if (event.getPath().equals(this.zookeeper.clusterStateZNode)) {
- LOG.info("Cluster shutdown while waiting, shutting down" +
- " this master.");
- this.requestShutdown.set(true);
- } else {
- LOG.debug("Master address ZNode deleted, notifying waiting masters");
- notifyAll();
- }
- } else if(type.equals(EventType.NodeCreated) &&
- event.getPath().equals(this.zookeeper.clusterStateZNode)) {
- LOG.debug("Resetting watch on cluster state node.");
- this.zookeeper.setClusterStateWatch();
- }
- }
-
- /**
- * Wait for master address to be available. This sets a watch in ZooKeeper and
- * blocks until the master address ZNode gets deleted.
- */
- public synchronized void waitForMasterAddressAvailability() {
- while (zookeeper.readMasterAddress(this) != null) {
- try {
- LOG.debug("Waiting for master address ZNode to be deleted " +
- "(Also watching cluster state node)");
- this.zookeeper.setClusterStateWatch();
- wait();
- } catch (InterruptedException e) {
- }
- }
- }
-
- /**
- * Write address to zookeeper. Parks here until we successfully write our
- * address (or until cluster shutdown).
- * @param address Address whose format is HServerAddress.toString
- */
- boolean writeAddressToZooKeeper(
- final HServerAddress address, boolean retry) {
- do {
- waitForMasterAddressAvailability();
- // Check if we need to shutdown instead of taking control
- if (this.requestShutdown.get()) {
- LOG.debug("Won't start Master because cluster is shuting down");
- return false;
- }
- if(this.zookeeper.writeMasterAddress(address)) {
- this.zookeeper.setClusterState(true);
- this.zookeeper.setClusterStateWatch();
- // Watch our own node
- this.zookeeper.readMasterAddress(this);
- return true;
- }
- } while(retry);
- return false;
- }
-
- /**
- * Reset the ZK in case a new connection is required
- * @param zookeeper new instance
- */
- public void setZookeeper(ZooKeeperWrapper zookeeper) {
- this.zookeeper = zookeeper;
- }
-}
\ No newline at end of file
+ *
*/
public HMaster(Configuration conf) throws IOException {
+ // initialize some variables
this.conf = conf;
-
- // Get my address and create an rpc server instance. The rpc-server port
- // can be ephemeral...ensure we have the correct info
+ // set the thread name
+ setName(MASTER + "-" + this.address);
+
+ /*
+ * 1. Determine address and initialize RPC server (but do not start)
+ *
+ * Get the master address and create an RPC server instance. The RPC
+ * server ports can be ephemeral.
+ */
HServerAddress a = new HServerAddress(getMyAddress(this.conf));
- this.rpcServer = HBaseRPC.getServer(this, a.getBindAddress(),
- a.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
- false, conf);
- this.address = new HServerAddress(this.rpcServer.getListenerAddress());
+ int numHandlers = conf.getInt("hbase.regionserver.handler.count", 10);
+ this.rpcServer = HBaseRPC.getServer(this, a.getBindAddress(), a.getPort(),
+ numHandlers, false, conf);
+ this.address = new HServerAddress(rpcServer.getListenerAddress());
- this.numRetries = conf.getInt("hbase.client.retries.number", 2);
- int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
- this.sleeper = new Sleeper(threadWakeFrequency, this.closed);
- this.connection = ServerConnectionManager.getConnection(conf);
-
- // Figure out if this is a fresh cluster start. This is done by checking the
- // number of RS ephemeral nodes. RS ephemeral nodes are created only after
- // the primary master has written the address to ZK. So this has to be done
- // before we race to write our address to zookeeper.
- zooKeeperWrapper =
+ /*
+ * 2. Determine if this is a fresh cluster startup or failed over master
+ *
+ * This is done by checking for the existence of any ephemeral
+ * RegionServer nodes in ZooKeeper. These nodes are created by RSs on
+ * their initialization but only after they find the primary master. As
+ * long as this check is done before we write our address into ZK, this
+ * will work. Note that multiple masters could find this to be true on
+ * startup (none have become active master yet), which is why there is
+ * an additional check if this master does not become primary on its
+ * first attempt.
+ */
+ zooKeeperWrapper =
new ZooKeeperWatcher(conf, getHServerAddress().toString(), this);
isClusterStartup = (zooKeeperWrapper.scanRSDirectory().size() == 0);
-
- // Create the filesystem manager, which in turn does the following:
- // - Creates the root hbase directory in the FS
- // - Checks the FS to make sure the root directory is readable
- // - Creates the archive directory for logs
+
+ /*
+ * 3. Initialize master components.
+ *
+ * This includes the filesystem manager, server manager, region manager,
+ * metrics, queues, sleeper, etc...
+ */
+ this.connection = ServerConnectionManager.getConnection(conf);
+ this.regionServerOperationQueue = new RegionServerOperationQueue(conf, closed);
+ this.metrics = new MasterMetrics(this.getName());
fileSystemManager = new FileSystemManager(conf, this);
-
- // Get our zookeeper wrapper and then try to write our address to zookeeper.
- // We'll succeed if we are only master or if we win the race when many
- // masters. Otherwise we park here inside in writeAddressToZooKeeper.
- // TODO: Bring up the UI to redirect to active Master.
- zooKeeperWrapper.registerListener(this);
- this.zkMasterAddressWatcher =
- new ZKMasterAddressWatcher(this.zooKeeperWrapper, this.shutdownRequested);
- zooKeeperWrapper.registerListener(zkMasterAddressWatcher);
- this.zkMasterAddressWatcher.writeAddressToZooKeeper(this.address, true);
- this.regionServerOperationQueue =
- new RegionServerOperationQueue(this.conf, this.closed);
+ serverManager = new ServerManager(this, metrics, regionServerOperationQueue);
+ regionManager = new RegionManager(this);
+ // create a sleeper to sleep for a configured wait frequency
+ int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
+ this.sleeper = new Sleeper(threadWakeFrequency, this.closed);
- // set the thread name
- setName(MASTER);
- // create the master metrics object
- this.metrics = new MasterMetrics(MASTER);
+ /*
+ * 4. Block on becoming the active master.
+ *
+ * We race with other masters to write our address into ZooKeeper. If we
+ * succeed, we are the primary/active master and finish initialization.
+ *
+ * If we do not succeed, there is another active master and we should
+ * now wait until it dies to try and become the next active master. If
+ * we do not succeed on our first attempt, this is no longer a cluster
+ * startup.
+ */
+ activeMasterManager = new ActiveMasterManager(zooKeeperWrapper, address,
+ this);
+ zooKeeperWrapper.registerListener(activeMasterManager);
+ // Wait here until we are the active master
+ activeMasterManager.blockUntilBecomingActiveMaster();
- serverManager = new ServerManager(this, metrics, regionServerOperationQueue);
+ // We are the active master now.
-
- // Start the unassigned watcher - which will create the unassigned region
- // in ZK. This is needed before RegionManager() constructor tries to assign
- // the root region.
- ZKUnassignedWatcher.start(this.conf, this);
- // start the "close region" executor service
- HBaseEventType.RS2ZK_REGION_CLOSED.startMasterExecutorService(address.toString());
- // start the "open region" executor service
- HBaseEventType.RS2ZK_REGION_OPENED.startMasterExecutorService(address.toString());
+ LOG.info("Server has become the active/primary master. Address is " +
+ this.address.toString());
-
- // start the region manager
- regionManager = new RegionManager(this);
+ // run() is executed next
+ }
- // We're almost open for business
- this.closed.set(false);
- LOG.info("HMaster initialized on " + this.address.toString());
+ /**
+ * Main processing loop for the HMaster.
+ * 1. Handle both fresh cluster start as well as failed over initialization of
+ * the HMaster.
+ * 2. Start the necessary services
+ * 3. Reassign the root region
+ * 4. The master is no longer closed - set "closed" to false
+ */
+ @Override
+ public void run() {
+ try {
+ // If this is a fresh cluster start, make sure the root region exists.
+ if(isClusterStartup()) {
+ // Initialize the filesystem, which does the following:
+ // - Creates the root hbase directory in the FS
+ // - Checks the FS to make sure the root directory is readable
+ // - Creates the archive directory for logs
+ fileSystemManager.initialize();
+ // Do any log splitting necessary
+ // TODO: Should do this in background rather than block master startup
+ fileSystemManager.splitLogAfterStartup();
+ }
+ // TODO: fix the logic and naming for joinCluster()
+ joinCluster();
+ // start up all service threads.
+ startServiceThreads();
+ // assign the root region
+ regionManager.reassignRootRegion();
+ // set the master as opened
+ this.closed.set(false);
+ LOG.info("HMaster started on " + this.address.toString());
+
+ while (!this.closed.get()) {
+ // check if we should be shutting down
+ if (this.shutdownRequested.get()) {
+ // The region servers won't all exit until we stop scanning the
+ // meta regions
+ this.regionManager.stopScanners();
+ if (this.serverManager.numServers() == 0) {
+ startShutdown();
+ break;
+ }
+ else {
+ LOG.debug("Waiting on " +
+ this.serverManager.getServersToServerInfo().keySet().toString());
+ }
+ }
+
+ // process the operation, handle the result
+ ProcessingResultCode resultCode = regionServerOperationQueue.process();
+ // If FAILED op processing, bad. Will exit.
+ if(resultCode == ProcessingResultCode.FAILED) {
+ break;
+ }
+ // If bad filesystem, exit
+ else if(resultCode == ProcessingResultCode.REQUEUED_BUT_PROBLEM) {
+ if (!fileSystemManager.checkFileSystem()) {
+ break;
+ }
+ }
+ // Continue run loop if conditions are PROCESSED, NOOP, REQUEUED
+ }
+ } catch (Throwable t) {
+ LOG.fatal("Unhandled exception. Starting shutdown.", t);
+ setClosed();
+ }
+
+ // Wait for all the remaining region servers to report in.
+ this.serverManager.letRegionServersShutdown();
+
+ // Clean up and close up shop
+ if (this.infoServer != null) {
+ LOG.info("Stopping infoServer");
+ try {
+ this.infoServer.stop();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
+ this.rpcServer.stop();
+ this.regionManager.stop();
+ this.zooKeeperWrapper.close();
+ HBaseExecutorService.shutdown();
+ LOG.info("HMaster main thread exiting");
}
-
+
/**
- * Returns true if this master process was responsible for starting the
- * cluster.
+ * Returns true if this master process was responsible for starting the
+ * cluster, false if not.
*/
public boolean isClusterStartup() {
return isClusterStartup;
}
-
- public void resetClusterStartup() {
- isClusterStartup = false;
+
+ /**
+ * Sets whether this is a cluster startup or not. Used by the
+ * {@link ActiveMasterManager} to set to false if we determine another master
+ * has become the primary.
+ * @param isClusterStartup false if another master became active before us
+ */
+ public void setClusterStartup(boolean isClusterStartup) {
+ this.isClusterStartup = isClusterStartup;
}
-
+
public HServerAddress getHServerAddress() {
return address;
}
@@ -274,7 +376,7 @@
public InfoServer getInfoServer() {
return this.infoServer;
}
-
+
/**
* Return the file systen manager instance
*/
@@ -304,7 +406,7 @@
public void setClosed() {
this.closed.set(true);
}
-
+
public AtomicBoolean getClosed() {
return this.closed;
}
@@ -330,71 +432,11 @@
public Path getRootDir() {
return fileSystemManager.getRootDir();
}
-
+
public RegionServerOperationQueue getRegionServerOperationQueue() {
return this.regionServerOperationQueue;
}
- /** Main processing loop */
- @Override
- public void run() {
- joinCluster();
- startServiceThreads();
- /* Main processing loop */
- try {
- FINISHED: while (!this.closed.get()) {
- // check if we should be shutting down
- if (this.shutdownRequested.get()) {
- // The region servers won't all exit until we stop scanning the
- // meta regions
- this.regionManager.stopScanners();
- if (this.serverManager.numServers() == 0) {
- startShutdown();
- break;
- } else {
- LOG.debug("Waiting on " +
- this.serverManager.getServersToServerInfo().keySet().toString());
- }
- }
- switch (this.regionServerOperationQueue.process()) {
- case FAILED:
- // If FAILED op processing, bad. Exit.
- break FINISHED;
- case REQUEUED_BUT_PROBLEM:
- if (!fileSystemManager.checkFileSystem())
- // If bad filesystem, exit.
- break FINISHED;
- default:
- // Continue run loop if conditions are PROCESSED, NOOP, REQUEUED
- break;
- }
- }
- } catch (Throwable t) {
- LOG.fatal("Unhandled exception. Starting shutdown.", t);
- setClosed();
- }
-
- // Wait for all the remaining region servers to report in.
- this.serverManager.letRegionServersShutdown();
-
- /*
- * Clean up and close up shop
- */
- if (this.infoServer != null) {
- LOG.info("Stopping infoServer");
- try {
- this.infoServer.stop();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- this.rpcServer.stop();
- this.regionManager.stop();
- this.zooKeeperWrapper.close();
- HBaseExecutorService.shutdown();
- LOG.info("HMaster main thread exiting");
- }
-
/*
* Joins cluster. Checks to see if this instance of HBase is fresh or the
* master was started following a failover. In the second case, it inspects
@@ -461,6 +503,15 @@
*/
private void startServiceThreads() {
try {
+ // Start the unassigned watcher - which will create the unassigned region
+ // in ZK. This is needed before RegionManager() constructor tries to assign
+ // the root region.
+ ZKUnassignedWatcher.start(this.conf, this);
+ // start the "close region" executor service
+ HBaseEventType.RS2ZK_REGION_CLOSED.startMasterExecutorService(address.toString());
+ // start the "open region" executor service
+ HBaseEventType.RS2ZK_REGION_OPENED.startMasterExecutorService(address.toString());
+ // start the region manager
this.regionManager.start();
// Put up info server.
int port = this.conf.getInt("hbase.master.info.port", 60010);
@@ -578,7 +629,8 @@
startKey = endKey;
}
}
- for (int tries = 0; tries < this.numRetries; tries++) {
+ int numRetries = conf.getInt("hbase.client.retries.number", 2);
+ for (int tries = 0; tries < numRetries; tries++) {
try {
// We can not create a table unless meta regions have already been
// assigned and scanned.
@@ -594,7 +646,7 @@
} catch (TableExistsException e) {
throw e;
} catch (IOException e) {
- if (tries == this.numRetries - 1) {
+ if (tries == numRetries - 1) {
throw RemoteExceptionHandler.checkIOException(e);
}
this.sleeper.sleep();
@@ -688,11 +740,14 @@
new MetaScannerVisitor() {
@Override
public boolean processRow(Result data) throws IOException {
- if (data == null || data.size() <= 0)
+ if (data == null || data.size() <= 0) {
return true;
+ }
Pair
+ *
+ */
+ public void initialize() throws IOException {
+ // check if the root directory exists
checkRootDir(this.rootdir, conf, this.fs);
// Make sure the region servers can archive their old logs
- this.oldLogDir = new Path(this.rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
if(!this.fs.exists(this.oldLogDir)) {
this.fs.mkdirs(this.oldLogDir);
}
Index: src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java (revision 961971)
+++ src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java (working copy)
@@ -1,129 +0,0 @@
-/**
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HServerAddress;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-
-/**
- * ZooKeeper watcher for the master address. Also watches the cluster state
- * flag so will shutdown this master if cluster has been shutdown.
- *