Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 1040689) +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy) @@ -575,26 +575,14 @@ // walk the list and acquire the locks - if any lock cant be acquired, release all locks, // sleep and retry LockObjectContainer notFound = new LockObjectContainer(); - while (true) { - notFound.setLck(null); - List hiveLocks = acquireLocks(lockObjects, notFound); - - if (hiveLocks == null) { - if (tryNum == numRetries) { - console.printError("Lock for " + notFound.getLck().getObj().getName() - + " cannot be acquired in " + notFound.getLck().getMode()); - throw new SemanticException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg()); - } - tryNum++; + notFound.setLck(null); + List hiveLocks = acquireLocks(lockObjects, notFound, + numRetries, sleepTime); - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - } - } else { - ctx.setHiveLocks(hiveLocks); - break; - } + if (hiveLocks == null) { + throw new SemanticException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg()); + } else { + ctx.setHiveLocks(hiveLocks); } return (0); } catch (SemanticException e) { @@ -611,8 +599,11 @@ * The list of objects to be locked Lock the objects specified in the list. The same * object is not locked twice, and the list passed is sorted such that EXCLUSIVE locks * occur before SHARED locks. + * @param sleepTime + * @param numRetries **/ - private List acquireLocks(List lockObjects, LockObjectContainer notFound) + private List acquireLocks(List lockObjects, + LockObjectContainer notFound, int numRetries, int sleepTime) throws SemanticException { // walk the list and acquire the locks - if any lock cant be acquired, release all locks, sleep // and retry @@ -629,8 +620,9 @@ HiveLock lock = null; try { - lock = ctx.getHiveLockMgr().lock(lockObject.getObj(), lockObject.getMode(), false); + lock = ctx.getHiveLockMgr().lock(lockObject.getObj(), lockObject.getMode(), false, numRetries, sleepTime); } catch (LockException e) { + console.printError("Error in acquireLocks: "+ e.getLocalizedMessage()); lock = null; } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (revision 1040689) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (working copy) @@ -1436,7 +1436,7 @@ String lockData = lockTbl.getQueryId() + ":" + String.valueOf(System.currentTimeMillis()); if (partSpec == null) { - HiveLock lck = lockMgr.lock(new HiveLockObject(tbl, lockData), mode, true); + HiveLock lck = lockMgr.lock(new HiveLockObject(tbl, lockData), mode, true, 0, 0); if (lck == null) { return 1; } @@ -1447,7 +1447,7 @@ if (par == null) { throw new HiveException("Partition " + partSpec + " for table " + tabName + " does not exist"); } - HiveLock lck = lockMgr.lock(new HiveLockObject(par, lockData), mode, true); + HiveLock lck = lockMgr.lock(new HiveLockObject(par, lockData), mode, true, 0, 0); if (lck == null) { return 1; } Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java (revision 1040689) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java (working copy) @@ -28,9 +28,11 @@ * @param key object to be locked * @param mode mode of the lock (SHARED/EXCLUSIVE) * @param keepAlive if the lock needs to be persisted after the statement + * @param sleepTime + * @param numRetries */ - public HiveLock lock(HiveLockObject key, HiveLockMode mode, boolean keepAlive) - throws LockException; + public HiveLock lock(HiveLockObject key, HiveLockMode mode, + boolean keepAlive, int numRetries, int sleepTime) throws LockException; public void unlock(HiveLock hiveLock) throws LockException; public List getLocks() throws LockException; Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (revision 1040689) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (working copy) @@ -37,6 +37,7 @@ import org.apache.zookeeper.KeeperException; import org.apache.hadoop.hive.ql.parse.ErrorMsg; +import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; import org.apache.hadoop.hive.ql.lockmgr.HiveLockManagerCtx; import org.apache.hadoop.hive.ql.lockmgr.HiveLock; @@ -61,6 +62,9 @@ // All the locks are created under this parent private String parent; + + private int sessionTimeout; + private String quorumServers; public ZooKeeperHiveLockManager() { } @@ -83,15 +87,11 @@ public void setContext(HiveLockManagerCtx ctx) throws LockException { this.ctx = ctx; HiveConf conf = ctx.getConf(); - int sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT); - String quorumServers = ZooKeeperHiveLockManager.getQuorumServers(conf); + sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT); + quorumServers = ZooKeeperHiveLockManager.getQuorumServers(conf); try { - if (zooKeeper != null) { - zooKeeper.close(); - } - - zooKeeper = new ZooKeeper(quorumServers, sessionTimeout, new DummyWatcher()); + renewZookeeperInstance(sessionTimeout, quorumServers); parent = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE); try { @@ -107,6 +107,15 @@ } } + private void renewZookeeperInstance(int sessionTimeout, String quorumServers) + throws InterruptedException, IOException { + if (zooKeeper != null) { + zooKeeper.close(); + } + + zooKeeper = new ZooKeeper(quorumServers, sessionTimeout, new DummyWatcher()); + } + /** * Since partition names can contain "/", which need all the parent directories to be created by ZooKeeper, * replace "/" by a dummy name to ensure a single hierarchy. @@ -121,19 +130,54 @@ * @param key The object to be locked * @param mode The mode of the lock * @param keepAlive Whether the lock is to be persisted after the statement + * @param numRetries number of retries when the lock can not be acquired + * @param sleepTime sleep time between retries + * * Acuire the lock. Return null if a conflicting lock is present. **/ - public ZooKeeperHiveLock lock(HiveLockObject key, HiveLockMode mode, boolean keepAlive) + public ZooKeeperHiveLock lock(HiveLockObject key, HiveLockMode mode, + boolean keepAlive, int numRetries, int sleepTime) throws LockException { String name = getObjectName(key, mode); - String res; + String res = null; try { - if (keepAlive) { - res = zooKeeper.create(name, key.getData().getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); - } - else { - res = zooKeeper.create(name, key.getData().getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); + int tryNum = 0; + while (true) { + String msg = null; + try { + if (keepAlive) { + res = zooKeeper.create(name, key.getData().getBytes(), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); + } else { + res = zooKeeper.create(name, key.getData().getBytes(), + Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); + } + } catch (Exception e) { + msg = e.getLocalizedMessage(); + } + + if (res != null) { + break; + } + + renewZookeeperInstance(sessionTimeout, quorumServers); + + if (tryNum == numRetries) { + console.printError("Lock for " + key.getName() + + " cannot be acquired in " + mode); + throw new SemanticException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg()); + } + + tryNum++; + + console.printInfo("Lock for " + key.getName() + + " cannot be acquired in " + mode +", will retry again later..., more info: " + msg); + + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + } } int seqNo = getSequenceNumber(res, name); @@ -258,6 +302,9 @@ } HiveLockObject obj = getLockObject(conf, child, mode, data); + if (obj == null) { + continue; + } if ((key == null) || (obj.getName().equals(key.getName()))) { HiveLock lck = (HiveLock)(new ZooKeeperHiveLock(child, obj, mode)); @@ -306,10 +353,16 @@ int indx = path.lastIndexOf(mode.toString()); String objName = path.substring(1, indx-1); String[] names = objName.split("/")[1].split("@"); + + if (names.length < 2) { + return null; + } Table tab = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, names[1], false); // do not throw exception if table does not exist - assert (tab != null); + if (tab == null) { + return null; + } if (names.length == 2) { return new HiveLockObject(tab, data);