Index: src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 1098149) +++ src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -1028,7 +1028,7 @@ Configuration c = new Configuration(this.conf); String quorumServers = ZKConfig.getZKQuorumServersString(c); int sessionTimeout = 5 * 1000; // 5 seconds - ZooKeeper zk = nodeZK.getZooKeeper(); + ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper(); byte[] password = zk.getSessionPasswd(); long sessionID = zk.getSessionId(); Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java (revision 1098149) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java (working copy) @@ -136,7 +136,7 @@ hri.getEncodedName()); Stat stat = null; for (int i = 0; i < 10; i++) { - stat = t.getConnection().getZooKeeperWatcher().getZooKeeper().exists(path, false); + stat = t.getConnection().getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false); LOG.info("Stat for znode path=" + path + ": " + stat); if (stat == null) break; org.apache.hadoop.hbase.util.Threads.sleep(100); @@ -196,7 +196,7 @@ String path = ZKAssign.getNodeName(t.getConnection().getZooKeeperWatcher(), hri.getEncodedName()); Stat stats = - t.getConnection().getZooKeeperWatcher().getZooKeeper().exists(path, false); + t.getConnection().getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false); LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats=" + stats); RegionTransitionData rtd = ZKAssign.getData(t.getConnection().getZooKeeperWatcher(), @@ -221,7 +221,7 @@ assertTrue(daughters.contains(r)); } // Finally assert that the ephemeral SPLIT znode was cleaned up. - stats = t.getConnection().getZooKeeperWatcher().getZooKeeper().exists(path, false); + stats = t.getConnection().getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false); LOG.info("EPHEMERAL NODE AFTER SERVER ABORT, path=" + path + ", stats=" + stats); assertTrue(stats == null); } finally { Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java (revision 1098149) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java (working copy) @@ -129,7 +129,7 @@ LOG.info("testAcquireTaskAtStartup"); ZKSplitLog.Counters.resetCounters(); - zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tatas"), + zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tatas"), TaskState.TASK_UNASSIGNED.get("mgr"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -161,7 +161,7 @@ LOG.info("testRaceForTask"); ZKSplitLog.Counters.resetCounters(); - zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "trft"), + zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "trft"), TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -200,7 +200,7 @@ Thread.sleep(100); // this time create a task node after starting the splitLogWorker - zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"), + zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"), TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -228,7 +228,7 @@ Thread.yield(); // let the worker start Thread.sleep(100); - zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"), + zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"), TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -236,7 +236,7 @@ // now the worker is busy doing the above task // create another task - zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2"), + zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2"), TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -264,7 +264,7 @@ Thread.yield(); // let the worker start Thread.sleep(100); - zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "task"), + zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "task"), TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -277,7 +277,7 @@ waitForCounter(tot_wkr_preempt_task, 0, 1, 1000); // create a RESCAN node - zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "RESCAN"), + zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "RESCAN"), TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); Index: src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java (revision 1098149) +++ src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java (working copy) @@ -99,8 +99,8 @@ int sessionTimeout = 5 * 1000; // 5 seconds HConnection connection = HConnectionManager.getConnection(c); ZooKeeperWatcher connectionZK = connection.getZooKeeperWatcher(); - long sessionID = connectionZK.getZooKeeper().getSessionId(); - byte[] password = connectionZK.getZooKeeper().getSessionPasswd(); + long sessionID = connectionZK.getRecoverableZooKeeper().getSessionId(); + byte[] password = connectionZK.getRecoverableZooKeeper().getSessionPasswd(); ZooKeeper zk = new ZooKeeper(quorumServers, sessionTimeout, EmptyWatcher.instance, sessionID, password); zk.close(); @@ -112,14 +112,14 @@ // Check that the old ZK conenction is closed, means we did expire System.err.println("ZooKeeper should have timed out"); - LOG.info("state=" + connectionZK.getZooKeeper().getState()); - Assert.assertTrue(connectionZK.getZooKeeper().getState().equals( + LOG.info("state=" + connectionZK.getRecoverableZooKeeper().getState()); + Assert.assertTrue(connectionZK.getRecoverableZooKeeper().getState().equals( States.CLOSED)); // Check that the client recovered ZooKeeperWatcher newConnectionZK = connection.getZooKeeperWatcher(); - LOG.info("state=" + newConnectionZK.getZooKeeper().getState()); - Assert.assertTrue(newConnectionZK.getZooKeeper().getState().equals( + LOG.info("state=" + newConnectionZK.getRecoverableZooKeeper().getState()); + Assert.assertTrue(newConnectionZK.getRecoverableZooKeeper().getState().equals( States.CONNECTED)); } Index: src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java (revision 1098149) +++ src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java (working copy) @@ -198,7 +198,7 @@ LOG.info("TestOrphanTaskAcquisition"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); - zkw.getZooKeeper().create(tasknode, + zkw.getRecoverableZooKeeper().create(tasknode, TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -231,7 +231,7 @@ " startup"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); //create an unassigned orphan task - zkw.getZooKeeper().create(tasknode, + zkw.getRecoverableZooKeeper().create(tasknode, TaskState.TASK_UNASSIGNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); int version = ZKUtil.checkExists(zkw, tasknode); @@ -391,7 +391,7 @@ // create an orphan task in OWNED state String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1"); - zkw.getZooKeeper().create(tasknode1, + zkw.getRecoverableZooKeeper().create(tasknode1, TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Index: src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java (revision 0) @@ -0,0 +1,668 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounterFactory; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.AsyncCallback.StatCallback; +import org.apache.zookeeper.AsyncCallback.StringCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.ZooKeeper.States; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * A zookeeper that can handle 'recoverable' errors. + * To handle recoverable errors, developers need to realize that there are two + * classes of requests: idempotent and non-idempotent requests. Read requests + * and unconditional sets and deletes are examples of idempotent requests, they + * can be reissued with the same results. + * (Although, the delete may throw a NoNodeException on reissue its effect on + * the ZooKeeper state is the same.) Non-idempotent requests need special + * handling, application and library writers need to keep in mind that they may + * need to encode information in the data or name of znodes to detect + * retries. A simple example is a create that uses a sequence flag. + * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection + * loss exception, that process will reissue another + * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a + * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be + * that x-109 was the result of the previous create, so the process actually + * owns both x-109 and x-111. An easy way around this is to use "x-process id-" + * when doing the create. If the process is using an id of 352, before reissuing + * the create it will do a getChildren("/") and see "x-222-1", "x-542-30", + * "x-352-109", x-333-110". The process will know that the original create + * succeeded an the znode it created is "x-352-109". + * @see http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling + */ +public class RecoverableZooKeeper { + private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class); + // the actual ZooKeeper client instance + private ZooKeeper zk; + private final RetryCounterFactory retryCounterFactory; + // An identifier of this process in the cluster + private final String identifier; + private final byte[] id; + private int retryIntervalMillis; + + private static final int ID_OFFSET = Bytes.SIZEOF_INT; + // the magic number is to be backward compatible + private static final byte MAGIC =(byte) 0XFF; + private static final int MAGIC_OFFSET = Bytes.SIZEOF_BYTE; + + public RecoverableZooKeeper(String quorumServers,int seesionTimeout, + Watcher watcher, int maxRetries, int retryIntervalMillis) + throws IOException { + this.zk = new ZooKeeper(quorumServers, seesionTimeout, watcher); + this.retryCounterFactory = + new RetryCounterFactory(maxRetries, retryIntervalMillis); + this.retryIntervalMillis = retryIntervalMillis; + + // the identifier = processID@hostName + this.identifier = ManagementFactory.getRuntimeMXBean().getName(); + LOG.info("The identifier of this process is "+identifier); + this.id = Bytes.toBytes(identifier); + } + + /** + * delete is an idempotent operation. Retry before throw out exception. + * This function will not throw out NoNodeException if the path is not existed + * @param path + * @param version + * @throws InterruptedException + * @throws KeeperException + */ + public void delete(String path, int version) + throws InterruptedException, KeeperException { + RetryCounter retryCounter = retryCounterFactory.create(); + boolean isRetry = false; // False for first attempt, true for all retries. + while (true) { + try { + zk.delete(path, version); + return; + } catch (KeeperException e) { + switch (e.code()) { + case NONODE: + if (isRetry) { + LOG.info("Node " + path + " already deleted. Assuming that a " + + "previous attempt succeeded."); + return; + } + LOG.warn("Node " + path + " already deleted, and this is not a " + + "retry"); + throw e; + + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper delete failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + isRetry = true; + } + } + + /** + * exists is an idempotent operation. Retry before throw out exception + * @param path + * @param watcher + * @return + * @throws KeeperException + * @throws InterruptedException + */ + public Stat exists(String path, Watcher watcher) + throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + return zk.exists(path, watcher); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper exists failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + } + } + + /** + * exists is an idempotent operation. Retry before throw out exception + * @param path + * @param watch + * @return + * @throws KeeperException + * @throws InterruptedException + */ + public Stat exists(String path, boolean watch) + throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + return zk.exists(path, watch); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper exists failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + } + } + + /** + * getChildren is an idempotent operation. Retry before throw out exception + * @param path + * @param watcher + * @return + * @throws KeeperException + * @throws InterruptedException + */ + public List getChildren(String path, Watcher watcher) + throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + return zk.getChildren(path, watcher); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper getChildren failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + } + } + + /** + * getChildren is an idempotent operation. Retry before throw out exception + * @param path + * @param watch + * @return + * @throws KeeperException + * @throws InterruptedException + */ + public List getChildren(String path, boolean watch) + throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + return zk.getChildren(path, watch); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper getChildren failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + } + } + + /** + * getData is an idempotent operation. Retry before throw out exception + * @param path + * @param watcher + * @param stat + * @return + * @throws KeeperException + * @throws InterruptedException + */ + public byte[] getData(String path, Watcher watcher, Stat stat) + throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + byte[] revData = zk.getData(path, watcher, stat); + return this.removeMetaData(revData); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper getData failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + } + } + + /** + * getData is an idemnpotent operation. Retry before throw out exception + * @param path + * @param watch + * @param stat + * @return + * @throws KeeperException + * @throws InterruptedException + */ + public byte[] getData(String path, boolean watch, Stat stat) + throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + byte[] revData = zk.getData(path, watch, stat); + return this.removeMetaData(revData); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper getData failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + } + } + + /** + * setData is NOT an idempotent operation. Retry may cause BadVersion Exception + * Adding an identifier field into the data to check whether + * badversion is caused by the result of previous correctly setData + * @param path + * @param data + * @param version + * @return + * @throws KeeperException + * @throws InterruptedException + */ + public Stat setData(String path, byte[] data, int version) + throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + + byte[] newData = appendMetaData(data); + while (true) { + try { + return zk.setData(path, newData, version); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper setData failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + case BADVERSION: + // try to verify whether the previous setData success or not + try{ + Stat stat = new Stat(); + byte[] revData = zk.getData(path, false, stat); + int idLength = Bytes.toInt(revData, ID_OFFSET); + int dataLength = revData.length-ID_OFFSET-idLength; + int dataOffset = ID_OFFSET+idLength; + + if(Bytes.compareTo(revData, ID_OFFSET, id.length, + revData, dataOffset, dataLength) == 0) { + // the bad version is caused by previous successful setData + return stat; + } + } catch(KeeperException keeperException){ + // the ZK is not reliable at this moment. just throw out exception + throw keeperException; + } + + // throw out other exceptions and verified bad version exceptions + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + } + } + + /** + *

