Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1171255)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -416,6 +416,7 @@
HIVE_SUPPORT_CONCURRENCY("hive.support.concurrency", false),
HIVE_LOCK_MANAGER("hive.lock.manager", "org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager"),
HIVE_LOCK_NUMRETRIES("hive.lock.numretries", 100),
+ HIVE_UNLOCK_NUMRETRIES("hive.unlock.numretries", 10),
HIVE_LOCK_SLEEP_BETWEEN_RETRIES("hive.lock.sleep.between.retries", 60),
HIVE_LOCK_MAPRED_ONLY("hive.lock.mapred.only.operation", false),
Index: conf/hive-default.xml
===================================================================
--- conf/hive-default.xml (revision 1171255)
+++ conf/hive-default.xml (working copy)
@@ -871,6 +871,12 @@
+ hive.unlock.numretries
+ 10
+ The number of times you want to retry to do one unlock
+
+
+
hive.lock.sleep.between.retries
60
The sleep time (in seconds) between various retries
Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 1171255)
+++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy)
@@ -767,9 +767,6 @@
perfLogger.PerfLogBegin(LOG, PerfLogger.ACQUIRE_READ_WRITE_LOCKS);
try {
- int sleepTime = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES) * 1000;
- int numRetries = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES);
-
boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
if (!supportConcurrency) {
return 0;
@@ -821,24 +818,7 @@
throw new SemanticException(e.getMessage());
}
- List hiveLocks = null;
-
- int tryNum = 1;
- do {
-
- ctx.getHiveLockMgr().prepareRetry();
- hiveLocks = ctx.getHiveLockMgr().lock(lockObjects, false);
-
- if (hiveLocks != null) {
- break;
- }
-
- tryNum++;
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- }
- } while (tryNum < numRetries);
+ List hiveLocks = ctx.getHiveLockMgr().lock(lockObjects, false);
if (hiveLocks == null) {
throw new SemanticException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg());
Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java (revision 1171255)
+++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java (working copy)
@@ -32,6 +32,7 @@
// mode of the lock: EXPLICIT(lock command)/IMPLICIT(query)
private String lockMode;
private String queryStr;
+ private String clientIp;
public HiveLockObjectData(String queryId,
String lockTime,
@@ -73,7 +74,16 @@
}
public String toString() {
- return queryId + ":" + lockTime + ":" + lockMode + ":" + queryStr;
+ return queryId + ":" + lockTime + ":" + lockMode + ":" + queryStr + ":"
+ + clientIp;
+ }
+
+ public String getClientIp() {
+ return this.clientIp;
+ }
+
+ public void setClientIp(String clientIp) {
+ this.clientIp = clientIp;
}
}
Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (revision 1171255)
+++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (working copy)
@@ -23,6 +23,8 @@
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import java.io.IOException;
+import java.net.InetAddress;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.List;
@@ -66,6 +68,12 @@
private int sessionTimeout;
private String quorumServers;
+
+ private int sleepTime;
+ private int numRetriesForLock;
+ private int numRetriesForUnLock;
+
+ private String clientIp;
public ZooKeeperHiveLockManager() {
}
@@ -91,6 +99,17 @@
sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
quorumServers = ZooKeeperHiveLockManager.getQuorumServers(conf);
+ sleepTime = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES) * 1000;
+ numRetriesForLock = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES);
+ numRetriesForUnLock = conf.getIntVar(HiveConf.ConfVars.HIVE_UNLOCK_NUMRETRIES);
+
+ clientIp = "UNKNOWN";
+ try {
+ InetAddress clientAddr = InetAddress.getLocalHost();
+ clientIp = clientAddr.getHostAddress();
+ } catch (Exception e1) {
+ }
+
try {
renewZookeeperInstance(sessionTimeout, quorumServers);
parent = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE);
@@ -101,9 +120,8 @@
// ignore if the parent already exists
}
-
} catch (Exception e) {
- LOG.error("Failed to create ZooKeeper object: " + e);
+ LOG.error("Failed to create ZooKeeper object: ", e);
throw new LockException(ErrorMsg.ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED.getMsg());
}
}
@@ -275,8 +293,34 @@
private String getLockName(String parent, HiveLockMode mode) {
return parent + "/" + "LOCK-" + mode + "-";
}
+
+ private ZooKeeperHiveLock lock (HiveLockObject key, HiveLockMode mode,
+ boolean keepAlive, boolean parentCreated) throws LockException {
+ int tryNum = 1;
+ ZooKeeperHiveLock ret = null;
+ do {
+ try {
+ if (tryNum > 1) {
+ prepareRetry();
+ }
+ ret = lockPrimitive(key, mode, keepAlive, parentCreated);
+ if (ret != null) {
+ break;
+ }
- private ZooKeeperHiveLock lock(HiveLockObject key, HiveLockMode mode,
+ tryNum++;
+ Thread.sleep(sleepTime);
+ } catch (Exception e) {
+ if(tryNum >= numRetriesForLock) {
+ throw new LockException(e);
+ }
+ }
+ } while (tryNum < numRetriesForLock);
+
+ return ret;
+ }
+
+ private ZooKeeperHiveLock lockPrimitive(HiveLockObject key, HiveLockMode mode,
boolean keepAlive, boolean parentCreated)
throws LockException {
String res;
@@ -285,6 +329,9 @@
// If the parents have already been created, create the last child only
List names = new ArrayList();
String lastName;
+
+ HiveLockObjectData lockData = key.getData();
+ lockData.setClientIp(clientIp);
if (parentCreated) {
lastName = getLastObjectName(parent, key);
@@ -334,7 +381,7 @@
}
}
} catch (Exception e) {
- LOG.error("Failed to get ZooKeeper lock: " + e);
+ LOG.error("Failed to get ZooKeeper lock: ", e);
throw new LockException(e);
}
@@ -343,11 +390,34 @@
/* Remove the lock specified */
public void unlock(HiveLock hiveLock) throws LockException {
- unlock(ctx.getConf(), zooKeeper, hiveLock, parent);
+ unlockWithRetry(ctx.getConf(), zooKeeper, hiveLock, parent);
+ }
+
+ private void unlockWithRetry(HiveConf conf, ZooKeeper zkpClient,
+ HiveLock hiveLock, String parent) throws LockException {
+
+ int tryNum = 0;
+ do {
+ try {
+ tryNum++;
+ if (tryNum > 1) {
+ Thread.sleep(sleepTime);
+ prepareRetry();
+ }
+ unlockPrimitive(conf, zkpClient, hiveLock, parent);
+ break;
+ } catch (Exception e) {
+ if (tryNum >= numRetriesForUnLock) {
+ throw new LockException(e);
+ }
+ }
+ } while (tryNum < numRetriesForUnLock);
+
+ return;
}
/* Remove the lock specified */
- private static void unlock(HiveConf conf, ZooKeeper zkpClient,
+ private static void unlockPrimitive(HiveConf conf, ZooKeeper zkpClient,
HiveLock hiveLock, String parent) throws LockException {
ZooKeeperHiveLock zLock = (ZooKeeperHiveLock)hiveLock;
try {
@@ -363,7 +433,7 @@
zkpClient.delete(name, -1);
}
} catch (Exception e) {
- LOG.error("Failed to release ZooKeeper lock: " + e);
+ LOG.error("Failed to release ZooKeeper lock: ", e);
throw new LockException(e);
}
}
@@ -374,15 +444,25 @@
try {
int sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
String quorumServers = getQuorumServers(conf);
- zkpClient = new ZooKeeper(quorumServers, sessionTimeout, new DummyWatcher());
+ Watcher dummWatcher = new DummyWatcher();
+ zkpClient = new ZooKeeper(quorumServers, sessionTimeout, dummWatcher);
String parent = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE);
List locks = getLocks(conf, zkpClient, null, parent, false, false);
-
+ Exception lastExceptionGot = null;
if (locks != null) {
for (HiveLock lock : locks) {
- unlock(conf, zkpClient, lock, parent);
+ try {
+ unlockPrimitive(conf, zkpClient, lock, parent);
+ } catch (Exception e) {
+ lastExceptionGot = e;
+ }
}
}
+
+ // if we got exception during doing the unlock, rethrow it here
+ if(lastExceptionGot != null) {
+ throw lastExceptionGot;
+ }
} catch (Exception e) {
LOG.error("Failed to release all locks: " + e.getMessage());
throw new Exception(ErrorMsg.ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED.getMsg());
@@ -645,4 +725,5 @@
throw new LockException(e);
}
}
+
}