diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java index 233c3cc..8d2773b 100644 --- src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java +++ src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.util.RetryCounterFactory; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Transaction; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper.States; @@ -68,7 +69,7 @@ import org.apache.zookeeper.data.Stat; public class RecoverableZooKeeper { private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class); // the actual ZooKeeper client instance - private ZooKeeper zk; + ZooKeeper zk; private final RetryCounterFactory retryCounterFactory; // An identifier of this process in the cluster private final String identifier; @@ -90,12 +91,10 @@ public class RecoverableZooKeeper { private static final int ID_LENGTH_OFFSET = MAGIC_SIZE; private static final int ID_LENGTH_SIZE = Bytes.SIZEOF_INT; - public RecoverableZooKeeper(String quorumServers, int sessionTimeout, - Watcher watcher, int maxRetries, int retryIntervalMillis) - throws IOException { + public RecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher watcher, + int maxRetries, int retryIntervalMillis) throws IOException { this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher); - this.retryCounterFactory = - new RetryCounterFactory(maxRetries, retryIntervalMillis); + this.retryCounterFactory = new RetryCounterFactory(maxRetries, retryIntervalMillis); // the identifier = processID@hostName this.identifier = ManagementFactory.getRuntimeMXBean().getName(); @@ -160,6 +159,65 @@ public class RecoverableZooKeeper { } /** + * Recursively delete the path all children on that path. + * @param path znode to delete + * @param version version of the node (and all children nodes) + */ + public void deleteRecursively(String path, int version) throws InterruptedException, + KeeperException { + RetryCounter retryCounter = retryCounterFactory.create(); + LOG.debug("Attempting to recursively delete:" + path + ", version:" + version); + boolean isRetry = false; // False for first attempt, true for all retries. + while (true) { + try { + Transaction transaction = zk.transaction(); + addAllChildrenToDeleteTransaction(transaction, path, version); + transaction.commit(); + return; + } catch (KeeperException e) { + switch (e.code()) { + case NONODE: + // if we are retrying, we did it before + if (isRetry) { + LOG.info("Node " + path + " already deleted. Assuming that a " + + "previous attempt succeeded."); + return; + } + LOG.warn("Node " + path + " already deleted, and this is not a " + "retry"); + throw e; + + case NOTEMPTY: + LOG.debug("Node not empty, retrying."); + case CONNECTIONLOSS: + case SESSIONEXPIRED: + case OPERATIONTIMEOUT: + retryOrThrow(retryCounter, e, "delete"); + break; + + default: + throw e; + } + } + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + isRetry = true; + } + } + + private void addAllChildrenToDeleteTransaction(Transaction trans, String path, int version) + throws KeeperException, InterruptedException { + try { + List children = zk.getChildren(path, false); + for (String child : children) { + addAllChildrenToDeleteTransaction(trans, ZKUtil.joinZNode(path, child), version); + } + trans.delete(path, version); + } catch (KeeperException.NoNodeException e) { + // noop - node already deleted + } + } + + /** * exists is an idempotent operation. Retry before throwing exception * @return A Stat instance */ diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index 46a6fde..ca659be 100644 --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -1017,13 +1017,8 @@ public class ZKUtil { // the node is already deleted, so we just finish if (children == null) return; - if(!children.isEmpty()) { - for(String child : children) { - deleteNodeRecursively(zkw, joinZNode(node, child)); - } - } - zkw.getRecoverableZooKeeper().delete(node, -1); - } catch(InterruptedException ie) { + zkw.getRecoverableZooKeeper().deleteRecursively(node, -1); + } catch (InterruptedException ie) { zkw.interruptedException(ie); } } diff --git src/test/java/org/apache/hadoop/hbase/zookeeper/TestRecoverableZookeeper.java src/test/java/org/apache/hadoop/hbase/zookeeper/TestRecoverableZookeeper.java new file mode 100644 index 0000000..45d1b0f --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/zookeeper/TestRecoverableZookeeper.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.TestZooKeeper; +import org.apache.zookeeper.ZooKeeper; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +@Category(MediumTests.class) +public class TestRecoverableZookeeper { + + private static HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + @Test + public void testRecursiveDelete() throws Exception { + UTIL.startMiniZKCluster(); + try { + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(new Configuration(UTIL.getConfiguration()), + TestZooKeeper.class.getName(), null); + RecoverableZooKeeper rzk = zkw.getRecoverableZooKeeper(); + + // set the stored zk as a spy of the real one + ZooKeeper real = rzk.zk; + ZooKeeper zkSpy = Mockito.spy(real); + rzk.zk = zkSpy; + + // add a node to zk + final String path = "/root/parent/child"; + String parent = ZKUtil.getParent(path); + final String sibling = ZKUtil.joinZNode(parent, "sibling"); + String root = ZKUtil.getParent(parent); + ZKUtil.createWithParents(zkw, path); + ZKUtil.createWithParents(zkw, sibling); + assertTrue(ZKUtil.checkExists(zkw, path) >= 0); + + // and then recursively delete a parent; + rzk.deleteRecursively(root, -1); + + // expect the call to get the first child + Mockito.verify(zkSpy).getChildren(root, false); + // and then we get another call to not empty + // and then that has a child, so we get the sub-children + Mockito.verify(zkSpy).getChildren(parent, false); + + // at which point we will attempt to delete the child and sibling + // which should work fine + Mockito.verify(zkSpy).transaction(); + + assertEquals(-1, ZKUtil.checkExists(zkw, root)); + } finally { + UTIL.getZkCluster().shutdown(); + } + } +}