diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java index 233c3cc..f510b86 100644 --- src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java +++ src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,6 +35,7 @@ import org.apache.hadoop.hbase.util.RetryCounterFactory; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Transaction; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper.States; @@ -68,7 +70,7 @@ import org.apache.zookeeper.data.Stat; public class RecoverableZooKeeper { private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class); // the actual ZooKeeper client instance - private ZooKeeper zk; + ZooKeeper zk; private final RetryCounterFactory retryCounterFactory; // An identifier of this process in the cluster private final String identifier; @@ -90,12 +92,10 @@ public class RecoverableZooKeeper { private static final int ID_LENGTH_OFFSET = MAGIC_SIZE; private static final int ID_LENGTH_SIZE = Bytes.SIZEOF_INT; - public RecoverableZooKeeper(String quorumServers, int sessionTimeout, - Watcher watcher, int maxRetries, int retryIntervalMillis) - throws IOException { + public RecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher watcher, + int maxRetries, int retryIntervalMillis) throws IOException { this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher); - this.retryCounterFactory = - new RetryCounterFactory(maxRetries, retryIntervalMillis); + this.retryCounterFactory = new RetryCounterFactory(maxRetries, retryIntervalMillis); // the identifier = processID@hostName this.identifier = ManagementFactory.getRuntimeMXBean().getName(); @@ -160,6 +160,82 @@ public class RecoverableZooKeeper { } /** + * Transactionally delete a node and all the children under that node. + *

+ * Uses ZK to build up the list of nodes in the tree under the specified znode + * to delete and then tries multiple times to delete the nodes + * transactionally, up to the max retries for this class (based on + * configuration of "zookeeper.recovery.retry"). + *

+ * Idempotent operation - any and all nodes under the passed node, with + * matching versions, will be deleted. This method will not throw + * {@link org.apache.zookeeper.KeeperException.NoNodeException} if the node is + * not present. + * @param node top znode to delete + * @param version version of the node (and all children nodes) + */ + public void deleteRecursively(String node, int version) throws InterruptedException, + KeeperException { + RetryCounter retryCounter = retryCounterFactory.create(); + LOG.debug("Attempting to recursively delete:" + node + ", version:" + version); + boolean isRetry = false; // False for first attempt, true for all retries. + while (true) { + try { + Transaction transaction = zk.transaction(); + addAllChildrenToDeleteTransaction(transaction, node, version); + transaction.commit(); + return; + } catch (KeeperException e) { + switch (e.code()) { + case NONODE: + // if we are retrying, we did it before + if (isRetry) { + LOG.info("Node " + node + " already deleted. Assuming that a " + + "previous attempt succeeded."); + return; + } + LOG.warn("Node " + node + " already deleted, and this is not a " + "retry"); + throw e; + + case NOTEMPTY: + LOG.debug("Node " + node + " not empty, retrying recursive delete."); + case CONNECTIONLOSS: + case SESSIONEXPIRED: + case OPERATIONTIMEOUT: + retryOrThrow(retryCounter, e, "deleteRecurisvely"); + break; + + default: + throw e; + } + } + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + isRetry = true; + } + } + + /** + * Add deletion of the znode and all its children to the passed transaction + * @param trans {@link Transaction} to update + * @param path znode at the root of the tree to be deleted + * @param version version of the node (and all its children) to be deleted + */ + private void addAllChildrenToDeleteTransaction(Transaction trans, String path, int version) + throws KeeperException, InterruptedException { + try { + List children = zk.getChildren(path, false); + for (String child : children) { + addAllChildrenToDeleteTransaction(trans, ZKUtil.joinZNode(path, child), version); + } + trans.delete(path, version); + } catch (KeeperException.NoNodeException e) { + LOG.debug("Node:" + path + + " has already been deleted, excluding it from the children to transactionally delete."); + } + } + + /** * exists is an idempotent operation. Retry before throwing exception * @return A Stat instance */ diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index 46a6fde..ca659be 100644 --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -1017,13 +1017,8 @@ public class ZKUtil { // the node is already deleted, so we just finish if (children == null) return; - if(!children.isEmpty()) { - for(String child : children) { - deleteNodeRecursively(zkw, joinZNode(node, child)); - } - } - zkw.getRecoverableZooKeeper().delete(node, -1); - } catch(InterruptedException ie) { + zkw.getRecoverableZooKeeper().deleteRecursively(node, -1); + } catch (InterruptedException ie) { zkw.interruptedException(ie); } } diff --git src/test/java/org/apache/hadoop/hbase/zookeeper/TestRecoverableZookeeper.java src/test/java/org/apache/hadoop/hbase/zookeeper/TestRecoverableZookeeper.java new file mode 100644 index 0000000..208db3e --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/zookeeper/TestRecoverableZookeeper.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.TestZooKeeper; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooKeeper; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +@Category(MediumTests.class) +public class TestRecoverableZookeeper { + + private static HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + @Test + public void testRecursiveDelete() throws Exception { + UTIL.startMiniZKCluster(); + try { + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(new Configuration(UTIL.getConfiguration()), + TestZooKeeper.class.getName(), null); + RecoverableZooKeeper rzk = zkw.getRecoverableZooKeeper(); + + // set the stored zk as a spy of the real one + ZooKeeper real = rzk.zk; + ZooKeeper zkSpy = Mockito.spy(real); + rzk.zk = zkSpy; + + // nodes to put into zk + final String child = "/root/parent/child"; + final String parent = ZKUtil.getParent(child); + final String sibling = ZKUtil.joinZNode(parent, "sibling"); + final String root = ZKUtil.getParent(parent); + + // add a node to zk + ZKUtil.createWithParents(zkw, child); + ZKUtil.createWithParents(zkw, sibling); + assertTrue(ZKUtil.checkExists(zkw, child) >= 0); + + // and then recursively delete a parent; + rzk.deleteRecursively(root, -1); + + // expect the call to get the first child + Mockito.verify(zkSpy).getChildren(root, false); + // and then that has a child, and we also get the sub-children + Mockito.verify(zkSpy).getChildren(parent, false); + Mockito.verify(zkSpy).getChildren(child, false); + Mockito.verify(zkSpy).getChildren(sibling, false); + + // after which everything should be deleted + assertEquals(-1, ZKUtil.checkExists(zkw, root)); + + // and then check to make sure we don't throw an exception if the node + // doesn't exist + rzk.deleteRecursively(root, -1); + + // make sure we do fail if we have the wrong version + ZKUtil.createAndFailSilent(zkw, root); + // verify that we call attempt the bad transaction once + try { + rzk.deleteRecursively(root, 2); + fail("Delete recursively didn't throw an error for wrong version"); + } catch (KeeperException e) { + // NOOP - it works correctly + } + // ensure the total number of transactions created for this test + Mockito.verify(zkSpy, Mockito.times(3)).transaction(); + + } finally { + UTIL.getZkCluster().shutdown(); + } + } +}