Index: src/test/java/org/apache/hadoop/hbase/zookeeper/TestHQuorumPeer.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/zookeeper/TestHQuorumPeer.java (revision 960252) +++ src/test/java/org/apache/hadoop/hbase/zookeeper/TestHQuorumPeer.java (working copy) @@ -64,7 +64,7 @@ /** */ public void testMakeZKProps() { - Properties properties = HQuorumPeer.makeZKProps(conf); + Properties properties = ZKConfig.makeZKProps(conf); assertEquals(dataDir.toString(), properties.get("dataDir")); assertEquals(Integer.valueOf(21810), Integer.valueOf(properties.getProperty("clientPort"))); assertEquals("localhost:2888:3888", properties.get("server.0")); @@ -72,7 +72,7 @@ String oldValue = conf.get(HConstants.ZOOKEEPER_QUORUM); conf.set(HConstants.ZOOKEEPER_QUORUM, "a.foo.bar,b.foo.bar,c.foo.bar"); - properties = HQuorumPeer.makeZKProps(conf); + properties = ZKConfig.makeZKProps(conf); assertEquals(dataDir.toString(), properties.get("dataDir")); assertEquals(Integer.valueOf(21810), Integer.valueOf(properties.getProperty("clientPort"))); assertEquals("a.foo.bar:2888:3888", properties.get("server.0")); @@ -91,7 +91,7 @@ System.setProperty("hbase.master.hostname", "localhost"); InputStream is = new ByteArrayInputStream(s.getBytes()); - Properties properties = HQuorumPeer.parseZooCfg(conf, is); + Properties properties = ZKConfig.parseZooCfg(conf, is); assertEquals(dataDir.toString(), properties.get("dataDir")); assertEquals(Integer.valueOf(2181), Integer.valueOf(properties.getProperty("clientPort"))); @@ -111,7 +111,7 @@ // Override with system property. System.setProperty("hbase.master.hostname", "foo.bar"); is = new ByteArrayInputStream(s.getBytes()); - properties = HQuorumPeer.parseZooCfg(conf, is); + properties = ZKConfig.parseZooCfg(conf, is); assertEquals("foo.bar:2888:3888", properties.get("server.0")); config.parseProperties(properties); @@ -127,7 +127,7 @@ public void testShouldAssignDefaultZookeeperClientPort() { Configuration config = HBaseConfiguration.create(); config.clear(); - Properties p = HQuorumPeer.makeZKProps(config); + Properties p = ZKConfig.makeZKProps(config); assertNotNull(p); assertEquals(2181, p.get("hbase.zookeeper.property.clientPort")); } Index: src/main/java/org/apache/hadoop/hbase/ServerStatus.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ServerStatus.java (revision 960252) +++ src/main/java/org/apache/hadoop/hbase/ServerStatus.java (working copy) @@ -37,4 +37,9 @@ * Get the configuration object for this server. */ public Configuration getConfiguration(); + + /** + * Stub method into ServerStatus to move forward with ZK cleanup. + */ + public void abortServer(); } Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (revision 0) @@ -0,0 +1,299 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerStatus; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; + +/** + * Acts as the single ZooKeeper Watcher. One instance of this is instantiated + * for each Master, RegionServer, and client process. + * + * This is the only class that implements {@link Watcher}. Other internal + * classes which need to be notified of ZooKeeper events must register with + * the local instance of this watcher via {@link #registerListener}. + * + * This class also holds and manages the connection to ZooKeeper. Code to deal + * with connection related events and exceptions are handled here. + */ +public class ZooKeeperWatcher extends ZooKeeperWrapper implements Watcher { + private static final Log LOG = LogFactory.getLog(ZooKeeperWatcher.class); + + // name of this watcher (for logging only) + private String name; + + // zookeeper connection + private ZooKeeper zooKeeper; + + // server controller + private ServerStatus server; + + // listeners to be notified + private final Set listeners = + new CopyOnWriteArraySet(); + + // node names + + // base znode for this cluster + public String baseZNode; + // znode containing location of server hosting root region + public String rootServerZNode; + // znode containing ephemeral nodes of the regionservers + public String rsZNode; + // znode of currently active master + public String masterAddressZNode; + // znode containing the current cluster state + public String clusterStateZNode; + // znode used for region transitioning and assignment + public String assignmentZNode; + + /** + * Instantiate a ZooKeeper connection and watcher. + * @param name name of this watcher, for logging/debug purposes only + * @throws IOException + */ + public ZooKeeperWatcher(Configuration conf, String name, ServerStatus server) + throws IOException { + super(conf, name); + this.name = name; + this.zooKeeper = ZKUtil.connect(conf, this); + this.server = server; + info("Connected to ZooKeeper"); + setNodeNames(conf); + } + + /** + * Set the local variable node names using the specified configuration. + */ + private void setNodeNames(Configuration conf) { + baseZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, + HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); + rootServerZNode = ZKUtil.joinZNode(baseZNode, + conf.get("zookeeper.znode.rootserver", "root-region-server")); + rsZNode = ZKUtil.joinZNode(baseZNode, + conf.get("zookeeper.znode.rs", "rs")); + masterAddressZNode = ZKUtil.joinZNode(baseZNode, + conf.get("zookeeper.znode.master", "master")); + clusterStateZNode = ZKUtil.joinZNode(baseZNode, + conf.get("zookeeper.znode.state", "shutdown")); + assignmentZNode = ZKUtil.joinZNode(baseZNode, + conf.get("zookeeper.znode.regionInTransition", "unassigned")); + } + + /** + * Register the specified listener to receive ZooKeeper events. + * @param listener + */ + public void registerListener(ZooKeeperListener listener) { + listeners.add(listener); + } + + /** + * Get the connection to ZooKeeper. + * @return connection reference to zookeeper + */ + public ZooKeeper getZooKeeper() { + return zooKeeper; + } + + /** + * Method called from ZooKeeper for events and connection status. + * + * Valid events are passed along to listeners. Connection status changes + * are dealt with locally. + */ + @Override + public void process(WatchedEvent event) { + LOG.debug("<" + name + "> Received ZooKeeper Event, " + + "type: " + event.getType() + ", " + + "state:" + event.getState() + ", " + + "path: " + event.getPath()); + + switch(event.getType()) { + + // If event type is NONE, this is a connection status change + case None: { + connectionEvent(event); + break; + } + + // Otherwise pass along to the listeners + + case NodeCreated: { + for(ZooKeeperListener listener : listeners) { + listener.nodeCreated(event.getPath()); + } + break; + } + + case NodeDeleted: { + for(ZooKeeperListener listener : listeners) { + listener.nodeDeleted(event.getPath()); + } + break; + } + + case NodeDataChanged: { + for(ZooKeeperListener listener : listeners) { + listener.nodeDataChanged(event.getPath()); + } + break; + } + + case NodeChildrenChanged: { + for(ZooKeeperListener listener : listeners) { + listener.nodeChildrenChanged(event.getPath()); + } + break; + } + } + } + + // Connection management + + /** + * Called when there is a connection-related event via the Watcher callback. + * + * If Disconnected or Expired, this should shutdown the cluster. + * + * @param event + */ + private void connectionEvent(WatchedEvent event) { + switch(event.getState()) { + // Unknown and NoSyncConnected are deprecated, ignore + // We can probably drop this, from ZK code: "Unused, this state is never generated by the server" for both +// case Unknown: +// case NoSyncConnected: + // SyncConnected is normal, ignore + case SyncConnected: + break; + + // Throw a RuntimException if Disconnected or Expired + // TODO: What is the difference between the two states? + // TODO: Do we need to pass these to any listeners? + // TODO: Should probably pass in ServerStatus and flip close bit? What's best way to handle emergency shutdown? + case Disconnected: + case Expired: + throw new RuntimeException("Connection closed to ZooKeeper"); + } + } + + /** + * Handles KeeperExceptions in client calls. + * + * This may be temporary but for now this gives one place to deal with these. + * + * TODO: Currently this method aborts the server. + * + * @param ke + */ + public void keeperException(KeeperException ke) { + error("Aborting Server", ke); + server.abortServer(); + } + + /** + * Handles InterruptedExceptions in client calls. + * + * This may be temporary but for now this gives one place to deal with these. + * + * TODO: Currently, this method does nothing. + * Is this ever expected to happen? Do we abort or can we let it run? + * + * @param ie + */ + public void interruptedException(InterruptedException ie) { + // no-op + } + + // Logging methods + + /** + * Exposed info logging method so our zookeeper output is named. + * @param string log line + */ + public void info(String string) { + LOG.info("<" + name + "> " + string); + } + + /** + * Exposed debug logging method so our zookeeper output is named. + * @param string log line + */ + public void debug(String string) { + LOG.debug("<" + name + "> " + string); + } + + /** + * Exposed warn logging method so our zookeeper output is named. + * @param string log line + */ + public void warn(String string) { + LOG.warn("<" + name + "> " + string); + } + + /** + * Exposed warn logging method so our zookeeper output is named. + * @param string log line + * @param t exception + */ + public void warn(String string, Throwable t) { + LOG.warn("<" + name + "> " + string, t); + } + + /** + * Exposed error logging method so our zookeeper output is named. + * @param string log line + */ + public void error(String string) { + LOG.error("<" + name + "> " + string); + } + + /** + * Exposed error logging method so our zookeeper output is named. + * @param string log line + * @param t exception + */ + public void error(String string, Throwable t) { + LOG.error("<" + name + "> " + string, t); + } + + /** + * Close the connection to ZooKeeper. + * @throws InterruptedException + */ + public void close() { + try { + zooKeeper.close(); + } catch (InterruptedException e) { + } + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (revision 0) @@ -0,0 +1,142 @@ +package org.apache.hadoop.hbase.zookeeper; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; + +/** + * Internal HBase utility class for ZooKeeper. + * + * Contains only static methods and constants. + */ +public class ZKUtil { + private static final Log LOG = LogFactory.getLog(ZKUtil.class); + + // TODO: Replace this with ZooKeeper constant when ZOOKEEPER-277 is resolved. + private static final char ZNODE_PATH_SEPARATOR = '/'; + + /** + * Creates a new connection to ZooKeeper, pulling settings and quorum config + * from the specified configuration object using methods from {@link ZKConfig}. + * + * Sets the connection status monitoring watcher to the specified watcher. + * + * @param conf configuration to pull quorum and other settings from + * @param watcher watcher to monitor connection changes + * @return connection to zookeeper + * @throws IOException if unable to connect to zk or config problem + */ + public static ZooKeeper connect(Configuration conf, Watcher watcher) + throws IOException { + Properties properties = ZKConfig.makeZKProps(conf); + String quorum = ZKConfig.getZKQuorumServersString(properties); + if(quorum == null) { + throw new IOException("Unable to determine ZooKeeper quorum"); + } + int timeout = conf.getInt("zookeeper.session.timeout", 60 * 1000); + LOG.debug("Opening connection to ZooKeeper with quorum (" + quorum + ")"); + return new ZooKeeper(quorum, timeout, watcher); + } + + /** + * Join the prefix znode name with the suffix znode name to generate a proper + * full znode name. + * + * Assumes prefix does not end with slash and suffix does not begin with it. + * + * @param prefix beginning of znode name + * @param suffix ending of znode name + * @return result of properly joining prefix with suffix + */ + public static String joinZNode(String prefix, String suffix) { + return prefix + ZNODE_PATH_SEPARATOR + suffix; + } + + /** + * Watch the specified znode for delete/create/change events. The watcher is + * set whether or not the node exists. If the node already exists, the method + * returns true. If the node does not exist, the method returns false. + * + * @param zkw zk reference + * @param znode path of node to watch + * @return true if znode exists, false if does not exist or error + */ + public static boolean watchAndCheckExists(ZooKeeperWatcher zkw, String znode) { + try { + Stat s = zkw.getZooKeeper().exists(znode, zkw); + zkw.debug("Set watcher on existing znode (" + znode + ")"); + return s != null ? true : false; + } catch (KeeperException e) { + zkw.warn("Unable to set watcher on znode (" + znode + ")", e); + zkw.keeperException(e); + return false; + } catch (InterruptedException e) { + zkw.warn("Unable to set watcher on znode (" + znode + ")", e); + zkw.interruptedException(e); + return false; + } + } + + /** + * Get the data at the specified znode and set a watch. + * + * Returns the data and sets a watch if the node exists. Returns null and no + * watch is set if the node does not exist or there is an exception. + * + * @param zkw zk reference + * @param znode path of node + * @return data of the specified znode, or null + */ + public static byte [] getDataAndWatch(ZooKeeperWatcher zkw, String znode) { + try { + byte [] data = zkw.getZooKeeper().getData(znode, zkw, null); + zkw.debug("Retrieved " + data.length + " bytes of data from znode (" + + znode + ") and set a watcher"); + return data; + } catch (KeeperException.NoNodeException e) { + zkw.debug("Unable to get data of znode (" + znode + ") " + + "because node does not exist (not an error)"); + return null; + } catch (KeeperException e) { + zkw.warn("Unable to get data of znode (" + znode + ")", e); + zkw.keeperException(e); + return null; + } catch (InterruptedException e) { + zkw.warn("Unable to get data of znode (" + znode + ")", e); + zkw.interruptedException(e); + return null; + } + } + + /** + * Get the data at the specified znode, deserialize it as an HServerAddress, + * and set a watch. + * + * Returns the data as a server address and sets a watch if the node exists. + * Returns null and no watch is set if the node does not exist or there is an + * exception. + * + * @param zkw zk reference + * @param znode path of node + * @return data of the specified node as a server address, or null + */ + public static HServerAddress getDataAsAddress(ZooKeeperWatcher zkw, + String znode) { + byte [] data = getDataAndWatch(zkw, znode); + if(data == null) { + return null; + } + String addrString = Bytes.toString(data); + zkw.debug("Read server address from znode (" + znode + "): " + addrString); + return new HServerAddress(addrString); + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java (revision 960252) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java (working copy) @@ -174,11 +174,11 @@ * @param conf HBaseConfiguration to read settings from. * @throws IOException If a connection error occurs. */ - private ZooKeeperWrapper(Configuration conf, String instanceName) + protected ZooKeeperWrapper(Configuration conf, String instanceName) throws IOException { this.instanceName = instanceName; - Properties properties = HQuorumPeer.makeZKProps(conf); - quorumServers = HQuorumPeer.getZKQuorumServersString(properties); + Properties properties = ZKConfig.makeZKProps(conf); + quorumServers = ZKConfig.getZKQuorumServersString(properties); if (quorumServers == null) { throw new IOException("Could not read quorum servers from " + HConstants.ZOOKEEPER_CONFIG_NAME); @@ -243,7 +243,6 @@ } /** @return String dump of everything in ZooKeeper. */ - @SuppressWarnings({"ConstantConditions"}) public String dump() { StringBuilder sb = new StringBuilder(); sb.append("\nHBase tree in ZooKeeper is rooted at ").append(parentZNode); Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java (revision 960252) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java (working copy) @@ -20,13 +20,12 @@ package org.apache.hadoop.hbase.zookeeper; +import java.util.Properties; +import java.util.Map.Entry; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import java.util.Map.Entry; -import java.util.Properties; - /** * Tool for reading ZooKeeper servers from HBase XML configuation and producing * a line-by-line list for use by bash scripts. @@ -41,7 +40,7 @@ // Note that we do not simply grab the property // HConstants.ZOOKEEPER_QUORUM from the HBaseConfiguration because the // user may be using a zoo.cfg file. - Properties zkProps = HQuorumPeer.makeZKProps(conf); + Properties zkProps = ZKConfig.makeZKProps(conf); for (Entry entry : zkProps.entrySet()) { String key = entry.getKey().toString().trim(); String value = entry.getValue().toString().trim(); Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperListener.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperListener.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperListener.java (revision 0) @@ -0,0 +1,60 @@ +package org.apache.hadoop.hbase.zookeeper; + + +/** + * Base class for internal listeners of ZooKeeper events. + * + * The {@link ZooKeeperWatcher} for a process will execute the appropriate + * methods of implementations of this class. In order to receive events from + * the watcher, every listener must register itself via {@link ZooKeeperWatcher#registerListener}. + * + * Subclasses need only override those methods in which they are interested. + * + * Note that the watcher will be blocked when invoking methods in listeners so + * they must not be long-running. + */ +public class ZooKeeperListener { + + // Reference to the zk watcher which also contains configuration and constants + protected ZooKeeperWatcher watcher; + + /** + * Construct a ZooKeeper event listener. + * TODO: This should take ServerStatus which will contain ZKWatcher ref? + */ + public ZooKeeperListener(ZooKeeperWatcher watcher) { + this.watcher = watcher; + } + + /** + * Called when a new node has been created. + * @param path full path of the new node + */ + public void nodeCreated(String path) { + // no-op + } + + /** + * Called when a node has been deleted + * @param path full path of the deleted node + */ + public void nodeDeleted(String path) { + // no-op + } + + /** + * Called when an existing node has changed data. + * @param path full path of the updated node + */ + public void nodeDataChanged(String path) { + // no-op + } + + /** + * Called when an existing node has a child node added or removed. + * @param path full path of the node whose children have changed + */ + public void nodeChildrenChanged(String path) { + // no-op + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java (revision 0) @@ -0,0 +1,224 @@ +package org.apache.hadoop.hbase.zookeeper; + +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.util.StringUtils; + +/** + * Utility methods for reading, parsing, and building zookeeper configuration. + */ +public class ZKConfig { + private static final Log LOG = LogFactory.getLog(ZKConfig.class); + + private static final String VARIABLE_START = "${"; + private static final int VARIABLE_START_LENGTH = VARIABLE_START.length(); + private static final String VARIABLE_END = "}"; + private static final int VARIABLE_END_LENGTH = VARIABLE_END.length(); + + private static final String ZK_CFG_PROPERTY = "hbase.zookeeper.property."; + private static final int ZK_CFG_PROPERTY_SIZE = ZK_CFG_PROPERTY.length(); + private static final String ZK_CLIENT_PORT_KEY = ZK_CFG_PROPERTY + + "clientPort"; + + /** + * Make a Properties object holding ZooKeeper config equivalent to zoo.cfg. + * If there is a zoo.cfg in the classpath, simply read it in. Otherwise parse + * the corresponding config options from the HBase XML configs and generate + * the appropriate ZooKeeper properties. + * @param conf Configuration to read from. + * @return Properties holding mappings representing ZooKeeper zoo.cfg file. + */ + public static Properties makeZKProps(Configuration conf) { + // First check if there is a zoo.cfg in the CLASSPATH. If so, simply read + // it and grab its configuration properties. + ClassLoader cl = HQuorumPeer.class.getClassLoader(); + final InputStream inputStream = + cl.getResourceAsStream(HConstants.ZOOKEEPER_CONFIG_NAME); + if (inputStream != null) { + try { + return parseZooCfg(conf, inputStream); + } catch (IOException e) { + LOG.warn("Cannot read " + HConstants.ZOOKEEPER_CONFIG_NAME + + ", loading from XML files", e); + } + } + + // Otherwise, use the configuration options from HBase's XML files. + Properties zkProperties = new Properties(); + + // Directly map all of the hbase.zookeeper.property.KEY properties. + for (Entry entry : conf) { + String key = entry.getKey(); + if (key.startsWith(ZK_CFG_PROPERTY)) { + String zkKey = key.substring(ZK_CFG_PROPERTY_SIZE); + String value = entry.getValue(); + // If the value has variables substitutions, need to do a get. + if (value.contains(VARIABLE_START)) { + value = conf.get(key); + } + zkProperties.put(zkKey, value); + } + } + + // If clientPort is not set, assign the default + if (zkProperties.getProperty(ZK_CLIENT_PORT_KEY) == null) { + zkProperties.put(ZK_CLIENT_PORT_KEY, + HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT); + } + + // Create the server.X properties. + int peerPort = conf.getInt("hbase.zookeeper.peerport", 2888); + int leaderPort = conf.getInt("hbase.zookeeper.leaderport", 3888); + + final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM, + "localhost"); + for (int i = 0; i < serverHosts.length; ++i) { + String serverHost = serverHosts[i]; + String address = serverHost + ":" + peerPort + ":" + leaderPort; + String key = "server." + i; + zkProperties.put(key, address); + } + + return zkProperties; + } + + /** + * Parse ZooKeeper's zoo.cfg, injecting HBase Configuration variables in. + * This method is used for testing so we can pass our own InputStream. + * @param conf HBaseConfiguration to use for injecting variables. + * @param inputStream InputStream to read from. + * @return Properties parsed from config stream with variables substituted. + * @throws IOException if anything goes wrong parsing config + */ + public static Properties parseZooCfg(Configuration conf, + InputStream inputStream) throws IOException { + Properties properties = new Properties(); + try { + properties.load(inputStream); + } catch (IOException e) { + final String msg = "fail to read properties from " + + HConstants.ZOOKEEPER_CONFIG_NAME; + LOG.fatal(msg); + throw new IOException(msg, e); + } + for (Entry entry : properties.entrySet()) { + String value = entry.getValue().toString().trim(); + String key = entry.getKey().toString().trim(); + StringBuilder newValue = new StringBuilder(); + int varStart = value.indexOf(VARIABLE_START); + int varEnd = 0; + while (varStart != -1) { + varEnd = value.indexOf(VARIABLE_END, varStart); + if (varEnd == -1) { + String msg = "variable at " + varStart + " has no end marker"; + LOG.fatal(msg); + throw new IOException(msg); + } + String variable = value.substring(varStart + VARIABLE_START_LENGTH, varEnd); + + String substituteValue = System.getProperty(variable); + if (substituteValue == null) { + substituteValue = conf.get(variable); + } + if (substituteValue == null) { + String msg = "variable " + variable + " not set in system property " + + "or hbase configs"; + LOG.fatal(msg); + throw new IOException(msg); + } + + newValue.append(substituteValue); + + varEnd += VARIABLE_END_LENGTH; + varStart = value.indexOf(VARIABLE_START, varEnd); + } + // Special case for 'hbase.cluster.distributed' property being 'true' + if (key.startsWith("server.")) { + if (conf.get(HConstants.CLUSTER_DISTRIBUTED).equals(HConstants.CLUSTER_IS_DISTRIBUTED) + && value.startsWith("localhost")) { + String msg = "The server in zoo.cfg cannot be set to localhost " + + "in a fully-distributed setup because it won't be reachable. " + + "See \"Getting Started\" for more information."; + LOG.fatal(msg); + throw new IOException(msg); + } + } + newValue.append(value.substring(varEnd)); + properties.setProperty(key, newValue.toString()); + } + return properties; + } + + /** + * Return the ZK Quorum servers string given zk properties returned by + * makeZKProps + * @param properties + * @return + */ + public static String getZKQuorumServersString(Properties properties) { + String clientPort = null; + List servers = new ArrayList(); + + // The clientPort option may come after the server.X hosts, so we need to + // grab everything and then create the final host:port comma separated list. + boolean anyValid = false; + for (Entry property : properties.entrySet()) { + String key = property.getKey().toString().trim(); + String value = property.getValue().toString().trim(); + if (key.equals("clientPort")) { + clientPort = value; + } + else if (key.startsWith("server.")) { + String host = value.substring(0, value.indexOf(':')); + servers.add(host); + try { + //noinspection ResultOfMethodCallIgnored + InetAddress.getByName(host); + anyValid = true; + } catch (UnknownHostException e) { + LOG.warn(StringUtils.stringifyException(e)); + } + } + } + + if (!anyValid) { + LOG.error("no valid quorum servers found in " + HConstants.ZOOKEEPER_CONFIG_NAME); + return null; + } + + if (clientPort == null) { + LOG.error("no clientPort found in " + HConstants.ZOOKEEPER_CONFIG_NAME); + return null; + } + + if (servers.isEmpty()) { + LOG.fatal("No server.X lines found in conf/zoo.cfg. HBase must have a " + + "ZooKeeper cluster configured for its operation."); + return null; + } + + StringBuilder hostPortBuilder = new StringBuilder(); + for (int i = 0; i < servers.size(); ++i) { + String host = servers.get(i); + if (i > 0) { + hostPortBuilder.append(','); + } + hostPortBuilder.append(host); + hostPortBuilder.append(':'); + hostPortBuilder.append(clientPort); + } + + return hostPortBuilder.toString(); + } +} Index: src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java (revision 960252) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java (working copy) @@ -19,21 +19,8 @@ */ package org.apache.hadoop.hbase.zookeeper; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.net.DNS; -import org.apache.hadoop.util.StringUtils; -import org.apache.zookeeper.server.ServerConfig; -import org.apache.zookeeper.server.ZooKeeperServerMain; -import org.apache.zookeeper.server.quorum.QuorumPeerConfig; -import org.apache.zookeeper.server.quorum.QuorumPeerMain; - import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.io.PrintWriter; import java.net.InetAddress; import java.net.NetworkInterface; @@ -41,9 +28,18 @@ import java.util.ArrayList; import java.util.Enumeration; import java.util.List; -import java.util.Map.Entry; import java.util.Properties; +import java.util.Map.Entry; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.net.DNS; +import org.apache.hadoop.util.StringUtils; +import org.apache.zookeeper.server.ServerConfig; +import org.apache.zookeeper.server.ZooKeeperServerMain; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; +import org.apache.zookeeper.server.quorum.QuorumPeerMain; + /** * HBase's version of ZooKeeper's QuorumPeer. When HBase is set to manage * ZooKeeper, this class is used to start up QuorumPeer instances. By doing @@ -52,18 +48,7 @@ * zoo.cfg and inject variables from HBase's site.xml configuration in. */ public class HQuorumPeer { - private static final Log LOG = LogFactory.getLog(HQuorumPeer.class); - - private static final String VARIABLE_START = "${"; - private static final int VARIABLE_START_LENGTH = VARIABLE_START.length(); - private static final String VARIABLE_END = "}"; - private static final int VARIABLE_END_LENGTH = VARIABLE_END.length(); - - private static final String ZK_CFG_PROPERTY = "hbase.zookeeper.property."; - private static final int ZK_CFG_PROPERTY_SIZE = ZK_CFG_PROPERTY.length(); - private static final String ZK_CLIENT_PORT_KEY = ZK_CFG_PROPERTY - + "clientPort"; - + /** * Parse ZooKeeper configuration from HBase XML config and run a QuorumPeer. * @param args String[] of command line arguments. Not used. @@ -71,7 +56,7 @@ public static void main(String[] args) { Configuration conf = HBaseConfiguration.create(); try { - Properties zkProperties = makeZKProps(conf); + Properties zkProperties = ZKConfig.makeZKProps(conf); writeMyID(zkProperties); QuorumPeerConfig zkConfig = new QuorumPeerConfig(); zkConfig.parseProperties(zkProperties); @@ -158,195 +143,4 @@ w.println(myId); w.close(); } - - /** - * Make a Properties object holding ZooKeeper config equivalent to zoo.cfg. - * If there is a zoo.cfg in the classpath, simply read it in. Otherwise parse - * the corresponding config options from the HBase XML configs and generate - * the appropriate ZooKeeper properties. - * @param conf Configuration to read from. - * @return Properties holding mappings representing ZooKeeper zoo.cfg file. - */ - public static Properties makeZKProps(Configuration conf) { - // First check if there is a zoo.cfg in the CLASSPATH. If so, simply read - // it and grab its configuration properties. - ClassLoader cl = HQuorumPeer.class.getClassLoader(); - final InputStream inputStream = - cl.getResourceAsStream(HConstants.ZOOKEEPER_CONFIG_NAME); - if (inputStream != null) { - try { - return parseZooCfg(conf, inputStream); - } catch (IOException e) { - LOG.warn("Cannot read " + HConstants.ZOOKEEPER_CONFIG_NAME + - ", loading from XML files", e); - } - } - - // Otherwise, use the configuration options from HBase's XML files. - Properties zkProperties = new Properties(); - - // Directly map all of the hbase.zookeeper.property.KEY properties. - for (Entry entry : conf) { - String key = entry.getKey(); - if (key.startsWith(ZK_CFG_PROPERTY)) { - String zkKey = key.substring(ZK_CFG_PROPERTY_SIZE); - String value = entry.getValue(); - // If the value has variables substitutions, need to do a get. - if (value.contains(VARIABLE_START)) { - value = conf.get(key); - } - zkProperties.put(zkKey, value); - } - } - - // If clientPort is not set, assign the default - if (zkProperties.getProperty(ZK_CLIENT_PORT_KEY) == null) { - zkProperties.put(ZK_CLIENT_PORT_KEY, - HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT); - } - - // Create the server.X properties. - int peerPort = conf.getInt("hbase.zookeeper.peerport", 2888); - int leaderPort = conf.getInt("hbase.zookeeper.leaderport", 3888); - - final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM, - "localhost"); - for (int i = 0; i < serverHosts.length; ++i) { - String serverHost = serverHosts[i]; - String address = serverHost + ":" + peerPort + ":" + leaderPort; - String key = "server." + i; - zkProperties.put(key, address); - } - - return zkProperties; - } - - /** - * Return the ZK Quorum servers string given zk properties returned by - * makeZKProps - * @param properties - * @return - */ - public static String getZKQuorumServersString(Properties properties) { - String clientPort = null; - List servers = new ArrayList(); - - // The clientPort option may come after the server.X hosts, so we need to - // grab everything and then create the final host:port comma separated list. - boolean anyValid = false; - for (Entry property : properties.entrySet()) { - String key = property.getKey().toString().trim(); - String value = property.getValue().toString().trim(); - if (key.equals("clientPort")) { - clientPort = value; - } - else if (key.startsWith("server.")) { - String host = value.substring(0, value.indexOf(':')); - servers.add(host); - try { - //noinspection ResultOfMethodCallIgnored - InetAddress.getByName(host); - anyValid = true; - } catch (UnknownHostException e) { - LOG.warn(StringUtils.stringifyException(e)); - } - } - } - - if (!anyValid) { - LOG.error("no valid quorum servers found in " + HConstants.ZOOKEEPER_CONFIG_NAME); - return null; - } - - if (clientPort == null) { - LOG.error("no clientPort found in " + HConstants.ZOOKEEPER_CONFIG_NAME); - return null; - } - - if (servers.isEmpty()) { - LOG.fatal("No server.X lines found in conf/zoo.cfg. HBase must have a " + - "ZooKeeper cluster configured for its operation."); - return null; - } - - StringBuilder hostPortBuilder = new StringBuilder(); - for (int i = 0; i < servers.size(); ++i) { - String host = servers.get(i); - if (i > 0) { - hostPortBuilder.append(','); - } - hostPortBuilder.append(host); - hostPortBuilder.append(':'); - hostPortBuilder.append(clientPort); - } - - return hostPortBuilder.toString(); - } - - /** - * Parse ZooKeeper's zoo.cfg, injecting HBase Configuration variables in. - * This method is used for testing so we can pass our own InputStream. - * @param conf HBaseConfiguration to use for injecting variables. - * @param inputStream InputStream to read from. - * @return Properties parsed from config stream with variables substituted. - * @throws IOException if anything goes wrong parsing config - */ - public static Properties parseZooCfg(Configuration conf, - InputStream inputStream) throws IOException { - Properties properties = new Properties(); - try { - properties.load(inputStream); - } catch (IOException e) { - final String msg = "fail to read properties from " - + HConstants.ZOOKEEPER_CONFIG_NAME; - LOG.fatal(msg); - throw new IOException(msg, e); - } - for (Entry entry : properties.entrySet()) { - String value = entry.getValue().toString().trim(); - String key = entry.getKey().toString().trim(); - StringBuilder newValue = new StringBuilder(); - int varStart = value.indexOf(VARIABLE_START); - int varEnd = 0; - while (varStart != -1) { - varEnd = value.indexOf(VARIABLE_END, varStart); - if (varEnd == -1) { - String msg = "variable at " + varStart + " has no end marker"; - LOG.fatal(msg); - throw new IOException(msg); - } - String variable = value.substring(varStart + VARIABLE_START_LENGTH, varEnd); - - String substituteValue = System.getProperty(variable); - if (substituteValue == null) { - substituteValue = conf.get(variable); - } - if (substituteValue == null) { - String msg = "variable " + variable + " not set in system property " - + "or hbase configs"; - LOG.fatal(msg); - throw new IOException(msg); - } - - newValue.append(substituteValue); - - varEnd += VARIABLE_END_LENGTH; - varStart = value.indexOf(VARIABLE_START, varEnd); - } - // Special case for 'hbase.cluster.distributed' property being 'true' - if (key.startsWith("server.")) { - if (conf.get(HConstants.CLUSTER_DISTRIBUTED).equals(HConstants.CLUSTER_IS_DISTRIBUTED) - && value.startsWith("localhost")) { - String msg = "The server in zoo.cfg cannot be set to localhost " + - "in a fully-distributed setup because it won't be reachable. " + - "See \"Getting Started\" for more information."; - LOG.fatal(msg); - throw new IOException(msg); - } - } - newValue.append(value.substring(varEnd)); - properties.setProperty(key, newValue.toString()); - } - return properties; - } } Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 960252) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -1102,4 +1102,9 @@ public static void main(String [] args) { doMain(args, HMaster.class); } + + @Override + public void abortServer() { + this.startShutdown(); + } }