Index: src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 960252) +++ src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -52,15 +52,17 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.mapred.MiniMRCluster; -import org.apache.zookeeper.ZooKeeper; -import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.security.UnixUserGroupInformation; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.zookeeper.ZooKeeper; import com.google.common.base.Preconditions; @@ -693,9 +695,7 @@ */ public void expireMasterSession() throws Exception { HMaster master = hbaseCluster.getMaster(); - ZooKeeperWrapper zkw = - ZooKeeperWrapper.getInstance(conf, master.getHServerAddress().toString()); - expireSession(zkw); + expireSession(master.getZooKeeper(), master); } /** @@ -705,15 +705,15 @@ */ public void expireRegionServerSession(int index) throws Exception { HRegionServer rs = hbaseCluster.getRegionServer(index); - expireSession(rs.getZooKeeperWrapper()); + expireSession(rs.getZooKeeper(), rs); } - public void expireSession(ZooKeeperWrapper nodeZK) throws Exception{ - ZooKeeperWrapper zkw = - ZooKeeperWrapper.createInstance(conf, - ZooKeeperWrapper.class.getName()); + public void expireSession(ZooKeeperWatcher nodeZK, ServerStatus server) + throws Exception { + ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, + ZooKeeperWatcher.class.getName(), server); zkw.registerListener(EmptyWatcher.instance); - String quorumServers = zkw.getQuorumServers(); + String quorumServers = ZKConfig.getZKQuorumServersString(conf); int sessionTimeout = 5 * 1000; // 5 seconds byte[] password = nodeZK.getSessionPassword(); Index: src/test/java/org/apache/hadoop/hbase/zookeeper/TestHQuorumPeer.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/zookeeper/TestHQuorumPeer.java (revision 960252) +++ src/test/java/org/apache/hadoop/hbase/zookeeper/TestHQuorumPeer.java (working copy) @@ -64,7 +64,7 @@ /** */ public void testMakeZKProps() { - Properties properties = HQuorumPeer.makeZKProps(conf); + Properties properties = ZKConfig.makeZKProps(conf); assertEquals(dataDir.toString(), properties.get("dataDir")); assertEquals(Integer.valueOf(21810), Integer.valueOf(properties.getProperty("clientPort"))); assertEquals("localhost:2888:3888", properties.get("server.0")); @@ -72,7 +72,7 @@ String oldValue = conf.get(HConstants.ZOOKEEPER_QUORUM); conf.set(HConstants.ZOOKEEPER_QUORUM, "a.foo.bar,b.foo.bar,c.foo.bar"); - properties = HQuorumPeer.makeZKProps(conf); + properties = ZKConfig.makeZKProps(conf); assertEquals(dataDir.toString(), properties.get("dataDir")); assertEquals(Integer.valueOf(21810), Integer.valueOf(properties.getProperty("clientPort"))); assertEquals("a.foo.bar:2888:3888", properties.get("server.0")); @@ -91,7 +91,7 @@ System.setProperty("hbase.master.hostname", "localhost"); InputStream is = new ByteArrayInputStream(s.getBytes()); - Properties properties = HQuorumPeer.parseZooCfg(conf, is); + Properties properties = ZKConfig.parseZooCfg(conf, is); assertEquals(dataDir.toString(), properties.get("dataDir")); assertEquals(Integer.valueOf(2181), Integer.valueOf(properties.getProperty("clientPort"))); @@ -111,7 +111,7 @@ // Override with system property. System.setProperty("hbase.master.hostname", "foo.bar"); is = new ByteArrayInputStream(s.getBytes()); - properties = HQuorumPeer.parseZooCfg(conf, is); + properties = ZKConfig.parseZooCfg(conf, is); assertEquals("foo.bar:2888:3888", properties.get("server.0")); config.parseProperties(properties); @@ -127,7 +127,7 @@ public void testShouldAssignDefaultZookeeperClientPort() { Configuration config = HBaseConfiguration.create(); config.clear(); - Properties p = HQuorumPeer.makeZKProps(config); + Properties p = ZKConfig.makeZKProps(config); assertNotNull(p); assertEquals(2181, p.get("hbase.zookeeper.property.clientPort")); } Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java (revision 0) @@ -0,0 +1,98 @@ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.Semaphore; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.CreateMode; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestMasterAddressManager { + private static final Log LOG = LogFactory.getLog(TestMasterAddressManager.class); + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniZKCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniZKCluster(); + } + /** + * Unit tests that uses ZooKeeper but does not use the master-side methods + * but rather acts directly on ZK. + * @throws Exception + */ + @Test + public void testMasterAddressManagerFromZK() throws Exception { + + ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), + "testMasterAddressManagerFromZK", null); + zk.createZNodeIfNotExists(zk.baseZNode); + + // Should not have a master yet + MasterAddressManager addressManager = new MasterAddressManager(zk); + addressManager.monitorMaster(); + assertFalse(addressManager.hasMaster()); + zk.registerListener(addressManager); + + // Use a listener to capture when the node is actually created + NodeCreationListener listener = new NodeCreationListener(zk, zk.masterAddressZNode); + zk.registerListener(listener); + + // Create the master node with a dummy address + String host = "hostname"; + int port = 1234; + HServerAddress dummyAddress = new HServerAddress(host, port); + LOG.info("Creating master node"); + zk.createZNodeIfNotExists(zk.masterAddressZNode, + Bytes.toBytes(dummyAddress.toString()), CreateMode.EPHEMERAL, false); + + // Wait for the node to be created + LOG.info("Waiting for master address manager to be notified"); + listener.waitForCreation(); + LOG.info("Master node created"); + assertTrue(addressManager.hasMaster()); + HServerAddress pulledAddress = addressManager.getMasterAddress(); + assertTrue(pulledAddress.equals(dummyAddress)); + + } + + public static class NodeCreationListener extends ZooKeeperListener { + private static final Log LOG = LogFactory.getLog(NodeCreationListener.class); + + private Semaphore lock; + private String node; + + public NodeCreationListener(ZooKeeperWatcher watcher, String node) { + super(watcher); + lock = new Semaphore(0); + this.node = node; + } + + @Override + public void nodeCreated(String path) { + if(path.equals(node)) { + LOG.debug("nodeCreated(" + path + ")"); + lock.release(); + } + } + + public void waitForCreation() throws InterruptedException { + lock.acquire(); + } + } +} Index: src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java (revision 960252) +++ src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java (working copy) @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; @@ -91,8 +92,8 @@ throws IOException, InterruptedException { new HTable(conf, HConstants.META_TABLE_NAME); - ZooKeeperWrapper zkw = - ZooKeeperWrapper.createInstance(conf, TestZooKeeper.class.getName()); + ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, + TestZooKeeper.class.getName(), null); zkw.registerListener(EmptyWatcher.instance); String quorumServers = zkw.getQuorumServers(); int sessionTimeout = 5 * 1000; // 5 seconds @@ -110,15 +111,17 @@ System.err.println("ZooKeeper should have timed out"); connection.relocateRegion(HConstants.ROOT_TABLE_NAME, HConstants.EMPTY_BYTE_ARRAY); } - @Test - public void testRegionServerSessionExpired() throws Exception{ + +// @Test Disabled, we don't expect these to restart anymore + public void disabledTestRegionServerSessionExpired() throws Exception{ LOG.info("Starting testRegionServerSessionExpired"); new HTable(conf, HConstants.META_TABLE_NAME); TEST_UTIL.expireRegionServerSession(0); testSanity(); } - @Test - public void testMasterSessionExpired() throws Exception { + +//@Test Disabled, we don't expect these to restart anymore + public void disabledTestMasterSessionExpired() throws Exception { LOG.info("Starting testMasterSessionExpired"); new HTable(conf, HConstants.META_TABLE_NAME); TEST_UTIL.expireMasterSession(); @@ -179,8 +182,8 @@ */ @Test public void testZNodeDeletes() throws Exception { - ZooKeeperWrapper zkw = - ZooKeeperWrapper.createInstance(conf, TestZooKeeper.class.getName()); + ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, + TestZooKeeper.class.getName(), null); zkw.registerListener(EmptyWatcher.instance); zkw.ensureExists("/l1/l2/l3/l4"); try { Index: src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java (revision 960252) +++ src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java (working copy) @@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -41,7 +41,7 @@ private static final Log LOG = LogFactory.getLog(TestRestartCluster.class); private static Configuration conf; private static HBaseTestingUtility utility; - private static ZooKeeperWrapper zkWrapper; + private static ZooKeeperWatcher zooKeeper; private static final byte[] TABLENAME = Bytes.toBytes("master_transitions"); private static final byte [][] FAMILIES = new byte [][] {Bytes.toBytes("a")}; @@ -59,11 +59,11 @@ @Test (timeout=300000) public void testRestartClusterAfterKill()throws Exception { utility.startMiniZKCluster(); - zkWrapper = ZooKeeperWrapper.createInstance(conf, "cluster1"); + zooKeeper = new ZooKeeperWatcher(conf, "cluster1", null); // create the unassigned region, throw up a region opened state for META - String unassignedZNode = zkWrapper.getRegionInTransitionZNode(); - zkWrapper.createZNodeIfNotExists(unassignedZNode); + String unassignedZNode = zooKeeper.getRegionInTransitionZNode(); + zooKeeper.createZNodeIfNotExists(unassignedZNode); byte[] data = null; HBaseEventType hbEventType = HBaseEventType.RS2ZK_REGION_OPENED; try { @@ -71,8 +71,8 @@ } catch (IOException e) { LOG.error("Error creating event data for " + hbEventType, e); } - zkWrapper.createUnassignedRegion(HRegionInfo.ROOT_REGIONINFO.getEncodedName(), data); - zkWrapper.createUnassignedRegion(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName(), data); + zooKeeper.createUnassignedRegion(HRegionInfo.ROOT_REGIONINFO.getEncodedName(), data); + zooKeeper.createUnassignedRegion(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName(), data); LOG.debug("Created UNASSIGNED zNode for ROOT and META regions in state " + HBaseEventType.M2ZK_REGION_OFFLINE); // start the HB cluster Index: src/main/java/org/apache/hadoop/hbase/ServerStatus.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ServerStatus.java (revision 960252) +++ src/main/java/org/apache/hadoop/hbase/ServerStatus.java (working copy) @@ -19,9 +19,8 @@ */ package org.apache.hadoop.hbase; -import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; /** * Set of functions that are exposed by any HBase server (implemented by the @@ -37,4 +36,14 @@ * Get the configuration object for this server. */ public Configuration getConfiguration(); + + /** + * Get the ZooKeeper instance for this server. + */ + public ZooKeeperWatcher getZooKeeper(); + + /** + * Stub method into ServerStatus to move forward with ZK cleanup. + */ + public void abortServer(); } Index: src/main/java/org/apache/hadoop/hbase/MiniZooKeeperCluster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/MiniZooKeeperCluster.java (revision 960252) +++ src/main/java/org/apache/hadoop/hbase/MiniZooKeeperCluster.java (working copy) @@ -104,7 +104,7 @@ standaloneServerFactory = new NIOServerCnxn.Factory(new InetSocketAddress(clientPort)); } catch (BindException e) { - LOG.info("Faild binding ZK Server to client port: " + clientPort); + LOG.info("Failed binding ZK Server to client port: " + clientPort); //this port is already in use. try to use another clientPort++; continue; @@ -118,7 +118,7 @@ } started = true; - + LOG.info("Started MiniZK Server on client port: " + clientPort); return clientPort; } Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (revision 0) @@ -0,0 +1,313 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerStatus; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; + +/** + * Acts as the single ZooKeeper Watcher. One instance of this is instantiated + * for each Master, RegionServer, and client process. + * + * This is the only class that implements {@link Watcher}. Other internal + * classes which need to be notified of ZooKeeper events must register with + * the local instance of this watcher via {@link #registerListener}. + * + * This class also holds and manages the connection to ZooKeeper. Code to deal + * with connection related events and exceptions are handled here. + */ +public class ZooKeeperWatcher extends ZooKeeperWrapper implements Watcher { + private static final Log LOG = LogFactory.getLog(ZooKeeperWatcher.class); + + // name of this watcher (for logging only) + private String name; + + // zookeeper connection + private ZooKeeper zooKeeper; + + // server controller + private ServerStatus server; + + // listeners to be notified + private final Set listeners = + new CopyOnWriteArraySet(); + + // node names + + // base znode for this cluster + public String baseZNode; + // znode containing location of server hosting root region + public String rootServerZNode; + // znode containing ephemeral nodes of the regionservers + public String rsZNode; + // znode of currently active master + public String masterAddressZNode; + // znode containing the current cluster state + public String clusterStateZNode; + // znode used for region transitioning and assignment + public String assignmentZNode; + + /** + * Instantiate a ZooKeeper connection and watcher. + * @param name name of this watcher, for logging/debug purposes only + * @throws IOException + */ + public ZooKeeperWatcher(Configuration conf, String name, ServerStatus server) + throws IOException { + super(conf, name); + this.name = name; + this.zooKeeper = ZKUtil.connect(conf, this); + this.server = server; + info("Connected to ZooKeeper"); + setNodeNames(conf); + } + + /** + * Set the local variable node names using the specified configuration. + */ + private void setNodeNames(Configuration conf) { + baseZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, + HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); + rootServerZNode = ZKUtil.joinZNode(baseZNode, + conf.get("zookeeper.znode.rootserver", "root-region-server")); + rsZNode = ZKUtil.joinZNode(baseZNode, + conf.get("zookeeper.znode.rs", "rs")); + masterAddressZNode = ZKUtil.joinZNode(baseZNode, + conf.get("zookeeper.znode.master", "master")); + clusterStateZNode = ZKUtil.joinZNode(baseZNode, + conf.get("zookeeper.znode.state", "shutdown")); + assignmentZNode = ZKUtil.joinZNode(baseZNode, + conf.get("zookeeper.znode.regionInTransition", "unassigned")); + } + + /** + * Register the specified listener to receive ZooKeeper events. + * @param listener + */ + public void registerListener(ZooKeeperListener listener) { + listeners.add(listener); + } + + /** + * Get the connection to ZooKeeper. + * @return connection reference to zookeeper + */ + public ZooKeeper getZooKeeper() { + return zooKeeper; + } + + /** + * Method called from ZooKeeper for events and connection status. + * + * Valid events are passed along to listeners. Connection status changes + * are dealt with locally. + */ + @Override + public void process(WatchedEvent event) { + LOG.debug("<" + name + "> Received ZooKeeper Event, " + + "type: " + event.getType() + ", " + + "state:" + event.getState() + ", " + + "path: " + event.getPath()); + + // While we are still using both ZKWs, need to call parent process() + super.process(event); + + switch(event.getType()) { + + // If event type is NONE, this is a connection status change + case None: { + connectionEvent(event); + break; + } + + // Otherwise pass along to the listeners + + case NodeCreated: { + for(ZooKeeperListener listener : listeners) { + listener.nodeCreated(event.getPath()); + } + break; + } + + case NodeDeleted: { + for(ZooKeeperListener listener : listeners) { + listener.nodeDeleted(event.getPath()); + } + break; + } + + case NodeDataChanged: { + for(ZooKeeperListener listener : listeners) { + listener.nodeDataChanged(event.getPath()); + } + break; + } + + case NodeChildrenChanged: { + for(ZooKeeperListener listener : listeners) { + listener.nodeChildrenChanged(event.getPath()); + } + break; + } + } + } + + // Connection management + + /** + * Called when there is a connection-related event via the Watcher callback. + * + * If Disconnected or Expired, this should shutdown the cluster. + * + * @param event + */ + private void connectionEvent(WatchedEvent event) { + switch(event.getState()) { + // SyncConnected is normal, ignore + case SyncConnected: + break; + + // Abort the server if Disconnected or Expired + // TODO: Åny reason to handle these two differently? + case Disconnected: + case Expired: + error("Received Disconnected/Expired [" + event.getState() + "] " + + "from ZooKeeper, aborting server"); + if(server != null) { + server.abortServer(); + } + break; + } + } + + /** + * Handles KeeperExceptions in client calls. + * + * This may be temporary but for now this gives one place to deal with these. + * + * TODO: Currently this method aborts the server. + * + * @param ke + */ + public void keeperException(KeeperException ke) { + error("Received unexpected KeeperException, aborting server", ke); + server.abortServer(); + } + + /** + * Handles InterruptedExceptions in client calls. + * + * This may be temporary but for now this gives one place to deal with these. + * + * TODO: Currently, this method does nothing. + * Is this ever expected to happen? Do we abort or can we let it run? + * + * @param ie + */ + public void interruptedException(InterruptedException ie) { + debug("Received InterruptedException, doing nothing here", ie); + // no-op + } + + // Logging methods + + /** + * Exposed info logging method so our zookeeper output is named. + * @param string log line + */ + public void info(String string) { + LOG.info("<" + name + "> " + string); + } + + /** + * Exposed debug logging method so our zookeeper output is named. + * @param string log line + */ + public void debug(String string) { + LOG.debug("<" + name + "> " + string); + } + + /** + * Exposed debug logging method so our zookeeper output is named. + * @param string log line + */ + public void debug(String string, Throwable t) { + LOG.debug("<" + name + "> " + string, t); + } + + /** + * Exposed warn logging method so our zookeeper output is named. + * @param string log line + */ + public void warn(String string) { + LOG.warn("<" + name + "> " + string); + } + + /** + * Exposed warn logging method so our zookeeper output is named. + * @param string log line + * @param t exception + */ + public void warn(String string, Throwable t) { + LOG.warn("<" + name + "> " + string, t); + } + + /** + * Exposed error logging method so our zookeeper output is named. + * @param string log line + */ + public void error(String string) { + LOG.error("<" + name + "> " + string); + } + + /** + * Exposed error logging method so our zookeeper output is named. + * @param string log line + * @param t exception + */ + public void error(String string, Throwable t) { + LOG.error("<" + name + "> " + string, t); + } + + /** + * Close the connection to ZooKeeper. + * @throws InterruptedException + */ + public void close() { + try { + if(zooKeeper != null) { + zooKeeper.close(); + super.close(); + } + } catch (InterruptedException e) { + } + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (revision 0) @@ -0,0 +1,142 @@ +package org.apache.hadoop.hbase.zookeeper; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; + +/** + * Internal HBase utility class for ZooKeeper. + * + * Contains only static methods and constants. + */ +public class ZKUtil { + private static final Log LOG = LogFactory.getLog(ZKUtil.class); + + // TODO: Replace this with ZooKeeper constant when ZOOKEEPER-277 is resolved. + private static final char ZNODE_PATH_SEPARATOR = '/'; + + /** + * Creates a new connection to ZooKeeper, pulling settings and quorum config + * from the specified configuration object using methods from {@link ZKConfig}. + * + * Sets the connection status monitoring watcher to the specified watcher. + * + * @param conf configuration to pull quorum and other settings from + * @param watcher watcher to monitor connection changes + * @return connection to zookeeper + * @throws IOException if unable to connect to zk or config problem + */ + public static ZooKeeper connect(Configuration conf, Watcher watcher) + throws IOException { + Properties properties = ZKConfig.makeZKProps(conf); + String quorum = ZKConfig.getZKQuorumServersString(properties); + if(quorum == null) { + throw new IOException("Unable to determine ZooKeeper quorum"); + } + int timeout = conf.getInt("zookeeper.session.timeout", 60 * 1000); + LOG.debug("Opening connection to ZooKeeper with quorum (" + quorum + ")"); + return new ZooKeeper(quorum, timeout, watcher); + } + + /** + * Join the prefix znode name with the suffix znode name to generate a proper + * full znode name. + * + * Assumes prefix does not end with slash and suffix does not begin with it. + * + * @param prefix beginning of znode name + * @param suffix ending of znode name + * @return result of properly joining prefix with suffix + */ + public static String joinZNode(String prefix, String suffix) { + return prefix + ZNODE_PATH_SEPARATOR + suffix; + } + + /** + * Watch the specified znode for delete/create/change events. The watcher is + * set whether or not the node exists. If the node already exists, the method + * returns true. If the node does not exist, the method returns false. + * + * @param zkw zk reference + * @param znode path of node to watch + * @return true if znode exists, false if does not exist or error + */ + public static boolean watchAndCheckExists(ZooKeeperWatcher zkw, String znode) { + try { + Stat s = zkw.getZooKeeper().exists(znode, zkw); + zkw.debug("Set watcher on existing znode (" + znode + ")"); + return s != null ? true : false; + } catch (KeeperException e) { + zkw.warn("Unable to set watcher on znode (" + znode + ")", e); + zkw.keeperException(e); + return false; + } catch (InterruptedException e) { + zkw.warn("Unable to set watcher on znode (" + znode + ")", e); + zkw.interruptedException(e); + return false; + } + } + + /** + * Get the data at the specified znode and set a watch. + * + * Returns the data and sets a watch if the node exists. Returns null and no + * watch is set if the node does not exist or there is an exception. + * + * @param zkw zk reference + * @param znode path of node + * @return data of the specified znode, or null + */ + public static byte [] getDataAndWatch(ZooKeeperWatcher zkw, String znode) { + try { + byte [] data = zkw.getZooKeeper().getData(znode, zkw, null); + zkw.debug("Retrieved " + data.length + " bytes of data from znode (" + + znode + ") and set a watcher"); + return data; + } catch (KeeperException.NoNodeException e) { + zkw.debug("Unable to get data of znode (" + znode + ") " + + "because node does not exist (not an error)"); + return null; + } catch (KeeperException e) { + zkw.warn("Unable to get data of znode (" + znode + ")", e); + zkw.keeperException(e); + return null; + } catch (InterruptedException e) { + zkw.warn("Unable to get data of znode (" + znode + ")", e); + zkw.interruptedException(e); + return null; + } + } + + /** + * Get the data at the specified znode, deserialize it as an HServerAddress, + * and set a watch. + * + * Returns the data as a server address and sets a watch if the node exists. + * Returns null and no watch is set if the node does not exist or there is an + * exception. + * + * @param zkw zk reference + * @param znode path of node + * @return data of the specified node as a server address, or null + */ + public static HServerAddress getDataAsAddress(ZooKeeperWatcher zkw, + String znode) { + byte [] data = getDataAndWatch(zkw, znode); + if(data == null) { + return null; + } + String addrString = Bytes.toString(data); + zkw.debug("Read server address from znode (" + znode + "): " + addrString); + return new HServerAddress(addrString); + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java (revision 960252) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java (working copy) @@ -136,34 +136,34 @@ private List listeners = Collections.synchronizedList(new ArrayList()); - // return the singleton given the name of the instance - public static ZooKeeperWrapper getInstance(Configuration conf, String name) { - name = getZookeeperClusterKey(conf, name); - return INSTANCES.get(name); - } - // creates only one instance - public static ZooKeeperWrapper createInstance(Configuration conf, String name) { - if (getInstance(conf, name) != null) { - return getInstance(conf, name); - } - ZooKeeperWrapper.createLock.lock(); - try { - if (getInstance(conf, name) == null) { - try { - String fullname = getZookeeperClusterKey(conf, name); - ZooKeeperWrapper instance = new ZooKeeperWrapper(conf, fullname); - INSTANCES.put(fullname, instance); - } - catch (Exception e) { - LOG.error("<" + name + ">" + "Error creating a ZooKeeperWrapper " + e); - } - } - } - finally { - createLock.unlock(); - } - return getInstance(conf, name); - } +// // return the singleton given the name of the instance +// public static ZooKeeperWrapper getInstance(Configuration conf, String name) { +// name = getZookeeperClusterKey(conf, name); +// return INSTANCES.get(name); +// } +// // creates only one instance +// public static ZooKeeperWrapper createInstance(Configuration conf, String name) { +// if (getInstance(conf, name) != null) { +// return getInstance(conf, name); +// } +// ZooKeeperWrapper.createLock.lock(); +// try { +// if (getInstance(conf, name) == null) { +// try { +// String fullname = getZookeeperClusterKey(conf, name); +// ZooKeeperWrapper instance = new ZooKeeperWrapper(conf, fullname); +// INSTANCES.put(fullname, instance); +// } +// catch (Exception e) { +// LOG.error("<" + name + ">" + "Error creating a ZooKeeperWrapper " + e); +// } +// } +// } +// finally { +// createLock.unlock(); +// } +// return getInstance(conf, name); +// } /** * Create a ZooKeeperWrapper. The Zookeeper wrapper listens to all messages @@ -174,11 +174,11 @@ * @param conf HBaseConfiguration to read settings from. * @throws IOException If a connection error occurs. */ - private ZooKeeperWrapper(Configuration conf, String instanceName) + protected ZooKeeperWrapper(Configuration conf, String instanceName) throws IOException { this.instanceName = instanceName; - Properties properties = HQuorumPeer.makeZKProps(conf); - quorumServers = HQuorumPeer.getZKQuorumServersString(properties); + Properties properties = ZKConfig.makeZKProps(conf); + quorumServers = ZKConfig.getZKQuorumServersString(properties); if (quorumServers == null) { throw new IOException("Could not read quorum servers from " + HConstants.ZOOKEEPER_CONFIG_NAME); @@ -192,7 +192,7 @@ String rsZNodeName = conf.get("zookeeper.znode.rs", "rs"); String masterAddressZNodeName = conf.get("zookeeper.znode.master", "master"); String stateZNodeName = conf.get("zookeeper.znode.state", "shutdown"); - String regionsInTransitZNodeName = conf.get("zookeeper.znode.regionInTransition", "UNASSIGNED"); + String regionsInTransitZNodeName = conf.get("zookeeper.znode.regionInTransition", "unassigned"); rootRegionZNode = getZNode(parentZNode, rootServerZNodeName); rsZNode = getZNode(parentZNode, rsZNodeName); @@ -243,7 +243,6 @@ } /** @return String dump of everything in ZooKeeper. */ - @SuppressWarnings({"ConstantConditions"}) public String dump() { StringBuilder sb = new StringBuilder(); sb.append("\nHBase tree in ZooKeeper is rooted at ").append(parentZNode); Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java (revision 960252) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java (working copy) @@ -20,13 +20,12 @@ package org.apache.hadoop.hbase.zookeeper; +import java.util.Properties; +import java.util.Map.Entry; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import java.util.Map.Entry; -import java.util.Properties; - /** * Tool for reading ZooKeeper servers from HBase XML configuation and producing * a line-by-line list for use by bash scripts. @@ -41,7 +40,7 @@ // Note that we do not simply grab the property // HConstants.ZOOKEEPER_QUORUM from the HBaseConfiguration because the // user may be using a zoo.cfg file. - Properties zkProps = HQuorumPeer.makeZKProps(conf); + Properties zkProps = ZKConfig.makeZKProps(conf); for (Entry entry : zkProps.entrySet()) { String key = entry.getKey().toString().trim(); String value = entry.getValue().toString().trim(); Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperListener.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperListener.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperListener.java (revision 0) @@ -0,0 +1,60 @@ +package org.apache.hadoop.hbase.zookeeper; + + +/** + * Base class for internal listeners of ZooKeeper events. + * + * The {@link ZooKeeperWatcher} for a process will execute the appropriate + * methods of implementations of this class. In order to receive events from + * the watcher, every listener must register itself via {@link ZooKeeperWatcher#registerListener}. + * + * Subclasses need only override those methods in which they are interested. + * + * Note that the watcher will be blocked when invoking methods in listeners so + * they must not be long-running. + */ +public class ZooKeeperListener { + + // Reference to the zk watcher which also contains configuration and constants + protected ZooKeeperWatcher watcher; + + /** + * Construct a ZooKeeper event listener. + * TODO: This should take ServerStatus which will contain ZKWatcher ref? + */ + public ZooKeeperListener(ZooKeeperWatcher watcher) { + this.watcher = watcher; + } + + /** + * Called when a new node has been created. + * @param path full path of the new node + */ + public void nodeCreated(String path) { + // no-op + } + + /** + * Called when a node has been deleted + * @param path full path of the deleted node + */ + public void nodeDeleted(String path) { + // no-op + } + + /** + * Called when an existing node has changed data. + * @param path full path of the updated node + */ + public void nodeDataChanged(String path) { + // no-op + } + + /** + * Called when an existing node has a child node added or removed. + * @param path full path of the node whose children have changed + */ + public void nodeChildrenChanged(String path) { + // no-op + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java (revision 0) @@ -0,0 +1,233 @@ +package org.apache.hadoop.hbase.zookeeper; + +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.util.StringUtils; + +/** + * Utility methods for reading, parsing, and building zookeeper configuration. + */ +public class ZKConfig { + private static final Log LOG = LogFactory.getLog(ZKConfig.class); + + private static final String VARIABLE_START = "${"; + private static final int VARIABLE_START_LENGTH = VARIABLE_START.length(); + private static final String VARIABLE_END = "}"; + private static final int VARIABLE_END_LENGTH = VARIABLE_END.length(); + + private static final String ZK_CFG_PROPERTY = "hbase.zookeeper.property."; + private static final int ZK_CFG_PROPERTY_SIZE = ZK_CFG_PROPERTY.length(); + private static final String ZK_CLIENT_PORT_KEY = ZK_CFG_PROPERTY + + "clientPort"; + + /** + * Make a Properties object holding ZooKeeper config equivalent to zoo.cfg. + * If there is a zoo.cfg in the classpath, simply read it in. Otherwise parse + * the corresponding config options from the HBase XML configs and generate + * the appropriate ZooKeeper properties. + * @param conf Configuration to read from. + * @return Properties holding mappings representing ZooKeeper zoo.cfg file. + */ + public static Properties makeZKProps(Configuration conf) { + // First check if there is a zoo.cfg in the CLASSPATH. If so, simply read + // it and grab its configuration properties. + ClassLoader cl = HQuorumPeer.class.getClassLoader(); + final InputStream inputStream = + cl.getResourceAsStream(HConstants.ZOOKEEPER_CONFIG_NAME); + if (inputStream != null) { + try { + return parseZooCfg(conf, inputStream); + } catch (IOException e) { + LOG.warn("Cannot read " + HConstants.ZOOKEEPER_CONFIG_NAME + + ", loading from XML files", e); + } + } + + // Otherwise, use the configuration options from HBase's XML files. + Properties zkProperties = new Properties(); + + // Directly map all of the hbase.zookeeper.property.KEY properties. + for (Entry entry : conf) { + String key = entry.getKey(); + if (key.startsWith(ZK_CFG_PROPERTY)) { + String zkKey = key.substring(ZK_CFG_PROPERTY_SIZE); + String value = entry.getValue(); + // If the value has variables substitutions, need to do a get. + if (value.contains(VARIABLE_START)) { + value = conf.get(key); + } + zkProperties.put(zkKey, value); + } + } + + // If clientPort is not set, assign the default + if (zkProperties.getProperty(ZK_CLIENT_PORT_KEY) == null) { + zkProperties.put(ZK_CLIENT_PORT_KEY, + HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT); + } + + // Create the server.X properties. + int peerPort = conf.getInt("hbase.zookeeper.peerport", 2888); + int leaderPort = conf.getInt("hbase.zookeeper.leaderport", 3888); + + final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM, + "localhost"); + for (int i = 0; i < serverHosts.length; ++i) { + String serverHost = serverHosts[i]; + String address = serverHost + ":" + peerPort + ":" + leaderPort; + String key = "server." + i; + zkProperties.put(key, address); + } + + return zkProperties; + } + + /** + * Parse ZooKeeper's zoo.cfg, injecting HBase Configuration variables in. + * This method is used for testing so we can pass our own InputStream. + * @param conf HBaseConfiguration to use for injecting variables. + * @param inputStream InputStream to read from. + * @return Properties parsed from config stream with variables substituted. + * @throws IOException if anything goes wrong parsing config + */ + public static Properties parseZooCfg(Configuration conf, + InputStream inputStream) throws IOException { + Properties properties = new Properties(); + try { + properties.load(inputStream); + } catch (IOException e) { + final String msg = "fail to read properties from " + + HConstants.ZOOKEEPER_CONFIG_NAME; + LOG.fatal(msg); + throw new IOException(msg, e); + } + for (Entry entry : properties.entrySet()) { + String value = entry.getValue().toString().trim(); + String key = entry.getKey().toString().trim(); + StringBuilder newValue = new StringBuilder(); + int varStart = value.indexOf(VARIABLE_START); + int varEnd = 0; + while (varStart != -1) { + varEnd = value.indexOf(VARIABLE_END, varStart); + if (varEnd == -1) { + String msg = "variable at " + varStart + " has no end marker"; + LOG.fatal(msg); + throw new IOException(msg); + } + String variable = value.substring(varStart + VARIABLE_START_LENGTH, varEnd); + + String substituteValue = System.getProperty(variable); + if (substituteValue == null) { + substituteValue = conf.get(variable); + } + if (substituteValue == null) { + String msg = "variable " + variable + " not set in system property " + + "or hbase configs"; + LOG.fatal(msg); + throw new IOException(msg); + } + + newValue.append(substituteValue); + + varEnd += VARIABLE_END_LENGTH; + varStart = value.indexOf(VARIABLE_START, varEnd); + } + // Special case for 'hbase.cluster.distributed' property being 'true' + if (key.startsWith("server.")) { + if (conf.get(HConstants.CLUSTER_DISTRIBUTED).equals(HConstants.CLUSTER_IS_DISTRIBUTED) + && value.startsWith("localhost")) { + String msg = "The server in zoo.cfg cannot be set to localhost " + + "in a fully-distributed setup because it won't be reachable. " + + "See \"Getting Started\" for more information."; + LOG.fatal(msg); + throw new IOException(msg); + } + } + newValue.append(value.substring(varEnd)); + properties.setProperty(key, newValue.toString()); + } + return properties; + } + + /** + * Return the ZK Quorum servers string given zk properties returned by + * makeZKProps + * @param properties + * @return + */ + public static String getZKQuorumServersString(Properties properties) { + String clientPort = null; + List servers = new ArrayList(); + + // The clientPort option may come after the server.X hosts, so we need to + // grab everything and then create the final host:port comma separated list. + boolean anyValid = false; + for (Entry property : properties.entrySet()) { + String key = property.getKey().toString().trim(); + String value = property.getValue().toString().trim(); + if (key.equals("clientPort")) { + clientPort = value; + } + else if (key.startsWith("server.")) { + String host = value.substring(0, value.indexOf(':')); + servers.add(host); + try { + //noinspection ResultOfMethodCallIgnored + InetAddress.getByName(host); + anyValid = true; + } catch (UnknownHostException e) { + LOG.warn(StringUtils.stringifyException(e)); + } + } + } + + if (!anyValid) { + LOG.error("no valid quorum servers found in " + HConstants.ZOOKEEPER_CONFIG_NAME); + return null; + } + + if (clientPort == null) { + LOG.error("no clientPort found in " + HConstants.ZOOKEEPER_CONFIG_NAME); + return null; + } + + if (servers.isEmpty()) { + LOG.fatal("No server.X lines found in conf/zoo.cfg. HBase must have a " + + "ZooKeeper cluster configured for its operation."); + return null; + } + + StringBuilder hostPortBuilder = new StringBuilder(); + for (int i = 0; i < servers.size(); ++i) { + String host = servers.get(i); + if (i > 0) { + hostPortBuilder.append(','); + } + hostPortBuilder.append(host); + hostPortBuilder.append(':'); + hostPortBuilder.append(clientPort); + } + + return hostPortBuilder.toString(); + } + + /** + * Return the ZK Quorum servers string given the specified configuration. + * @param properties + * @return + */ + public static String getZKQuorumServersString(Configuration conf) { + return getZKQuorumServersString(makeZKProps(conf)); + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java (revision 960252) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java (working copy) @@ -19,21 +19,8 @@ */ package org.apache.hadoop.hbase.zookeeper; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.net.DNS; -import org.apache.hadoop.util.StringUtils; -import org.apache.zookeeper.server.ServerConfig; -import org.apache.zookeeper.server.ZooKeeperServerMain; -import org.apache.zookeeper.server.quorum.QuorumPeerConfig; -import org.apache.zookeeper.server.quorum.QuorumPeerMain; - import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.io.PrintWriter; import java.net.InetAddress; import java.net.NetworkInterface; @@ -41,9 +28,18 @@ import java.util.ArrayList; import java.util.Enumeration; import java.util.List; -import java.util.Map.Entry; import java.util.Properties; +import java.util.Map.Entry; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.net.DNS; +import org.apache.hadoop.util.StringUtils; +import org.apache.zookeeper.server.ServerConfig; +import org.apache.zookeeper.server.ZooKeeperServerMain; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; +import org.apache.zookeeper.server.quorum.QuorumPeerMain; + /** * HBase's version of ZooKeeper's QuorumPeer. When HBase is set to manage * ZooKeeper, this class is used to start up QuorumPeer instances. By doing @@ -52,18 +48,7 @@ * zoo.cfg and inject variables from HBase's site.xml configuration in. */ public class HQuorumPeer { - private static final Log LOG = LogFactory.getLog(HQuorumPeer.class); - - private static final String VARIABLE_START = "${"; - private static final int VARIABLE_START_LENGTH = VARIABLE_START.length(); - private static final String VARIABLE_END = "}"; - private static final int VARIABLE_END_LENGTH = VARIABLE_END.length(); - - private static final String ZK_CFG_PROPERTY = "hbase.zookeeper.property."; - private static final int ZK_CFG_PROPERTY_SIZE = ZK_CFG_PROPERTY.length(); - private static final String ZK_CLIENT_PORT_KEY = ZK_CFG_PROPERTY - + "clientPort"; - + /** * Parse ZooKeeper configuration from HBase XML config and run a QuorumPeer. * @param args String[] of command line arguments. Not used. @@ -71,7 +56,7 @@ public static void main(String[] args) { Configuration conf = HBaseConfiguration.create(); try { - Properties zkProperties = makeZKProps(conf); + Properties zkProperties = ZKConfig.makeZKProps(conf); writeMyID(zkProperties); QuorumPeerConfig zkConfig = new QuorumPeerConfig(); zkConfig.parseProperties(zkProperties); @@ -158,195 +143,4 @@ w.println(myId); w.close(); } - - /** - * Make a Properties object holding ZooKeeper config equivalent to zoo.cfg. - * If there is a zoo.cfg in the classpath, simply read it in. Otherwise parse - * the corresponding config options from the HBase XML configs and generate - * the appropriate ZooKeeper properties. - * @param conf Configuration to read from. - * @return Properties holding mappings representing ZooKeeper zoo.cfg file. - */ - public static Properties makeZKProps(Configuration conf) { - // First check if there is a zoo.cfg in the CLASSPATH. If so, simply read - // it and grab its configuration properties. - ClassLoader cl = HQuorumPeer.class.getClassLoader(); - final InputStream inputStream = - cl.getResourceAsStream(HConstants.ZOOKEEPER_CONFIG_NAME); - if (inputStream != null) { - try { - return parseZooCfg(conf, inputStream); - } catch (IOException e) { - LOG.warn("Cannot read " + HConstants.ZOOKEEPER_CONFIG_NAME + - ", loading from XML files", e); - } - } - - // Otherwise, use the configuration options from HBase's XML files. - Properties zkProperties = new Properties(); - - // Directly map all of the hbase.zookeeper.property.KEY properties. - for (Entry entry : conf) { - String key = entry.getKey(); - if (key.startsWith(ZK_CFG_PROPERTY)) { - String zkKey = key.substring(ZK_CFG_PROPERTY_SIZE); - String value = entry.getValue(); - // If the value has variables substitutions, need to do a get. - if (value.contains(VARIABLE_START)) { - value = conf.get(key); - } - zkProperties.put(zkKey, value); - } - } - - // If clientPort is not set, assign the default - if (zkProperties.getProperty(ZK_CLIENT_PORT_KEY) == null) { - zkProperties.put(ZK_CLIENT_PORT_KEY, - HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT); - } - - // Create the server.X properties. - int peerPort = conf.getInt("hbase.zookeeper.peerport", 2888); - int leaderPort = conf.getInt("hbase.zookeeper.leaderport", 3888); - - final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM, - "localhost"); - for (int i = 0; i < serverHosts.length; ++i) { - String serverHost = serverHosts[i]; - String address = serverHost + ":" + peerPort + ":" + leaderPort; - String key = "server." + i; - zkProperties.put(key, address); - } - - return zkProperties; - } - - /** - * Return the ZK Quorum servers string given zk properties returned by - * makeZKProps - * @param properties - * @return - */ - public static String getZKQuorumServersString(Properties properties) { - String clientPort = null; - List servers = new ArrayList(); - - // The clientPort option may come after the server.X hosts, so we need to - // grab everything and then create the final host:port comma separated list. - boolean anyValid = false; - for (Entry property : properties.entrySet()) { - String key = property.getKey().toString().trim(); - String value = property.getValue().toString().trim(); - if (key.equals("clientPort")) { - clientPort = value; - } - else if (key.startsWith("server.")) { - String host = value.substring(0, value.indexOf(':')); - servers.add(host); - try { - //noinspection ResultOfMethodCallIgnored - InetAddress.getByName(host); - anyValid = true; - } catch (UnknownHostException e) { - LOG.warn(StringUtils.stringifyException(e)); - } - } - } - - if (!anyValid) { - LOG.error("no valid quorum servers found in " + HConstants.ZOOKEEPER_CONFIG_NAME); - return null; - } - - if (clientPort == null) { - LOG.error("no clientPort found in " + HConstants.ZOOKEEPER_CONFIG_NAME); - return null; - } - - if (servers.isEmpty()) { - LOG.fatal("No server.X lines found in conf/zoo.cfg. HBase must have a " + - "ZooKeeper cluster configured for its operation."); - return null; - } - - StringBuilder hostPortBuilder = new StringBuilder(); - for (int i = 0; i < servers.size(); ++i) { - String host = servers.get(i); - if (i > 0) { - hostPortBuilder.append(','); - } - hostPortBuilder.append(host); - hostPortBuilder.append(':'); - hostPortBuilder.append(clientPort); - } - - return hostPortBuilder.toString(); - } - - /** - * Parse ZooKeeper's zoo.cfg, injecting HBase Configuration variables in. - * This method is used for testing so we can pass our own InputStream. - * @param conf HBaseConfiguration to use for injecting variables. - * @param inputStream InputStream to read from. - * @return Properties parsed from config stream with variables substituted. - * @throws IOException if anything goes wrong parsing config - */ - public static Properties parseZooCfg(Configuration conf, - InputStream inputStream) throws IOException { - Properties properties = new Properties(); - try { - properties.load(inputStream); - } catch (IOException e) { - final String msg = "fail to read properties from " - + HConstants.ZOOKEEPER_CONFIG_NAME; - LOG.fatal(msg); - throw new IOException(msg, e); - } - for (Entry entry : properties.entrySet()) { - String value = entry.getValue().toString().trim(); - String key = entry.getKey().toString().trim(); - StringBuilder newValue = new StringBuilder(); - int varStart = value.indexOf(VARIABLE_START); - int varEnd = 0; - while (varStart != -1) { - varEnd = value.indexOf(VARIABLE_END, varStart); - if (varEnd == -1) { - String msg = "variable at " + varStart + " has no end marker"; - LOG.fatal(msg); - throw new IOException(msg); - } - String variable = value.substring(varStart + VARIABLE_START_LENGTH, varEnd); - - String substituteValue = System.getProperty(variable); - if (substituteValue == null) { - substituteValue = conf.get(variable); - } - if (substituteValue == null) { - String msg = "variable " + variable + " not set in system property " - + "or hbase configs"; - LOG.fatal(msg); - throw new IOException(msg); - } - - newValue.append(substituteValue); - - varEnd += VARIABLE_END_LENGTH; - varStart = value.indexOf(VARIABLE_START, varEnd); - } - // Special case for 'hbase.cluster.distributed' property being 'true' - if (key.startsWith("server.")) { - if (conf.get(HConstants.CLUSTER_DISTRIBUTED).equals(HConstants.CLUSTER_IS_DISTRIBUTED) - && value.startsWith("localhost")) { - String msg = "The server in zoo.cfg cannot be set to localhost " + - "in a fully-distributed setup because it won't be reachable. " + - "See \"Getting Started\" for more information."; - LOG.fatal(msg); - throw new IOException(msg); - } - } - newValue.append(value.substring(varEnd)); - properties.setProperty(key, newValue.toString()); - } - return properties; - } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 960252) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.LocalHBaseCluster; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.ServerStatus; import org.apache.hadoop.hbase.UnknownRowLockException; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.YouAreDeadException; @@ -101,23 +102,20 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Sleeper; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.net.DNS; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.StringUtils; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.Watcher.Event.EventType; -import org.apache.zookeeper.Watcher.Event.KeeperState; /** * HRegionServer makes a set of HRegions available to clients. It checks in with * the HMaster. There are many HRegionServers in a single HBase deployment. */ public class HRegionServer implements HRegionInterface, - HBaseRPCErrorHandler, Runnable, Watcher, Stoppable { + HBaseRPCErrorHandler, Runnable, Stoppable, ServerStatus { public static final Log LOG = LogFactory.getLog(HRegionServer.class); private static final HMsg REPORT_EXITING = new HMsg(Type.MSG_REPORT_EXITING); private static final HMsg REPORT_QUIESCED = new HMsg(Type.MSG_REPORT_QUIESCED); @@ -219,7 +217,11 @@ final Map scanners = new ConcurrentHashMap(); - private ZooKeeperWrapper zooKeeperWrapper; + // zookeeper connection and watcher + private ZooKeeperWatcher zooKeeper; + + // master address manager and watecher + private MasterAddressManager masterAddressManager; // A sleeper that sleeps for msgInterval. private final Sleeper sleeper; @@ -284,7 +286,7 @@ conf.getLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD); - reinitialize(); + initialize(); } /** @@ -293,7 +295,7 @@ * Both call it. * @throws IOException */ - private void reinitialize() throws IOException { + private void initialize() throws IOException { this.abortRequested = false; this.stopRequested.set(false); @@ -312,22 +314,25 @@ throw new NullPointerException("Server address cannot be null; " + "hbase-958 debugging"); } - reinitializeThreads(); - reinitializeZooKeeper(); + initializeThreads(); + initializeZooKeeper(); int nbBlocks = conf.getInt("hbase.regionserver.nbreservationblocks", 4); for(int i = 0; i < nbBlocks; i++) { reservedSpace.add(new byte[HConstants.DEFAULT_SIZE_RESERVATION_BLOCK]); } } - private void reinitializeZooKeeper() throws IOException { - zooKeeperWrapper = - ZooKeeperWrapper.createInstance(conf, serverInfo.getServerName()); - zooKeeperWrapper.registerListener(this); - watchMasterAddress(); + private void initializeZooKeeper() throws IOException { + // open connection to zookeeper and set primary watcher + zooKeeper = new ZooKeeperWatcher(conf, serverInfo.getServerName(), this); + + // create the master address manager, register with zk, and start it + masterAddressManager = new MasterAddressManager(zooKeeper); + zooKeeper.registerListener(masterAddressManager); + masterAddressManager.monitorMaster(); } - private void reinitializeThreads() { + private void initializeThreads() { this.workerThread = new Thread(worker); // Cache flushing thread. @@ -353,74 +358,6 @@ } /** - * We register ourselves as a watcher on the master address ZNode. This is - * called by ZooKeeper when we get an event on that ZNode. When this method - * is called it means either our master has died, or a new one has come up. - * Either way we need to update our knowledge of the master. - * @param event WatchedEvent from ZooKeeper. - */ - public void process(WatchedEvent event) { - EventType type = event.getType(); - KeeperState state = event.getState(); - LOG.info("Got ZooKeeper event, state: " + state + ", type: " + - type + ", path: " + event.getPath()); - - // Ignore events if we're shutting down. - if (this.stopRequested.get()) { - LOG.debug("Ignoring ZooKeeper event while shutting down"); - return; - } - - if (state == KeeperState.Expired) { - LOG.error("ZooKeeper session expired"); - boolean restart = - this.conf.getBoolean("hbase.regionserver.restart.on.zk.expire", false); - if (restart) { - restart(); - } else { - abort("ZooKeeper session expired"); - } - } else if (type == EventType.NodeDeleted) { - watchMasterAddress(); - } else if (type == EventType.NodeCreated) { - getMaster(); - - // ZooKeeper watches are one time only, so we need to re-register our watch. - watchMasterAddress(); - } - } - - private void watchMasterAddress() { - while (!stopRequested.get() && !zooKeeperWrapper.watchMasterAddress(this)) { - LOG.warn("Unable to set watcher on ZooKeeper master address. Retrying."); - sleeper.sleep(); - } - } - - private void restart() { - abort("Restarting region server"); - Threads.shutdown(regionServerThread); - boolean done = false; - while (!done) { - try { - reinitialize(); - done = true; - } catch (IOException e) { - LOG.debug("Error trying to reinitialize ZooKeeper", e); - } - } - Thread t = new Thread(this); - String name = regionServerThread.getName(); - t.setName(name); - t.start(); - } - - /** @return ZooKeeperWrapper used by RegionServer. */ - public ZooKeeperWrapper getZooKeeperWrapper() { - return zooKeeperWrapper; - } - - /** * The HRegionServer sticks in this loop until closed. It repeatedly checks * in with the HMaster, sending heartbeats & reports, and receiving HRegion * load/unload instructions. @@ -446,7 +383,8 @@ for (int tries = 0; !stopRequested.get() && isHealthy();) { // Try to get the root region location from the master. if (!haveRootRegion.get()) { - HServerAddress rootServer = zooKeeperWrapper.readRootRegionLocation(); + HServerAddress rootServer = + ZKUtil.getDataAsAddress(zooKeeper, zooKeeper.rootServerZNode); if (rootServer != null) { // By setting the root region location, we bypass the wait imposed on // HTable for all regions being assigned. @@ -646,8 +584,9 @@ this.hbaseMaster = null; } + this.zooKeeper.close(); + if (!killed) { - this.zooKeeperWrapper.close(); join(); } LOG.info(Thread.currentThread().getName() + " exiting"); @@ -1174,21 +1113,23 @@ Threads.shutdown(this.hlogRoller); } + /** + * Get the current master from ZooKeeper and open the RPC connection to it. + * + * Method will block until a master is available. You can break from this + * block by requesting the server stop. + * + * @return + */ private boolean getMaster() { HServerAddress masterAddress = null; - while (masterAddress == null) { - if (stopRequested.get()) { + while((masterAddress = masterAddressManager.getMasterAddress()) == null) { + if(stopRequested.get()) { return false; } - try { - masterAddress = zooKeeperWrapper.readMasterAddressOrThrow(); - } catch (IOException e) { - LOG.warn("Unable to read master address from ZooKeeper. Retrying." + - " Error was:", e); - sleeper.sleep(); - } + LOG.debug("No master found, will retry"); + sleeper.sleep(); } - LOG.info("Telling master at " + masterAddress + " that we are up"); HMasterRegionInterface master = null; while (!stopRequested.get() && master == null) { @@ -1230,7 +1171,7 @@ if (LOG.isDebugEnabled()) LOG.debug("sending initial server load: " + hsl); lastMsg = System.currentTimeMillis(); - zooKeeperWrapper.writeRSLocation(this.serverInfo); + zooKeeper.writeRSLocation(this.serverInfo); result = this.hbaseMaster.regionServerStartup(this.serverInfo); break; } catch (IOException e) { @@ -1406,7 +1347,7 @@ Integer mapKey = Bytes.mapKey(regionInfo.getRegionName()); HRegion region = this.onlineRegions.get(mapKey); RSZookeeperUpdater zkUpdater = - new RSZookeeperUpdater(conf, serverInfo.getServerName(), + new RSZookeeperUpdater(zooKeeper, serverInfo.getServerName(), regionInfo.getEncodedName()); if (region == null) { try { @@ -1491,7 +1432,7 @@ throws IOException { RSZookeeperUpdater zkUpdater = null; if(reportWhenCompleted) { - zkUpdater = new RSZookeeperUpdater(conf, + zkUpdater = new RSZookeeperUpdater(zooKeeper, serverInfo.getServerName(), hri.getEncodedName()); zkUpdater.startRegionCloseEvent(null, false); } @@ -2380,6 +2321,23 @@ return threadWakeFrequency; } + // ServerStatus + + @Override + public void abortServer() { + this.abort("Received abortServer call"); + } + + @Override + public HServerAddress getHServerAddress() { + return this.address; + } + + @Override + public ZooKeeperWatcher getZooKeeper() { + return zooKeeper; + } + // // Main program and support routines // Index: src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java (revision 0) @@ -0,0 +1,185 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + +/** + * Manages the location of the current active Master for this RegionServer. + * + * Listens for ZooKeeper events related to the master address. The node /master + * will contain the address of the current master. This listener is interested + * in NodeDeleted and NodeCreated events on /master. + * + * This class is thread-safe and takes care of re-setting all watchers to + * ensure it always knows the up-to-date master. To kick it off, instantiate + * the class and run the {@link #monitorMaster()} method. + * + * You can get the current master via {@link #getMasterAddress()} or the + * blocking method {@link #waitMasterAddress()}. + */ +public class MasterAddressManager extends ZooKeeperListener { + private static final Log LOG = LogFactory.getLog(MasterAddressManager.class); + + // Address of the current primary master, null if no primary master + private HServerAddress masterAddress; + + /** + * Construct a master address listener with the specified zookeeper reference. + * + * This constructor does not trigger any actions, you must call methods + * explicitly. Normally you will just want to execute {@link #monitorMaster()} + * and you will ensure to + * + * @param watcher zk reference and watcher + */ + public MasterAddressManager(ZooKeeperWatcher watcher) { + super(watcher); + masterAddress = null; + } + + /** + * Get the address of the current master if one is available. Returns null + * if no current master. + * + * Use {@link #waitMasterAddress} if you want to block until the master is + * available. + * @return server address of current active master, or null if none available + */ + public synchronized HServerAddress getMasterAddress() { + return masterAddress; + } + + /** + * Check if there is a master available. + * @return true if there is a master set, false if not. + */ + public synchronized boolean hasMaster() { + return masterAddress != null; + } + + /** + * Get the address of the current master. If no master is available, method + * will block until one is available, the thread is interrupted, or timeout + * has passed. + * + * TODO: Make this work, currently unused, kept with existing retry semantics. + * + * @return server address of current active master, null if timed out + * @throws InterruptedException if the thread is interrupted while waiting + */ + public synchronized HServerAddress waitForMaster() + throws InterruptedException { + return masterAddress; + } + + /** + * Setup to watch for the primary master of the cluster. + * + * If the master is already available in ZooKeeper, this method will ensure + * it gets set and that any further changes are also watched for. + * + * If no master is available, this method ensures we become aware of it and + * will take care of setting it. + */ + public void monitorMaster() { + if(ZKUtil.watchAndCheckExists(watcher, watcher.masterAddressZNode)) { + handleNewMaster(); + } + } + + @Override + public void nodeCreated(String path) { + LOG.info("nodeCreated(" + path + ")"); + if(path.equals(watcher.masterAddressZNode)) { + handleNewMaster(); + } + monitorMaster(); + } + + @Override + public void nodeDeleted(String path) { + if(path.equals(watcher.masterAddressZNode)) { + handleDeadMaster(); + } + monitorMaster(); + } + + /** + * Set the master address to the specified address. This operation is + * idempotent, a master will only be set if there is currently no master set. + */ + private synchronized void setMasterAddress(HServerAddress address) { + if(masterAddress == null) { + LOG.info("Found and set master address: " + address); + masterAddress = address; + } + } + + /** + * Unsets the master address. Used when the master goes offline so none is + * available. + */ + private synchronized void unsetMasterAddress() { + if(masterAddress != null) { + LOG.info("Master has been unset. There is no current master available"); + masterAddress = null; + } + } + + /** + * Handle a new master being set. + * + * This method should be called to check if there is a new master. If there + * is already a master set, this method returns immediately. If none is set, + * this will attempt to grab the master location from ZooKeeper and will set + * it. + * + * This method uses an atomic operation to ensure a new master is only set + * once. + */ + private void handleNewMaster() { + if(hasMaster()) { + return; + } + HServerAddress address = + ZKUtil.getDataAsAddress(watcher, watcher.masterAddressZNode); + if(address != null) { + setMasterAddress(address); + } + } + + /** + * Handle a master failure. + * + * Triggered when a master node is deleted. + * + * TODO: Other ways we figure master is "dead"? What do we do if set in ZK + * but we can't communicate with TCP? + */ + private void handleDeadMaster() { + unsetMasterAddress(); + } +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java (revision 960252) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java (working copy) @@ -1,19 +1,14 @@ package org.apache.hadoop.hbase.regionserver; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HMsg; import org.apache.hadoop.hbase.executor.RegionTransitionEventData; -import org.apache.hadoop.hbase.executor.HBaseEventHandler; import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType; import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; @@ -33,22 +28,22 @@ private final String regionServerName; private String regionName = null; private String regionZNode = null; - private ZooKeeperWrapper zkWrapper = null; + private ZooKeeperWatcher zooKeeper = null; private int zkVersion = 0; HBaseEventType lastUpdatedState; - public RSZookeeperUpdater(Configuration conf, - String regionServerName, String regionName) { - this(conf, regionServerName, regionName, 0); + public RSZookeeperUpdater(ZooKeeperWatcher zooKeeper, String regionServerName, + String regionName) { + this(zooKeeper, regionServerName, regionName, 0); } - public RSZookeeperUpdater(Configuration conf, String regionServerName, - String regionName, int zkVersion) { - this.zkWrapper = ZooKeeperWrapper.getInstance(conf, regionServerName); + public RSZookeeperUpdater(ZooKeeperWatcher zooKeeper, String regionServerName, + String regionName, int zkVersion) { + this.zooKeeper = zooKeeper; this.regionServerName = regionServerName; this.regionName = regionName; // get the region ZNode we have to create - this.regionZNode = zkWrapper.getZNode(zkWrapper.getRegionInTransitionZNode(), regionName); + this.regionZNode = zooKeeper.getZNode(zooKeeper.assignmentZNode, regionName); this.zkVersion = zkVersion; } @@ -59,14 +54,14 @@ */ public void startRegionCloseEvent(HMsg hmsg, boolean updatePeriodically) throws IOException { // if this ZNode already exists, something is wrong - if(zkWrapper.exists(regionZNode, true)) { + if(zooKeeper.exists(regionZNode, true)) { String msg = "ZNode " + regionZNode + " already exists in ZooKeeper, will NOT close region."; LOG.error(msg); throw new IOException(msg); } // create the region node in the unassigned directory first - zkWrapper.createZNodeIfNotExists(regionZNode, null, CreateMode.PERSISTENT, true); + zooKeeper.createZNodeIfNotExists(regionZNode, null, CreateMode.PERSISTENT, true); // update the data for "regionName" ZNode in unassigned to CLOSING updateZKWithEventData(HBaseEventType.RS2ZK_REGION_CLOSING, hmsg); @@ -93,7 +88,7 @@ */ public void startRegionOpenEvent(HMsg hmsg, boolean updatePeriodically) throws IOException { Stat stat = new Stat(); - byte[] data = zkWrapper.readZNode(regionZNode, stat); + byte[] data = zooKeeper.readZNode(regionZNode, stat); // if there is no ZNode for this region, something is wrong if(data == null) { String msg = "ZNode " + regionZNode + " does not exist in ZooKeeper, will NOT open region."; @@ -158,7 +153,7 @@ " with [" + hbEventType + "]" + " expected version = " + zkVersion); lastUpdatedState = hbEventType; - zkWrapper.writeZNode(regionZNode, data, zkVersion, true); + zooKeeper.writeZNode(regionZNode, data, zkVersion, true); zkVersion++; } } Index: src/main/java/org/apache/hadoop/hbase/master/ServerManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (revision 960252) +++ src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (working copy) @@ -246,10 +246,7 @@ // We must set this watcher here because it can be set on a fresh start // or on a failover Watcher watcher = new ServerExpirer(new HServerInfo(info)); - ZooKeeperWrapper zkw = ZooKeeperWrapper.getInstance( - masterStatus.getConfiguration(), - masterStatus.getHServerAddress().toString()); - zkw.updateRSLocationGetWatch(info, watcher); + masterStatus.getZooKeeper().updateRSLocationGetWatch(info, watcher); this.serversToServerInfo.put(serverName, info); this.serversToLoad.put(serverName, load); synchronized (this.loadToServers) { Index: src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java (revision 960252) +++ src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java (working copy) @@ -19,6 +19,8 @@ */ package org.apache.hadoop.hbase.master; +import java.io.IOException; + import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HServerAddress; @@ -26,10 +28,8 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import java.io.IOException; - /** * ProcessRegionOpen is instantiated when a region server reports that it is * serving a region. This applies to all meta and user regions except the @@ -115,10 +115,8 @@ } else { masterStatus.getRegionManager().removeRegion(regionInfo); } - ZooKeeperWrapper zkWrapper = - ZooKeeperWrapper.getInstance(masterStatus.getConfiguration(), - masterStatus.getHServerAddress().toString()); - zkWrapper.deleteUnassignedRegion(regionInfo.getEncodedName()); + masterStatus.getZooKeeper().deleteUnassignedRegion( + regionInfo.getEncodedName()); return true; } } Index: src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java (revision 960252) +++ src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java (working copy) @@ -56,7 +56,7 @@ throws IOException { this.serverName = masterStatus.getHServerAddress().toString(); this.serverManager = masterStatus.getServerManager(); - zkWrapper = ZooKeeperWrapper.getInstance(conf, masterStatus.getHServerAddress().toString()); + zkWrapper = masterStatus.getZooKeeper(); String unassignedZNode = zkWrapper.getRegionInTransitionZNode(); // If the UNASSIGNED ZNode exists and this is a fresh cluster start, then Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 960252) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -88,6 +88,7 @@ import org.apache.hadoop.hbase.util.Sleeper; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; @@ -134,7 +135,7 @@ private final MasterMetrics metrics; // Our zk client. - private ZooKeeperWrapper zooKeeperWrapper; + private ZooKeeperWatcher zooKeeperWrapper; // Watcher for master address and for cluster shutdown. private final ZKMasterAddressWatcher zkMasterAddressWatcher; // A Sleeper that sleeps for threadWakeFrequency; sleep if nothing todo. @@ -182,7 +183,8 @@ // number of RS ephemeral nodes. RS ephemeral nodes are created only after // the primary master has written the address to ZK. So this has to be done // before we race to write our address to zookeeper. - zooKeeperWrapper = ZooKeeperWrapper.createInstance(conf, getHServerAddress().toString()); + zooKeeperWrapper = + new ZooKeeperWatcher(conf, getHServerAddress().toString(), this); isClusterStartup = (zooKeeperWrapper.scanRSDirectory().size() == 0); // Create the filesystem manager, which in turn does the following: @@ -919,8 +921,9 @@ zooKeeperWrapper.close(); try { + // TODO: this is broken, we should just shutdown now not restart zooKeeperWrapper = - ZooKeeperWrapper.createInstance(conf, HMaster.class.getName()); + new ZooKeeperWatcher(conf, HMaster.class.getName(), this); zooKeeperWrapper.registerListener(this); this.zkMasterAddressWatcher.setZookeeper(zooKeeperWrapper); if(!this.zkMasterAddressWatcher. @@ -1102,4 +1105,17 @@ public static void main(String [] args) { doMain(args, HMaster.class); } + + @Override + public void abortServer() { + this.startShutdown(); + } + + /** + * TODO: Implement once migrated to new ZK stuff + */ + @Override + public ZooKeeperWatcher getZooKeeper() { + return zooKeeperWrapper; + } } Index: src/main/java/org/apache/hadoop/hbase/master/RegionManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/RegionManager.java (revision 960252) +++ src/main/java/org/apache/hadoop/hbase/master/RegionManager.java (working copy) @@ -140,8 +140,7 @@ masterStatus.getConfiguration().getInt( HConstants.THREAD_WAKE_FREQUENCY, HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); - this.zkWrapper = - ZooKeeperWrapper.getInstance(conf, masterStatus.getHServerAddress().toString()); + this.zkWrapper = masterStatus.getZooKeeper(); this.maxAssignInOneGo = conf.getInt("hbase.regions.percheckin", 10); this.loadBalancer = new LoadBalancer(conf); @@ -640,11 +639,12 @@ } catch(Exception iex) { LOG.warn("meta scanner", iex); } - ZooKeeperWrapper zkw = ZooKeeperWrapper.getInstance( - masterStatus.getConfiguration(), - masterStatus.getHServerAddress().toString()); - zkw.clearRSDirectory(); - zkw.close(); + // TODO: Why did we getInstance again? We should have it local? +// ZooKeeperWrapper zkw = ZooKeeperWrapper.getInstance( +// masterStatus.getConfiguration(), +// masterStatus.getHServerAddress().toString()); + zkWrapper.clearRSDirectory(); + zkWrapper.close(); } /** @@ -1233,10 +1233,7 @@ private void writeRootRegionLocationToZooKeeper(HServerAddress address) { for (int attempt = 0; attempt < zooKeeperNumRetries; ++attempt) { - ZooKeeperWrapper zkw = ZooKeeperWrapper.getInstance( - masterStatus.getConfiguration(), - masterStatus.getHServerAddress().toString()); - if (zkw.writeRootRegionLocation(address)) { + if (zkWrapper.writeRootRegionLocation(address)) { return; } Index: src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 960252) +++ src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -19,6 +19,24 @@ */ package org.apache.hadoop.hbase.client; +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -42,30 +60,13 @@ import org.apache.hadoop.hbase.util.MetaUtils; import org.apache.hadoop.hbase.util.SoftValueSortedMap; import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; import org.apache.hadoop.ipc.RemoteException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; -import java.io.IOException; -import java.lang.reflect.UndeclaredThrowableException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.CopyOnWriteArraySet; - /** * A non-instantiable class that manages connections to multiple tables in * multiple HBase instances. @@ -79,7 +80,10 @@ Runtime.getRuntime().addShutdownHook(new Thread("HCM.shutdownHook") { @Override public void run() { - HConnectionManager.deleteAllConnections(true); + try { + HConnectionManager.deleteAllConnections(true); + } catch (IOException e) { + } } }); } @@ -147,8 +151,9 @@ /** * Delete information for all connections. * @param stopProxy stop the proxy as well + * @throws IOException */ - public static void deleteAllConnections(boolean stopProxy) { + public static void deleteAllConnections(boolean stopProxy) throws IOException { synchronized (HBASE_INSTANCES) { for (TableServers t : HBASE_INSTANCES.values()) { if (t != null) { @@ -228,8 +233,8 @@ */ public synchronized ZooKeeperWrapper getZooKeeperWrapper() throws IOException { if(zooKeeperWrapper == null) { - zooKeeperWrapper = - ZooKeeperWrapper.createInstance(conf, HConnectionManager.class.getName()); + zooKeeperWrapper = new ZooKeeperWatcher(conf, + HConnectionManager.class.getName(), null); zooKeeperWrapper.registerListener(this); } return zooKeeperWrapper; @@ -237,6 +242,7 @@ /** * Clear this connection to zookeeper. + * @throws IOException */ private synchronized void resetZooKeeper() { if (zooKeeperWrapper != null) { Index: src/main/resources/hbase-webapps/regionserver/regionserver.jsp =================================================================== --- src/main/resources/hbase-webapps/regionserver/regionserver.jsp (revision 960252) +++ src/main/resources/hbase-webapps/regionserver/regionserver.jsp (working copy) @@ -42,7 +42,7 @@ HBase Version<%= org.apache.hadoop.hbase.util.VersionInfo.getVersion() %>, r<%= org.apache.hadoop.hbase.util.VersionInfo.getRevision() %>HBase version and svn revision HBase Compiled<%= org.apache.hadoop.hbase.util.VersionInfo.getDate() %>, <%= org.apache.hadoop.hbase.util.VersionInfo.getUser() %>When HBase version was compiled and by whom Metrics<%= metrics.toString() %>RegionServer Metrics; file and heap sizes are in megabytes -Zookeeper Quorum<%= regionServer.getZooKeeperWrapper().getQuorumServers() %>Addresses of all registered ZK servers +Zookeeper Quorum<%= regionServer.getZooKeeper().getQuorumServers() %>Addresses of all registered ZK servers

Online Regions