Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1171255) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -416,6 +416,7 @@ HIVE_SUPPORT_CONCURRENCY("hive.support.concurrency", false), HIVE_LOCK_MANAGER("hive.lock.manager", "org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager"), HIVE_LOCK_NUMRETRIES("hive.lock.numretries", 100), + HIVE_UNLOCK_NUMRETRIES("hive.unlock.numretries", 10), HIVE_LOCK_SLEEP_BETWEEN_RETRIES("hive.lock.sleep.between.retries", 60), HIVE_LOCK_MAPRED_ONLY("hive.lock.mapred.only.operation", false), Index: conf/hive-default.xml =================================================================== --- conf/hive-default.xml (revision 1171255) +++ conf/hive-default.xml (working copy) @@ -871,6 +871,12 @@ + hive.unlock.numretries + 10 + The number of times you want to retry to do one unlock + + + hive.lock.sleep.between.retries 60 The sleep time (in seconds) between various retries Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 1171255) +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy) @@ -767,9 +767,6 @@ perfLogger.PerfLogBegin(LOG, PerfLogger.ACQUIRE_READ_WRITE_LOCKS); try { - int sleepTime = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES) * 1000; - int numRetries = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES); - boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); if (!supportConcurrency) { return 0; @@ -821,24 +818,7 @@ throw new SemanticException(e.getMessage()); } - List hiveLocks = null; - - int tryNum = 1; - do { - - ctx.getHiveLockMgr().prepareRetry(); - hiveLocks = ctx.getHiveLockMgr().lock(lockObjects, false); - - if (hiveLocks != null) { - break; - } - - tryNum++; - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - } - } while (tryNum < numRetries); + List hiveLocks = ctx.getHiveLockMgr().lock(lockObjects, false); if (hiveLocks == null) { throw new SemanticException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg()); Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java (revision 1171255) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java (working copy) @@ -32,6 +32,7 @@ // mode of the lock: EXPLICIT(lock command)/IMPLICIT(query) private String lockMode; private String queryStr; + private String clientIp; public HiveLockObjectData(String queryId, String lockTime, @@ -73,7 +74,16 @@ } public String toString() { - return queryId + ":" + lockTime + ":" + lockMode + ":" + queryStr; + return queryId + ":" + lockTime + ":" + lockMode + ":" + queryStr + ":" + + clientIp; + } + + public String getClientIp() { + return this.clientIp; + } + + public void setClientIp(String clientIp) { + this.clientIp = clientIp; } } Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (revision 1171255) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (working copy) @@ -23,6 +23,8 @@ import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import java.io.IOException; +import java.net.InetAddress; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.util.List; @@ -66,6 +68,12 @@ private int sessionTimeout; private String quorumServers; + + private int sleepTime; + private int numRetriesForLock; + private int numRetriesForUnLock; + + private String clientIp; public ZooKeeperHiveLockManager() { } @@ -91,6 +99,17 @@ sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT); quorumServers = ZooKeeperHiveLockManager.getQuorumServers(conf); + sleepTime = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES) * 1000; + numRetriesForLock = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES); + numRetriesForUnLock = conf.getIntVar(HiveConf.ConfVars.HIVE_UNLOCK_NUMRETRIES); + + clientIp = "UNKNOWN"; + try { + InetAddress clientAddr = InetAddress.getLocalHost(); + clientIp = clientAddr.getHostAddress(); + } catch (Exception e1) { + } + try { renewZookeeperInstance(sessionTimeout, quorumServers); parent = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE); @@ -101,9 +120,8 @@ // ignore if the parent already exists } - } catch (Exception e) { - LOG.error("Failed to create ZooKeeper object: " + e); + LOG.error("Failed to create ZooKeeper object: ", e); throw new LockException(ErrorMsg.ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED.getMsg()); } } @@ -275,8 +293,34 @@ private String getLockName(String parent, HiveLockMode mode) { return parent + "/" + "LOCK-" + mode + "-"; } + + private ZooKeeperHiveLock lock (HiveLockObject key, HiveLockMode mode, + boolean keepAlive, boolean parentCreated) throws LockException { + int tryNum = 1; + ZooKeeperHiveLock ret = null; + do { + try { + if (tryNum > 1) { + prepareRetry(); + } + ret = lockPrimitive(key, mode, keepAlive, parentCreated); + if (ret != null) { + break; + } - private ZooKeeperHiveLock lock(HiveLockObject key, HiveLockMode mode, + tryNum++; + Thread.sleep(sleepTime); + } catch (Exception e) { + if(tryNum >= numRetriesForLock) { + throw new LockException(e); + } + } + } while (tryNum < numRetriesForLock); + + return ret; + } + + private ZooKeeperHiveLock lockPrimitive(HiveLockObject key, HiveLockMode mode, boolean keepAlive, boolean parentCreated) throws LockException { String res; @@ -285,6 +329,9 @@ // If the parents have already been created, create the last child only List names = new ArrayList(); String lastName; + + HiveLockObjectData lockData = key.getData(); + lockData.setClientIp(clientIp); if (parentCreated) { lastName = getLastObjectName(parent, key); @@ -334,7 +381,7 @@ } } } catch (Exception e) { - LOG.error("Failed to get ZooKeeper lock: " + e); + LOG.error("Failed to get ZooKeeper lock: ", e); throw new LockException(e); } @@ -343,11 +390,34 @@ /* Remove the lock specified */ public void unlock(HiveLock hiveLock) throws LockException { - unlock(ctx.getConf(), zooKeeper, hiveLock, parent); + unlockWithRetry(ctx.getConf(), zooKeeper, hiveLock, parent); + } + + private void unlockWithRetry(HiveConf conf, ZooKeeper zkpClient, + HiveLock hiveLock, String parent) throws LockException { + + int tryNum = 0; + do { + try { + tryNum++; + if (tryNum > 1) { + Thread.sleep(sleepTime); + prepareRetry(); + } + unlockPrimitive(conf, zkpClient, hiveLock, parent); + break; + } catch (Exception e) { + if (tryNum >= numRetriesForUnLock) { + throw new LockException(e); + } + } + } while (tryNum < numRetriesForUnLock); + + return; } /* Remove the lock specified */ - private static void unlock(HiveConf conf, ZooKeeper zkpClient, + private static void unlockPrimitive(HiveConf conf, ZooKeeper zkpClient, HiveLock hiveLock, String parent) throws LockException { ZooKeeperHiveLock zLock = (ZooKeeperHiveLock)hiveLock; try { @@ -363,7 +433,7 @@ zkpClient.delete(name, -1); } } catch (Exception e) { - LOG.error("Failed to release ZooKeeper lock: " + e); + LOG.error("Failed to release ZooKeeper lock: ", e); throw new LockException(e); } } @@ -374,15 +444,25 @@ try { int sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT); String quorumServers = getQuorumServers(conf); - zkpClient = new ZooKeeper(quorumServers, sessionTimeout, new DummyWatcher()); + Watcher dummWatcher = new DummyWatcher(); + zkpClient = new ZooKeeper(quorumServers, sessionTimeout, dummWatcher); String parent = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE); List locks = getLocks(conf, zkpClient, null, parent, false, false); - + Exception lastExceptionGot = null; if (locks != null) { for (HiveLock lock : locks) { - unlock(conf, zkpClient, lock, parent); + try { + unlockPrimitive(conf, zkpClient, lock, parent); + } catch (Exception e) { + lastExceptionGot = e; + } } } + + // if we got exception during doing the unlock, rethrow it here + if(lastExceptionGot != null) { + throw lastExceptionGot; + } } catch (Exception e) { LOG.error("Failed to release all locks: " + e.getMessage()); throw new Exception(ErrorMsg.ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED.getMsg()); @@ -645,4 +725,5 @@ throw new LockException(e); } } + }