Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1171255) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -416,6 +416,7 @@ HIVE_SUPPORT_CONCURRENCY("hive.support.concurrency", false), HIVE_LOCK_MANAGER("hive.lock.manager", "org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager"), HIVE_LOCK_NUMRETRIES("hive.lock.numretries", 100), + HIVE_UNLOCK_NUMRETRIES("hive.unlock.numretries", 10), HIVE_LOCK_SLEEP_BETWEEN_RETRIES("hive.lock.sleep.between.retries", 60), HIVE_LOCK_MAPRED_ONLY("hive.lock.mapred.only.operation", false), Index: conf/hive-default.xml =================================================================== --- conf/hive-default.xml (revision 1171255) +++ conf/hive-default.xml (working copy) @@ -871,6 +871,12 @@ + hive.unlock.numretries + 10 + The number of times you want to retry to do one unlock + + + hive.lock.sleep.between.retries 60 The sleep time (in seconds) between various retries Index: ql/src/java/org/apache/hadoop/hive/ql/Context.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Context.java (revision 1171255) +++ ql/src/java/org/apache/hadoop/hive/ql/Context.java (working copy) @@ -498,6 +498,9 @@ } public HiveLockManager getHiveLockMgr() { + if (hiveLockMgr != null) { + hiveLockMgr.refresh(); + } return hiveLockMgr; } Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 1171255) +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy) @@ -767,9 +767,6 @@ perfLogger.PerfLogBegin(LOG, PerfLogger.ACQUIRE_READ_WRITE_LOCKS); try { - int sleepTime = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES) * 1000; - int numRetries = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES); - boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); if (!supportConcurrency) { return 0; @@ -821,24 +818,7 @@ throw new SemanticException(e.getMessage()); } - List hiveLocks = null; - - int tryNum = 1; - do { - - ctx.getHiveLockMgr().prepareRetry(); - hiveLocks = ctx.getHiveLockMgr().lock(lockObjects, false); - - if (hiveLocks != null) { - break; - } - - tryNum++; - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - } - } while (tryNum < numRetries); + List hiveLocks = ctx.getHiveLockMgr().lock(lockObjects, false); if (hiveLocks == null) { throw new SemanticException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg()); Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java (revision 1171255) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java (working copy) @@ -40,4 +40,10 @@ public List getLocks(HiveLockObject key, boolean verifyTablePartitions, boolean fetchData) throws LockException; public void close() throws LockException; public void prepareRetry() throws LockException; + + /** + * refresh to enable new configurations. + */ + public void refresh(); + } Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java (revision 1171255) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java (working copy) @@ -32,6 +32,7 @@ // mode of the lock: EXPLICIT(lock command)/IMPLICIT(query) private String lockMode; private String queryStr; + private String clientIp; public HiveLockObjectData(String queryId, String lockTime, @@ -73,7 +74,16 @@ } public String toString() { - return queryId + ":" + lockTime + ":" + lockMode + ":" + queryStr; + return queryId + ":" + lockTime + ":" + lockMode + ":" + queryStr + ":" + + clientIp; + } + + public String getClientIp() { + return this.clientIp; + } + + public void setClientIp(String clientIp) { + this.clientIp = clientIp; } } Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (revision 1171255) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (working copy) @@ -23,6 +23,8 @@ import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import java.io.IOException; +import java.net.InetAddress; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.util.List; @@ -66,6 +68,12 @@ private int sessionTimeout; private String quorumServers; + + private int sleepTime; + private int numRetriesForLock; + private int numRetriesForUnLock; + + private String clientIp; public ZooKeeperHiveLockManager() { } @@ -91,6 +99,16 @@ sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT); quorumServers = ZooKeeperHiveLockManager.getQuorumServers(conf); + sleepTime = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES) * 1000; + numRetriesForLock = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES); + numRetriesForUnLock = conf.getIntVar(HiveConf.ConfVars.HIVE_UNLOCK_NUMRETRIES); + clientIp = "UNKNOWN"; + try { + InetAddress clientAddr = InetAddress.getLocalHost(); + clientIp = clientAddr.getHostAddress(); + } catch (Exception e1) { + } + try { renewZookeeperInstance(sessionTimeout, quorumServers); parent = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE); @@ -101,17 +119,24 @@ // ignore if the parent already exists } - } catch (Exception e) { - LOG.error("Failed to create ZooKeeper object: " + e); + LOG.error("Failed to create ZooKeeper object: ", e); throw new LockException(ErrorMsg.ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED.getMsg()); } } + @Override + public void refresh() { + HiveConf conf = ctx.getConf(); + sleepTime = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES) * 1000; + numRetriesForLock = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES); + numRetriesForUnLock = conf.getIntVar(HiveConf.ConfVars.HIVE_UNLOCK_NUMRETRIES); + } + private void renewZookeeperInstance(int sessionTimeout, String quorumServers) throws InterruptedException, IOException { if (zooKeeper != null) { - zooKeeper.close(); + return; } zooKeeper = new ZooKeeper(quorumServers, sessionTimeout, new DummyWatcher()); @@ -194,7 +219,8 @@ try { lock = lock(lockObject.getObj(), lockObject.getMode(), false, true); } catch (LockException e) { - console.printError("Error in acquireLocks: "+ e.getLocalizedMessage()); + console.printError("Error in acquireLocks..." ); + LOG.error("Error in acquireLocks...", e); lock = null; } @@ -251,91 +277,122 @@ * The data for the zookeeper child * @param mode * The mode in which the child needs to be created + * @throws KeeperException + * @throws InterruptedException **/ - private String createChild(String name, byte[] data, CreateMode mode) throws LockException { - String res = null; - String msg = null; - try { - res = zooKeeper.create(name, data, Ids.OPEN_ACL_UNSAFE, mode); - } catch (KeeperException e) { - return null; - // nothing to do if the node already exists - } catch (Exception e) { - msg = e.getLocalizedMessage(); - } - - if (res == null) { - console.printInfo("Lock for " + name + " cannot be acquired in " + mode - + ", will retry again later..., more info: " + msg); - } - - return res; + private String createChild(String name, byte[] data, CreateMode mode) + throws KeeperException, InterruptedException { + return zooKeeper.create(name, data, Ids.OPEN_ACL_UNSAFE, mode); } private String getLockName(String parent, HiveLockMode mode) { return parent + "/" + "LOCK-" + mode + "-"; } + + private ZooKeeperHiveLock lock (HiveLockObject key, HiveLockMode mode, + boolean keepAlive, boolean parentCreated) throws LockException { + int tryNum = 1; + ZooKeeperHiveLock ret = null; + + do { + try { + if (tryNum > 1) { + Thread.sleep(sleepTime); + prepareRetry(); + } + ret = lockPrimitive(key, mode, keepAlive, parentCreated); + if (ret != null) { + break; + } + tryNum++; + } catch (Exception e1) { + if (e1 instanceof KeeperException) { + KeeperException e = (KeeperException) e1; + switch (e.code()) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: ", e); + break; + default: + LOG.error("Serious Zookeeper exception: ", e); + break; + } + } + if (tryNum >= numRetriesForLock) { + throw new LockException(e1); + } + } + } while (tryNum < numRetriesForLock); - private ZooKeeperHiveLock lock(HiveLockObject key, HiveLockMode mode, - boolean keepAlive, boolean parentCreated) - throws LockException { + return ret; + } + + private ZooKeeperHiveLock lockPrimitive(HiveLockObject key, + HiveLockMode mode, boolean keepAlive, boolean parentCreated) + throws KeeperException, InterruptedException { String res; - try { - // If the parents have already been created, create the last child only - List names = new ArrayList(); - String lastName; + // If the parents have already been created, create the last child only + List names = new ArrayList(); + String lastName; - if (parentCreated) { - lastName = getLastObjectName(parent, key); - names.add(lastName); - } - else { - names = getObjectNames(key); - lastName = names.get(names.size()-1); - } + HiveLockObjectData lockData = key.getData(); + lockData.setClientIp(clientIp); + + if (parentCreated) { + lastName = getLastObjectName(parent, key); + names.add(lastName); + } else { + names = getObjectNames(key); + lastName = names.get(names.size() - 1); + } - // Create the parents first - for (String name : names) { - res = createChild(name, new byte[0], CreateMode.PERSISTENT); + // Create the parents first + for (String name : names) { + try { + res = createChild(name, new byte[0], CreateMode.PERSISTENT); + } catch (KeeperException e) { + if (e.code() != KeeperException.Code.NODEEXISTS) { + //if the exception is not 'NODEEXISTS', re-throw it + throw e; + } } + } - res = createChild(getLockName(lastName, mode), key.getData().toString().getBytes(), - keepAlive ? CreateMode.PERSISTENT_SEQUENTIAL : CreateMode.EPHEMERAL_SEQUENTIAL); + res = createChild(getLockName(lastName, mode), key.getData().toString() + .getBytes(), keepAlive ? CreateMode.PERSISTENT_SEQUENTIAL + : CreateMode.EPHEMERAL_SEQUENTIAL); - int seqNo = getSequenceNumber(res, getLockName(lastName, mode)); - if (seqNo == -1) { - zooKeeper.delete(res, -1); - return null; - } + int seqNo = getSequenceNumber(res, getLockName(lastName, mode)); + if (seqNo == -1) { + zooKeeper.delete(res, -1); + return null; + } - List children = zooKeeper.getChildren(lastName, false); + List children = zooKeeper.getChildren(lastName, false); - String exLock = getLockName(lastName, HiveLockMode.EXCLUSIVE); - String shLock = getLockName(lastName, HiveLockMode.SHARED); + String exLock = getLockName(lastName, HiveLockMode.EXCLUSIVE); + String shLock = getLockName(lastName, HiveLockMode.SHARED); - for (String child : children) { - child = lastName + "/" + child; + for (String child : children) { + child = lastName + "/" + child; - // Is there a conflicting lock on the same object with a lower sequence number - int childSeq = seqNo; - if (child.startsWith(exLock)) { - childSeq = getSequenceNumber(child, exLock); - } - if ((mode == HiveLockMode.EXCLUSIVE) && child.startsWith(shLock)) { - childSeq = getSequenceNumber(child, shLock); - } + // Is there a conflicting lock on the same object with a lower sequence + // number + int childSeq = seqNo; + if (child.startsWith(exLock)) { + childSeq = getSequenceNumber(child, exLock); + } + if ((mode == HiveLockMode.EXCLUSIVE) && child.startsWith(shLock)) { + childSeq = getSequenceNumber(child, shLock); + } - if ((childSeq >= 0) && (childSeq < seqNo)) { - zooKeeper.delete(res, -1); - console.printError("conflicting lock present for " + key.getDisplayName() + - " mode " + mode); - return null; - } + if ((childSeq >= 0) && (childSeq < seqNo)) { + zooKeeper.delete(res, -1); + console.printError("conflicting lock present for " + + key.getDisplayName() + " mode " + mode); + return null; } - } catch (Exception e) { - LOG.error("Failed to get ZooKeeper lock: " + e); - throw new LockException(e); } return new ZooKeeperHiveLock(res, key, mode); @@ -343,11 +400,34 @@ /* Remove the lock specified */ public void unlock(HiveLock hiveLock) throws LockException { - unlock(ctx.getConf(), zooKeeper, hiveLock, parent); + unlockWithRetry(ctx.getConf(), zooKeeper, hiveLock, parent); + } + + private void unlockWithRetry(HiveConf conf, ZooKeeper zkpClient, + HiveLock hiveLock, String parent) throws LockException { + + int tryNum = 0; + do { + try { + tryNum++; + if (tryNum > 1) { + Thread.sleep(sleepTime); + prepareRetry(); + } + unlockPrimitive(conf, zkpClient, hiveLock, parent); + break; + } catch (Exception e) { + if (tryNum >= numRetriesForUnLock) { + throw new LockException(e); + } + } + } while (tryNum < numRetriesForUnLock); + + return; } /* Remove the lock specified */ - private static void unlock(HiveConf conf, ZooKeeper zkpClient, + private static void unlockPrimitive(HiveConf conf, ZooKeeper zkpClient, HiveLock hiveLock, String parent) throws LockException { ZooKeeperHiveLock zLock = (ZooKeeperHiveLock)hiveLock; try { @@ -363,7 +443,7 @@ zkpClient.delete(name, -1); } } catch (Exception e) { - LOG.error("Failed to release ZooKeeper lock: " + e); + LOG.error("Failed to release ZooKeeper lock: ", e); throw new LockException(e); } } @@ -374,17 +454,27 @@ try { int sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT); String quorumServers = getQuorumServers(conf); - zkpClient = new ZooKeeper(quorumServers, sessionTimeout, new DummyWatcher()); + Watcher dummWatcher = new DummyWatcher(); + zkpClient = new ZooKeeper(quorumServers, sessionTimeout, dummWatcher); String parent = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE); List locks = getLocks(conf, zkpClient, null, parent, false, false); - + Exception lastExceptionGot = null; if (locks != null) { for (HiveLock lock : locks) { - unlock(conf, zkpClient, lock, parent); + try { + unlockPrimitive(conf, zkpClient, lock, parent); + } catch (Exception e) { + lastExceptionGot = e; + } } } + + // if we got exception during doing the unlock, rethrow it here + if(lastExceptionGot != null) { + throw lastExceptionGot; + } } catch (Exception e) { - LOG.error("Failed to release all locks: " + e.getMessage()); + LOG.error("Failed to release all locks: ", e); throw new Exception(ErrorMsg.ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED.getMsg()); } finally { if (zkpClient != null) { @@ -645,4 +735,5 @@ throw new LockException(e); } } + } Index: ql/src/test/queries/clientnegative/lockneg1.q =================================================================== --- ql/src/test/queries/clientnegative/lockneg1.q (revision 1171255) +++ ql/src/test/queries/clientnegative/lockneg1.q (working copy) @@ -2,6 +2,9 @@ create table tstsrc like src; insert overwrite table tstsrc select key, value from src; +set hive.lock.numretries=0; +set hive.unlock.numretries=0; + LOCK TABLE tstsrc SHARED; LOCK TABLE tstsrc SHARED; LOCK TABLE tstsrc EXCLUSIVE; Index: ql/src/test/queries/clientnegative/lockneg2.q =================================================================== --- ql/src/test/queries/clientnegative/lockneg2.q (revision 1171255) +++ ql/src/test/queries/clientnegative/lockneg2.q (working copy) @@ -2,4 +2,5 @@ create table tstsrc like src; insert overwrite table tstsrc select key, value from src; +set hive.unlock.numretries=0; UNLOCK TABLE tstsrc; Index: ql/src/test/queries/clientnegative/lockneg3.q =================================================================== --- ql/src/test/queries/clientnegative/lockneg3.q (revision 1171255) +++ ql/src/test/queries/clientnegative/lockneg3.q (working copy) @@ -4,4 +4,6 @@ insert overwrite table tstsrcpart partition (ds='2008-04-08', hr='11') select key, value from srcpart where ds='2008-04-08' and hr='11'; +set hive.lock.numretries=0; +set hive.unlock.numretries=0; UNLOCK TABLE tstsrcpart PARTITION(ds='2008-04-08', hr='11'); Index: ql/src/test/queries/clientnegative/lockneg4.q =================================================================== --- ql/src/test/queries/clientnegative/lockneg4.q (revision 1171255) +++ ql/src/test/queries/clientnegative/lockneg4.q (working copy) @@ -4,6 +4,9 @@ insert overwrite table tstsrcpart partition (ds='2008-04-08', hr='11') select key, value from srcpart where ds='2008-04-08' and hr='11'; +set hive.lock.numretries=0; +set hive.unlock.numretries=0; + LOCK TABLE tstsrcpart PARTITION(ds='2008-04-08', hr='11') EXCLUSIVE; SHOW LOCKS tstsrcpart PARTITION(ds='2008-04-08', hr='12');