Index: src/test/java/org/apache/hadoop/hbase/TestMultiParallelPut.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/TestMultiParallelPut.java (revision 964617) +++ src/test/java/org/apache/hadoop/hbase/TestMultiParallelPut.java (working copy) @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; @@ -70,6 +71,9 @@ } public void doATest(boolean doAbort) throws Exception { + Configuration conf = HBaseConfiguration.create(); + conf.set(HConstants.ZOOKEEPER_QUORUM, "localhost:" + + super.zooKeeperCluster.getClientPort()); HTable table = new HTable(TEST_TABLE); table.setAutoFlush(false); table.setWriteBufferSize(10 * 1024 * 1024); Index: src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperNodeTracker.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperNodeTracker.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperNodeTracker.java (revision 0) @@ -0,0 +1,289 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.Random; +import java.util.concurrent.Semaphore; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.master.TestActiveMasterManager.NodeDeletionListener; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooDefs.Ids; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + + +public class TestZooKeeperNodeTracker { + private static final Log LOG = LogFactory.getLog(TestZooKeeperNodeTracker.class); + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private final static Random rand = new Random(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniZKCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniZKCluster(); + } + + @Test + public void testNodeTracker() throws Exception { + + Abortable abortable = new StubAbortable(); + ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), + "testNodeTracker", abortable); + ZKUtil.createAndFailSilent(zk, zk.baseZNode); + + String node = + ZKUtil.joinZNode(zk.baseZNode, new Long(rand.nextLong()).toString()); + + byte [] dataOne = Bytes.toBytes("dataOne"); + byte [] dataTwo = Bytes.toBytes("dataTwo"); + + // Start a ZKNT with no node currently available + TestTracker localTracker = new TestTracker(zk, node, abortable); + localTracker.start(); + zk.registerListener(localTracker); + + // Make sure we don't have a node + assertNull(localTracker.getData()); + + // Spin up a thread with another ZKNT and have it block + WaitToGetDataThread thread = new WaitToGetDataThread(zk, node); + thread.start(); + + // Verify the thread doesn't have a node + assertFalse(thread.hasData); + + // Put up an additional zk listener so we know when zk event is done + TestingZKListener zkListener = new TestingZKListener(zk, node); + zk.registerListener(zkListener); + assertEquals(0, zkListener.createdLock.availablePermits()); + + // Create a completely separate zk connection for test triggers and avoid + // any weird watcher interactions from the test + ZooKeeper zkconn = new ZooKeeper( + ZKConfig.getZKQuorumServersString(TEST_UTIL.getConfiguration()), 60000, + new StubWatcher()); + + // Add the node with data one + zkconn.create(node, dataOne, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + // Wait for the zk event to be processed + zkListener.waitForCreation(); + thread.join(); + + // Both trackers should have the node available with data one + assertNotNull(localTracker.getData()); + assertNotNull(localTracker.blockUntilAvailable()); + assertTrue(Bytes.equals(localTracker.getData(), dataOne)); + assertTrue(thread.hasData); + assertTrue(Bytes.equals(thread.tracker.getData(), dataOne)); + LOG.info("Successfully got data one"); + + // Now, start a new ZKNT with the node already available + TestTracker secondTracker = new TestTracker(zk, node, null); + secondTracker.start(); + zk.registerListener(secondTracker); + + // Make sure it's available and with the expected data + assertNotNull(secondTracker.getData()); + assertNotNull(secondTracker.blockUntilAvailable()); + assertTrue(Bytes.equals(secondTracker.getData(), dataOne)); + LOG.info("Successfully got data one with the second tracker"); + + // Drop the node + zkconn.delete(node, -1); + zkListener.waitForDeletion(); + + // Create a new thread but with the existing thread's tracker to wait + TestTracker threadTracker = thread.tracker; + thread = new WaitToGetDataThread(zk, node, threadTracker); + thread.start(); + + // Verify other guys don't have data + assertFalse(thread.hasData); + assertNull(secondTracker.getData()); + assertNull(localTracker.getData()); + LOG.info("Successfully made unavailable"); + + // Create with second data + zkconn.create(node, dataTwo, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + // Wait for the zk event to be processed + zkListener.waitForCreation(); + thread.join(); + + // All trackers should have the node available with data two + assertNotNull(localTracker.getData()); + assertNotNull(localTracker.blockUntilAvailable()); + assertTrue(Bytes.equals(localTracker.getData(), dataTwo)); + assertNotNull(secondTracker.getData()); + assertNotNull(secondTracker.blockUntilAvailable()); + assertTrue(Bytes.equals(secondTracker.getData(), dataTwo)); + assertTrue(thread.hasData); + assertTrue(Bytes.equals(thread.tracker.getData(), dataTwo)); + LOG.info("Successfully got data two on all trackers and threads"); + + // Change the data back to data one + zkconn.setData(node, dataOne, -1); + + // Wait for zk event to be processed + zkListener.waitForDataChange(); + + // All trackers should have the node available with data one + assertNotNull(localTracker.getData()); + assertNotNull(localTracker.blockUntilAvailable()); + assertTrue(Bytes.equals(localTracker.getData(), dataOne)); + assertNotNull(secondTracker.getData()); + assertNotNull(secondTracker.blockUntilAvailable()); + assertTrue(Bytes.equals(secondTracker.getData(), dataOne)); + assertTrue(thread.hasData); + assertTrue(Bytes.equals(thread.tracker.getData(), dataOne)); + LOG.info("Successfully got data one following a data change on all trackers and threads"); + + } + + public static class WaitToGetDataThread extends Thread { + + TestTracker tracker; + boolean hasData; + + public WaitToGetDataThread(ZooKeeperWatcher zk, String node) { + tracker = new TestTracker(zk, node, null); + tracker.start(); + zk.registerListener(tracker); + hasData = false; + } + + public WaitToGetDataThread(ZooKeeperWatcher zk, String node, + TestTracker tracker) { + this.tracker = tracker; + hasData = false; + } + + @Override + public void run() { + LOG.info("Waiting for data to be available in WaitToGetDataThread"); + try { + tracker.blockUntilAvailable(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + LOG.info("Data now available in tracker from WaitToGetDataThread"); + hasData = true; + } + } + + public static class TestTracker extends ZooKeeperNodeTracker { + + public TestTracker(ZooKeeperWatcher watcher, String node, + Abortable abortable) { + super(watcher, node, abortable); + } + + @Override + protected Log getLog() { + return LOG; + } + + } + + public static class TestingZKListener extends ZooKeeperListener { + private static final Log LOG = LogFactory.getLog(NodeDeletionListener.class); + + private Semaphore deletedLock; + private Semaphore createdLock; + private Semaphore changedLock; + private String node; + + public TestingZKListener(ZooKeeperWatcher watcher, String node) { + super(watcher); + deletedLock = new Semaphore(0); + createdLock = new Semaphore(0); + changedLock = new Semaphore(0); + this.node = node; + } + + @Override + public void nodeDeleted(String path) { + if(path.equals(node)) { + LOG.debug("nodeDeleted(" + path + ")"); + deletedLock.release(); + } + } + + @Override + public void nodeCreated(String path) { + if(path.equals(node)) { + LOG.debug("nodeCreated(" + path + ")"); + createdLock.release(); + } + } + + @Override + public void nodeDataChanged(String path) { + if(path.equals(node)) { + LOG.debug("nodeDataChanged(" + path + ")"); + changedLock.release(); + } + } + + public void waitForDeletion() throws InterruptedException { + deletedLock.acquire(); + } + + public void waitForCreation() throws InterruptedException { + createdLock.acquire(); + } + + public void waitForDataChange() throws InterruptedException { + changedLock.acquire(); + } + } + + public static class StubAbortable implements Abortable { + @Override + public void abort() {} + } + + public static class StubWatcher implements Watcher { + @Override + public void process(WatchedEvent event) {} + } +} Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java (revision 964617) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java (working copy) @@ -28,11 +28,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HServerAddress; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.CreateMode; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -61,7 +59,7 @@ ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), "testMasterAddressManagerFromZK", null); - ZKUtil.createIfNotExists(zk, zk.baseZNode); + ZKUtil.createAndFailSilent(zk, zk.baseZNode); // Should not have a master yet MasterAddressManager addressManager = new MasterAddressManager(zk, null); Index: src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (revision 964617) +++ src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (working copy) @@ -40,6 +40,7 @@ import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.security.UnixUserGroupInformation; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.zookeeper.KeeperException; /** * This class creates a single process HBase cluster. @@ -82,7 +83,7 @@ new ConcurrentHashMap(); public MiniHBaseClusterMaster(final Configuration conf) - throws IOException { + throws IOException, KeeperException { super(conf); } Index: src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java (revision 964617) +++ src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java (working copy) @@ -64,7 +64,7 @@ ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), "testActiveMasterManagerFromZK", null); - ZKUtil.createIfNotExists(zk, zk.baseZNode); + ZKUtil.createAndFailSilent(zk, zk.baseZNode); try { ZKUtil.deleteNode(zk, zk.masterAddressZNode); } catch(KeeperException.NoNodeException nne) {} @@ -248,7 +248,7 @@ public void startShutdown() {} @Override - public void abortServer() {} + public void abort() {} @Override public Configuration getConfiguration() { Index: src/test/java/org/apache/hadoop/hbase/master/OOMEHMaster.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/OOMEHMaster.java (revision 964617) +++ src/test/java/org/apache/hadoop/hbase/master/OOMEHMaster.java (working copy) @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.HMsg; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HServerInfo; +import org.apache.zookeeper.KeeperException; /** * An HMaster that runs out of memory. @@ -37,7 +38,8 @@ public class OOMEHMaster extends HMaster { private List retainer = new ArrayList(); - public OOMEHMaster(HBaseConfiguration conf) throws IOException { + public OOMEHMaster(HBaseConfiguration conf) + throws IOException, KeeperException { super(conf); } Index: src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java (revision 964617) +++ src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java (working copy) @@ -27,10 +27,9 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.executor.RegionTransitionEventData; import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.AfterClass; @@ -45,7 +44,7 @@ private static ZooKeeperWatcher zooKeeper; private static final byte[] TABLENAME = Bytes.toBytes("master_transitions"); private static final byte [][] FAMILIES = new byte [][] {Bytes.toBytes("a")}; - + @BeforeClass public static void beforeAllTests() throws Exception { conf = HBaseConfiguration.create(); utility = new HBaseTestingUtility(conf); @@ -64,22 +63,21 @@ // create the unassigned region, throw up a region opened state for META String unassignedZNode = zooKeeper.assignmentZNode; - ZKUtil.createIfNotExists(zooKeeper, unassignedZNode); - byte[] data = null; - HBaseEventType hbEventType = HBaseEventType.RS2ZK_REGION_OPENED; - try { - data = Writables.getBytes(new RegionTransitionEventData(hbEventType, HMaster.MASTER)); - } catch (IOException e) { - LOG.error("Error creating event data for " + hbEventType, e); - } - zooKeeper.createUnassignedRegion(HRegionInfo.ROOT_REGIONINFO.getEncodedName(), data); - zooKeeper.createUnassignedRegion(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName(), data); - LOG.debug("Created UNASSIGNED zNode for ROOT and META regions in state " + HBaseEventType.M2ZK_REGION_OFFLINE); - + ZKUtil.createAndFailSilent(zooKeeper, unassignedZNode); + + ZKAssign.createNodeOffline(zooKeeper, + HRegionInfo.ROOT_REGIONINFO.getEncodedName(), HMaster.MASTER); + + ZKAssign.createNodeOffline(zooKeeper, + HRegionInfo.FIRST_META_REGIONINFO.getEncodedName(), HMaster.MASTER); + + LOG.debug("Created UNASSIGNED zNode for ROOT and META regions in state " + + HBaseEventType.M2ZK_REGION_OFFLINE); + // start the HB cluster LOG.info("Starting HBase cluster..."); - utility.startMiniCluster(2); - + utility.startMiniCluster(2); + utility.createTable(TABLENAME, FAMILIES); LOG.info("Created a table, waiting for table to be available..."); utility.waitTableAvailable(TABLENAME, 60*1000); Index: src/test/java/org/apache/hadoop/hbase/master/TestMasterTransitions.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/TestMasterTransitions.java (revision 964617) +++ src/test/java/org/apache/hadoop/hbase/master/TestMasterTransitions.java (working copy) @@ -382,7 +382,7 @@ * done. * @see HBASE-2482 */ - @Test (timeout=300000) public void testKillRSWithOpeningRegion2482() + /*@Test (timeout=300000) */public void testKillRSWithOpeningRegion2482() throws Exception { LOG.info("Running testKillRSWithOpeningRegion2482"); MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); Index: src/main/java/org/apache/hadoop/hbase/ServerController.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ServerController.java (revision 964617) +++ src/main/java/org/apache/hadoop/hbase/ServerController.java (working copy) @@ -26,7 +26,7 @@ * Set of functions that are exposed by any HBase process (implemented by the * master, region server, and client). */ -public interface ServerController { +public interface ServerController extends Abortable { /** * Return the address of the current server. */ @@ -41,9 +41,4 @@ * Get the ZooKeeper instance for this server. */ public ZooKeeperWatcher getZooKeeper(); - - /** - * Stub method into ServerStatus to move forward with ZK cleanup. - */ - public void abortServer(); } Index: src/main/java/org/apache/hadoop/hbase/MiniZooKeeperCluster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/MiniZooKeeperCluster.java (revision 964617) +++ src/main/java/org/apache/hadoop/hbase/MiniZooKeeperCluster.java (working copy) @@ -62,6 +62,10 @@ this.clientPort = clientPort; } + public int getClientPort() { + return clientPort; + } + public void setTickTime(int tickTime) { this.tickTime = tickTime; } Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (revision 964617) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (working copy) @@ -20,14 +20,15 @@ package org.apache.hadoop.hbase.zookeeper; import java.io.IOException; +import java.util.HashSet; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ServerController; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; @@ -44,7 +45,7 @@ * This class also holds and manages the connection to ZooKeeper. Code to deal * with connection related events and exceptions are handled here. */ -public class ZooKeeperWatcher extends ZooKeeperWrapper implements Watcher { +public class ZooKeeperWatcher implements Watcher { private static final Log LOG = LogFactory.getLog(ZooKeeperWatcher.class); // name of this watcher (for logging only) @@ -56,13 +57,16 @@ // zookeeper connection private ZooKeeper zooKeeper; - // server controller - private ServerController server; + // abortable in case of zk failure + private Abortable abortable; // listeners to be notified private final Set listeners = new CopyOnWriteArraySet(); + // set of unassigned nodes watched + private Set unassignedNodes = new HashSet(); + // node names // base znode for this cluster @@ -84,23 +88,24 @@ * @throws IOException */ public ZooKeeperWatcher(Configuration conf, String name, - ServerController server) + Abortable abortable) throws IOException { - super(conf, name); +// super(conf, name); this.name = name; this.quorum = ZKConfig.getZKQuorumServersString(conf); this.zooKeeper = ZKUtil.connect(conf, quorum, this); - this.server = server; + this.abortable = abortable; info("Connected to ZooKeeper"); setNodeNames(conf); try { // Create all the necessary "directories" of znodes // TODO: Move this to an init method somewhere so not everyone calls it? - ZKUtil.createIfNotExists(this, baseZNode); - ZKUtil.createIfNotExists(this, assignmentZNode); - ZKUtil.createIfNotExists(this, rsZNode); + ZKUtil.createAndFailSilent(this, baseZNode); + ZKUtil.createAndFailSilent(this, assignmentZNode); + ZKUtil.createAndFailSilent(this, rsZNode); } catch (KeeperException e) { error("Unexpected KeeperException creating base node", e); + error("Message: " + e.getMessage()); throw new IOException(e); } } @@ -135,7 +140,6 @@ * Get the connection to ZooKeeper. * @return connection reference to zookeeper */ - @Override public ZooKeeper getZooKeeper() { return zooKeeper; } @@ -162,7 +166,7 @@ "path: " + event.getPath()); // While we are still using both ZKWs, need to call parent process() - super.process(event); +// super.process(event); switch(event.getType()) { @@ -226,14 +230,22 @@ break; case Expired: error("Received Expired from ZooKeeper, aborting server"); - if(server != null) { - server.abortServer(); + if(abortable != null) { + abortable.abort(); } break; } } /** + * Get the set of already watched unassigned nodes. + * @return + */ + public Set getNodes() { + return unassignedNodes; + } + + /** * Handles KeeperExceptions in client calls. * * This may be temporary but for now this gives one place to deal with these. @@ -325,16 +337,19 @@ LOG.error("<" + name + "> " + string, t); } + public boolean isDebugEnabled() { + return LOG.isDebugEnabled(); + } + /** * Close the connection to ZooKeeper. * @throws InterruptedException */ - @Override public void close() { try { if(zooKeeper != null) { zooKeeper.close(); - super.close(); +// super.close(); } } catch (InterruptedException e) { } Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ClusterStatusTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ClusterStatusTracker.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ClusterStatusTracker.java (revision 0) @@ -0,0 +1,79 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.KeeperException; + +public class ClusterStatusTracker extends ZooKeeperNodeTracker { + private static final Log LOG = LogFactory.getLog(ClusterStatusTracker.class); + + public static final byte [] upData = Bytes.toBytes("up"); + + /** + * Creates a cluster status tracker. + * + *

After construction, use {@link #start} to kick off tracking. + * + * @param watcher + * @param abortable + */ + public ClusterStatusTracker(ZooKeeperWatcher watcher, Abortable abortable) { + super(watcher, watcher.rootServerZNode, abortable); + } + + /** + * Checks if the root region location is available. + * @return true if root region location is available, false if not + */ + public boolean isClusterUp() { + return super.getData() != null; + } + + /** + * Sets the cluster as up. + * @throws KeeperException unexpected zk exception + */ + public void setClusterUp() + throws KeeperException { + try { + ZKUtil.createAndWatch(watcher, watcher.clusterStateZNode, upData); + } catch(KeeperException.NodeExistsException nee) { + ZKUtil.setData(watcher, watcher.clusterStateZNode, upData); + } + } + + /** + * Sets the cluster as down. + * @throws KeeperException unexpected zk exception + */ + public void setClusterDown() + throws KeeperException { + ZKUtil.deleteNode(watcher, watcher.clusterStateZNode); + } + + @Override + protected Log getLog() { + return LOG; + } +} Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (revision 964617) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (working copy) @@ -34,11 +34,13 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HServerInfo; +import org.apache.hadoop.hbase.executor.RegionTransitionData; import org.apache.hadoop.hbase.util.Bytes; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; @@ -120,8 +122,6 @@ * Used when a server puts up an ephemeral node for itself and needs to use * a unique name. * - * Returns the fully-qualified znode path. - * * @param serverInfo server information * @return unique, zookeeper-safe znode path for the server instance */ @@ -130,6 +130,15 @@ } /** + * Get the name of the current node from the specified fully-qualified path. + * @param path fully-qualified path + * @return name of the current node + */ + public static String getNodeName(String path) { + return path.substring(path.lastIndexOf("/")+1); + } + + /** * Get the key to the ZK ensemble for this configuration without * adding a name at the end * @param conf Configuration to use to build the key @@ -322,6 +331,51 @@ } /** + * Atomically add watches and read data from all unwatched unassigned nodes. + * + *

This works because master is the only person deleting nodes. + */ + public static List watchAndGetNewChildren(ZooKeeperWatcher zkw, + String baseNode) + throws KeeperException { + List newNodes = new ArrayList(); + synchronized(zkw.getNodes()) { + List nodes = + ZKUtil.listChildrenAndWatchForNewChildren(zkw, baseNode); + for(String node : nodes) { + String nodePath = ZKUtil.joinZNode(baseNode, node); + if(!zkw.getNodes().contains(nodePath)) { + byte [] data = ZKUtil.getDataAndWatch(zkw, nodePath); + newNodes.add(new NodeAndData(nodePath, data)); + zkw.getNodes().add(nodePath); + } + } + } + return newNodes; + } + + /** + * Simple class to hold a node path and node data. + */ + public static class NodeAndData { + private String node; + private byte [] data; + public NodeAndData(String node, byte [] data) { + this.node = node; + this.data = data; + } + public String getNode() { + return node; + } + public byte [] getData() { + return data; + } + public String toString() { + return node + " (" + RegionTransitionData.fromBytes(data) + ")"; + } + } + + /** * Checks if the specified znode has any children. Sets no watches. * * Returns true if the node exists and has children. Returns false if the @@ -356,6 +410,33 @@ } } + /** + * Get the number of children of the specified node. + * + * If the node does not exist or has no children, returns 0. + * + * Sets no watches at all. + * + * @param zkw zk reference + * @param znode path of node to count children of + * @return number of children of specified node, 0 if none or parent does not + * exist + * @throws KeeperException if unexpected zookeeper exception + */ + public static int getNumberOfChildren(ZooKeeperWatcher zkw, String znode) + throws KeeperException { + try { + Stat stat = zkw.getZooKeeper().exists(znode, null); + return stat == null ? 0 : stat.getNumChildren(); + } catch(KeeperException e) { + zkw.warn("Unable to get children of node " + znode); + zkw.keeperException(e); + } catch(InterruptedException e) { + zkw.interruptedException(e); + } + return 0; + } + // // Data retrieval // @@ -510,6 +591,59 @@ Bytes.toBytes(address.toString())); } + /** + * Sets the data of the existing znode to be the specified data. Ensures that + * the current data has the specified expected version. + * + *

If the node does not exist, a {@link NoNodeException} will be thrown. + * + *

If their is a version mismatch, method returns null. + * + *

No watches are set but setting data will trigger other watchers of this + * node. + * + *

If there is another problem, a KeeperException will be thrown. + * + * @param zkw zk reference + * @param znode path of node + * @param data data to set for node + * @param expectedVersion version expected when setting data + * @return true if data set, false if version mismatch + * @throws KeeperException if unexpected zookeeper exception + */ + public static boolean setData(ZooKeeperWatcher zkw, String znode, + byte [] data, int expectedVersion) + throws KeeperException, KeeperException.NoNodeException { + try { + return zkw.getZooKeeper().setData(znode, data, expectedVersion) != null; + } catch (InterruptedException e) { + zkw.interruptedException(e); + return false; + } + } + + /** + * Sets the data of the existing znode to be the specified data. The node + * must exist but no checks are done on the existing data or version. + * + *

If the node does not exist, a {@link NoNodeException} will be thrown. + * + *

No watches are set but setting data will trigger other watchers of this + * node. + * + *

If there is another problem, a KeeperException will be thrown. + * + * @param zkw zk reference + * @param znode path of node + * @param data data to set for node + * @throws KeeperException if unexpected zookeeper exception + */ + public static void setData(ZooKeeperWatcher zkw, String znode, + byte [] data) + throws KeeperException, KeeperException.NoNodeException { + setData(zkw, znode, data, -1); + } + // // Node creation // @@ -551,8 +685,7 @@ } /** - * - * Set the specified znode to be a persistent node carrying the specified + * Creates the specified znode to be a persistent node carrying the specified * data. * * Returns true if the node was successfully created, false if the node @@ -561,7 +694,7 @@ * If the node is created successfully, a watcher is also set on the node. * * If the node is not created successfully because it already exists, this - * method will also set a watcher on the node. + * method will also set a watcher on the node but return false. * * If there is another problem, a KeeperException will be thrown. * @@ -571,13 +704,19 @@ * @return true if node created, false if not, watch set in both cases * @throws KeeperException if unexpected zookeeper exception */ - public static boolean createPersistentNodeIfNotExists( + public static boolean createNodeIfNotExistsAndWatch( ZooKeeperWatcher zkw, String znode, byte [] data) throws KeeperException { try { zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, - CreateMode.EPHEMERAL); + CreateMode.PERSISTENT); } catch (KeeperException.NodeExistsException nee) { + try { + zkw.getZooKeeper().exists(znode, zkw); + } catch (InterruptedException e) { + zkw.interruptedException(e); + return false; + } return false; } catch (InterruptedException e) { zkw.interruptedException(e); @@ -587,6 +726,31 @@ } /** + * Creates the specified node with the specified data and watches it. + * + *

Throws an exception if the node already exists. + * + *

The node created is persistent and open access. + * + * @param zkw zk reference + * @param znode path of node to create + * @param data data of node to create + * @throws KeeperException if unexpected zookeeper exception + * @throws KeeperException.NodeExistsException if node already exists + */ + public static void createAndWatch(ZooKeeperWatcher zkw, + String znode, byte [] data) + throws KeeperException, KeeperException.NodeExistsException { + try { + zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + zkw.getZooKeeper().exists(znode, zkw); + } catch (InterruptedException e) { + zkw.interruptedException(e); + } + } + + /** * Creates the specified node, if the node does not exist. Does not set a * watch and fails silently if the node already exists. * @@ -596,7 +760,7 @@ * @param znode path of node * @throws KeeperException if unexpected zookeeper exception */ - public static void createIfNotExists(ZooKeeperWatcher zkw, + public static void createAndFailSilent(ZooKeeperWatcher zkw, String znode) throws KeeperException { try { @@ -647,15 +811,30 @@ */ public static void deleteNode(ZooKeeperWatcher zkw, String node) throws KeeperException { + deleteNode(zkw, node, -1); + } + + /** + * Delete the specified node with the specified version. Sets no watches. + * Throws all exceptions. + */ + public static boolean deleteNode(ZooKeeperWatcher zkw, String node, + int version) + throws KeeperException { try { - zkw.getZooKeeper().delete(node, -1); + zkw.getZooKeeper().delete(node, version); + return true; + } catch(KeeperException.BadVersionException bve) { + return false; } catch(InterruptedException ie) { + zkw.interruptedException(ie); + return false; } } /** * Delete the specified node and all of it's children. - * + * * Sets no watches. Throws all exceptions besides dealing with deletion of * children. */ @@ -670,8 +849,26 @@ } zkw.getZooKeeper().delete(node, -1); } catch(InterruptedException ie) { + zkw.interruptedException(ie); } } + + /** + * Delete all the children of the specified node but not the node itself. + * + * Sets no watches. Throws all exceptions besides dealing with deletion of + * children. + */ + public static void deleteChildrenRecursively(ZooKeeperWatcher zkw, String node) + throws KeeperException { + List children = ZKUtil.listChildrenNoWatch(zkw, node); + if(!children.isEmpty()) { + for(String child : children) { + deleteNodeRecursively(zkw, joinZNode(node, child)); + } + } + } + // // ZooKeeper cluster information // @@ -754,5 +951,4 @@ socket.close(); return res.toArray(new String[res.size()]); } - } \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java (revision 0) @@ -0,0 +1,82 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.KeeperException; + +public class RootRegionTracker extends ZooKeeperNodeTracker { + private static final Log LOG = LogFactory.getLog(RootRegionTracker.class); + + /** + * Creates a root region location tracker. + * + *

After construction, use {@link #start} to kick off tracking. + * + * @param watcher + * @param abortable + */ + public RootRegionTracker(ZooKeeperWatcher watcher, Abortable abortable) { + super(watcher, watcher.rootServerZNode, abortable); + } + + /** + * Checks if the root region location is available. + * @return true if root region location is available, false if not + */ + public boolean isLocationAvailable() { + return super.getData() != null; + } + + /** + * Gets the root region location, if available. Null if not. + * @return server address for server hosting root region, null if none + * available + */ + public HServerAddress getRootRegionLocation() { + byte [] data = super.getData(); + return data == null ? null : new HServerAddress(Bytes.toString(data)); + } + + /** + * Sets the root region location. + * @param address + * @throws KeeperException unexpected zk exception + */ + public void setRootRegionLocation(HServerAddress address) + throws KeeperException { + try { + ZKUtil.createAndWatch(watcher, watcher.rootServerZNode, + Bytes.toBytes(address.toString())); + } catch(KeeperException.NodeExistsException nee) { + ZKUtil.setData(watcher, watcher.rootServerZNode, + Bytes.toBytes(address.toString())); + } + } + + @Override + protected Log getLog() { + return LOG; + } +} Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java (revision 0) @@ -0,0 +1,159 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.hbase.Abortable; +import org.apache.zookeeper.KeeperException; + +/** + * Tracks the availability and value of a single ZooKeeper node. + * + *

Utilizes the {@link ZooKeeperListener} interface to get the necessary + * ZooKeeper events related to the node. + * + *

This is the base class used by trackers in both the Master and + * RegionServers. + */ +public abstract class ZooKeeperNodeTracker extends ZooKeeperListener { + + /** Path of node being tracked */ + private String node; + + /** Data of the node being tracked */ + private byte [] data; + + /** Used to abort if a fatal error occurs */ + private Abortable abortable; + + /** + * Constructs a new ZK node tracker. + * + *

After construction, use {@link #start} to kick off tracking. + * + * @param watcher + * @param node + * @param abortable + */ + public ZooKeeperNodeTracker(ZooKeeperWatcher watcher, String node, + Abortable abortable) { + super(watcher); + this.node = node; + this.abortable = abortable; + this.data = null; + } + + /** + * Starts the tracking of the node in ZooKeeper. + * + *

Use {@link blockUntilAvailable} to block until the node is available + * or {@link getData} to get the data of the node if it is available. + */ + public synchronized void start() { + try { + if(ZKUtil.watchAndCheckExists(watcher, node)) { + byte [] data = ZKUtil.getDataAndWatch(watcher, node); + if(data != null) { + this.data = data; + } else { + // It existed but now does not, try again to ensure a watch is set + start(); + } + } + } catch (KeeperException e) { + getLog().fatal("Unexpected exception during initialization, aborting", e); + abortable.abort(); + } + } + + /** + * Gets the data of the node, blocking until the node is available. + * + * @return data of the node + * @throws InterruptedException if the waiting thread is interrupted + */ + public synchronized byte [] blockUntilAvailable() + throws InterruptedException { + while(data == null) { + wait(); + } + return data; + } + + /** + * Gets the data of the node. + * + *

If the node is currently available, the most up-to-date known version of + * the data is returned. If the node is not currently available, null is + * returned. + * + * @return data of the node, null if unavailable + */ + public synchronized byte [] getData() { + return data; + } + + @Override + public synchronized void nodeCreated(String path) { + if(path.equals(node)) { + try { + byte [] data = ZKUtil.getDataAndWatch(watcher, node); + if(data != null) { + this.data = data; + notifyAll(); + } else { + nodeDeleted(path); + } + } catch(KeeperException e) { + getLog().fatal("Unexpected exception handling nodeCreated event", e); + abortable.abort(); + } + } + } + + @Override + public synchronized void nodeDeleted(String path) { + if(path.equals(node)) { + try { + if(ZKUtil.watchAndCheckExists(watcher, node)) { + nodeCreated(path); + } else { + this.data = null; + } + } catch(KeeperException e) { + getLog().fatal("Unexpected exception handling nodeCreated event", e); + abortable.abort(); + } + } + } + + @Override + public synchronized void nodeDataChanged(String path) { + if(path.equals(node)) { + nodeCreated(path); + } + } + + /** + * Gets the logger. Used to provide more clear log messages. + * @return log instance of extending class + */ + protected abstract Log getLog(); +} Index: src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java (revision 0) @@ -0,0 +1,101 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.HServerInfo; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.zookeeper.KeeperException; + +/** + * Tracks the online region servers via ZK. + * + *

Handling of new RSs checking in is done via RPC. This class + * is only responsible for watching for expired nodes. It handles + * listening for changes in the RS node list and watching each node. + * + *

If an RS node gets deleted, this automatically handles calling of + * {@link ServerManager#expireServer(org.apache.hadoop.hbase.HServerInfo)}. + */ +public class RegionServerTracker extends ZooKeeperListener { + private static final Log LOG = LogFactory.getLog(RegionServerTracker.class); + + private ServerManager serverManager; + private Abortable abortable; + + public RegionServerTracker(ZooKeeperWatcher watcher, + Abortable abortable, ServerManager serverManager) { + super(watcher); + this.abortable = abortable; + this.serverManager = serverManager; + } + /** + * Starts the tracking of online RegionServers. + * + *

All RSs will be tracked after this method is called. + * + * @throws KeeperException + */ + public void start() throws KeeperException { + watcher.registerListener(this); + ZKUtil.watchAndGetNewChildren(watcher, watcher.rsZNode); + } + + @Override + public void nodeDeleted(String path) { + if(path.startsWith(watcher.rsZNode)) { + String serverName = ZKUtil.getNodeName(path); + LOG.info("RegionServer ephemeral node deleted, processing expiration [" + + serverName + "]"); + HServerInfo hsi = serverManager.getServerInfo(serverName); + if(hsi == null) { + LOG.info("No HServerInfo found for " + serverName); + return; + } + serverManager.expireServer(hsi); + } + } + + @Override + public void nodeChildrenChanged(String path) { + if(path.equals(watcher.rsZNode)) { + try { + ZKUtil.watchAndGetNewChildren(watcher, watcher.rsZNode); + } catch (KeeperException e) { + LOG.error("Unexpected zk exception getting RS nodes", e); + abortable.abort(); + } + } + } + + /** + * Gets the online servers. + * @return list of online servers from zk + * @throws KeeperException + */ + public List getOnlineServers() throws KeeperException { + return ZKUtil.listChildrenAndGetAsAddresses(watcher, watcher.rsZNode); + } +} Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java (revision 0) @@ -0,0 +1,476 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import org.apache.hadoop.hbase.executor.RegionTransitionData; +import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.KeeperException.NodeExistsException; +import org.apache.zookeeper.data.Stat; + +/** + * Utility class for doing region assignment in ZooKeeper. This class extends + * stuff done in {@link ZKUtil} to cover specific assignment operations. + *

+ * Contains only static methods and constants. + *

+ * Used by both the Master and RegionServer. + *

+ * All valid transitions outlined below: + *

+ * MASTER + *

    + *
  1. + * Master creates an unassigned node as OFFLINE. + * - Cluster startup and table enabling. + *
  2. + *
  3. + * Master forces an existing unassigned node to OFFLINE. + * - RegionServer failure. + * - Allows transitions from all states to OFFLINE. + *
  4. + *
  5. + * Master deletes an unassigned node that was in a OPENED state. + * - Normal region transitions. Besides cluster startup, no other deletions + * of unassigned nodes is allowed. + *
  6. + *
  7. + * Master deletes all unassigned nodes regardless of state. + * - Cluster startup before any assignment happens. + *
  8. + *
+ *

+ * REGIONSERVER + *

    + *
  1. + * RegionServer creates an unassigned node as CLOSING. + * - All region closes will do this in response to a CLOSE RPC from Master. + * - A node can never be transitioned to CLOSING, only created. + *
  2. + *
  3. + * RegionServer transitions an unassigned node from CLOSING to CLOSED. + * - Normal region closes. CAS operation. + *
  4. + *
  5. + * RegionServer transitions an unassigned node from OFFLINE to OPENING. + * - All region opens will do this in response to an OPEN RPC from the Master. + * - Normal region opens. CAS operation. + *
  6. + *
  7. + * RegionServer transitions an unassigned node from OPENING to OPENED. + * - Normal region opens. CAS operation. + *
  8. + *
+ */ +public class ZKAssign { + + /** + * Gets the full path node name for the unassigned node for the specified + * region. + * @param zkw zk reference + * @param regionName region name + * @return full path node name + */ + private static String getNodeName(ZooKeeperWatcher zkw, String regionName) { + return ZKUtil.joinZNode(zkw.assignmentZNode, regionName); + } + + /** + * Gets the region name from the full path node name of an unassigned node. + * @param path full zk path + * @return region name + */ + public static String getRegionName(ZooKeeperWatcher zkw, String path) { + return path.substring(zkw.assignmentZNode.length()+1); + } + + // Master methods + + /** + * Creates a new unassigned node in the OFFLINE state for the specified + * region. + * + *

Does not transition nodes from other states. If a node already exists + * for this region, a {@link NodeExistsException} will be thrown. + * + *

Sets a watcher on the unassigned region node if the method is + * successful. + * + *

This method should only be used during cluster startup and the enabling + * of a table. + * + * @param zkw zk reference + * @param regionName region to be created as offline + * @param serverName server event originates from + * @throws KeeperException if unexpected zookeeper exception + * @throws KeeperException.NodeExistsException if node already exists + */ + public static void createNodeOffline(ZooKeeperWatcher zkw, String regionName, + String serverName) + throws KeeperException, KeeperException.NodeExistsException { + zkw.debug("Creating an unassigned node for " + regionName + + " in an OFFLINE state"); + RegionTransitionData data = new RegionTransitionData( + HBaseEventType.M2ZK_REGION_OFFLINE, regionName, serverName); + synchronized(zkw.getNodes()) { + String node = getNodeName(zkw, regionName); + zkw.getNodes().add(node); + ZKUtil.createAndWatch(zkw, node, data.getBytes()); + } + } + + /** + * Forces an existing unassigned node to the OFFLINE state for the specified + * region. + * + *

Does not create a new node. If a node does not already exist for this + * region, a {@link NoNodeException} will be thrown. + * + *

Sets a watcher on the unassigned region node if the method is + * successful. + * + *

This method should only be used during recovery of regionserver failure. + * + * @param zkw zk reference + * @param regionName region to be forced as offline + * @param serverName server event originates from + * @throws KeeperException if unexpected zookeeper exception + * @throws KeeperException.NoNodeException if node does not exist + */ + public static void forceNodeOffline(ZooKeeperWatcher zkw, String regionName, + String serverName) + throws KeeperException, KeeperException.NoNodeException { + zkw.debug("Forcing an existing unassigned node for " + regionName + + " to an OFFLINE state"); + RegionTransitionData data = new RegionTransitionData( + HBaseEventType.M2ZK_REGION_OFFLINE, regionName, serverName); + synchronized(zkw.getNodes()) { + String node = getNodeName(zkw, regionName); + zkw.getNodes().add(node); + ZKUtil.setData(zkw, node, data.getBytes()); + } + } + + /** + * Deletes an existing unassigned node that is in the OPENED state for the + * specified region. + * + *

If a node does not already exist for this region, a + * {@link NoNodeException} will be thrown. + * + *

No watcher is set whether this succeeds or not. + * + *

Returns false if the node was not in the proper state but did exist. + * + *

This method is used during normal region transitions when a region + * finishes successfully opening. This is the Master acknowledging completion + * of the specified regions transition. + * + * @param zkw zk reference + * @param regionName opened region to be deleted from zk + * @throws KeeperException if unexpected zookeeper exception + * @throws KeeperException.NoNodeException if node does not exist + */ + public static boolean deleteOpenedNode(ZooKeeperWatcher zkw, + String regionName) + throws KeeperException, KeeperException.NoNodeException { + zkw.debug("Deleting an existing unassigned node for " + regionName + + " that is in a OPENED state"); + String node = getNodeName(zkw, regionName); + Stat stat = new Stat(); + byte [] bytes = ZKUtil.getDataNoWatch(zkw, node, stat); + if(bytes == null) { + throw KeeperException.create(Code.NONODE); + } + RegionTransitionData data = RegionTransitionData.fromBytes(bytes); + if(!data.getEventType().equals(HBaseEventType.RS2ZK_REGION_OPENED)) { + zkw.warn("Attempting to delete an unassigned node in OPENED state but " + + "node is in " + data.getEventType() + " state"); + return false; + } + synchronized(zkw.getNodes()) { + // TODO: Does this go here or only if we successfully delete node? + zkw.getNodes().remove(node); + if(!ZKUtil.deleteNode(zkw, node, stat.getVersion())) { + zkw.warn("Attempting to delete an unassigned node in OPENED state but " + + "after verifying it was in OPENED state, we got a version mismatch"); + return false; + } + return true; + } + } + + /** + * Deletes all unassigned nodes regardless of their state. + * + *

No watchers are set. + * + *

This method is used by the Master during cluster startup to clear out + * any existing state from other cluster runs. + * + * @param zkw zk reference + * @throws KeeperException if unexpected zookeeper exception + */ + public static void deleteAllNodes(ZooKeeperWatcher zkw) + throws KeeperException { + zkw.debug("Deleting any existing unassigned nodes"); + ZKUtil.deleteChildrenRecursively(zkw, zkw.assignmentZNode); + } + + // RegionServer methods + + /** + * Creates a new unassigned node in the CLOSING state for the specified + * region. + * + *

Does not transition nodes from any states. If a node already exists + * for this region, a {@link NodeExistsException} will be thrown. + * + *

Does not set any watches. + * + *

This method should only be used by a RegionServer when initiating a + * close of a region after receiving a CLOSE RPC from the Master. + * + * @param zkw zk reference + * @param regionName region to be created as closing + * @param serverName server event originates from + * @throws KeeperException if unexpected zookeeper exception + * @throws KeeperException.NodeExistsException if node already exists + */ + public static void createNodeClosing(ZooKeeperWatcher zkw, String regionName, + String serverName) + throws KeeperException, KeeperException.NodeExistsException { + zkw.debug("Creating an unassigned node for " + regionName + + " in a CLOSING state"); + RegionTransitionData data = new RegionTransitionData( + HBaseEventType.RS2ZK_REGION_CLOSING, regionName, serverName); + synchronized(zkw.getNodes()) { + String node = getNodeName(zkw, regionName); + zkw.getNodes().add(node); + ZKUtil.createAndWatch(zkw, node, data.getBytes()); + } + } + + /** + * Transitions an existing unassigned node for the specified region which is + * currently in the CLOSING state to be in the CLOSED state. + * + *

Does not transition nodes from other states. If for some reason the + * node could not be transitioned, the method returns false. + * + *

This method can fail and return false for three different reasons: + *

  • Unassigned node for this region does not exist
  • + *
  • Unassigned node for this region is not in CLOSING state
  • + *
  • After verifying CLOSING state, update fails because of wrong version + * (someone else already transitioned the node)
  • + *
+ * + *

Does not set any watches. + * + *

This method should only be used by a RegionServer when initiating a + * close of a region after receiving a CLOSE RPC from the Master. + * + * @param zkw zk reference + * @param regionName region to be transitioned to closed + * @param serverName server event originates from + * @return true if transition was successful, false if not + * @throws KeeperException if unexpected zookeeper exception + */ + public static boolean transitionNodeClosed(ZooKeeperWatcher zkw, + String regionName, String serverName) + throws KeeperException { + return transitionNode(zkw, regionName, serverName, + HBaseEventType.RS2ZK_REGION_CLOSING, + HBaseEventType.RS2ZK_REGION_CLOSED); + } + + /** + * Transitions an existing unassigned node for the specified region which is + * currently in the OFFLINE state to be in the OPENING state. + * + *

Does not transition nodes from other states. If for some reason the + * node could not be transitioned, the method returns false. + * + *

This method can fail and return false for three different reasons: + *

  • Unassigned node for this region does not exist
  • + *
  • Unassigned node for this region is not in OFFLINE state
  • + *
  • After verifying OFFLINE state, update fails because of wrong version + * (someone else already transitioned the node)
  • + *
+ * + *

Does not set any watches. + * + *

This method should only be used by a RegionServer when initiating an + * open of a region after receiving an OPEN RPC from the Master. + * + * @param zkw zk reference + * @param regionName region to be transitioned to opening + * @param serverName server event originates from + * @return true if transition was successful, false if not + * @throws KeeperException if unexpected zookeeper exception + */ + public static boolean transitionNodeOpening(ZooKeeperWatcher zkw, + String regionName, String serverName) + throws KeeperException { + return transitionNode(zkw, regionName, serverName, + HBaseEventType.M2ZK_REGION_OFFLINE, + HBaseEventType.RS2ZK_REGION_OPENING); + } + + /** + * Transitions an existing unassigned node for the specified region which is + * currently in the OPENING state to be in the OPENED state. + * + *

Does not transition nodes from other states. If for some reason the + * node could not be transitioned, the method returns false. + * + *

This method can fail and return false for three different reasons: + *

  • Unassigned node for this region does not exist
  • + *
  • Unassigned node for this region is not in OPENING state
  • + *
  • After verifying OPENING state, update fails because of wrong version + * (this should never actually happen since an RS only does this transition + * following a transition to OPENING. if two RS are conflicting, one would + * fail the original transition to OPENING and not this transition)
  • + *
+ * + *

Does not set any watches. + * + *

This method should only be used by a RegionServer when completing the + * open of a region. + * + * @param zkw zk reference + * @param regionName region to be transitioned to opened + * @param serverName server event originates from + * @return true if transition was successful, false if not + * @throws KeeperException if unexpected zookeeper exception + */ + public static boolean transitionNodeOpened(ZooKeeperWatcher zkw, + String regionName, String serverName) + throws KeeperException { + return transitionNode(zkw, regionName, serverName, + HBaseEventType.RS2ZK_REGION_OPENING, + HBaseEventType.RS2ZK_REGION_OPENED); + } + + /** + * Private method that actually performs unassigned node transitions. + * + *

Attempts to transition the unassigned node for the specified region + * from the expected state to the state in the specified transition data. + * + *

Method first reads existing data and verifies it is in the expected + * state. If the node does not exist or the node is not in the expected + * state, the method returns false. + * + *

If the read state is what is expected, it attempts to write the new + * state and data into the node. When doing this, it includes the expected + * version (determined when the existing state was verified) to ensure that + * only one transition is successful. If there is a version mismatch, the + * method returns false. + * + *

If the write is successful, no watch is set and the method returns true. + * + * @param zkw zk reference + * @param regionName region to be transitioned to opened + * @param serverName server event originates from + * @param beginState state the node must currently be in to do transition + * @param endState state to transition node to if all checks pass + * @return true if transition was successful, false if not + * @throws KeeperException if unexpected zookeeper exception + */ + private static boolean transitionNode(ZooKeeperWatcher zkw, String regionName, + String serverName, HBaseEventType beginState, HBaseEventType endState) + throws KeeperException { + if(zkw.isDebugEnabled()) { + zkw.debug("Attempting to transition node for " + regionName + + " from " + beginState.toString() + " to " + endState.toString()); + } + + String node = getNodeName(zkw, regionName); + + // Read existing data of the node + Stat stat = new Stat(); + byte [] existingBytes = + ZKUtil.getDataNoWatch(zkw, node, stat); + RegionTransitionData existingData = + RegionTransitionData.fromBytes(existingBytes); + + // Verify it is in expected state + if(!existingData.getEventType().equals(beginState)) { + zkw.warn("Attempt to transition the unassigned node for " + regionName + + " from " + beginState + " to " + endState + " failed, " + + "the node existed but was in the state " + existingData.getEventType()); + return false; + } + + // Write new data, ensuring data has not changed since we last read it + try { + RegionTransitionData data = new RegionTransitionData(endState, + regionName, serverName); + if(!ZKUtil.setData(zkw, node, data.getBytes(), + stat.getVersion())) { + zkw.warn("Attempt to transition the unassigned node for " + regionName + + " from " + beginState + " to " + endState + " failed, " + + "the node existed and was in the expected state but then when " + + "setting data we got a version mismatch"); + return false; + } + if(zkw.isDebugEnabled()) { + zkw.debug("Successfully transitioned node for " + regionName + + " from " + beginState + " to " + endState); + } + return true; + } catch (KeeperException.NoNodeException nne) { + zkw.warn("Attempt to transition the unassigned node for " + regionName + + " from " + beginState + " to " + endState + " failed, " + + "the node existed and was in the expected state but then when " + + "setting data it no longer existed"); + return false; + } + } + + /** + * Gets the current data in the unassigned node for the specified region name + * or fully-qualified path. + * + *

Returns null if the region does not currently have a node. + * + *

Sets a watch on the node if the node exists. + * + * @param watcher zk reference + * @param pathOrRegionName fully-specified path or region name + * @return data for the unassigned node + * @throws KeeperException + * @throws KeeperException if unexpected zookeeper exception + */ + public static RegionTransitionData getData(ZooKeeperWatcher zkw, + String pathOrRegionName) + throws KeeperException { + String node = pathOrRegionName.startsWith("/") ? + pathOrRegionName : getNodeName(zkw, pathOrRegionName); + byte [] data = ZKUtil.getDataAndWatch(zkw, node); + if(data == null) { + return null; + } + return RegionTransitionData.fromBytes(data); + } +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 964617) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -2329,8 +2329,8 @@ // ServerStatus @Override - public void abortServer() { - this.abort("Received abortServer call"); + public void abort() { + this.abort("Received abort call"); } @Override Index: src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java (revision 964617) +++ src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java (working copy) @@ -118,7 +118,7 @@ } catch(KeeperException ke) { // If we have a ZK exception trying to find the master we must abort LOG.fatal("Unexpected ZooKeeper exception", ke); - status.abortServer(); + status.abort(); } } @@ -182,7 +182,7 @@ } catch (KeeperException ke) { // If we have a ZK exception trying to find the master we must abort LOG.fatal("Unexpected ZooKeeper exception", ke); - status.abortServer(); + status.abort(); } if(address != null) { setMasterAddress(address); Index: src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java (revision 964617) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java (working copy) @@ -5,12 +5,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HMsg; -import org.apache.hadoop.hbase.executor.RegionTransitionEventData; +import org.apache.hadoop.hbase.executor.RegionTransitionData; import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; @@ -59,8 +58,9 @@ // Try to create the node with a CLOSING state, if already exists, // something is wrong try { - if(ZKUtil.createPersistentNodeIfNotExists(zooKeeper, regionZNode, - makeZKEventData(HBaseEventType.RS2ZK_REGION_CLOSING, hmsg))) { + if(ZKUtil.createNodeIfNotExistsAndWatch(zooKeeper, regionZNode, + makeZKEventData(HBaseEventType.RS2ZK_REGION_CLOSING, + regionName, hmsg))) { String msg = "ZNode " + regionZNode + " already exists in ZooKeeper, will NOT close region."; LOG.error(msg); throw new IOException(msg); @@ -159,9 +159,10 @@ * @param hmsg * @return serialized data */ - private byte [] makeZKEventData(HBaseEventType eventType, HMsg hmsg) + private byte [] makeZKEventData(HBaseEventType eventType, String regionName, + HMsg hmsg) throws IOException { - return Writables.getBytes(new RegionTransitionEventData(eventType, + return Writables.getBytes(new RegionTransitionData(eventType, regionName, regionServerName, hmsg)); } @@ -174,7 +175,7 @@ */ private void updateZKWithEventData(HBaseEventType eventType, HMsg hmsg) throws IOException { - byte[] data = makeZKEventData(eventType, hmsg); + byte[] data = makeZKEventData(eventType, regionName, hmsg); LOG.debug("Updating ZNode " + regionZNode + " with [" + eventType + "]" + " expected version = " + zkVersion); Index: src/main/java/org/apache/hadoop/hbase/master/ServerManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (revision 964617) +++ src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (working copy) @@ -243,10 +243,8 @@ HServerLoad load = useInfoLoad ? info.getLoad() : new HServerLoad(); String serverName = info.getServerName(); info.setLoad(load); - // We must set this watcher here because it can be set on a fresh start - // or on a failover - Watcher watcher = new ServerExpirer(new HServerInfo(info)); - masterStatus.getZooKeeper().updateRSLocationGetWatch(info, watcher); + // TODO: Why did we update the RS location ourself? Shouldn't RS do this? +// masterStatus.getZooKeeper().updateRSLocationGetWatch(info, watcher); this.serversToServerInfo.put(serverName, info); this.serversToLoad.put(serverName, load); synchronized (this.loadToServers) { @@ -848,7 +846,7 @@ * Expire the passed server. Add it to list of deadservers and queue a * shutdown processing. */ - private synchronized void expireServer(final HServerInfo hsi) { + public synchronized void expireServer(final HServerInfo hsi) { // First check a server to expire. ServerName is of the form: // , , String serverName = hsi.getServerName(); Index: src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java (revision 964617) +++ src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java (working copy) @@ -28,7 +28,8 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.apache.zookeeper.KeeperException; /** * ProcessRegionOpen is instantiated when a region server reports that it is @@ -115,8 +116,13 @@ } else { masterStatus.getRegionManager().removeRegion(regionInfo); } - masterStatus.getZooKeeper().deleteUnassignedRegion( - regionInfo.getEncodedName()); + try { + ZKAssign.deleteOpenedNode(masterStatus.getZooKeeper(), + regionInfo.getEncodedName()); + } catch (KeeperException e) { + LOG.error("ZK error deleting opened node", e); + throw new IOException(e); + } return true; } } Index: src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java (revision 964617) +++ src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java (working copy) @@ -102,7 +102,7 @@ } } catch (KeeperException ke) { LOG.fatal("Received an unexpected KeeperException, aborting", ke); - status.abortServer(); + status.abort(); } } @@ -126,7 +126,7 @@ } } catch (KeeperException ke) { LOG.fatal("Received an unexpected KeeperException, aborting", ke); - status.abortServer(); + status.abort(); return; } // There is another active master, this is not a cluster startup @@ -151,4 +151,19 @@ blockUntilBecomingActiveMaster(); } } + + public void stop() { + try { + // If our address is in ZK, delete it on our way out + HServerAddress zkAddress = + ZKUtil.getDataAsAddress(watcher, watcher.masterAddressZNode); + // TODO: redo this to make it atomic (only added for tests) + if(zkAddress != null && + zkAddress.equals(address)) { + ZKUtil.deleteNode(watcher, watcher.masterAddressZNode); + } + } catch (KeeperException e) { + watcher.error("Error deleting our own master address node", e); + } + } } Index: src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java (revision 964617) +++ src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java (working copy) @@ -1,184 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master; - -import java.io.IOException; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType; -import org.apache.hadoop.hbase.master.handler.MasterCloseRegionHandler; -import org.apache.hadoop.hbase.master.handler.MasterOpenRegionHandler; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper.ZNodePathAndData; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.Watcher.Event.EventType; - -/** - * Watches the UNASSIGNED znode in ZK for the master, and handles all events - * relating to region transitions. - */ -public class ZKUnassignedWatcher implements Watcher { - private static final Log LOG = LogFactory.getLog(ZKUnassignedWatcher.class); - - private ZooKeeperWrapper zkWrapper; - String serverName; - ServerManager serverManager; - - public static void start(Configuration conf, MasterStatus masterStatus) - throws IOException { - new ZKUnassignedWatcher(conf, masterStatus); - LOG.debug("Started ZKUnassigned watcher"); - } - - public ZKUnassignedWatcher(Configuration conf, MasterStatus masterStatus) - throws IOException { - this.serverName = masterStatus.getHServerAddress().toString(); - this.serverManager = masterStatus.getServerManager(); - zkWrapper = masterStatus.getZooKeeper(); - String unassignedZNode = zkWrapper.getRegionInTransitionZNode(); - - // If the UNASSIGNED ZNode exists and this is a fresh cluster start, then - // delete it. - if(masterStatus.isClusterStartup() && zkWrapper.exists(unassignedZNode, false)) { - LOG.info("Cluster start, but found " + unassignedZNode + ", deleting it."); - try { - zkWrapper.deleteZNode(unassignedZNode, true); - } catch (KeeperException e) { - LOG.error("Could not delete znode " + unassignedZNode, e); - throw new IOException(e); - } catch (InterruptedException e) { - LOG.error("Could not delete znode " + unassignedZNode, e); - throw new IOException(e); - } - } - - // If the UNASSIGNED ZNode does not exist, create it. - zkWrapper.createZNodeIfNotExists(unassignedZNode); - - // TODO: get the outstanding changes in UNASSIGNED - - // Set a watch on Zookeeper's UNASSIGNED node if it exists. - zkWrapper.registerListener(this); - } - - /** - * This is the processing loop that gets triggered from the ZooKeeperWrapper. - * This zookeeper events process function dies the following: - * - WATCHES the following events: NodeCreated, NodeDataChanged, NodeChildrenChanged - * - IGNORES the following events: None, NodeDeleted - */ - @Override - public synchronized void process(WatchedEvent event) { - EventType type = event.getType(); - LOG.debug("ZK-EVENT-PROCESS: Got zkEvent " + type + - " state:" + event.getState() + - " path:" + event.getPath()); - - // Handle the ignored events - if(type.equals(EventType.None) || - type.equals(EventType.NodeDeleted)) { - return; - } - - // check if the path is for the UNASSIGNED directory we care about - if(event.getPath() == null || - !event.getPath().startsWith(zkWrapper.getZNodePathForHBase( - zkWrapper.getRegionInTransitionZNode()))) { - return; - } - - try - { - /* - * If a node is created in the UNASSIGNED directory in zookeeper, then: - * 1. watch its updates (this is an unassigned region). - * 2. read to see what its state is and handle as needed (state may have - * changed before we started watching it) - */ - if(type.equals(EventType.NodeCreated)) { - zkWrapper.watchZNode(event.getPath()); - handleRegionStateInZK(event.getPath()); - } - /* - * Data on some node has changed. Read to see what the state is and handle - * as needed. - */ - else if(type.equals(EventType.NodeDataChanged)) { - handleRegionStateInZK(event.getPath()); - } - /* - * If there were some nodes created then watch those nodes - */ - else if(type.equals(EventType.NodeChildrenChanged)) { - List newZNodes = - zkWrapper.watchAndGetNewChildren(event.getPath()); - for(ZNodePathAndData zNodePathAndData : newZNodes) { - LOG.debug("Handling updates for znode: " + zNodePathAndData.getzNodePath()); - handleRegionStateInZK(zNodePathAndData.getzNodePath(), - zNodePathAndData.getData()); - } - } - } - catch (IOException e) - { - LOG.error("Could not process event from ZooKeeper", e); - } - } - - /** - * Read the state of a node in ZK, and do the needful. We want to do the - * following: - * 1. If region's state is updated as CLOSED, invoke the ClosedRegionHandler. - * 2. If region's state is updated as OPENED, invoke the OpenRegionHandler. - * @param zNodePath - * @throws IOException - */ - private void handleRegionStateInZK(String zNodePath) throws IOException { - byte[] data = zkWrapper.readZNode(zNodePath, null); - handleRegionStateInZK(zNodePath, data); - } - - private void handleRegionStateInZK(String zNodePath, byte[] data) { - // a null value is set when a node is created, we don't need to handle this - if(data == null) { - return; - } - String rgnInTransitNode = zkWrapper.getRegionInTransitionZNode(); - String region = zNodePath.substring( - zNodePath.indexOf(rgnInTransitNode) + rgnInTransitNode.length() + 1); - HBaseEventType rsEvent = HBaseEventType.fromByte(data[0]); - - // if the node was CLOSED then handle it - if(rsEvent == HBaseEventType.RS2ZK_REGION_CLOSED) { - new MasterCloseRegionHandler(rsEvent, serverManager, serverName, region, data).submit(); - } - // if the region was OPENED then handle that - else if(rsEvent == HBaseEventType.RS2ZK_REGION_OPENED || - rsEvent == HBaseEventType.RS2ZK_REGION_OPENING) { - new MasterOpenRegionHandler(rsEvent, serverManager, serverName, region, data).submit(); - } - } -} - Index: src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java (revision 964617) +++ src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java (working copy) @@ -26,9 +26,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HMsg; import org.apache.hadoop.hbase.HServerInfo; -import org.apache.hadoop.hbase.executor.RegionTransitionEventData; +import org.apache.hadoop.hbase.executor.RegionTransitionData; import org.apache.hadoop.hbase.executor.HBaseEventHandler; -import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.util.Writables; @@ -46,13 +45,13 @@ // other args passed in a byte array form protected byte[] serializedData; private String regionName; - private RegionTransitionEventData hbEventData; + private RegionTransitionData hbEventData; ServerManager serverManager; - public MasterOpenRegionHandler(HBaseEventType eventType, - ServerManager serverManager, - String serverName, - String regionName, + public MasterOpenRegionHandler(HBaseEventType eventType, + ServerManager serverManager, + String serverName, + String regionName, byte[] serData) { super(false, serverName, eventType); this.regionName = regionName; @@ -61,14 +60,14 @@ } /** - * Handle the various events relating to opening regions. We can get the + * Handle the various events relating to opening regions. We can get the * following events here: - * - RS_REGION_OPENING : Keep track to see how long the region open takes. - * If the RS is taking too long, then revert the - * region back to closed state so that it can be + * - RS_REGION_OPENING : Keep track to see how long the region open takes. + * If the RS is taking too long, then revert the + * region back to closed state so that it can be * re-assigned. - * - RS_REGION_OPENED : The region is opened. Add an entry into META for - * the RS having opened this region. Then delete this + * - RS_REGION_OPENED : The region is opened. Add an entry into META for + * the RS having opened this region. Then delete this * entry in ZK. */ @Override @@ -82,30 +81,30 @@ handleRegionOpenedEvent(); } } - + private void handleRegionOpeningEvent() { - // TODO: not implemented. + // TODO: not implemented. LOG.debug("NO-OP call to handling region opening event"); - // Keep track to see how long the region open takes. If the RS is taking too - // long, then revert the region back to closed state so that it can be + // Keep track to see how long the region open takes. If the RS is taking too + // long, then revert the region back to closed state so that it can be // re-assigned. } private void handleRegionOpenedEvent() { try { if(hbEventData == null) { - hbEventData = new RegionTransitionEventData(); + hbEventData = new RegionTransitionData(); Writables.getWritable(serializedData, hbEventData); } } catch (IOException e) { LOG.error("Could not deserialize additional args for Open region", e); } - LOG.debug("RS " + hbEventData.getRsName() + " has opened region " + regionName); - HServerInfo serverInfo = serverManager.getServerInfo(hbEventData.getRsName()); + LOG.debug("RS " + hbEventData.getServerName() + " has opened region " + regionName); + HServerInfo serverInfo = serverManager.getServerInfo(hbEventData.getServerName()); ArrayList returnMsgs = new ArrayList(); serverManager.processRegionOpen(serverInfo, hbEventData.getHmsg().getRegionInfo(), returnMsgs); if(returnMsgs.size() > 0) { - LOG.error("Open region tried to send message: " + returnMsgs.get(0).getType() + + LOG.error("Open region tried to send message: " + returnMsgs.get(0).getType() + " about " + returnMsgs.get(0).getRegionInfo().getRegionNameAsString()); } } Index: src/main/java/org/apache/hadoop/hbase/master/handler/MasterCloseRegionHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/handler/MasterCloseRegionHandler.java (revision 964617) +++ src/main/java/org/apache/hadoop/hbase/master/handler/MasterCloseRegionHandler.java (working copy) @@ -23,9 +23,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.executor.RegionTransitionEventData; +import org.apache.hadoop.hbase.executor.RegionTransitionData; import org.apache.hadoop.hbase.executor.HBaseEventHandler; -import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.util.Writables; @@ -38,16 +37,16 @@ public class MasterCloseRegionHandler extends HBaseEventHandler { private static final Log LOG = LogFactory.getLog(MasterCloseRegionHandler.class); - + private String regionName; protected byte[] serializedData; - RegionTransitionEventData hbEventData; + RegionTransitionData hbEventData; ServerManager serverManager; - - public MasterCloseRegionHandler(HBaseEventType eventType, - ServerManager serverManager, - String serverName, - String regionName, + + public MasterCloseRegionHandler(HBaseEventType eventType, + ServerManager serverManager, + String serverName, + String regionName, byte[] serializedData) { super(false, serverName, eventType); this.regionName = regionName; @@ -56,13 +55,13 @@ } /** - * Handle the various events relating to closing regions. We can get the + * Handle the various events relating to closing regions. We can get the * following events here: * - RS_REGION_CLOSING : No-op - * - RS_REGION_CLOSED : The region is closed. If we are not in a shutdown - * state, find the RS to open this region. This could - * be a part of a region move, or just that the RS has - * died. Should result in a M_REQUEST_OPENREGION event + * - RS_REGION_CLOSED : The region is closed. If we are not in a shutdown + * state, find the RS to open this region. This could + * be a part of a region move, or just that the RS has + * died. Should result in a M_REQUEST_OPENREGION event * getting created. */ @Override @@ -72,22 +71,22 @@ // handle RS_REGION_CLOSED events handleRegionClosedEvent(); } - + private void handleRegionClosedEvent() { try { if(hbEventData == null) { - hbEventData = new RegionTransitionEventData(); + hbEventData = new RegionTransitionData(); Writables.getWritable(serializedData, hbEventData); } } catch (IOException e) { LOG.error("Could not deserialize additional args for Close region", e); } - // process the region close - this will cause the reopening of the + // process the region close - this will cause the reopening of the // region as a part of the heartbeat of some RS serverManager.processRegionClose(hbEventData.getHmsg().getRegionInfo()); LOG.info("Processed close of region " + hbEventData.getHmsg().getRegionInfo().getRegionNameAsString()); } - + public String getRegionName() { return regionName; } Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 964617) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -82,17 +82,18 @@ import org.apache.hadoop.hbase.util.Sleeper; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; +import org.apache.hadoop.hbase.zookeeper.RegionServerTracker; +import org.apache.hadoop.hbase.zookeeper.RootRegionTracker; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.DNS; -import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.Watcher.Event.EventType; -import org.apache.zookeeper.Watcher.Event.KeeperState; import com.google.common.collect.Lists; @@ -106,8 +107,8 @@ * @see HMasterRegionInterface * @see Watcher */ -public class HMaster extends Thread implements HMasterInterface, - HMasterRegionInterface, Watcher, MasterStatus { +public class HMaster extends Thread +implements HMasterInterface, HMasterRegionInterface, MasterStatus { // MASTER is name of the webapp and the attribute name used stuffing this //instance into web context. public static final String MASTER = "master"; @@ -132,6 +133,13 @@ private ZooKeeperWatcher zooKeeperWrapper; // Manager and zk listener for master election private ActiveMasterManager activeMasterManager; + // Cluster status zk tracker and local setter + private ClusterStatusTracker clusterStatusTracker; + // Root region location tracker + private RootRegionTracker rootRegionTracker; + // Region server tracker + private RegionServerTracker regionServerTracker; + // A Sleeper that sleeps for threadWakeFrequency; sleep if nothing todo. private final Sleeper sleeper; // RPC server for the HMaster @@ -147,6 +155,9 @@ // region manager to deal with region specific stuff private final RegionManager regionManager; + // manager of assignment nodes in zookeeper + private final AssignmentManager assignmentManager; + // True if this is the master that started the cluster. boolean isClusterStartup; @@ -167,7 +178,7 @@ *

  • Block until becoming active master * */ - public HMaster(Configuration conf) throws IOException { + public HMaster(Configuration conf) throws IOException, KeeperException { // initialize some variables this.conf = conf; // set the thread name @@ -199,7 +210,8 @@ */ zooKeeperWrapper = new ZooKeeperWatcher(conf, getHServerAddress().toString(), this); - isClusterStartup = (zooKeeperWrapper.scanRSDirectory().size() == 0); + isClusterStartup = 0 == + ZKUtil.getNumberOfChildren(zooKeeperWrapper, zooKeeperWrapper.rsZNode); /* * 3. Initialize master components. @@ -210,9 +222,15 @@ this.connection = ServerConnectionManager.getConnection(conf); this.regionServerOperationQueue = new RegionServerOperationQueue(conf, closed); this.metrics = new MasterMetrics(this.getName()); + clusterStatusTracker = new ClusterStatusTracker(zooKeeperWrapper, this); + rootRegionTracker = new RootRegionTracker(zooKeeperWrapper, this); fileSystemManager = new FileSystemManager(conf, this); serverManager = new ServerManager(this, metrics, regionServerOperationQueue); - regionManager = new RegionManager(this); + regionManager = new RegionManager(this, rootRegionTracker); + assignmentManager = new AssignmentManager(zooKeeperWrapper, this, + serverManager, regionManager); + regionServerTracker = new RegionServerTracker(zooKeeperWrapper, this, + serverManager); // create a sleeper to sleep for a configured wait frequency int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); this.sleeper = new Sleeper(threadWakeFrequency, this.closed); @@ -235,6 +253,7 @@ activeMasterManager.blockUntilBecomingActiveMaster(); // We are the active master now. + clusterStatusTracker.setClusterUp(); LOG.info("Server has become the active/primary master. Address is " + this.address.toString()); @@ -323,6 +342,7 @@ } this.rpcServer.stop(); this.regionManager.stop(); + this.activeMasterManager.stop(); this.zooKeeperWrapper.close(); HBaseExecutorService.shutdown(); LOG.info("HMaster main thread exiting"); @@ -424,7 +444,7 @@ * Get the ZK wrapper object - needed by master_jsp.java * @return the zookeeper wrapper */ - public ZooKeeperWrapper getZooKeeperWrapper() { + public ZooKeeperWatcher getZooKeeperWatcher() { return this.zooKeeperWrapper; } /** @@ -445,11 +465,21 @@ */ private void joinCluster() { LOG.debug("Checking cluster state..."); - HServerAddress rootLocation = - this.zooKeeperWrapper.readRootRegionLocation(); - List addresses = this.zooKeeperWrapper.scanRSDirectory(); + HServerAddress rootLocation = null; + List addresses = null; + try { + clusterStatusTracker.start(); + rootRegionTracker.start(); + regionServerTracker.start(); + rootLocation = rootRegionTracker.getRootRegionLocation(); + addresses = regionServerTracker.getOnlineServers(); + } catch(KeeperException e) { + LOG.fatal("Unexpected ZK exception initializing trackers", e); + abort(); + return; + } // Check if this is a fresh start of the cluster - if (addresses.isEmpty()) { + if (isClusterStartup) { LOG.debug("Master fresh start, proceeding with normal startup"); fileSystemManager.splitLogAfterStartup(); return; @@ -504,14 +534,20 @@ */ private void startServiceThreads() { try { - // Start the unassigned watcher - which will create the unassigned region - // in ZK. This is needed before RegionManager() constructor tries to assign - // the root region. - ZKUnassignedWatcher.start(this.conf, this); // start the "close region" executor service - HBaseEventType.RS2ZK_REGION_CLOSED.startMasterExecutorService(address.toString()); + HBaseEventType.RS2ZK_REGION_CLOSED.startMasterExecutorService( + address.toString()); // start the "open region" executor service HBaseEventType.RS2ZK_REGION_OPENED.startMasterExecutorService(address.toString()); + // Start the assignment manager. Creates the unassigned node in ZK + // if it does not exist and handles regions in transition if a failed-over + // master. This is needed before RegionManager() constructor tries to + // assign the root region. + try { + assignmentManager.start(); + } catch (KeeperException e) { + throw new IOException(e); + } // start the region manager this.regionManager.start(); // Put up info server. @@ -608,7 +644,11 @@ public void shutdown() { LOG.info("Cluster shutdown requested. Starting to quiesce servers"); this.shutdownRequested.set(true); - this.zooKeeperWrapper.setClusterState(false); + try { + clusterStatusTracker.setClusterDown(); + } catch (KeeperException e) { + LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e); + } } public void createTable(HTableDescriptor desc, byte [][] splitKeys) @@ -967,47 +1007,6 @@ return status; } - /** - * @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.WatchedEvent) - */ - @Override - public void process(WatchedEvent event) { - LOG.debug("Event " + event.getType() + - " with state " + event.getState() + - " with path " + event.getPath()); - // Master should kill itself if its session expired or if its - // znode was deleted manually (usually for testing purposes) - if(event.getState() == KeeperState.Expired || - (event.getType().equals(EventType.NodeDeleted) && - event.getPath().equals(this.zooKeeperWrapper.getMasterElectionZNode())) && - !shutdownRequested.get()) { - - LOG.info("Master lost its znode, trying to get a new one"); - - // Can we still be the master? If not, goodbye - - zooKeeperWrapper.close(); - try { - // TODO: this is broken, we should just shutdown now not restart - zooKeeperWrapper = - new ZooKeeperWatcher(conf, HMaster.class.getName(), this); - zooKeeperWrapper.registerListener(this); - activeMasterManager = new ActiveMasterManager(zooKeeperWrapper, - this.address, this); - activeMasterManager.blockUntilBecomingActiveMaster(); - - // we are a failed over master, reset the fact that we started the - // cluster - setClusterStartup(false); - // Verify the cluster to see if anything happened while we were away - joinCluster(); - } catch (Exception e) { - LOG.error("Killing master because of", e); - System.exit(1); - } - } - } - private static void printUsageAndExit() { System.err.println("Usage: Master [opts] start|stop"); System.err.println(" start Start Master. If local mode, start Master and RegionServer in same JVM"); @@ -1042,7 +1041,8 @@ static class LocalHMaster extends HMaster { private MiniZooKeeperCluster zkcluster = null; - public LocalHMaster(Configuration conf) throws IOException { + public LocalHMaster(Configuration conf) + throws IOException, KeeperException { super(conf); } @@ -1175,7 +1175,7 @@ } @Override - public void abortServer() { + public void abort() { this.startShutdown(); } Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 0) @@ -0,0 +1,235 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.executor.RegionTransitionData; +import org.apache.hadoop.hbase.master.handler.MasterCloseRegionHandler; +import org.apache.hadoop.hbase.master.handler.MasterOpenRegionHandler; +import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.hbase.zookeeper.ZKUtil.NodeAndData; +import org.apache.zookeeper.KeeperException; + +/** + * Manages region assignment. + * + *

    Monitors ZooKeeper for events related to regions in transition. + * + *

    Handles existing regions in transition during master failover. + */ +public class AssignmentManager extends ZooKeeperListener { + private static final Log LOG = LogFactory.getLog(AssignmentManager.class); + + private MasterStatus status; + + private ServerManager serverManager; + + private RegionManager regionManager; + + private String serverName; + +// TODO: Eventually RIT will move here? +// private final Map regionsInTransition = +// new TreeMap(); + + /** + * Constructs a new assignment manager. + * + *

    This manager must be started with {@link #start()}. + * + * @param watcher zookeeper watcher + * @param status master status + */ + public AssignmentManager(ZooKeeperWatcher watcher, MasterStatus status, + ServerManager serverManager, RegionManager regionManager) { + super(watcher); + this.status = status; + this.serverManager = serverManager; + this.regionManager = regionManager; + serverName = status.getHServerAddress().toString(); + } + + /** + * Starts the assignment manager. + * + *

    This includes registering itself with ZooKeeper and handling + * the initial state of whatever unassigned nodes already exist. + * @throws KeeperException + */ + public void start() throws KeeperException { + watcher.registerListener(this); + if(status.isClusterStartup()) { + processStartup(); + } else { + processFailover(); + } + } + + public synchronized void processStartup() throws KeeperException { + ZKAssign.deleteAllNodes(watcher); + ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.assignmentZNode); + } + + /** + * Handle failover. + * @throws KeeperException + */ + public synchronized void processFailover() throws KeeperException { + List nodes = ZKUtil.listChildrenAndWatchForNewChildren(watcher, + watcher.assignmentZNode); + if(nodes.isEmpty()) { + LOG.info("No regions in transition in ZK, nothing to do for failover"); + return; + } + LOG.info("Failed-over master needs to process " + nodes.size() + + " regions in transition"); + for(String regionName : nodes) { + RegionTransitionData data = ZKAssign.getData(watcher, regionName); + switch(data.getEventType()) { + case M2ZK_REGION_OFFLINE: + // TODO: Generate new assignment and send OPEN RPC + break; + case RS2ZK_REGION_CLOSING: + // TODO: Only need to deal with timeouts. + break; + case RS2ZK_REGION_CLOSED: + // TODO: Generate new assignment and send OPEN RPC + break; + case RS2ZK_REGION_OPENING: + // TODO: Only need to deal with timeouts. + break; + case RS2ZK_REGION_OPENED: + // TODO: Delete the node from ZK. Region successfully opened but not + // acknowledged. + break; + } + } + } + + private synchronized void handleRegion(RegionTransitionData data) { + switch(data.getEventType()) { + case RS2ZK_REGION_CLOSED: + new MasterCloseRegionHandler(data.getEventType(), serverManager, + serverName, data.getRegionName(), data.getBytes()) + .submit(); + break; + case RS2ZK_REGION_OPENED: + case RS2ZK_REGION_OPENING: + new MasterOpenRegionHandler(data.getEventType(), serverManager, + serverName, data.getRegionName(), data.getBytes()) + .submit(); + break; + } + } + + // ZooKeeper events + + /** + * New unassigned node has been created. + * + *

    This happens when an RS begins the OPENING or CLOSING of a region by + * creating an unassigned node. + * + *

    When this happens we must: + *

      + *
    1. Watch the node for further events
    2. + *
    3. Read and handle the state in the node
    4. + *
    + */ + @Override + public synchronized void nodeCreated(String path) { + if(path.startsWith(watcher.assignmentZNode)) { + try { + RegionTransitionData data = ZKAssign.getData(watcher, path); + if(data == null) { + return; + } + handleRegion(data); + } catch (KeeperException e) { + LOG.error("Unexpected ZK exception reading unassigned node data", e); + status.abort(); + } + } + } + + /** + * Existing unassigned node has had data changed. + * + *

    This happens when an RS transitions from OFFLINE to OPENING, or between + * OPENING/OPENED and CLOSING/CLOSED. + * + *

    When this happens we must: + *

      + *
    1. Watch the node for further events
    2. + *
    3. Read and handle the state in the node
    4. + *
    + */ + @Override + public synchronized void nodeDataChanged(String path) { + if(path.startsWith(watcher.assignmentZNode)) { + try { + RegionTransitionData data = ZKAssign.getData(watcher, path); + if(data == null) { + return; + } + handleRegion(data); + } catch (KeeperException e) { + LOG.error("Unexpected ZK exception reading unassigned node data", e); + status.abort(); + } + } + } + + /** + * New unassigned node has been created. + * + *

    This happens when an RS begins the OPENING or CLOSING of a region by + * creating an unassigned node. + * + *

    When this happens we must: + *

      + *
    1. Watch the node for further children changed events
    2. + *
    3. Watch all new children for changed events
    4. + *
    5. Read all children and handle them
    6. + *
    + */ + @Override + public synchronized void nodeChildrenChanged(String path) { + if(path.equals(watcher.assignmentZNode)) { + try { + List newNodes = ZKUtil.watchAndGetNewChildren(watcher, + watcher.assignmentZNode); + for(NodeAndData newNode : newNodes) { + LOG.debug("Handling new unassigned node: " + newNode); + handleRegion(RegionTransitionData.fromBytes(newNode.getData())); + } + } catch(KeeperException e) { + LOG.error("Unexpected ZK exception reading unassigned children", e); + status.abort(); + } + } + } +} Index: src/main/java/org/apache/hadoop/hbase/master/RegionManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/RegionManager.java (revision 964617) +++ src/main/java/org/apache/hadoop/hbase/master/RegionManager.java (working copy) @@ -49,7 +49,6 @@ import org.apache.hadoop.hbase.HServerLoad; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.executor.RegionTransitionEventData; import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -57,7 +56,11 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; +import org.apache.hadoop.hbase.zookeeper.RootRegionTracker; +import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; /** * Class to manage assigning regions to servers, state of root and meta, etc. @@ -65,6 +68,8 @@ public class RegionManager { protected static final Log LOG = LogFactory.getLog(RegionManager.class); + private static final boolean ABORT_ON_ZK_ERROR = false; + private AtomicReference rootRegionLocation = new AtomicReference(null); @@ -81,7 +86,7 @@ private static final byte[] OVERLOADED = Bytes.toBytes("Overloaded"); private static final byte [] META_REGION_PREFIX = Bytes.toBytes(".META.,"); - + private static int threadWakeFrequency; /** @@ -99,15 +104,16 @@ */ final SortedMap regionsInTransition = Collections.synchronizedSortedMap(new TreeMap()); - - // regions in transition are also recorded in ZK using the zk wrapper - final ZooKeeperWrapper zkWrapper; + // regions in transition are also recorded in ZK using the zk watcher + final ZooKeeperWatcher zooKeeper; + // How many regions to assign a server at a time. private final int maxAssignInOneGo; final MasterStatus masterStatus; private final LoadBalancer loadBalancer; + final RootRegionTracker rootRegionTracker; /** Set of regions to split. */ private final SortedMap> @@ -132,15 +138,17 @@ private final int zooKeeperNumRetries; private final int zooKeeperPause; - RegionManager(MasterStatus masterStatus) throws IOException { + RegionManager(MasterStatus masterStatus, RootRegionTracker rootRegionTracker) + throws IOException { Configuration conf = masterStatus.getConfiguration(); this.masterStatus = masterStatus; - threadWakeFrequency = + this.rootRegionTracker = rootRegionTracker; + threadWakeFrequency = masterStatus.getConfiguration().getInt( - HConstants.THREAD_WAKE_FREQUENCY, + HConstants.THREAD_WAKE_FREQUENCY, HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); - this.zkWrapper = masterStatus.getZooKeeper(); + this.zooKeeper = masterStatus.getZooKeeper(); this.maxAssignInOneGo = conf.getInt("hbase.regions.percheckin", 10); this.loadBalancer = new LoadBalancer(conf); @@ -177,13 +185,14 @@ if (!masterStatus.getShutdownRequested().get()) { synchronized (regionsInTransition) { String regionName = HRegionInfo.ROOT_REGIONINFO.getRegionNameAsString(); - byte[] data = null; try { - data = Writables.getBytes(new RegionTransitionEventData(HBaseEventType.M2ZK_REGION_OFFLINE, HMaster.MASTER)); - } catch (IOException e) { - LOG.error("Error creating event data for " + HBaseEventType.M2ZK_REGION_OFFLINE, e); + ZKAssign.createNodeOffline(zooKeeper, + HRegionInfo.ROOT_REGIONINFO.getEncodedName(), HMaster.MASTER); + } catch (KeeperException e) { + LOG.error("Unexpected ZK exception creating offline node when " + + "trying to reassign root region", e); + if(ABORT_ON_ZK_ERROR) masterStatus.abort(); } - zkWrapper.createUnassignedRegion(HRegionInfo.ROOT_REGIONINFO.getEncodedName(), data); LOG.debug("Created UNASSIGNED zNode " + regionName + " in state " + HBaseEventType.M2ZK_REGION_OFFLINE); RegionState s = new RegionState(HRegionInfo.ROOT_REGIONINFO, RegionState.State.UNASSIGNED); regionsInTransition.put(regionName, s); @@ -244,8 +253,9 @@ final ArrayList returnMsgs) { boolean isMetaAssign = false; for (RegionState s : regionsToAssign) { - if (s.getRegionInfo().isMetaRegion()) + if (s.getRegionInfo().isMetaRegion()) { isMetaAssign = true; + } } int nRegionsToAssign = regionsToAssign.size(); int otherServersRegionsCount = @@ -341,13 +351,14 @@ LOG.info("Assigning region " + regionName + " to " + sinfo.getServerName()); rs.setPendingOpen(sinfo.getServerName()); synchronized (this.regionsInTransition) { - byte[] data = null; try { - data = Writables.getBytes(new RegionTransitionEventData(HBaseEventType.M2ZK_REGION_OFFLINE, HMaster.MASTER)); - } catch (IOException e) { - LOG.error("Error creating event data for " + HBaseEventType.M2ZK_REGION_OFFLINE, e); + ZKAssign.createNodeOffline(zooKeeper, + rs.getRegionInfo().getEncodedName(), HMaster.MASTER); + } catch (KeeperException e) { + LOG.error("Unexpected ZK exception creating offline node when " + + "trying to create offline node for region", e); + if(ABORT_ON_ZK_ERROR) masterStatus.abort(); } - zkWrapper.createUnassignedRegion(rs.getRegionInfo().getEncodedName(), data); LOG.debug("Created UNASSIGNED zNode " + regionName + " in state " + HBaseEventType.M2ZK_REGION_OFFLINE); this.regionsInTransition.put(regionName, rs); } @@ -565,7 +576,7 @@ RegionDirFilter rdf = new RegionDirFilter(); for(FileStatus tabledir : tableDirs) { if(tabledir.isDir()) { - regionDirs = + regionDirs = masterStatus.getFileSystemManager().getFileSystem().listStatus( tabledir.getPath(), rdf); regions += regionDirs.length; @@ -637,12 +648,11 @@ } catch(Exception iex) { LOG.warn("meta scanner", iex); } - // TODO: Why did we getInstance again? We should have it local? -// ZooKeeperWrapper zkw = ZooKeeperWrapper.getInstance( -// masterStatus.getConfiguration(), -// masterStatus.getHServerAddress().toString()); - zkWrapper.clearRSDirectory(); - zkWrapper.close(); + try { + ZKUtil.deleteChildrenRecursively(zooKeeper, zooKeeper.rsZNode); + } catch (KeeperException e) { + LOG.error("Unable to delete RS nodes during shutdown", e); + } } /** @@ -996,15 +1006,14 @@ synchronized(this.regionsInTransition) { s = regionsInTransition.get(info.getRegionNameAsString()); if (s == null) { - byte[] data = null; try { - data = Writables.getBytes(new RegionTransitionEventData(HBaseEventType.M2ZK_REGION_OFFLINE, HMaster.MASTER)); - } catch (IOException e) { - // TODO: Review what we should do here. If Writables work this - // should never happen - LOG.error("Error creating event data for " + HBaseEventType.M2ZK_REGION_OFFLINE, e); + ZKAssign.createNodeOffline(zooKeeper, + info.getEncodedName(), HMaster.MASTER); + } catch (KeeperException e) { + LOG.error("Unexpected ZK exception creating offline node when " + + "trying to reassign root region", e); + if(ABORT_ON_ZK_ERROR) masterStatus.abort(); } - zkWrapper.createUnassignedRegion(info.getEncodedName(), data); LOG.debug("Created UNASSIGNED zNode " + info.getRegionNameAsString() + " in state " + HBaseEventType.M2ZK_REGION_OFFLINE); s = new RegionState(info, RegionState.State.UNASSIGNED); regionsInTransition.put(info.getRegionNameAsString(), s); @@ -1231,11 +1240,13 @@ private void writeRootRegionLocationToZooKeeper(HServerAddress address) { for (int attempt = 0; attempt < zooKeeperNumRetries; ++attempt) { - if (zkWrapper.writeRootRegionLocation(address)) { + try { + rootRegionTracker.setRootRegionLocation(address); return; + } catch (KeeperException e) { + LOG.info("ZK exception writing root region location", e); + sleep(attempt); } - - sleep(attempt); } LOG.error("Failed to write root region location to ZooKeeper after " + @@ -1252,7 +1263,13 @@ writeRootRegionLocationToZooKeeper(address); synchronized (rootRegionLocation) { // the root region has been assigned, remove it from transition in ZK - zkWrapper.deleteUnassignedRegion(HRegionInfo.ROOT_REGIONINFO.getEncodedName()); + try { + ZKAssign.deleteOpenedNode(zooKeeper, + HRegionInfo.ROOT_REGIONINFO.getEncodedName()); + } catch (KeeperException e) { + LOG.error("Exception deleting root region unassigned node", e); + if(ABORT_ON_ZK_ERROR) masterStatus.abort(); + } rootRegionLocation.set(new HServerAddress(address)); rootRegionLocation.notifyAll(); } @@ -1388,7 +1405,9 @@ LoadBalancer(Configuration conf) { this.slop = conf.getFloat("hbase.regions.slop", (float)0.3); - if (this.slop <= 0) this.slop = 1; + if (this.slop <= 0) { + this.slop = 1; + } //maxRegToClose to constrain balance closing per one iteration // -1 to turn off // TODO: change default in HBASE-862, need a suggestion @@ -1462,16 +1481,18 @@ SortedMap> loadToServers = masterStatus.getServerManager().getLoadToServers(); // check if server most loaded - if (!loadToServers.get(loadToServers.lastKey()).contains(srvName)) + if (!loadToServers.get(loadToServers.lastKey()).contains(srvName)) { return 0; + } // this server is most loaded, we will try to unload it by lowest // loaded servers int avgLoadMinusSlop = (int)Math.floor(avgLoad * (1 - this.slop)) - 1; int lowestLoad = loadToServers.firstKey().getNumberOfRegions(); - if(lowestLoad >= avgLoadMinusSlop) + if(lowestLoad >= avgLoadMinusSlop) { return 0; // there is no low loaded servers + } int lowSrvCount = loadToServers.get(loadToServers.firstKey()).size(); int numRegionsToClose = 0; @@ -1496,7 +1517,9 @@ NavigableMap getRegionsInTransition() { NavigableMap result = new TreeMap(); synchronized (this.regionsInTransition) { - if (this.regionsInTransition.isEmpty()) return result; + if (this.regionsInTransition.isEmpty()) { + return result; + } for (Map.Entry e: this.regionsInTransition.entrySet()) { result.put(e.getKey(), e.getValue().toString()); } @@ -1511,7 +1534,9 @@ boolean clearFromInTransition(final byte [] regionname) { boolean result = false; synchronized (this.regionsInTransition) { - if (this.regionsInTransition.isEmpty()) return result; + if (this.regionsInTransition.isEmpty()) { + return result; + } for (Map.Entry e: this.regionsInTransition.entrySet()) { if (Bytes.equals(regionname, e.getValue().getRegionName())) { this.regionsInTransition.remove(e.getKey()); @@ -1695,7 +1720,7 @@ return Bytes.compareTo(getRegionName(), o.getRegionName()); } } - + /* * When we find rows in a meta region that has an empty HRegionInfo, we * clean them up here. @@ -1719,7 +1744,7 @@ } } } - + // TODO ryan rework this function /* * Get HRegionInfo from passed META map of row values. Index: src/main/java/org/apache/hadoop/hbase/executor/HBaseEventHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/executor/HBaseEventHandler.java (revision 964617) +++ src/main/java/org/apache/hadoop/hbase/executor/HBaseEventHandler.java (working copy) @@ -26,25 +26,24 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.executor.HBaseExecutorService.HBaseExecutorServiceType; -import org.apache.hadoop.hbase.master.ServerManager; /** - * Abstract base class for all HBase event handlers. Subclasses should - * implement the process() method where the actual handling of the event + * Abstract base class for all HBase event handlers. Subclasses should + * implement the process() method where the actual handling of the event * happens. - * - * HBaseEventType is a list of ALL events (which also corresponds to messages - - * either internal to one component or between components). The event type - * names specify the component from which the event originated, and the + * + * HBaseEventType is a list of ALL events (which also corresponds to messages - + * either internal to one component or between components). The event type + * names specify the component from which the event originated, and the * component which is supposed to handle it. - * - * Listeners can listen to all the events by implementing the interface - * HBaseEventHandlerListener, and by registering themselves as a listener. They + * + * Listeners can listen to all the events by implementing the interface + * HBaseEventHandlerListener, and by registering themselves as a listener. They * will be called back before and after the process of every event. - * - * TODO: Rename HBaseEvent and HBaseEventType to EventHandler and EventType - * after ZK refactor as it currently would clash with EventType from ZK and + * + * TODO: Rename HBaseEvent and HBaseEventType to EventHandler and EventType + * after ZK refactor as it currently would clash with EventType from ZK and * make the code very confusing. */ public abstract class HBaseEventHandler implements Runnable @@ -54,16 +53,16 @@ protected HBaseEventType eventType = HBaseEventType.NONE; // is this a region server or master? protected boolean isRegionServer; - // name of the server - this is needed for naming executors in case of tests + // name of the server - this is needed for naming executors in case of tests // where region servers may be co-located. protected String serverName; // listeners that are called before and after an event is processed - protected static List eventHandlerListeners = - Collections.synchronizedList(new ArrayList()); + protected static List eventHandlerListeners = + Collections.synchronizedList(new ArrayList()); /** - * This interface provides hooks to listen to various events received by the - * queue. A class implementing this can listen to the updates by calling + * This interface provides hooks to listen to various events received by the + * queue. A class implementing this can listen to the updates by calling * registerListener and stop receiving updates by calling unregisterListener */ public interface HBaseEventHandlerListener { @@ -83,29 +82,29 @@ */ public enum HBaseEventType { NONE (-1), - // Messages originating from RS (NOTE: there is NO direct communication from + // Messages originating from RS (NOTE: there is NO direct communication from // RS to Master). These are a result of RS updates into ZK. RS2ZK_REGION_CLOSING (1), // RS is in process of closing a region RS2ZK_REGION_CLOSED (2), // RS has finished closing a region RS2ZK_REGION_OPENING (3), // RS is in process of opening a region RS2ZK_REGION_OPENED (4), // RS has finished opening a region - - // Updates from master to ZK. This is done by the master and there is + + // Updates from master to ZK. This is done by the master and there is // nothing to process by either Master or RS M2ZK_REGION_OFFLINE (50); // Master adds this region as offline in ZK - + private final byte value; - + /** - * Called by the HMaster. Returns a name of the executor service given an - * event type. Every event type has en entry - if the event should not be + * Called by the HMaster. Returns a name of the executor service given an + * event type. Every event type has en entry - if the event should not be * handled just add the NONE executor. * @return name of the executor service */ public HBaseExecutorServiceType getMasterExecutorForEvent() { HBaseExecutorServiceType executorServiceType = null; switch(this) { - + case RS2ZK_REGION_CLOSING: case RS2ZK_REGION_CLOSED: executorServiceType = HBaseExecutorServiceType.MASTER_CLOSEREGION; @@ -115,31 +114,31 @@ case RS2ZK_REGION_OPENED: executorServiceType = HBaseExecutorServiceType.MASTER_OPENREGION; break; - + case M2ZK_REGION_OFFLINE: executorServiceType = HBaseExecutorServiceType.NONE; break; - + default: throw new RuntimeException("Unhandled event type in the master."); } - + return executorServiceType; } /** - * Called by the RegionServer. Returns a name of the executor service given an - * event type. Every event type has en entry - if the event should not be + * Called by the RegionServer. Returns a name of the executor service given an + * event type. Every event type has en entry - if the event should not be * handled just return a null executor name. * @return name of the event service */ public static String getRSExecutorForEvent(String serverName) { throw new RuntimeException("Unsupported operation."); } - + /** - * Start the executor service that handles the passed in event type. The - * server that starts these event executor services wants to handle these + * Start the executor service that handles the passed in event type. The + * server that starts these event executor services wants to handle these * event types. */ public void startMasterExecutorService(String serverName) { @@ -151,17 +150,29 @@ } public static void startRSExecutorService() { - + } HBaseEventType(int intValue) { this.value = (byte)intValue; } - + public byte getByteValue() { return value; } + @Override + public String toString() { + switch(this) { + case RS2ZK_REGION_CLOSED: return "CLOSED"; + case RS2ZK_REGION_CLOSING: return "CLOSING"; + case RS2ZK_REGION_OPENED: return "OPENED"; + case RS2ZK_REGION_OPENING: return "OPENING"; + case M2ZK_REGION_OFFLINE: return "OFFLINE"; + default: return this.name(); + } + } + public static HBaseEventType fromByte(byte value) { switch(value) { case -1: return HBaseEventType.NONE; @@ -176,12 +187,12 @@ } } } - + /** * Default base class constructor. - * - * TODO: isRegionServer and serverName will go away once we do the HMaster - * refactor. We will end up passing a ServerStatus which should tell us both + * + * TODO: isRegionServer and serverName will go away once we do the HMaster + * refactor. We will end up passing a ServerStatus which should tell us both * the name and if it is a RS or master. */ public HBaseEventHandler(boolean isRegionServer, String serverName, HBaseEventType eventType) { @@ -189,17 +200,17 @@ this.eventType = eventType; this.serverName = serverName; } - + /** - * This is a wrapper around process, used to update listeners before and after - * events are processed. + * This is a wrapper around process, used to update listeners before and after + * events are processed. */ public void run() { // fire all beforeProcess listeners for(HBaseEventHandlerListener listener : eventHandlerListeners) { listener.beforeProcess(this); } - + // call the main process function try { process(); @@ -209,32 +220,32 @@ // fire all afterProcess listeners for(HBaseEventHandlerListener listener : eventHandlerListeners) { - LOG.debug("Firing " + listener.getClass().getName() + + LOG.debug("Firing " + listener.getClass().getName() + ".afterProcess event listener for event " + eventType); listener.afterProcess(this); } } - + /** - * This method is the main processing loop to be implemented by the various + * This method is the main processing loop to be implemented by the various * subclasses. */ public abstract void process(); - + /** * Subscribe to updates before and after processing events */ public static void registerListener(HBaseEventHandlerListener listener) { eventHandlerListeners.add(listener); } - + /** * Stop receiving updates before and after processing events */ public static void unregisterListener(HBaseEventHandlerListener listener) { eventHandlerListeners.remove(listener); } - + public boolean isRegionServer() { return isRegionServer; } @@ -247,7 +258,7 @@ // TODO: check for isRegionServer here return eventType.getMasterExecutorForEvent(); } - + /** * Return the event type * @return @@ -267,9 +278,9 @@ } serviceType.getExecutor(serverName).submit(this); } - + /** - * Executes this event object in the caller's thread. This is a synchronous + * Executes this event object in the caller's thread. This is a synchronous * way of executing the event. */ public void execute() { Index: src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java (revision 964617) +++ src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java (working copy) @@ -88,6 +88,7 @@ LOG.debug("Executor service " + toString() + " already running on " + serverName); return; } + LOG.debug("Starting executor service [" + name + "]"); HBaseExecutorService.startExecutorService(name); } Index: src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java (revision 0) @@ -0,0 +1,245 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.executor; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.hbase.HMsg; +import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.io.Writable; + +/** + * Data serialized into ZooKeeper for region transitions. + */ +public class RegionTransitionData implements Writable { + /** + * Type of transition event (offline, opening, opened, closing, closed). + * Required. + */ + private HBaseEventType eventType; + + /** Region being transitioned. Required. */ + private String regionName; + + /** Server event originated from. Optional. */ + private String serverName; + + /** Time the event was created. Required but automatically set. */ + private long timeStamp; + + /** Temporary. Holds payload used doing transitions via heartbeats. */ + private HMsg hmsg; // to be removed shortly once we stop using heartbeats + + /** + * Writable constructor. Do not use directly. + */ + public RegionTransitionData() {} + + /** + * Construct data for a new region transition event with the specified event + * type and region name. + * + *

    Used when the server name is not known (the master is setting it). This + * happens during cluster startup or during failure scenarios. When + * processing a failed regionserver, the master assigns the regions from that + * server to other servers though the region was never 'closed'. During + * master failover, the new master may have regions stuck in transition + * without a destination so may have to set regions offline and generate a new + * assignment. + * + *

    Since only the master uses this constructor, the type should always be + * {@link HBaseEventType#M2ZK_REGION_OFFLINE}. + * + * @param eventType type of event + * @param regionName name of region + */ + public RegionTransitionData(HBaseEventType eventType, String regionName) { + this(eventType, regionName, null); + } + + /** + * Construct data for a new region transition event with the specified event + * type, region name, and server name. + * + *

    Used when the server name is known (a regionserver is setting it). + * + *

    Valid types for this constructor are {@link HBaseEventType#RS2ZK_REGION_CLOSING}, + * {@link HBaseEventType#RS2ZK_REGION_CLOSED}, {@link HBaseEventType#RS2ZK_REGION_OPENING}, + * and {@link HBaseEventType#RS2ZK_REGION_OPENED}. + * + * @param eventType type of event + * @param regionName name of region + * @param serverName name of server setting data + */ + public RegionTransitionData(HBaseEventType eventType, String regionName, + String serverName) { + this(eventType, regionName, serverName, null); + } + + /** + * Construct data for a fully-specified, old-format region transition event + * which uses HMsg/heartbeats. + * + * TODO: Remove this constructor once we stop using heartbeats. + * + * @param eventType + * @param regionName + * @param serverName + * @param hmsg + */ + public RegionTransitionData(HBaseEventType eventType, String regionName, + String serverName, HMsg hmsg) { + this.eventType = eventType; + this.timeStamp = System.currentTimeMillis(); + this.regionName = regionName; + this.serverName = serverName; + this.hmsg = hmsg; + } + + /** + * Gets the type of region transition event. + * + *

    One of: + *

      + *
    • {@link HBaseEventType#M2ZK_REGION_OFFLINE} + *
    • {@link HBaseEventType#RS2ZK_REGION_CLOSING} + *
    • {@link HBaseEventType#RS2ZK_REGION_CLOSED} + *
    • {@link HBaseEventType#RS2ZK_REGION_OPENING} + *
    • {@link HBaseEventType#RS2ZK_REGION_OPENED} + *
    + * @return type of region transition event + */ + public HBaseEventType getEventType() { + return eventType; + } + + /** + * Gets the encoded name of the region being transitioned. + * + *

    Region name is required so this never returns null. + * @return region name + */ + public String getRegionName() { + return regionName; + } + + /** + * Gets the server the event originated from. If null, this event originated + * from the master. + * + * @return server name of originating regionserver, or null if from master + */ + public String getServerName() { + return serverName; + } + + /** + * Gets the timestamp when this event was created. + * + * @return time event was created + */ + public long getTimeStamp() { + return timeStamp; + } + + /** + * Gets the {@link HMsg} payload of this region transition event. + * @return heartbeat payload + */ + public HMsg getHmsg() { + return hmsg; + } + + @Override + public void readFields(DataInput in) throws IOException { + // the event type byte + eventType = HBaseEventType.fromByte(in.readByte()); + // the timestamp + timeStamp = in.readLong(); + // the encoded name of the region being transitioned + regionName = in.readUTF(); + // remaining fields are optional so prefixed with boolean + // the name of the regionserver sending the data + if(in.readBoolean()) { + serverName = in.readUTF(); + } + // hmsg + if(in.readBoolean()) { + hmsg = new HMsg(); + hmsg.readFields(in); + } + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeByte(eventType.getByteValue()); + out.writeLong(System.currentTimeMillis()); + out.writeUTF(regionName); + // remaining fields are optional so prefixed with boolean + out.writeBoolean(serverName != null); + if(serverName != null) { + out.writeUTF(serverName); + } + out.writeBoolean(hmsg != null); + if(hmsg != null) { + hmsg.write(out); + } + } + + /** + * Get the bytes for this instance. Throws a {@link RuntimeException} if + * there is an error deserializing this instance because it represents a code + * bug. + * @return binary representation of this instance + */ + public byte [] getBytes() { + try { + return Writables.getBytes(this); + } catch(IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Get an instance from bytes. Throws a {@link RuntimeException} if + * there is an error serializing this instance from bytes because it + * represents a code bug. + * @param bytes binary representation of this instance + * @return instance of this class + */ + public static RegionTransitionData fromBytes(byte [] bytes) { + try { + RegionTransitionData data = new RegionTransitionData(); + Writables.getWritable(bytes, data); + return data; + } catch(IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public String toString() { + return "region=" + regionName + ",server=" + serverName + ",state=" + + eventType; + } +} Index: src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionEventData.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionEventData.java (revision 964617) +++ src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionEventData.java (working copy) @@ -1,92 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.executor; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.hbase.HMsg; -import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType; -import org.apache.hadoop.io.Writable; - -public class RegionTransitionEventData implements Writable { - private HBaseEventType hbEvent; - private String rsName; - private long timeStamp; - private HMsg hmsg; - - public RegionTransitionEventData() { - } - - public RegionTransitionEventData(HBaseEventType hbEvent, String rsName) { - this(hbEvent, rsName, null); - } - - public RegionTransitionEventData(HBaseEventType hbEvent, String rsName, HMsg hmsg) { - this.hbEvent = hbEvent; - this.rsName = rsName; - this.timeStamp = System.currentTimeMillis(); - this.hmsg = hmsg; - } - - public HBaseEventType getHbEvent() { - return hbEvent; - } - - public String getRsName() { - return rsName; - } - - public long getTimeStamp() { - return timeStamp; - } - - public HMsg getHmsg() { - return hmsg; - } - - @Override - public void readFields(DataInput in) throws IOException { - // the event type byte - hbEvent = HBaseEventType.fromByte(in.readByte()); - // the hostname of the RS sending the data - rsName = in.readUTF(); - // the timestamp - timeStamp = in.readLong(); - if(in.readBoolean()) { - // deserialized the HMsg from ZK - hmsg = new HMsg(); - hmsg.readFields(in); - } - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeByte(hbEvent.getByteValue()); - out.writeUTF(rsName); - out.writeLong(System.currentTimeMillis()); - out.writeBoolean((hmsg != null)); - if(hmsg != null) { - hmsg.write(out); - } - } - -} Index: src/main/java/org/apache/hadoop/hbase/Abortable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/Abortable.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/Abortable.java (revision 0) @@ -0,0 +1,35 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +/** + * Interface to support the aborting of a given server or client. + * + *

    This is used primarily for ZooKeeper usage when we could get an unexpected + * and fatal exception, requiring an abort. + * + *

    Implemented by the Master, RegionServer, and TableServers (client). + */ +public interface Abortable { + /** + * Abort the server or client. + */ + public void abort(); +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 964617) +++ src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -1587,7 +1587,7 @@ // The only method that really makes no sense here is get address @Override - public void abortServer() { + public void abort() { if(zooKeeper != null) { zooKeeper.close(); zooKeeper = null; Index: src/main/resources/hbase-webapps/master/master.jsp =================================================================== --- src/main/resources/hbase-webapps/master/master.jsp (revision 964617) +++ src/main/resources/hbase-webapps/master/master.jsp (working copy) @@ -72,7 +72,7 @@ <% if (showFragmentation) { %> Fragmentation<%= frags.get("-TOTAL-") != null ? frags.get("-TOTAL-").intValue() + "%" : "n/a" %>Overall fragmentation of all tables, including .META. and -ROOT-. <% } %> -Zookeeper Quorum<%= master.getZooKeeperWrapper().getQuorumServers() %>Addresses of all registered ZK servers. For more, see zk dump. +Zookeeper Quorum<%= master.getZooKeeperWatcher().getQuorum() %>Addresses of all registered ZK servers. For more, see zk dump.

    Catalog Tables