diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java index 233c3cc..f510b86 100644 --- src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java +++ src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,6 +35,7 @@ import org.apache.hadoop.hbase.util.RetryCounterFactory; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Transaction; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper.States; @@ -68,7 +70,7 @@ import org.apache.zookeeper.data.Stat; public class RecoverableZooKeeper { private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class); // the actual ZooKeeper client instance - private ZooKeeper zk; + ZooKeeper zk; private final RetryCounterFactory retryCounterFactory; // An identifier of this process in the cluster private final String identifier; @@ -90,12 +92,10 @@ public class RecoverableZooKeeper { private static final int ID_LENGTH_OFFSET = MAGIC_SIZE; private static final int ID_LENGTH_SIZE = Bytes.SIZEOF_INT; - public RecoverableZooKeeper(String quorumServers, int sessionTimeout, - Watcher watcher, int maxRetries, int retryIntervalMillis) - throws IOException { + public RecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher watcher, + int maxRetries, int retryIntervalMillis) throws IOException { this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher); - this.retryCounterFactory = - new RetryCounterFactory(maxRetries, retryIntervalMillis); + this.retryCounterFactory = new RetryCounterFactory(maxRetries, retryIntervalMillis); // the identifier = processID@hostName this.identifier = ManagementFactory.getRuntimeMXBean().getName(); @@ -160,6 +160,82 @@ public class RecoverableZooKeeper { } /** + * Transactionally delete a node and all the children under that node. + *
+ * Uses ZK to build up the list of nodes in the tree under the specified znode + * to delete and then tries multiple times to delete the nodes + * transactionally, up to the max retries for this class (based on + * configuration of "zookeeper.recovery.retry"). + *
+ * Idempotent operation - any and all nodes under the passed node, with
+ * matching versions, will be deleted. This method will not throw
+ * {@link org.apache.zookeeper.KeeperException.NoNodeException} if the node is
+ * not present.
+ * @param node top znode to delete
+ * @param version version of the node (and all children nodes)
+ */
+ public void deleteRecursively(String node, int version) throws InterruptedException,
+ KeeperException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ LOG.debug("Attempting to recursively delete:" + node + ", version:" + version);
+ boolean isRetry = false; // False for first attempt, true for all retries.
+ while (true) {
+ try {
+ Transaction transaction = zk.transaction();
+ addAllChildrenToDeleteTransaction(transaction, node, version);
+ transaction.commit();
+ return;
+ } catch (KeeperException e) {
+ switch (e.code()) {
+ case NONODE:
+ // if we are retrying, we did it before
+ if (isRetry) {
+ LOG.info("Node " + node + " already deleted. Assuming that a "
+ + "previous attempt succeeded.");
+ return;
+ }
+ LOG.warn("Node " + node + " already deleted, and this is not a " + "retry");
+ throw e;
+
+ case NOTEMPTY:
+ LOG.debug("Node " + node + " not empty, retrying recursive delete.");
+ case CONNECTIONLOSS:
+ case SESSIONEXPIRED:
+ case OPERATIONTIMEOUT:
+ retryOrThrow(retryCounter, e, "deleteRecurisvely");
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ retryCounter.useRetry();
+ isRetry = true;
+ }
+ }
+
+ /**
+ * Add deletion of the znode and all its children to the passed transaction
+ * @param trans {@link Transaction} to update
+ * @param path znode at the root of the tree to be deleted
+ * @param version version of the node (and all its children) to be deleted
+ */
+ private void addAllChildrenToDeleteTransaction(Transaction trans, String path, int version)
+ throws KeeperException, InterruptedException {
+ try {
+ List