Index: src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 1088475) +++ src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hdfs.DFSClient; @@ -1001,7 +1002,7 @@ Configuration c = new Configuration(this.conf); String quorumServers = ZKConfig.getZKQuorumServersString(c); int sessionTimeout = 5 * 1000; // 5 seconds - ZooKeeper zk = nodeZK.getZooKeeper(); + RecoverableZooKeeper zk = nodeZK.getRecoverableZooKeeper(); byte[] password = zk.getSessionPasswd(); long sessionID = zk.getSessionId(); Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java (revision 1088475) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java (working copy) @@ -135,7 +135,8 @@ hri.getEncodedName()); Stat stat = null; for (int i = 0; i < 10; i++) { - stat = t.getConnection().getZooKeeperWatcher().getZooKeeper().exists(path, false); + stat = t.getConnection().getZooKeeperWatcher(). + getRecoverableZooKeeper().exists(path, false); LOG.info("Stat for znode path=" + path + ": " + stat); if (stat == null) break; org.apache.hadoop.hbase.util.Threads.sleep(100); @@ -195,7 +196,8 @@ String path = ZKAssign.getNodeName(t.getConnection().getZooKeeperWatcher(), hri.getEncodedName()); Stat stats = - t.getConnection().getZooKeeperWatcher().getZooKeeper().exists(path, false); + t.getConnection().getZooKeeperWatcher(). + getRecoverableZooKeeper().exists(path, false); LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats=" + stats); RegionTransitionData rtd = ZKAssign.getData(t.getConnection().getZooKeeperWatcher(), @@ -218,7 +220,8 @@ assertTrue(daughters.contains(r)); } // Finally assert that the ephemeral SPLIT znode was cleaned up. - stats = t.getConnection().getZooKeeperWatcher().getZooKeeper().exists(path, false); + stats = t.getConnection().getZooKeeperWatcher(). + getRecoverableZooKeeper().exists(path, false); LOG.info("EPHEMERAL NODE AFTER SERVER ABORT, path=" + path + ", stats=" + stats); assertTrue(stats == null); } finally { Index: src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java (revision 1088475) +++ src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java (working copy) @@ -99,8 +99,8 @@ int sessionTimeout = 5 * 1000; // 5 seconds HConnection connection = HConnectionManager.getConnection(c); ZooKeeperWatcher connectionZK = connection.getZooKeeperWatcher(); - long sessionID = connectionZK.getZooKeeper().getSessionId(); - byte[] password = connectionZK.getZooKeeper().getSessionPasswd(); + long sessionID = connectionZK.getRecoverableZooKeeper().getSessionId(); + byte[] password = connectionZK.getRecoverableZooKeeper().getSessionPasswd(); ZooKeeper zk = new ZooKeeper(quorumServers, sessionTimeout, EmptyWatcher.instance, sessionID, password); zk.close(); @@ -112,14 +112,14 @@ // Check that the old ZK conenction is closed, means we did expire System.err.println("ZooKeeper should have timed out"); - LOG.info("state=" + connectionZK.getZooKeeper().getState()); - Assert.assertTrue(connectionZK.getZooKeeper().getState().equals( + LOG.info("state=" + connectionZK.getRecoverableZooKeeper().getState()); + Assert.assertTrue(connectionZK.getRecoverableZooKeeper().getState().equals( States.CLOSED)); // Check that the client recovered ZooKeeperWatcher newConnectionZK = connection.getZooKeeperWatcher(); - LOG.info("state=" + newConnectionZK.getZooKeeper().getState()); - Assert.assertTrue(newConnectionZK.getZooKeeper().getState().equals( + LOG.info("state=" + newConnectionZK.getRecoverableZooKeeper().getState()); + Assert.assertTrue(newConnectionZK.getRecoverableZooKeeper().getState().equals( States.CONNECTED)); } Index: src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java (revision 0) @@ -0,0 +1,676 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounterFactory; +import org.apache.log4j.Logger; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.AsyncCallback.StatCallback; +import org.apache.zookeeper.AsyncCallback.StringCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.ZooKeeper.States; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.List; + + +/** + * http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling + * To handle recoverable errors, developers need to realize that there are two + * classes of requests: idempotent and non-idempotent requests. Read requests + * and unconditional sets and deletes are examples of idempotent requests, they + * can be reissued with the same results. + * (Although, the delete may throw a NoNodeException on reissue its effect on + * the ZooKeeper state is the same.) Non-idempotent requests need special + * handling, application and library writers need to keep in mind that they may + * need to encode information in the data or name of znodes to detect when a + * retries. A simple example is a create that uses a sequence flag. + * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection + * loss exception, that process will reissue another + * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a + * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be + * that x-109 was the result of the previous create, so the process actually + * owns both x-109 and x-111. An easy way around this is to use "x-process id-" + * when doing the create. If the process is using an id of 352, before reissuing + * the create it will do a getChildren("/") and see "x-222-1", "x-542-30", + * "x-352-109", x-333-110". The process will know that the original create + * succeeded an the znode it created is "x-352-109". + * + */ + +public class RecoverableZooKeeper { + private static final Logger LOG = Logger.getLogger(RecoverableZooKeeper.class); + // the actual ZooKeeper client instance + private ZooKeeper zk; + private final RetryCounterFactory retryCounterFactory; + // An identifier of this process in the cluster + private final String identifier; + private final byte[] id; + private int retryIntervalMillis; + + private static final int ID_OFFSET = Bytes.SIZEOF_INT; + // the magic number is to be backward compatible + private static final byte MAGIC =(byte) 0XFF; + private static final int MAGIC_OFFSET = Bytes.SIZEOF_BYTE; + + public RecoverableZooKeeper(String quorumServers,int seesionTimeout, + Watcher watcher, int maxRetries, int retryIntervalMillis) + throws IOException { + this.zk = new ZooKeeper(quorumServers, seesionTimeout, watcher); + this.retryCounterFactory = + new RetryCounterFactory(maxRetries, retryIntervalMillis); + this.retryIntervalMillis = retryIntervalMillis; + + // the identifier = processID@hostName + this.identifier = ManagementFactory.getRuntimeMXBean().getName(); + LOG.info("The identifier of this process is "+identifier); + this.id = Bytes.toBytes(identifier); + } + + /** + * delete is an idempotent operation. Retry before throw out exception. + * This function will not throw out NoNodeException if the path is not existed + * @param path + * @param version + * @throws InterruptedException + * @throws KeeperException + */ + public void delete(String path, int version) + throws InterruptedException, KeeperException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + zk.delete(path, version); + return; + } catch (KeeperException e) { + switch (e.code()) { + case NONODE: + return; // Delete was successful + + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper delete failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + } + } + + /** + * exists is an idempotent operation. Retry before throw out exception + * @param path + * @param watcher + * @return + * @throws KeeperException + * @throws InterruptedException + */ + public Stat exists(String path, Watcher watcher) + throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + return zk.exists(path, watcher); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper exists failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + } + } + + /** + * Do not retry asynchronized operation + * @param path + * @param watcher + * @param cb + * @param ctx + */ + public void exists(String path, Watcher watcher, + StatCallback cb, Object ctx) { + zk.exists(path, watcher, cb, ctx); + } + + /** + * Do not retry asynchronized operation + * @param path + * @param watch + * @param cb + * @param ctx + */ + public void exists(String path, boolean watch, StatCallback cb, Object ctx) { + zk.exists(path, watch, cb, ctx); + } + + /** + * exists is an idempotent operation. Retry before throw out exception + * @param path + * @param watch + * @return + * @throws KeeperException + * @throws InterruptedException + */ + public Stat exists(String path, boolean watch) + throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + return zk.exists(path, watch); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper exists failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + } + } + + /** + * getChildren is an idempotent operation. Retry before throw out exception + * @param path + * @param watcher + * @return + * @throws KeeperException + * @throws InterruptedException + */ + public List getChildren(String path, Watcher watcher) + throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + return zk.getChildren(path, watcher); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper getChildren failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + } + } + + /** + * getChildren is an idempotent operation. Retry before throw out exception + * @param path + * @param watch + * @return + * @throws KeeperException + * @throws InterruptedException + */ + public List getChildren(String path, boolean watch) + throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + return zk.getChildren(path, watch); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper getChildren failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + } + } + + /** + * getData is an idempotent operation. Retry before throw out exception + * @param path + * @param watcher + * @param stat + * @return + * @throws KeeperException + * @throws InterruptedException + */ + public byte[] getData(String path, Watcher watcher, Stat stat) + throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + byte[] revData = zk.getData(path, watcher, stat); + return this.removeMetaData(revData); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper getData failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + } + } + + /** + * getData is an idemnpotent operation. Retry before throw out exception + * @param path + * @param watch + * @param stat + * @return + * @throws KeeperException + * @throws InterruptedException + */ + public byte[] getData(String path, boolean watch, Stat stat) + throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + byte[] revData = zk.getData(path, watch, stat); + return this.removeMetaData(revData); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper getData failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + } + } + + /** + * setData is NOT an idempotent operation. Retry may cause BadVersion Exception + * Adding an identifier field into the data to check whether + * badversion is caused by the result of previous correctly setData + * @param path + * @param data + * @param version + * @return + * @throws KeeperException + * @throws InterruptedException + */ + public Stat setData(String path, byte[] data, int version) + throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + byte[] newData = appendMetaData(data); + while (true) { + try { + return zk.setData(path, newData, version); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper setData failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + case BADVERSION: + // try to verify whether the previous setData success or not + try{ + Stat stat = new Stat(); + byte[] revData = zk.getData(path, false, stat); + int idLength = Bytes.toInt(revData, ID_OFFSET); + int dataLength = revData.length-ID_OFFSET-idLength; + int dataOffset = ID_OFFSET+idLength; + + if(Bytes.compareTo(revData, ID_OFFSET, id.length, + revData, dataOffset, dataLength) == 0) { + // the bad version is caused by previous successful setData + return stat; + } + } catch(KeeperException keeperException){ + // the ZK is not reliable at this moment. just throw out exception + throw keeperException; + } + + // throw out other exceptions and verified bad version exceptions + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + } + } + + /** + * Do NOT retry the asynchronized create operation + * @param path + * @param data + * @param acl + * @param createMode + * @param cb + * @param ctx + * @throws KeeperException + */ + public void create(String path, byte[] data, List acl, + CreateMode createMode,StringCallback cb,Object ctx) + throws KeeperException { + zk.create(path, data, acl, createMode, cb, ctx); + } + /** + *