+ * NONSEQUENTIAL create is idempotent operation. + * Retry before throw out exceptions. + * But this function will not throw out the NodeExist exception back to the + * application. + *

+ *

+ * But SEQUENTIAL is NOT idempotent operation. It is necessary to add + * identifier to the path to verify, whether the previous one is successful + * or not. + *

+ * + * @param path + * @param data + * @param acl + * @param createMode + * @return + * @throws KeeperException + * @throws InterruptedException + */ + public String create(String path, byte[] data, List acl, + CreateMode createMode) + throws KeeperException, InterruptedException { + byte[] newData = appendMetaData(data); + switch (createMode) { + case EPHEMERAL: + case PERSISTENT: + return createNonSequential(path, newData, acl, createMode); + + case EPHEMERAL_SEQUENTIAL: + case PERSISTENT_SEQUENTIAL: + return createSequential(path, newData, acl, createMode); + + default: + throw new IllegalArgumentException("Unrecognized CreateMode: " + + createMode); + } + } + + private String createNonSequential(String path, byte[] data, List acl, + CreateMode createMode) throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + boolean isRetry = false; // False for first attempt, true for all retries. + while (true) { + try { + return zk.create(path, data, acl, createMode); + } catch (KeeperException e) { + switch (e.code()) { + case NODEEXISTS: + if (isRetry) { + // If the connection was lost, there is still a possibility that + // we have successfully created the node at our previous attempt, + // so we read the node and compare. + byte[] currentData = zk.getData(path, false, null); + if (currentData != null && + Bytes.compareTo(currentData, data) == 0) { + // We successfully created a non-sequential node + return path; + } + LOG.error("Node " + path + " already exists with " + + Bytes.toStringBinarySafe(currentData) + ", could not write " + + Bytes.toStringBinarySafe(data)); + throw e; + } + LOG.error("Node " + path + " already exists and this is not a " + + "retry"); + throw e; + + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper create failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + isRetry = true; + } + } + + private String createSequential(String path, byte[] data, + List acl, CreateMode createMode) + throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + boolean first = true; + String newPath = path+this.identifier; + while (true) { + try { + if (!first) { + // Check if we succeeded on a previous attempt + String previousResult = findPreviousSequentialNode(newPath); + if (previousResult != null) { + return previousResult; + } + } + first = false; + return zk.create(newPath, data, acl, createMode); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper create failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + } + } + + private String findPreviousSequentialNode(String path) + throws KeeperException, InterruptedException { + int lastSlashIdx = path.lastIndexOf('/'); + assert(lastSlashIdx != -1); + String parent = path.substring(0, lastSlashIdx); + String nodePrefix = path.substring(lastSlashIdx+1); + + List nodes = zk.getChildren(parent, false); + List matching = filterByPrefix(nodes, nodePrefix); + for (String node : matching) { + String nodePath = parent + "/" + node; + Stat stat = zk.exists(nodePath, false); + if (stat != null) { + return nodePath; + } + } + return null; + } + + public byte[] removeMetaData(byte[] data) { + if(data == null || data.length == 0) { + return data; + } + // check the magic data; to be backward compatible + byte magic = data[0]; + if(magic != MAGIC) { + return data; + } + + int idLength = Bytes.toInt(data, MAGIC_OFFSET); + int dataLength = data.length-MAGIC_OFFSET-ID_OFFSET-idLength; + int dataOffset = MAGIC_OFFSET+ID_OFFSET+idLength; + + byte[] newData = new byte[dataLength]; + System.arraycopy(data, dataOffset, newData, 0, dataLength); + + return newData; + + } + + private byte[] appendMetaData(byte[] data) { + if(data == null){ + return null; + } + + byte[] newData = new byte[MAGIC_OFFSET+ID_OFFSET+id.length+data.length]; + int pos = 0; + pos = Bytes.putByte(newData, pos, MAGIC); + pos = Bytes.putInt(newData, pos, id.length); + pos = Bytes.putBytes(newData, pos, id, 0, id.length); + pos = Bytes.putBytes(newData, pos, data, 0, data.length); + + return newData; + } + + public long getSessionId() { + return zk.getSessionId(); + } + + public void close() throws InterruptedException { + zk.close(); + } + + public States getState() { + return zk.getState(); + } + + public ZooKeeper getZooKeeper() { + return zk; + } + + public byte[] getSessionPasswd() { + return zk.getSessionPasswd(); + } + + public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) { + this.zk.sync(path, null, null); + } + + + /** + * Filters the given node list by the given prefixes. + * This method is all-inclusive--if any element in the node list starts + * with any of the given prefixes, then it is included in the result. + * + * @param nodes the nodes to filter + * @param prefixes the prefixes to include in the result + * @return list of every element that starts with one of the prefixes + */ + private static List filterByPrefix(List nodes, + String... prefixes) { + List lockChildren = new ArrayList(); + for (String child : nodes){ + for (String prefix : prefixes){ + if (child.startsWith(prefix)){ + lockChildren.add(child); + break; + } + } + } + return lockChildren; + } + +} Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (revision 1098149) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (working copy) @@ -59,7 +59,7 @@ private String quorum; // zookeeper connection - private ZooKeeper zooKeeper; + private RecoverableZooKeeper recoverableZooKeeper; // abortable in case of zk failure private Abortable abortable; @@ -120,7 +120,7 @@ this.identifier = descriptor; this.abortable = abortable; setNodeNames(conf); - this.zooKeeper = ZKUtil.connect(conf, quorum, this, descriptor); + this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, descriptor); try { // Create all the necessary "directories" of znodes // TODO: Move this to an init method somewhere so not everyone calls it? @@ -129,40 +129,28 @@ // Apparently this is recoverable. Retry a while. // See http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling // TODO: Generalize out in ZKUtil. - long wait = conf.getLong("hbase.zookeeper.recoverable.waittime", 10000); - long finished = System.currentTimeMillis() + wait; - KeeperException ke = null; - do { - try { - ZKUtil.createAndFailSilent(this, baseZNode); - ke = null; - break; - } catch (KeeperException.ConnectionLossException e) { - if (LOG.isDebugEnabled() && (isFinishedRetryingRecoverable(finished))) { - LOG.debug("Retrying zk create for another " + - (finished - System.currentTimeMillis()) + - "ms; set 'hbase.zookeeper.recoverable.waittime' to change " + - "wait time); " + e.getMessage()); - } - ke = e; + try { + ZKUtil.createAndFailSilent(this, baseZNode); + + } catch (KeeperException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Zookeeper Create Failed because "+e.getMessage()); } - } while (isFinishedRetryingRecoverable(finished)); - // Convert connectionloss exception to ZKCE. - if (ke != null) { - try { - // If we don't close it, the zk connection managers won't be killed - this.zooKeeper.close(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.warn("Interrupted while closing", e); - } throw new ZooKeeperConnectionException("HBase is able to connect to" + " ZooKeeper but the connection closes immediately. This could be" + " a sign that the server has too many connections (30 is the" + " default). Consider inspecting your ZK server logs for that" + " error and then make sure you are reusing HBaseConfiguration" + " as often as you can. See HTable's javadoc for more information.", - ke); + e); + } finally { + try { + // If we don't close it, the zk connection managers won't be killed + this.recoverableZooKeeper.close(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted while closing", e); + } } ZKUtil.createAndFailSilent(this, assignmentZNode); ZKUtil.createAndFailSilent(this, rsZNode); @@ -238,8 +226,8 @@ * Get the connection to ZooKeeper. * @return connection reference to zookeeper */ - public ZooKeeper getZooKeeper() { - return zooKeeper; + public RecoverableZooKeeper getRecoverableZooKeeper() { + return recoverableZooKeeper; } /** @@ -324,16 +312,16 @@ this.conf.getLong("hbase.zookeeper.watcher.sync.connected.wait", 2000); while (System.currentTimeMillis() < finished) { Threads.sleep(1); - if (this.zooKeeper != null) break; + if (this.recoverableZooKeeper != null) break; } - if (this.zooKeeper == null) { + if (this.recoverableZooKeeper == null) { LOG.error("ZK is null on connection event -- see stack trace " + "for the stack trace when constructor was called on this zkw", this.constructorCaller); throw new NullPointerException("ZK is null"); } this.identifier = this.identifier + "-0x" + - Long.toHexString(this.zooKeeper.getSessionId()); + Long.toHexString(this.recoverableZooKeeper.getSessionId()); // Update our identifier. Otherwise ignore. LOG.debug(this.identifier + " connected"); break; @@ -368,7 +356,7 @@ * is up-to-date from when we begin the operation. */ public void sync(String path) { - this.zooKeeper.sync(path, null, null); + this.recoverableZooKeeper.sync(path, null, null); } /** @@ -419,8 +407,8 @@ */ public void close() { try { - if (zooKeeper != null) { - zooKeeper.close(); + if (recoverableZooKeeper != null) { + recoverableZooKeeper.close(); // super.close(); } } catch (InterruptedException e) { Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (revision 1098149) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (working copy) @@ -70,20 +70,20 @@ * @return connection to zookeeper * @throws IOException if unable to connect to zk or config problem */ - public static ZooKeeper connect(Configuration conf, Watcher watcher) + public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher) throws IOException { Properties properties = ZKConfig.makeZKProps(conf); String ensemble = ZKConfig.getZKQuorumServersString(properties); return connect(conf, ensemble, watcher); } - public static ZooKeeper connect(Configuration conf, String ensemble, + public static RecoverableZooKeeper connect(Configuration conf, String ensemble, Watcher watcher) throws IOException { return connect(conf, ensemble, watcher, ""); } - public static ZooKeeper connect(Configuration conf, String ensemble, + public static RecoverableZooKeeper connect(Configuration conf, String ensemble, Watcher watcher, final String descriptor) throws IOException { if(ensemble == null) { @@ -92,7 +92,11 @@ int timeout = conf.getInt("zookeeper.session.timeout", 180 * 1000); LOG.debug(descriptor + " opening connection to ZooKeeper with ensemble (" + ensemble + ")"); - return new ZooKeeper(ensemble, timeout, watcher); + int retry = conf.getInt("zookeeper.recovery.retry", 3); + int retryIntervalMillis = + conf.getInt("zookeeper.recovery.retry.intervalmill", 1000); + return new RecoverableZooKeeper(ensemble, timeout, watcher, + retry, retryIntervalMillis); } // @@ -214,7 +218,7 @@ public static boolean watchAndCheckExists(ZooKeeperWatcher zkw, String znode) throws KeeperException { try { - Stat s = zkw.getZooKeeper().exists(znode, zkw); + Stat s = zkw.getRecoverableZooKeeper().exists(znode, zkw); LOG.debug(zkw.prefix("Set watcher on existing znode " + znode)); return s != null ? true : false; } catch (KeeperException e) { @@ -242,7 +246,7 @@ public static int checkExists(ZooKeeperWatcher zkw, String znode) throws KeeperException { try { - Stat s = zkw.getZooKeeper().exists(znode, null); + Stat s = zkw.getRecoverableZooKeeper().exists(znode, null); return s != null ? s.getVersion() : -1; } catch (KeeperException e) { LOG.warn(zkw.prefix("Unable to set watcher on znode (" + znode + ")"), e); @@ -279,7 +283,7 @@ ZooKeeperWatcher zkw, String znode) throws KeeperException { try { - List children = zkw.getZooKeeper().getChildren(znode, zkw); + List children = zkw.getRecoverableZooKeeper().getChildren(znode, zkw); return children; } catch(KeeperException.NoNodeException ke) { LOG.debug(zkw.prefix("Unable to list children of znode " + znode + " " + @@ -339,7 +343,7 @@ List children = null; try { // List the children without watching - children = zkw.getZooKeeper().getChildren(znode, null); + children = zkw.getRecoverableZooKeeper().getChildren(znode, null); } catch(KeeperException.NoNodeException nne) { return null; } catch(InterruptedException ie) { @@ -413,7 +417,7 @@ public static boolean nodeHasChildren(ZooKeeperWatcher zkw, String znode) throws KeeperException { try { - return !zkw.getZooKeeper().getChildren(znode, null).isEmpty(); + return !zkw.getRecoverableZooKeeper().getChildren(znode, null).isEmpty(); } catch(KeeperException.NoNodeException ke) { LOG.debug(zkw.prefix("Unable to list children of znode " + znode + " " + "because node does not exist (not an error)")); @@ -445,7 +449,7 @@ public static int getNumberOfChildren(ZooKeeperWatcher zkw, String znode) throws KeeperException { try { - Stat stat = zkw.getZooKeeper().exists(znode, null); + Stat stat = zkw.getRecoverableZooKeeper().exists(znode, null); return stat == null ? 0 : stat.getNumChildren(); } catch(KeeperException e) { LOG.warn(zkw.prefix("Unable to get children of node " + znode)); @@ -467,7 +471,7 @@ public static byte [] getData(ZooKeeperWatcher zkw, String znode) throws KeeperException { try { - byte [] data = zkw.getZooKeeper().getData(znode, null, null); + byte [] data = zkw.getRecoverableZooKeeper().getData(znode, null, null); logRetrievedMsg(zkw, znode, data, false); return data; } catch (KeeperException.NoNodeException e) { @@ -499,7 +503,7 @@ public static byte [] getDataAndWatch(ZooKeeperWatcher zkw, String znode) throws KeeperException { try { - byte [] data = zkw.getZooKeeper().getData(znode, zkw, null); + byte [] data = zkw.getRecoverableZooKeeper().getData(znode, zkw, null); logRetrievedMsg(zkw, znode, data, true); return data; } catch (KeeperException.NoNodeException e) { @@ -536,7 +540,7 @@ Stat stat) throws KeeperException { try { - byte [] data = zkw.getZooKeeper().getData(znode, null, stat); + byte [] data = zkw.getRecoverableZooKeeper().getData(znode, null, stat); logRetrievedMsg(zkw, znode, data, false); return data; } catch (KeeperException.NoNodeException e) { @@ -573,7 +577,7 @@ byte [] data, int expectedVersion) throws KeeperException { try { - zkw.getZooKeeper().setData(znode, data, expectedVersion); + zkw.getRecoverableZooKeeper().setData(znode, data, expectedVersion); } catch(InterruptedException ie) { zkw.interruptedException(ie); } @@ -607,7 +611,7 @@ byte [] data, int expectedVersion) throws KeeperException, KeeperException.NoNodeException { try { - return zkw.getZooKeeper().setData(znode, data, expectedVersion) != null; + return zkw.getRecoverableZooKeeper().setData(znode, data, expectedVersion) != null; } catch (InterruptedException e) { zkw.interruptedException(e); return false; @@ -678,7 +682,7 @@ String znode, byte [] data) throws KeeperException { try { - zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, + zkw.getRecoverableZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } catch (KeeperException.NodeExistsException nee) { if(!watchAndCheckExists(zkw, znode)) { @@ -717,11 +721,11 @@ ZooKeeperWatcher zkw, String znode, byte [] data) throws KeeperException { try { - zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, + zkw.getRecoverableZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (KeeperException.NodeExistsException nee) { try { - zkw.getZooKeeper().exists(znode, zkw); + zkw.getRecoverableZooKeeper().exists(znode, zkw); } catch (InterruptedException e) { zkw.interruptedException(e); return false; @@ -754,9 +758,9 @@ String znode, byte [] data) throws KeeperException, KeeperException.NodeExistsException { try { - zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, + zkw.getRecoverableZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - return zkw.getZooKeeper().exists(znode, zkw).getVersion(); + return zkw.getRecoverableZooKeeper().exists(znode, zkw).getVersion(); } catch (InterruptedException e) { zkw.interruptedException(e); return -1; @@ -781,7 +785,7 @@ public static void asyncCreate(ZooKeeperWatcher zkw, String znode, byte [] data, final AsyncCallback.StringCallback cb, final Object ctx) { - zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, + zkw.getRecoverableZooKeeper().getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, cb, ctx); } @@ -799,7 +803,7 @@ String znode) throws KeeperException { try { - ZooKeeper zk = zkw.getZooKeeper(); + RecoverableZooKeeper zk = zkw.getRecoverableZooKeeper(); if (zk.exists(znode, false) == null) { zk.create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -807,7 +811,7 @@ } catch(KeeperException.NodeExistsException nee) { } catch(KeeperException.NoAuthException nee){ try { - if (null == zkw.getZooKeeper().exists(znode, false)) { + if (null == zkw.getRecoverableZooKeeper().exists(znode, false)) { // If we failed to create the file and it does not already exist. throw(nee); } @@ -837,7 +841,7 @@ if(znode == null) { return; } - zkw.getZooKeeper().create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE, + zkw.getRecoverableZooKeeper().create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch(KeeperException.NodeExistsException nee) { return; @@ -869,7 +873,7 @@ int version) throws KeeperException { try { - zkw.getZooKeeper().delete(node, version); + zkw.getRecoverableZooKeeper().delete(node, version); return true; } catch(KeeperException.BadVersionException bve) { return false; @@ -888,7 +892,7 @@ public static void deleteNodeFailSilent(ZooKeeperWatcher zkw, String node) throws KeeperException { try { - zkw.getZooKeeper().delete(node, -1); + zkw.getRecoverableZooKeeper().delete(node, -1); } catch(KeeperException.NoNodeException nne) { } catch(InterruptedException ie) { zkw.interruptedException(ie); @@ -910,7 +914,7 @@ deleteNodeRecursively(zkw, joinZNode(node, child)); } } - zkw.getZooKeeper().delete(node, -1); + zkw.getRecoverableZooKeeper().delete(node, -1); } catch(InterruptedException ie) { zkw.interruptedException(ie); } Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1098149) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -877,7 +877,7 @@ LOG.info("Serving as " + this.serverNameFromMasterPOV + ", RPC listening on " + this.isa + ", sessionid=0x" + - Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId())); + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); isOnline = true; } catch (Throwable e) { this.isOnline = false; Index: src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (revision 1098149) +++ src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (working copy) @@ -323,7 +323,7 @@ */ private boolean ownTask(boolean isFirstTime) { try { - Stat stat = this.watcher.getZooKeeper().setData(currentTask, + Stat stat = this.watcher.getRecoverableZooKeeper().setData(currentTask, TaskState.TASK_OWNED.get(serverName), currentVersion); if (stat == null) { LOG.warn("zk.setData() returned null for path " + currentTask); @@ -386,8 +386,9 @@ } void getDataSetWatchAsync() { - this.watcher.getZooKeeper().getData(currentTask, this.watcher, - new GetDataAsyncCallback(), null); + this.watcher.getRecoverableZooKeeper().getZooKeeper(). + getData(currentTask, this.watcher, + new GetDataAsyncCallback(), null); tot_wkr_get_data_queued.incrementAndGet(); } Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1098149) +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -1027,7 +1027,7 @@ LOG.debug("rs=" + (RegionState)ctx + ", server=" + this.destination.toString()); // Async exists to set a watcher so we'll get triggered when // unassigned node changes. - this.zkw.getZooKeeper().exists(path, this.zkw, + this.zkw.getRecoverableZooKeeper().getZooKeeper().exists(path, this.zkw, new ExistsUnassignedAsyncCallback(this.counter), ctx); } } Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1098149) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -359,7 +359,7 @@ LOG.info("Server active/primary master; " + this.serverName + ", sessionid=0x" + - Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()) + + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) + ", cluster-up flag was=" + wasUp); } Index: src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (revision 1098149) +++ src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (working copy) @@ -176,6 +176,7 @@ * @throws IOException * if there was an error while splitting any log file * @return cumulative size of the logfiles split + * @throws KeeperException */ public long splitLogDistributed(final Path logDir) throws IOException { this.fs = logDir.getFileSystem(conf); @@ -326,7 +327,8 @@ private void getDataSetWatch(String path, Long retry_count) { - this.watcher.getZooKeeper().getData(path, this.watcher, + this.watcher.getRecoverableZooKeeper().getZooKeeper(). + getData(path, this.watcher, new GetDataAsyncCallback(), retry_count); tot_mgr_get_data_queued.incrementAndGet(); } @@ -480,7 +482,8 @@ private void deleteNode(String path, Long retries) { tot_mgr_node_delete_queued.incrementAndGet(); - this.watcher.getZooKeeper().delete(path, -1, new DeleteAsyncCallback(), + this.watcher.getRecoverableZooKeeper().getZooKeeper(). + delete(path, -1, new DeleteAsyncCallback(), retries); } @@ -507,9 +510,11 @@ /** * signal the workers that a task was resubmitted by creating the * RESCAN node. + * @throws KeeperException */ private void createRescanNode(long retries) { - watcher.getZooKeeper().create(ZKSplitLog.getRescanNode(watcher), + this.watcher.getRecoverableZooKeeper().getZooKeeper(). + create(ZKSplitLog.getRescanNode(watcher), TaskState.TASK_UNASSIGNED.get(serverName), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL, new CreateRescanAsyncCallback(), new Long(retries)); Index: src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java (revision 0) @@ -0,0 +1,39 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.concurrent.TimeUnit; + +public class RetryCounterFactory { + private final int maxRetries; + private final int retryIntervalMillis; + + public RetryCounterFactory(int maxRetries, int retryIntervalMillis) { + this.maxRetries = maxRetries; + this.retryIntervalMillis = retryIntervalMillis; + } + + public RetryCounter create() { + return + new RetryCounter( + maxRetries, retryIntervalMillis, TimeUnit.MILLISECONDS + ); + } +} Index: src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java (revision 0) @@ -0,0 +1,57 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.concurrent.TimeUnit; + +public class RetryCounter { + private final int maxRetries; + private int retriesRemaining; + private final int retryIntervalMillis; + private final TimeUnit timeUnit; + + public RetryCounter(int maxRetries, + int retryIntervalMillis, TimeUnit timeUnit) { + this.maxRetries = maxRetries; + this.retriesRemaining = maxRetries; + this.retryIntervalMillis = retryIntervalMillis; + this.timeUnit = timeUnit; + } + + public int getMaxRetries() { + return maxRetries; + } + + public void sleepUntilNextRetry() throws InterruptedException { + timeUnit.sleep(retryIntervalMillis); + } + + public boolean shouldRetry() { + return retriesRemaining > 0; + } + + public void useRetry() { + retriesRemaining--; + } + + public int getAttemptTimes() { + return maxRetries-retriesRemaining+1; + } +} Index: src/main/java/org/apache/hadoop/hbase/util/Bytes.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/Bytes.java (revision 1098149) +++ src/main/java/org/apache/hadoop/hbase/util/Bytes.java (working copy) @@ -300,8 +300,17 @@ public static String toStringBinary(final byte [] b) { return toStringBinary(b, 0, b.length); } - + /** + * The same as {@link #toStringBinary(byte[])}, but returns a string "null" + * if given a null argument. + */ + public static String toStringBinarySafe(final byte [] b) { + if (b == null) + return "null"; + return toStringBinary(b, 0, b.length); + } + /** * Write a printable representation of a byte array. Non-printable * characters are hex escaped in the format \\x%02X, eg: * \x00 \x05 etc Index: src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 1098149) +++ src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -1113,7 +1113,7 @@ } if (this.zooKeeper != null) { LOG.info("Closed zookeeper sessionid=0x" + - Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId())); + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); this.zooKeeper.close(); this.zooKeeper = null; }