From 9f1994b7565440fd922114c1ee5f333d1f3bd666 Mon Sep 17 00:00:00 2001 From: Na Yang Date: Mon, 5 Jan 2015 10:36:21 -0800 Subject: [PATCH] HIVE-9119 --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 11 +- .../java/org/apache/hadoop/hive/ql/QTestUtil.java | 8 +- ql/pom.xml | 17 +++ .../zookeeper/CuratorFrameworkSingleton.java | 76 +++++++++++ .../zookeeper/ZooKeeperHiveLockManager.java | 141 +++++++-------------- .../zookeeper/TestZookeeperLockManager.java | 95 ++++++++------ 6 files changed, 205 insertions(+), 143 deletions(-) create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 2e51518..af3b046 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1302,13 +1302,20 @@ "The port of ZooKeeper servers to talk to.\n" + "If the list of Zookeeper servers specified in hive.zookeeper.quorum\n" + "does not contain port numbers, this value is used."), - HIVE_ZOOKEEPER_SESSION_TIMEOUT("hive.zookeeper.session.timeout", 600*1000, - "ZooKeeper client's session timeout. The client is disconnected, and as a result, all locks released, \n" + + HIVE_ZOOKEEPER_SESSION_TIMEOUT("hive.zookeeper.session.timeout", "600000ms", + new TimeValidator(TimeUnit.MILLISECONDS), + "ZooKeeper client's session timeout (in milliseconds). The client is disconnected, and as a result, all locks released, \n" + "if a heartbeat is not sent in the timeout."), HIVE_ZOOKEEPER_NAMESPACE("hive.zookeeper.namespace", "hive_zookeeper_namespace", "The parent node under which all ZooKeeper nodes are created."), HIVE_ZOOKEEPER_CLEAN_EXTRA_NODES("hive.zookeeper.clean.extra.nodes", false, "Clean extra nodes at the end of the session."), + HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES("hive.zookeeper.connection.max.retries", 3, + "Max number of times to retry when connecting to the ZooKeeper server."), + HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME("hive.zookeeper.connection.basesleeptime", "1000ms", + new TimeValidator(TimeUnit.MILLISECONDS), + "Initial amount of time (in milliseconds) to wait between retries\n" + + "when connecting to the ZooKeeper server when using ExponentialBackoffRetry policy."), // Transactions HIVE_TXN_MANAGER("hive.txn.manager", diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index 878202a..d3fd06c 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -49,6 +49,7 @@ import java.util.List; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -75,6 +76,7 @@ import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.lockmgr.zookeeper.CuratorFrameworkSingleton; import org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.Table; @@ -124,7 +126,7 @@ private static MiniClusterType clusterType = MiniClusterType.none; private ParseDriver pd; protected Hive db; - protected HiveConf conf; + static protected HiveConf conf; private Driver drv; private BaseSemanticAnalyzer sem; protected final boolean overWrite; @@ -1464,7 +1466,7 @@ public void preTest(HiveConf conf) throws Exception { zooKeeper.close(); } - int sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT); + int sessionTimeout = (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); zooKeeper = new ZooKeeper("localhost:" + zkPort, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent arg0) { @@ -1489,6 +1491,8 @@ public void postTest(HiveConf conf) throws Exception { } public void tearDown() throws Exception { + CuratorFrameworkSingleton.closeAndReleaseInstance(); + if (zooKeeperCluster != null) { zooKeeperCluster.shutdown(); zooKeeperCluster = null; diff --git ql/pom.xml ql/pom.xml index 84e912e..a1fd46f 100644 --- ql/pom.xml +++ ql/pom.xml @@ -162,6 +162,23 @@ ${zookeeper.version} + org.apache.curator + curator-framework + ${curator.version} + + + org.apache.curator + apache-curator + ${curator.version} + pom + + + org.apache.curator + curator-test + ${curator.version} + test + + org.codehaus.groovy groovy-all ${groovy.version} diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java new file mode 100644 index 0000000..fbf2a01 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lockmgr.zookeeper; + +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper; + +public class CuratorFrameworkSingleton { + private static HiveConf conf = null; + private static CuratorFramework sharedClient = null; + static final Log LOG = LogFactory.getLog("CuratorFrameworkSingleton"); + static { + // Add shutdown hook. + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + closeAndReleaseInstance(); + } + }); + } + + public static synchronized CuratorFramework getInstance(HiveConf hiveConf) { + if (sharedClient == null) { + // Create a client instance + if (hiveConf == null) { + conf = new HiveConf(); + } else { + conf = hiveConf; + } + int sessionTimeout = (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); + int baseSleepTime = (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS); + int maxRetries = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES); + String quorumServers = ZooKeeperHiveHelper.getQuorumServers(conf); + + sharedClient = CuratorFrameworkFactory.builder().connectString(quorumServers) + .sessionTimeoutMs(sessionTimeout) + .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)) + .build(); + sharedClient.start(); + } + + return sharedClient; + } + + public static synchronized void closeAndReleaseInstance() { + if (sharedClient != null) { + sharedClient.close(); + sharedClient = null; + String shutdownMsg = "Closing ZooKeeper client."; + LOG.info(shutdownMsg); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java index 1334a91..4f86dd9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java @@ -27,15 +27,10 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData; import org.apache.hadoop.hive.ql.metadata.*; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; -import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper; import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.ZooKeeper; +import org.apache.curator.framework.CuratorFramework; -import java.io.IOException; import java.net.InetAddress; import java.util.*; import java.util.concurrent.TimeUnit; @@ -47,14 +42,11 @@ public static final Log LOG = LogFactory.getLog("ZooKeeperHiveLockManager"); static final private LogHelper console = new LogHelper(LOG); - private ZooKeeper zooKeeper; + private static CuratorFramework curatorFramework; // All the locks are created under this parent private String parent; - private int sessionTimeout; - private String quorumServers; - private long sleepTime; private int numRetriesForLock; private int numRetriesForUnLock; @@ -80,8 +72,6 @@ public ZooKeeperHiveLockManager() { public void setContext(HiveLockManagerCtx ctx) throws LockException { this.ctx = ctx; HiveConf conf = ctx.getConf(); - sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT); - quorumServers = ZooKeeperHiveHelper.getQuorumServers(conf); sleepTime = conf.getTimeVar( HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES, TimeUnit.MILLISECONDS); @@ -89,20 +79,18 @@ public void setContext(HiveLockManagerCtx ctx) throws LockException { numRetriesForUnLock = conf.getIntVar(HiveConf.ConfVars.HIVE_UNLOCK_NUMRETRIES); try { - renewZookeeperInstance(sessionTimeout, quorumServers); + curatorFramework = CuratorFrameworkSingleton.getInstance(conf); parent = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE); - - try { - zooKeeper.create("/" + parent, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } catch (KeeperException e) { + try{ + curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/" + parent, new byte[0]); + } catch (Exception e) { // ignore if the parent already exists - if (e.code() != KeeperException.Code.NODEEXISTS) { + if (!(e instanceof KeeperException) || ((KeeperException)e).code() != KeeperException.Code.NODEEXISTS) { LOG.warn("Unexpected ZK exception when creating parent node /" + parent, e); } } - } catch (Exception e) { - LOG.error("Failed to create ZooKeeper object: ", e); + LOG.error("Failed to create curatorFramework object: ", e); throw new LockException(ErrorMsg.ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED.getMsg()); } } @@ -116,15 +104,6 @@ public void refresh() { numRetriesForUnLock = conf.getIntVar(HiveConf.ConfVars.HIVE_UNLOCK_NUMRETRIES); } - private void renewZookeeperInstance(int sessionTimeout, String quorumServers) - throws InterruptedException, IOException { - if (zooKeeper != null) { - return; - } - - zooKeeper = new ZooKeeper(quorumServers, sessionTimeout, new ZooKeeperHiveHelper.DummyWatcher()); - } - /** * @param key object to be locked * Get the name of the last string. For eg. if you need to lock db/T/ds=1=/hr=1, @@ -266,8 +245,8 @@ public ZooKeeperHiveLock lock(HiveLockObject key, HiveLockMode mode, * @throws InterruptedException **/ private String createChild(String name, byte[] data, CreateMode mode) - throws KeeperException, InterruptedException { - return zooKeeper.create(name, data, Ids.OPEN_ACL_UNSAFE, mode); + throws Exception { + return curatorFramework.create().withMode(mode).forPath(name, data); } private String getLockName(String parent, HiveLockMode mode) { @@ -347,7 +326,7 @@ private void printConflictingLocks(HiveLockObject key, HiveLockMode mode, private ZooKeeperHiveLock lockPrimitive(HiveLockObject key, HiveLockMode mode, boolean keepAlive, boolean parentCreated, Set conflictingLocks) - throws KeeperException, InterruptedException { + throws Exception { String res; // If the parents have already been created, create the last child only @@ -369,8 +348,8 @@ private ZooKeeperHiveLock lockPrimitive(HiveLockObject key, for (String name : names) { try { res = createChild(name, new byte[0], CreateMode.PERSISTENT); - } catch (KeeperException e) { - if (e.code() != KeeperException.Code.NODEEXISTS) { + } catch (Exception e) { + if (!(e instanceof KeeperException) || ((KeeperException)e).code() != KeeperException.Code.NODEEXISTS) { //if the exception is not 'NODEEXISTS', re-throw it throw e; } @@ -383,11 +362,11 @@ private ZooKeeperHiveLock lockPrimitive(HiveLockObject key, int seqNo = getSequenceNumber(res, getLockName(lastName, mode)); if (seqNo == -1) { - zooKeeper.delete(res, -1); + curatorFramework.delete().forPath(res); return null; } - List children = zooKeeper.getChildren(lastName, false); + List children = curatorFramework.getChildren().forPath(lastName); String exLock = getLockName(lastName, HiveLockMode.EXCLUSIVE); String shLock = getLockName(lastName, HiveLockMode.SHARED); @@ -407,12 +386,11 @@ private ZooKeeperHiveLock lockPrimitive(HiveLockObject key, if ((childSeq >= 0) && (childSeq < seqNo)) { try { - zooKeeper.delete(res, -1); + curatorFramework.delete().forPath(res); } finally { if (LOG.isDebugEnabled()) { - Stat stat = new Stat(); try { - String data = new String(zooKeeper.getData(child, false, stat)); + String data = new String(curatorFramework.getData().forPath(child)); conflictingLocks.add(data); } catch (Exception e) { //ignored @@ -428,11 +406,10 @@ private ZooKeeperHiveLock lockPrimitive(HiveLockObject key, /* Remove the lock specified */ public void unlock(HiveLock hiveLock) throws LockException { - unlockWithRetry(ctx.getConf(), zooKeeper, hiveLock, parent); + unlockWithRetry(hiveLock, parent); } - private void unlockWithRetry(HiveConf conf, ZooKeeper zkpClient, - HiveLock hiveLock, String parent) throws LockException { + private void unlockWithRetry(HiveLock hiveLock, String parent) throws LockException { int tryNum = 0; do { @@ -440,14 +417,13 @@ private void unlockWithRetry(HiveConf conf, ZooKeeper zkpClient, tryNum++; if (tryNum > 1) { Thread.sleep(sleepTime); - prepareRetry(); } - unlockPrimitive(conf, zkpClient, hiveLock, parent); + unlockPrimitive(hiveLock, parent, curatorFramework); break; } catch (Exception e) { if (tryNum >= numRetriesForUnLock) { String name = ((ZooKeeperHiveLock)hiveLock).getPath(); - LOG.error("Node " + name + " can not be deleted after " + numRetriesForUnLock + " attempts."); + LOG.error("Node " + name + " can not be deleted after " + numRetriesForUnLock + " attempts."); throw new LockException(e); } } @@ -458,21 +434,20 @@ private void unlockWithRetry(HiveConf conf, ZooKeeper zkpClient, /* Remove the lock specified */ @VisibleForTesting - static void unlockPrimitive(HiveConf conf, ZooKeeper zkpClient, - HiveLock hiveLock, String parent) throws LockException { + static void unlockPrimitive(HiveLock hiveLock, String parent, CuratorFramework curatorFramework) throws LockException { ZooKeeperHiveLock zLock = (ZooKeeperHiveLock)hiveLock; HiveLockObject obj = zLock.getHiveLockObject(); String name = getLastObjectName(parent, obj); try { - zkpClient.delete(zLock.getPath(), -1); + curatorFramework.delete().forPath(zLock.getPath()); // Delete the parent node if all the children have been deleted - List children = zkpClient.getChildren(name, false); + List children = curatorFramework.getChildren().forPath(name); if (children == null || children.isEmpty()) { - zkpClient.delete(name, -1); + curatorFramework.delete().forPath(name); } } catch (KeeperException.NoNodeException nne) { - //can happen in retrying deleting the zLock after exceptions like InterruptedException + //can happen in retrying deleting the zLock after exceptions like InterruptedException //or in a race condition where parent has already been deleted by other process when it //is to be deleted. Both cases should not raise error LOG.debug("Node " + zLock.getPath() + " or its parent has already been deleted."); @@ -480,7 +455,7 @@ static void unlockPrimitive(HiveConf conf, ZooKeeper zkpClient, //can happen in a race condition where another process adds a zLock under this parent //just before it is about to be deleted. It should not be a problem since this parent //can eventually be deleted by the process which hold its last child zLock - LOG.debug("Node " + name + " to be deleted is not empty."); + LOG.debug("Node " + name + " to be deleted is not empty."); } catch (Exception e) { //exceptions including InterruptException and other KeeperException LOG.error("Failed to release ZooKeeper lock: ", e); @@ -490,19 +465,14 @@ static void unlockPrimitive(HiveConf conf, ZooKeeper zkpClient, /* Release all locks - including PERSISTENT locks */ public static void releaseAllLocks(HiveConf conf) throws Exception { - ZooKeeper zkpClient = null; try { - int sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT); - String quorumServers = ZooKeeperHiveHelper.getQuorumServers(conf); - Watcher dummyWatcher = new ZooKeeperHiveHelper.DummyWatcher(); - zkpClient = new ZooKeeper(quorumServers, sessionTimeout, dummyWatcher); String parent = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE); - List locks = getLocks(conf, zkpClient, null, parent, false, false); + List locks = getLocks(conf, null, parent, false, false); Exception lastExceptionGot = null; if (locks != null) { for (HiveLock lock : locks) { try { - unlockPrimitive(conf, zkpClient, lock, parent); + unlockPrimitive(lock, parent, curatorFramework); } catch (Exception e) { lastExceptionGot = e; } @@ -516,24 +486,19 @@ public static void releaseAllLocks(HiveConf conf) throws Exception { } catch (Exception e) { LOG.error("Failed to release all locks: ", e); throw new Exception(ErrorMsg.ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED.getMsg()); - } finally { - if (zkpClient != null) { - zkpClient.close(); - zkpClient = null; - } } } /* Get all locks */ public List getLocks(boolean verifyTablePartition, boolean fetchData) throws LockException { - return getLocks(ctx.getConf(), zooKeeper, null, parent, verifyTablePartition, fetchData); + return getLocks(ctx.getConf(), null, parent, verifyTablePartition, fetchData); } /* Get all locks for a particular object */ public List getLocks(HiveLockObject key, boolean verifyTablePartitions, boolean fetchData) throws LockException { - return getLocks(ctx.getConf(), zooKeeper, key, parent, verifyTablePartitions, fetchData); + return getLocks(ctx.getConf(), key, parent, verifyTablePartitions, fetchData); } /** @@ -541,7 +506,7 @@ public static void releaseAllLocks(HiveConf conf) throws Exception { * @param zkpClient The ZooKeeper client * @param key The object to be compared against - if key is null, then get all locks **/ - private static List getLocks(HiveConf conf, ZooKeeper zkpClient, + private static List getLocks(HiveConf conf, HiveLockObject key, String parent, boolean verifyTablePartition, boolean fetchData) throws LockException { List locks = new ArrayList(); @@ -552,12 +517,12 @@ public static void releaseAllLocks(HiveConf conf) throws Exception { try { if (key != null) { commonParent = "/" + parent + "/" + key.getName(); - children = zkpClient.getChildren(commonParent, false); + children = curatorFramework.getChildren().forPath(commonParent); recurse = false; } else { commonParent = "/" + parent; - children = zkpClient.getChildren(commonParent, false); + children = curatorFramework.getChildren().forPath(commonParent); } } catch (Exception e) { // no locks present @@ -579,7 +544,7 @@ public static void releaseAllLocks(HiveConf conf) throws Exception { if (recurse) { try { - children = zkpClient.getChildren(curChild, false); + children = curatorFramework.getChildren().forPath(curChild); for (String child : children) { childn.add(curChild + "/" + child); } @@ -588,7 +553,7 @@ public static void releaseAllLocks(HiveConf conf) throws Exception { } } - HiveLockMode mode = getLockMode(conf, curChild); + HiveLockMode mode = getLockMode(curChild); if (mode == null) { continue; } @@ -605,8 +570,7 @@ public static void releaseAllLocks(HiveConf conf) throws Exception { if (fetchData) { try { - data = new HiveLockObjectData(new String(zkpClient.getData(curChild, - new ZooKeeperHiveHelper.DummyWatcher(), null))); + data = new HiveLockObjectData(new String(curatorFramework.getData().watched().forPath(curChild))); data.setClientIp(clientIp); } catch (Exception e) { LOG.error("Error in getting data for " + curChild, e); @@ -623,12 +587,7 @@ public static void releaseAllLocks(HiveConf conf) throws Exception { /** Remove all redundant nodes **/ private void removeAllRedundantNodes() { try { - renewZookeeperInstance(sessionTimeout, quorumServers); checkRedundantNode("/" + parent); - if (zooKeeper != null) { - zooKeeper.close(); - zooKeeper = null; - } } catch (Exception e) { LOG.warn("Exception while removing all redundant nodes", e); } @@ -637,19 +596,19 @@ private void removeAllRedundantNodes() { private void checkRedundantNode(String node) { try { // Nothing to do if it is a lock mode - if (getLockMode(ctx.getConf(), node) != null) { + if (getLockMode(node) != null) { return; } - List children = zooKeeper.getChildren(node, false); + List children = curatorFramework.getChildren().forPath(node); for (String child : children) { checkRedundantNode(node + "/" + child); } - children = zooKeeper.getChildren(node, false); + children = curatorFramework.getChildren().forPath(node); if ((children == null) || (children.isEmpty())) { - zooKeeper.delete(node, -1); + curatorFramework.delete().forPath(node); } } catch (Exception e) { LOG.warn("Error in checkRedundantNode for node " + node, e); @@ -658,12 +617,7 @@ private void checkRedundantNode(String node) { /* Release all transient locks, by simply closing the client */ public void close() throws LockException { - try { - - if (zooKeeper != null) { - zooKeeper.close(); - zooKeeper = null; - } + try { if (HiveConf.getBoolVar(ctx.getConf(), HiveConf.ConfVars.HIVE_ZOOKEEPER_CLEAN_EXTRA_NODES)) { removeAllRedundantNodes(); @@ -750,7 +704,7 @@ private static HiveLockObject getLockObject(HiveConf conf, String path, private static Pattern exMode = Pattern.compile("^.*-(EXCLUSIVE)-([0-9]+)$"); /* Get the mode of the lock encoded in the path */ - private static HiveLockMode getLockMode(HiveConf conf, String path) { + private static HiveLockMode getLockMode(String path) { Matcher shMatcher = shMode.matcher(path); Matcher exMatcher = exMode.matcher(path); @@ -768,15 +722,6 @@ private static HiveLockMode getLockMode(HiveConf conf, String path) { @Override public void prepareRetry() throws LockException { - try { - if (zooKeeper != null && zooKeeper.getState() == ZooKeeper.States.CLOSED) { - // Reconnect if the connection is closed. - zooKeeper = null; - } - renewZookeeperInstance(sessionTimeout, quorumServers); - } catch (Exception e) { - throw new LockException(e); - } } } diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java index aacb73f..4a1ef2e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java @@ -18,70 +18,82 @@ package org.apache.hadoop.hive.ql.lockmgr.zookeeper; -import static org.mockito.Mockito.*; - -import java.util.Collections; - import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData; import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooKeeper; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.TestingServer; import org.junit.Assert; import org.junit.Before; +import org.junit.After; import org.junit.Test; -import com.google.common.base.Joiner; - public class TestZookeeperLockManager { - private static final Joiner SLASH = Joiner.on("/"); - private static final String PARENT = "hive"; - private static final String TABLE = "t1"; - private static final String PARENT_LOCK_PATH = SLASH.join("", PARENT, TABLE); - private static final String TABLE_LOCK_PATH = SLASH.join("", PARENT, TABLE, "00001"); private HiveConf conf; - private ZooKeeper zooKeeper; + private TestingServer server; + private CuratorFramework client; private HiveLockObject hiveLock; private ZooKeeperHiveLock zLock; + private HiveLockObjectData lockObjData; + private static final String PARENT = "hive"; + private static final String TABLE = "t1"; + private static final String PARENT_LOCK_PATH = "/hive/t1"; + private static final String TABLE_LOCK_PATH = "/hive/t1/00001"; @Before public void setup() { conf = new HiveConf(); - zooKeeper = mock(ZooKeeper.class); - hiveLock = mock(HiveLockObject.class); - when(hiveLock.getName()).thenReturn(TABLE); + lockObjData = new HiveLockObjectData("1", "10", "SHARED", "show tables"); + hiveLock = new HiveLockObject(TABLE, lockObjData); zLock = new ZooKeeperHiveLock(TABLE_LOCK_PATH, hiveLock, HiveLockMode.SHARED); - } - @Test - public void testDeleteNoChildren() throws Exception { - ZooKeeperHiveLockManager.unlockPrimitive(conf, zooKeeper, zLock, PARENT); - verify(zooKeeper).delete(TABLE_LOCK_PATH, -1); - verify(zooKeeper).getChildren(PARENT_LOCK_PATH, false); - verify(zooKeeper).delete(PARENT_LOCK_PATH, -1); - verifyNoMoreInteractions(zooKeeper); + while (server == null) + { + try { + server = new TestingServer(); + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); + client = builder.connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).build(); + client.start(); + } catch (Exception e) { + System.err.println("Getting bind exception - retrying to allocate server"); + server = null; + } + } } - /** - * Tests two threads racing to delete PARENT_LOCK_PATH - */ - @Test - public void testDeleteNoChildrenNodeDoesNotExist() throws Exception { - doThrow(new KeeperException.NoNodeException()).when(zooKeeper).delete(PARENT_LOCK_PATH, -1); - ZooKeeperHiveLockManager.unlockPrimitive(conf, zooKeeper, zLock, PARENT); - verify(zooKeeper).delete(TABLE_LOCK_PATH, -1); - verify(zooKeeper).getChildren(PARENT_LOCK_PATH, false); - verify(zooKeeper).delete(PARENT_LOCK_PATH, -1); - verifyNoMoreInteractions(zooKeeper); + + @After + public void teardown() throws Exception + { + client.close(); + server.close(); + server = null; } + @Test - public void testDeleteWithChildren() throws Exception { - when(zooKeeper.getChildren(PARENT_LOCK_PATH, false)).thenReturn(Collections.singletonList("somechild")); - ZooKeeperHiveLockManager.unlockPrimitive(conf, zooKeeper, zLock, PARENT); - verify(zooKeeper).delete(TABLE_LOCK_PATH, -1); - verify(zooKeeper).getChildren(PARENT_LOCK_PATH, false); - verifyNoMoreInteractions(zooKeeper); + public void testDeleteNoChildren() throws Exception + { + client.create().creatingParentsIfNeeded().forPath(TABLE_LOCK_PATH, lockObjData.toString().getBytes()); + byte[] data = client.getData().forPath(TABLE_LOCK_PATH); + Assert.assertArrayEquals(lockObjData.toString().getBytes(), data); + ZooKeeperHiveLockManager.unlockPrimitive(zLock, PARENT, client); + try { + data = client.getData().forPath(TABLE_LOCK_PATH); + Assert.fail(); + } catch (Exception e) { + Assert.assertEquals( e instanceof KeeperException.NoNodeException, true); + } + try { + data = client.getData().forPath(PARENT_LOCK_PATH); + Assert.fail(); + } catch (Exception e) { + Assert.assertEquals( e instanceof KeeperException.NoNodeException, true); + } } @Test @@ -99,3 +111,4 @@ public void testGetQuorumServers() { Assert.assertEquals("node1:5666,node2:9999,node3:9999", ZooKeeperHiveHelper.getQuorumServers(conf)); } } + -- 1.8.5.2 (Apple Git-48)