+ * NONSEQUENTIAL create is idempotent operation. + * Retry before throw out exceptions. + * But this function will not throw out the NodeExist exception back to the + * application. + *

+ *

+ * But SEQUENTIAL is NOT idempotent operation. It is necessary to add + * identifier to the path to verify, whether the previous one is successful + * or not. + *

+ * + * @param path + * @param data + * @param acl + * @param createMode + * @return + * @throws KeeperException + * @throws InterruptedException + */ + public String create(String path, byte[] data, List acl, + CreateMode createMode) throws KeeperException, InterruptedException { + byte[] newData = appendMetaData(data); + switch (createMode) { + case EPHEMERAL: + case PERSISTENT: + return createNonSequential(path, newData, acl, createMode); + + case EPHEMERAL_SEQUENTIAL: + case PERSISTENT_SEQUENTIAL: + return createSequential(path, newData, acl, createMode); + + default: + throw new IllegalArgumentException("Unrecognized CreateMode: " + + createMode); + } + } + + private String createNonSequential(String path, byte[] data, List acl, + CreateMode createMode) throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + return zk.create(path, data, acl, createMode); + } catch (KeeperException e) { + switch (e.code()) { + case NODEEXISTS: + // Non-sequential node was successfully created + return path; + + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper create failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + } + } + + private String createSequential(String path, byte[] data, + List acl, CreateMode createMode) + throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + boolean first = true; + String newPath = path+this.identifier; + while (true) { + try { + if (!first) { + // Check if we succeeded on a previous attempt + String previousResult = findPreviousSequentialNode(newPath); + if (previousResult != null) { + return previousResult; + } + } + first = false; + return zk.create(newPath, data, acl, createMode); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper create failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + } + } + + private String findPreviousSequentialNode(String path) + throws KeeperException, InterruptedException { + int lastSlashIdx = path.lastIndexOf('/'); + assert(lastSlashIdx != -1); + String parent = path.substring(0, lastSlashIdx); + String nodePrefix = path.substring(lastSlashIdx+1); + + List nodes = zk.getChildren(parent, false); + List matching = filterByPrefix(nodes, nodePrefix); + for (String node : matching) { + String nodePath = parent + "/" + node; + Stat stat = zk.exists(nodePath, false); + if (stat != null) { + return nodePath; + } + } + return null; + } + + public byte[] removeMetaData(byte[] data) { + if(data == null || data.length == 0) { + return data; + } + // check the magic data; to be backward compatible + byte magic = data[0]; + if(magic != MAGIC) { + return data; + } + + int idLength = Bytes.toInt(data, MAGIC_OFFSET); + int dataLength = data.length-MAGIC_OFFSET-ID_OFFSET-idLength; + int dataOffset = MAGIC_OFFSET+ID_OFFSET+idLength; + + byte[] newData = new byte[dataLength]; + System.arraycopy(data, dataOffset, newData, 0, dataLength); + + return newData; + + } + + private byte[] appendMetaData(byte[] data) { + if(data == null){ + return null; + } + + byte[] newData = new byte[MAGIC_OFFSET+ID_OFFSET+id.length+data.length]; + int pos = 0; + pos = Bytes.putByte(newData, pos, MAGIC); + pos = Bytes.putInt(newData, pos, id.length); + pos = Bytes.putBytes(newData, pos, id, 0, id.length); + pos = Bytes.putBytes(newData, pos, data, 0, data.length); + + return newData; + } + + public long getSessionId() { + return zk.getSessionId(); + } + + public void close() throws InterruptedException { + zk.close(); + } + + public States getState() { + return zk.getState(); + } + + public ZooKeeper getZooKeeper() { + return zk; + } + + public byte[] getSessionPasswd() { + return zk.getSessionPasswd(); + } + + public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) { + this.zk.sync(path, null, null); + } + + + /** + * Filters the given node list by the given prefixes. + * This method is all-inclusive--if any element in the node list starts + * with any of the given prefixes, then it is included in the result. + * + * @param nodes the nodes to filter + * @param prefixes the prefixes to include in the result + * @return list of every element that starts with one of the prefixes + */ + private static List filterByPrefix(List nodes, + String... prefixes) { + List lockChildren = new ArrayList(); + for (String child : nodes){ + for (String prefix : prefixes){ + if (child.startsWith(prefix)){ + lockChildren.add(child); + break; + } + } + } + return lockChildren; + } + +} Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (revision 1088475) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (working copy) @@ -59,7 +59,7 @@ private String quorum; // zookeeper connection - private ZooKeeper zooKeeper; + private RecoverableZooKeeper recoverableZooKeeper; // abortable in case of zk failure private Abortable abortable; @@ -116,7 +116,11 @@ this.identifier = descriptor; this.abortable = abortable; setNodeNames(conf); - this.zooKeeper = ZKUtil.connect(conf, quorum, this, descriptor); + int retryNum = conf.getInt("zookeeper.connection.retry.num",3); + int retryFreq = conf.getInt("zookeeper.connection.retry.freq",1000); + + this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this,descriptor, + retryNum,retryFreq); try { // Create all the necessary "directories" of znodes // TODO: Move this to an init method somewhere so not everyone calls it? @@ -214,8 +218,8 @@ * Get the connection to ZooKeeper. * @return connection reference to zookeeper */ - public ZooKeeper getZooKeeper() { - return zooKeeper; + public RecoverableZooKeeper getRecoverableZooKeeper() { + return recoverableZooKeeper; } /** @@ -300,16 +304,16 @@ this.conf.getLong("hbase.zookeeper.watcher.sync.connected.wait", 2000); while (System.currentTimeMillis() < finished) { Threads.sleep(1); - if (this.zooKeeper != null) break; + if (this.recoverableZooKeeper != null) break; } - if (this.zooKeeper == null) { + if (this.recoverableZooKeeper == null) { LOG.error("ZK is null on connection event -- see stack trace " + "for the stack trace when constructor was called on this zkw", this.constructorCaller); throw new NullPointerException("ZK is null"); } this.identifier = this.identifier + "-0x" + - Long.toHexString(this.zooKeeper.getSessionId()); + Long.toHexString(this.recoverableZooKeeper.getSessionId()); // Update our identifier. Otherwise ignore. LOG.debug(this.identifier + " connected"); break; @@ -344,7 +348,7 @@ * is up-to-date from when we begin the operation. */ public void sync(String path) { - this.zooKeeper.sync(path, null, null); + this.recoverableZooKeeper.sync(path, null, null); } /** @@ -395,8 +399,8 @@ */ public void close() { try { - if (zooKeeper != null) { - zooKeeper.close(); + if (recoverableZooKeeper != null) { + recoverableZooKeeper.close(); // super.close(); } } catch (InterruptedException e) { Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (revision 1088475) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (working copy) @@ -68,33 +68,25 @@ * Sets the connection status monitoring watcher to the specified watcher. * * @param conf configuration to pull ensemble and other settings from + * @param ensemble * @param watcher watcher to monitor connection changes + * @param descriptor + * @param retryNum + * @param retryFreq * @return connection to zookeeper * @throws IOException if unable to connect to zk or config problem */ - public static ZooKeeper connect(Configuration conf, Watcher watcher) + public static RecoverableZooKeeper connect(Configuration conf, String ensemble, + Watcher watcher, final String descriptor,int retryNum, int retryFreq) throws IOException { - Properties properties = ZKConfig.makeZKProps(conf); - String ensemble = ZKConfig.getZKQuorumServersString(properties); - return connect(conf, ensemble, watcher); - } - - public static ZooKeeper connect(Configuration conf, String ensemble, - Watcher watcher) - throws IOException { - return connect(conf, ensemble, watcher, ""); - } - - public static ZooKeeper connect(Configuration conf, String ensemble, - Watcher watcher, final String descriptor) - throws IOException { if(ensemble == null) { throw new IOException("Unable to determine ZooKeeper ensemble"); } int timeout = conf.getInt("zookeeper.session.timeout", 180 * 1000); LOG.debug(descriptor + " opening connection to ZooKeeper with ensemble (" + ensemble + ")"); - return new ZooKeeper(ensemble, timeout, watcher); + return new RecoverableZooKeeper(ensemble, timeout, watcher, + retryNum, retryFreq); } // @@ -229,7 +221,7 @@ public static boolean watchAndCheckExists(ZooKeeperWatcher zkw, String znode) throws KeeperException { try { - Stat s = zkw.getZooKeeper().exists(znode, zkw); + Stat s = zkw.getRecoverableZooKeeper().exists(znode, zkw); LOG.debug(zkw.prefix("Set watcher on existing znode " + znode)); return s != null ? true : false; } catch (KeeperException e) { @@ -257,7 +249,7 @@ public static int checkExists(ZooKeeperWatcher zkw, String znode) throws KeeperException { try { - Stat s = zkw.getZooKeeper().exists(znode, null); + Stat s = zkw.getRecoverableZooKeeper().exists(znode, null); return s != null ? s.getVersion() : -1; } catch (KeeperException e) { LOG.warn(zkw.prefix("Unable to set watcher on znode (" + znode + ")"), e); @@ -294,7 +286,7 @@ ZooKeeperWatcher zkw, String znode) throws KeeperException { try { - List children = zkw.getZooKeeper().getChildren(znode, zkw); + List children = zkw.getRecoverableZooKeeper().getChildren(znode, zkw); return children; } catch(KeeperException.NoNodeException ke) { LOG.debug(zkw.prefix("Unable to list children of znode " + znode + " " + @@ -386,7 +378,7 @@ List children = null; try { // List the children without watching - children = zkw.getZooKeeper().getChildren(znode, null); + children = zkw.getRecoverableZooKeeper().getChildren(znode, null); } catch(KeeperException.NoNodeException nne) { return null; } catch(InterruptedException ie) { @@ -460,7 +452,7 @@ public static boolean nodeHasChildren(ZooKeeperWatcher zkw, String znode) throws KeeperException { try { - return !zkw.getZooKeeper().getChildren(znode, null).isEmpty(); + return !zkw.getRecoverableZooKeeper().getChildren(znode, null).isEmpty(); } catch(KeeperException.NoNodeException ke) { LOG.debug(zkw.prefix("Unable to list children of znode " + znode + " " + "because node does not exist (not an error)")); @@ -492,7 +484,7 @@ public static int getNumberOfChildren(ZooKeeperWatcher zkw, String znode) throws KeeperException { try { - Stat stat = zkw.getZooKeeper().exists(znode, null); + Stat stat = zkw.getRecoverableZooKeeper().exists(znode, null); return stat == null ? 0 : stat.getNumChildren(); } catch(KeeperException e) { LOG.warn(zkw.prefix("Unable to get children of node " + znode)); @@ -514,7 +506,7 @@ public static byte [] getData(ZooKeeperWatcher zkw, String znode) throws KeeperException { try { - byte [] data = zkw.getZooKeeper().getData(znode, null, null); + byte [] data = zkw.getRecoverableZooKeeper().getData(znode, null, null); logRetrievedMsg(zkw, znode, data, false); return data; } catch (KeeperException.NoNodeException e) { @@ -546,7 +538,7 @@ public static byte [] getDataAndWatch(ZooKeeperWatcher zkw, String znode) throws KeeperException { try { - byte [] data = zkw.getZooKeeper().getData(znode, zkw, null); + byte [] data = zkw.getRecoverableZooKeeper().getData(znode, zkw, null); logRetrievedMsg(zkw, znode, data, true); return data; } catch (KeeperException.NoNodeException e) { @@ -583,7 +575,7 @@ Stat stat) throws KeeperException { try { - byte [] data = zkw.getZooKeeper().getData(znode, zkw, stat); + byte [] data = zkw.getRecoverableZooKeeper().getData(znode, zkw, stat); logRetrievedMsg(zkw, znode, data, false); return data; } catch (KeeperException.NoNodeException e) { @@ -646,7 +638,7 @@ byte [] data, int expectedVersion) throws KeeperException { try { - zkw.getZooKeeper().setData(znode, data, expectedVersion); + zkw.getRecoverableZooKeeper().setData(znode, data, expectedVersion); } catch(InterruptedException ie) { zkw.interruptedException(ie); } @@ -705,7 +697,7 @@ byte [] data, int expectedVersion) throws KeeperException, KeeperException.NoNodeException { try { - return zkw.getZooKeeper().setData(znode, data, expectedVersion) != null; + return zkw.getRecoverableZooKeeper().setData(znode, data, expectedVersion) != null; } catch (InterruptedException e) { zkw.interruptedException(e); return false; @@ -777,7 +769,7 @@ String znode, byte [] data) throws KeeperException { try { - zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, + zkw.getRecoverableZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } catch (KeeperException.NodeExistsException nee) { if(!watchAndCheckExists(zkw, znode)) { @@ -816,11 +808,11 @@ ZooKeeperWatcher zkw, String znode, byte [] data) throws KeeperException { try { - zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, + zkw.getRecoverableZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (KeeperException.NodeExistsException nee) { try { - zkw.getZooKeeper().exists(znode, zkw); + zkw.getRecoverableZooKeeper().exists(znode, zkw); } catch (InterruptedException e) { zkw.interruptedException(e); return false; @@ -853,9 +845,9 @@ String znode, byte [] data) throws KeeperException, KeeperException.NodeExistsException { try { - zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, + zkw.getRecoverableZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - return zkw.getZooKeeper().exists(znode, zkw).getVersion(); + return zkw.getRecoverableZooKeeper().exists(znode, zkw).getVersion(); } catch (InterruptedException e) { zkw.interruptedException(e); return -1; @@ -881,7 +873,7 @@ String znode, byte [] data, final AsyncCallback.StringCallback cb, final Object ctx) throws KeeperException, KeeperException.NodeExistsException { - zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, + zkw.getRecoverableZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, cb, ctx); } @@ -899,12 +891,12 @@ String znode) throws KeeperException { try { - zkw.getZooKeeper().create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE, + zkw.getRecoverableZooKeeper().create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch(KeeperException.NodeExistsException nee) { } catch(KeeperException.NoAuthException nee){ try { - if (null == zkw.getZooKeeper().exists(znode, false)) { + if (null == zkw.getRecoverableZooKeeper().exists(znode, false)) { // If we failed to create the file and it does not already exist. throw(nee); } @@ -934,7 +926,7 @@ if(znode == null) { return; } - zkw.getZooKeeper().create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE, + zkw.getRecoverableZooKeeper().create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch(KeeperException.NodeExistsException nee) { return; @@ -966,7 +958,7 @@ int version) throws KeeperException { try { - zkw.getZooKeeper().delete(node, version); + zkw.getRecoverableZooKeeper().delete(node, version); return true; } catch(KeeperException.BadVersionException bve) { return false; @@ -985,7 +977,7 @@ public static void deleteNodeFailSilent(ZooKeeperWatcher zkw, String node) throws KeeperException { try { - zkw.getZooKeeper().delete(node, -1); + zkw.getRecoverableZooKeeper().delete(node, -1); } catch(KeeperException.NoNodeException nne) { } catch(InterruptedException ie) { zkw.interruptedException(ie); @@ -1007,7 +999,7 @@ deleteNodeRecursively(zkw, joinZNode(node, child)); } } - zkw.getZooKeeper().delete(node, -1); + zkw.getRecoverableZooKeeper().delete(node, -1); } catch(InterruptedException ie) { zkw.interruptedException(ie); } Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1088475) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -873,7 +873,7 @@ LOG.info("Serving as " + this.serverInfo.getServerName() + ", RPC listening on " + this.server.getListenerAddress() + ", sessionid=0x" + - Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId())); + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); isOnline = true; } catch (Throwable e) { this.isOnline = false; Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1088475) +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -981,7 +981,7 @@ LOG.debug("rs=" + (RegionState)ctx + ", server=" + this.destination.getServerName()); // Async exists to set a watcher so we'll get triggered when // unassigned node changes. - this.zkw.getZooKeeper().exists(path, this.zkw, + this.zkw.getRecoverableZooKeeper().exists(path, this.zkw, new ExistsUnassignedAsyncCallback(this.counter), ctx); } } Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1088475) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -374,8 +374,8 @@ LOG.info("Server active/primary master; " + this.address + ", sessionid=0x" + - Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()) + - ", cluster-up flag was=" + wasUp); + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) + + ", cluster-up flag was=" + wasUp); // initialize master side coprocessors before we start handling requests this.cpHost = new MasterCoprocessorHost(this, this.conf); Index: src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java (revision 0) @@ -0,0 +1,39 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.concurrent.TimeUnit; + +public class RetryCounterFactory { + private final int maxRetries; + private final int retryIntervalMillis; + + public RetryCounterFactory(int maxRetries, int retryIntervalMillis) { + this.maxRetries = maxRetries; + this.retryIntervalMillis = retryIntervalMillis; + } + + public RetryCounter create() { + return + new RetryCounter( + maxRetries, retryIntervalMillis, TimeUnit.MILLISECONDS + ); + } +} Index: src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java (revision 0) @@ -0,0 +1,57 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.concurrent.TimeUnit; + +public class RetryCounter { + private final int maxRetries; + private int retriesRemaining; + private final int retryIntervalMillis; + private final TimeUnit timeUnit; + + public RetryCounter(int maxRetries, + int retryIntervalMillis, TimeUnit timeUnit) { + this.maxRetries = maxRetries; + this.retriesRemaining = maxRetries; + this.retryIntervalMillis = retryIntervalMillis; + this.timeUnit = timeUnit; + } + + public int getMaxRetries() { + return maxRetries; + } + + public void sleepUntilNextRetry() throws InterruptedException { + timeUnit.sleep(retryIntervalMillis); + } + + public boolean shouldRetry() { + return retriesRemaining > 0; + } + + public void useRetry() { + retriesRemaining--; + } + + public int getAttemptTimes() { + return maxRetries-retriesRemaining+1; + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 1088475) +++ src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -1051,7 +1051,7 @@ } if (this.zooKeeper != null) { LOG.info("Closed zookeeper sessionid=0x" + - Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId())); + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); this.zooKeeper.close(); this.zooKeeper = null; }