Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 1080354) +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy) @@ -657,7 +657,6 @@ **/ public int acquireReadWriteLocks() { try { - int tryNum = 1; int sleepTime = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES) * 1000; int numRetries = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES); @@ -717,13 +716,31 @@ } ctx.setHiveLockMgr(hiveLockMgr); - List hiveLocks = ctx.getHiveLockMgr().lock(lockObjects, false, numRetries, sleepTime); + List hiveLocks = null; + + int tryNum = 1; + while (tryNum < numRetries) { + + ctx.getHiveLockMgr().prepareRetry(); + hiveLocks = ctx.getHiveLockMgr().lock(lockObjects, false); + if (hiveLocks != null) { + break; + } + + tryNum++; + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + } + } + if (hiveLocks == null) { throw new SemanticException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg()); } else { ctx.setHiveLocks(hiveLocks); } + return (0); } catch (SemanticException e) { errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage(); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (revision 1080354) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (working copy) @@ -1927,7 +1927,7 @@ "EXPLICIT"); if (partSpec == null) { - HiveLock lck = lockMgr.lock(new HiveLockObject(tbl, lockData), mode, true, 0, 0); + HiveLock lck = lockMgr.lock(new HiveLockObject(tbl, lockData), mode, true); if (lck == null) { return 1; } @@ -1938,7 +1938,7 @@ if (par == null) { throw new HiveException("Partition " + partSpec + " for table " + tabName + " does not exist"); } - HiveLock lck = lockMgr.lock(new HiveLockObject(par, lockData), mode, true, 0, 0); + HiveLock lck = lockMgr.lock(new HiveLockObject(par, lockData), mode, true); if (lck == null) { return 1; } Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java (revision 1080354) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java (working copy) @@ -28,17 +28,16 @@ * @param key object to be locked * @param mode mode of the lock (SHARED/EXCLUSIVE) * @param keepAlive if the lock needs to be persisted after the statement - * @param sleepTime - * @param numRetries */ public HiveLock lock(HiveLockObject key, HiveLockMode mode, - boolean keepAlive, int numRetries, int sleepTime) throws LockException; + boolean keepAlive) throws LockException; public List lock(List objs, - boolean keepAlive, int numRetries, int sleepTime) throws LockException; + boolean keepAlive) throws LockException; public void unlock(HiveLock hiveLock) throws LockException; public void releaseLocks(List hiveLocks); public List getLocks(boolean verifyTablePartitions, boolean fetchData) throws LockException; public List getLocks(HiveLockObject key, boolean verifyTablePartitions, boolean fetchData) throws LockException; public void close() throws LockException; + public void prepareRetry() throws LockException; } Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java (revision 1080354) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java (working copy) @@ -18,8 +18,6 @@ package org.apache.hadoop.hive.ql.lockmgr; -import org.apache.hadoop.hive.conf.HiveConf; - public enum HiveLockMode { SHARED, EXCLUSIVE; } Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObj.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObj.java (revision 1080354) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObj.java (working copy) @@ -18,9 +18,6 @@ package org.apache.hadoop.hive.ql.lockmgr; -import org.apache.hadoop.hive.ql.metadata.Partition; -import org.apache.hadoop.hive.ql.metadata.Table; - public class HiveLockObj { HiveLockObject obj; HiveLockMode mode; Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (revision 1080354) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (working copy) @@ -27,21 +27,17 @@ import org.apache.commons.logging.LogFactory; import java.util.List; import java.util.ArrayList; -import java.util.Set; import java.util.Queue; import java.util.LinkedList; import java.util.Map; import java.util.HashMap; import java.util.Comparator; import java.util.Collections; -import java.util.LinkedHashSet; import java.util.regex.Pattern; import java.util.regex.Matcher; -import org.apache.commons.lang.StringEscapeUtils; import org.apache.zookeeper.KeeperException; import org.apache.hadoop.hive.ql.parse.ErrorMsg; -import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; import org.apache.hadoop.hive.ql.lockmgr.HiveLockManagerCtx; import org.apache.hadoop.hive.ql.lockmgr.HiveLock; @@ -100,7 +96,7 @@ parent = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE); try { - String par = zooKeeper.create("/" + parent, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zooKeeper.create("/" + parent, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (KeeperException e) { // ignore if the parent already exists } @@ -152,14 +148,12 @@ /** * @param lockObjects List of objects and the modes of the locks requested * @param keepAlive Whether the lock is to be persisted after the statement - * @param numRetries number of retries when the lock can not be acquired - * @param sleepTime sleep time between retries * * Acuire all the locks. Release all the locks and return null if any lock * could not be acquired. **/ public List lock(List lockObjects, - boolean keepAlive, int numRetries, int sleepTime) throws LockException + boolean keepAlive) throws LockException { // Sort the objects first. You are guaranteed that if a partition is being locked, // the table has already been locked @@ -198,8 +192,7 @@ HiveLock lock = null; try { - lock = lock(lockObject.getObj(), lockObject.getMode(), false, - numRetries, sleepTime, true); + lock = lock(lockObject.getObj(), lockObject.getMode(), false, true); } catch (LockException e) { console.printError("Error in acquireLocks: "+ e.getLocalizedMessage()); lock = null; @@ -238,71 +231,45 @@ } /** - * @param key The object to be locked - * @param mode The mode of the lock - * @param keepAlive Whether the lock is to be persisted after the statement - * @param numRetries number of retries when the lock can not be acquired - * @param sleepTime sleep time between retries - * - * Acuire the lock. Return null if a conflicting lock is present. + * @param key + * The object to be locked + * @param mode + * The mode of the lock + * @param keepAlive + * Whether the lock is to be persisted after the statement Acuire the + * lock. Return null if a conflicting lock is present. **/ public ZooKeeperHiveLock lock(HiveLockObject key, HiveLockMode mode, - boolean keepAlive, int numRetries, int sleepTime) - throws LockException { - return lock(key, mode, keepAlive, numRetries, sleepTime, false); + boolean keepAlive) throws LockException { + return lock(key, mode, keepAlive, false); } /** - * @param name The name of the zookeeper child - * @param data The data for the zookeeper child - * @param mode The mode in which the child needs to be created - * @param numRetries number of retries if the child cannot be created - * @param sleepTime sleep time between retries + * @param name + * The name of the zookeeper child + * @param data + * The data for the zookeeper child + * @param mode + * The mode in which the child needs to be created **/ - private String createChild(String name, byte[] data, CreateMode mode, - int numRetries, int sleepTime) throws LockException { + private String createChild(String name, byte[] data, CreateMode mode) throws LockException { String res = null; - int tryNum = 0; - while (true) { - String msg = null; - try { - res = zooKeeper.create(name, data, Ids.OPEN_ACL_UNSAFE, mode); - } catch (KeeperException e) { - return null; - // nothing to do if the node already exists - } catch (Exception e) { - msg = e.getLocalizedMessage(); - } - - if (res != null) { - return res; - } - - try { - renewZookeeperInstance(sessionTimeout, quorumServers); - } catch (Exception e) { - console.printError("Lock for " + name - + " cannot be acquired in " + mode); - throw new LockException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg()); - } - - if (tryNum == numRetries) { - console.printError("Lock for " + name - + " cannot be acquired in " + mode); - throw new LockException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg()); - } - - tryNum++; - - console.printInfo("Lock for " + name - + " cannot be acquired in " + mode + - ", will retry again later..., more info: " + msg); + String msg = null; + try { + res = zooKeeper.create(name, data, Ids.OPEN_ACL_UNSAFE, mode); + } catch (KeeperException e) { + return null; + // nothing to do if the node already exists + } catch (Exception e) { + msg = e.getLocalizedMessage(); + } - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - } + if (res == null) { + console.printInfo("Lock for " + name + " cannot be acquired in " + mode + + ", will retry again later..., more info: " + msg); } + + return res; } private String getLockName(String parent, HiveLockMode mode) { @@ -310,8 +277,7 @@ } private ZooKeeperHiveLock lock(HiveLockObject key, HiveLockMode mode, - boolean keepAlive, int numRetries, int sleepTime, - boolean parentCreated) + boolean keepAlive, boolean parentCreated) throws LockException { String res; @@ -331,12 +297,11 @@ // Create the parents first for (String name : names) { - res = createChild(name, new byte[0], CreateMode.PERSISTENT, numRetries, sleepTime); + res = createChild(name, new byte[0], CreateMode.PERSISTENT); } res = createChild(getLockName(lastName, mode), key.getData().toString().getBytes(), - keepAlive ? CreateMode.PERSISTENT_SEQUENTIAL : CreateMode.EPHEMERAL_SEQUENTIAL, - numRetries, sleepTime); + keepAlive ? CreateMode.PERSISTENT_SEQUENTIAL : CreateMode.EPHEMERAL_SEQUENTIAL); int seqNo = getSequenceNumber(res, getLockName(lastName, mode)); if (seqNo == -1) { @@ -662,4 +627,13 @@ public void process(org.apache.zookeeper.WatchedEvent event) { } } + + @Override + public void prepareRetry() throws LockException { + try { + renewZookeeperInstance(sessionTimeout, quorumServers); + } catch (Exception e) { + throw new LockException(e); + } + } }