diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java index f02c191..d3abaff 100644 --- src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java +++ src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.zookeeper; import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; @@ -35,7 +36,9 @@ import org.apache.hadoop.hbase.util.RetryCounterFactory; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.Op; +import org.apache.zookeeper.OpResult; import org.apache.zookeeper.Transaction; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; @@ -673,4 +676,41 @@ public class RecoverableZooKeeper { } return lockChildren; } + + /** + * Run list of operations on ZooKeeper transactionally. Either all operations + * commit or none commit (in which case any applicable KeeperException will be + * thrown). + * @param ops {@link Op Ops} to apply to the zk server + * @return the results from the Ops, if successfull + * @throws KeeperException if one of the ops fails in an expected way (e.g. + * create a node where the node already exists will throw a + * {@link KeeperException.NodeExistsException}). + * @throws InterruptedException + */ + public List transact(Iterable ops) throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + return zk.multi(ops); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case SESSIONEXPIRED: + case OPERATIONTIMEOUT: + retryOrThrow(retryCounter, e, "transact"); + break; + + default: + throw e; + } + } catch (InterruptedException e) { + LOG.info("Recieved interrupted exception error, relaying as connection loss.", e); + retryOrThrow(retryCounter, KeeperException.create(Code.CONNECTIONLOSS, null), "transact"); + } + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + } + + } } diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index ca659be..740829a 100644 --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -26,7 +26,7 @@ import java.io.PrintWriter; import java.net.InetSocketAddress; import java.net.Socket; import java.util.ArrayList; -import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Properties; @@ -49,6 +49,8 @@ import org.apache.hadoop.hbase.util.Threads; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.BadArgumentsException; +import org.apache.zookeeper.Op; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.KeeperException.NoNodeException; @@ -937,25 +939,53 @@ public class ZKUtil { * @param znode path of node * @throws KeeperException if unexpected zookeeper exception */ - public static void createWithParents(ZooKeeperWatcher zkw, String znode) - throws KeeperException { + public static void createWithParents(ZooKeeperWatcher zkw, String znode) throws KeeperException { try { - if(znode == null) { + if (znode == null) { return; } waitForZKConnectionIfAuthenticating(zkw); zkw.getRecoverableZooKeeper().create(znode, new byte[0], createACL(zkw, znode), - CreateMode.PERSISTENT); - } catch(KeeperException.NodeExistsException nee) { + CreateMode.PERSISTENT); + } catch (KeeperException.NodeExistsException nee) { return; - } catch(KeeperException.NoNodeException nne) { - createWithParents(zkw, getParent(znode)); - createWithParents(zkw, znode); - } catch(InterruptedException ie) { + } catch (KeeperException.NoNodeException nne) { + LOG.debug("Failed to create:" + znode + ", attempting to create parents."); + // the parent doesn't exist, so create up the list + List treeCreation = new LinkedList(); + // push on the leaf node to the creation stack + treeCreation.add(Op.create(znode, new byte[0], createACL(zkw, znode), CreateMode.PERSISTENT)); + // try adding up the tree until we succeed or hit root + try { + transactionallyCreateWithParents(zkw, treeCreation, znode); + } catch (InterruptedException e) { + zkw.interruptedException(e); + } catch (KeeperException.NodeExistsException e) { + // one of the parent nodes exists, so just try doing it all again + createWithParents(zkw, znode); + } + } catch (InterruptedException ie) { zkw.interruptedException(ie); } } + private static void transactionallyCreateWithParents(ZooKeeperWatcher zkw, List parents, + String node) throws KeeperException, InterruptedException { + String parent = ZKUtil.getParent(node); + // if there aren't any more nodes, then throw an exception + if (parent == null) { + throw new KeeperException.BadArgumentsException(node); + } + LOG.debug("Attempting to create parent znode:" + parent + ", as well as all children."); + parents.add(0, Op.create(parent, new byte[0], createACL(zkw, parent), CreateMode.PERSISTENT)); + try { + zkw.getRecoverableZooKeeper().transact(parents); + } catch (KeeperException.NoNodeException e) { + // one of the parent nodes doesn't, so try this again with the next parent + transactionallyCreateWithParents(zkw, parents, parent); + } + } + // // Deletes // diff --git src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java index d496d48..3271147 100644 --- src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java +++ src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java @@ -23,11 +23,14 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -45,7 +48,10 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.OpResult; +import org.apache.zookeeper.Transaction; import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper.States; import org.junit.AfterClass; @@ -362,6 +368,30 @@ public class TestZooKeeper { ZKUtil.getChildDataAndWatchForNewChildren(zkw, "/wrongNode"); } + @Test + public void testCreateWithParents() throws Exception { + // try a general create + ZKUtil.createWithParents(TEST_UTIL.getZooKeeperWatcher(), "/path/to/thing"); + assertTrue(ZKUtil.checkExists(TEST_UTIL.getZooKeeperWatcher(), "/path/to/thing") > -1); + LOG.debug("Created '/path/to/thing'"); + + // then try creating a sibling + ZKUtil.createWithParents(TEST_UTIL.getZooKeeperWatcher(), "/path/to/other"); + assertTrue(ZKUtil.checkExists(TEST_UTIL.getZooKeeperWatcher(), "/path/to/other") > -1); + LOG.debug("Created '/path/to/other'"); + + ZKUtil.createWithParents(TEST_UTIL.getZooKeeperWatcher(), "/"); + LOG.debug("Created '/'"); + + // try a bad creation + try { + ZKUtil.createWithParents(TEST_UTIL.getZooKeeperWatcher(), "bad/node/creation"); + fail("Attempted to create 'bad/node/creation', which is not a valid node"); + } catch (IllegalArgumentException e) { + // NOOP, this is ok + } + } + @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();