Index: conf/hive-default.xml
===================================================================
--- conf/hive-default.xml (revision 1056098)
+++ conf/hive-default.xml (working copy)
@@ -764,6 +764,12 @@
+ hive.zookeeper.clean.extra.nodes
+ false
+ Clean extra nodes at the end of the session.
+
+
+
fs.har.impl
org.apache.hadoop.hive.shims.HiveHarFileSystem
The implementation for accessing Hadoop Archives. Note that this won't be applicable to Hadoop vers less than 0.20
@@ -815,8 +821,8 @@
hive.auto.progress.timeout
0
-
- How long to run autoprogressor for the script/UDTF operators (in seconds).
+
+ How long to run autoprogressor for the script/UDTF operators (in seconds).
Set to 0 for forever.
Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1056098)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -313,6 +313,7 @@
HIVE_ZOOKEEPER_CLIENT_PORT("hive.zookeeper.client.port", ""),
HIVE_ZOOKEEPER_SESSION_TIMEOUT("hive.zookeeper.session.timeout", 600*1000),
HIVE_ZOOKEEPER_NAMESPACE("hive.zookeeper.namespace", "hive_zookeeper_namespace"),
+ HIVE_ZOOKEEPER_CLEAN_EXTRA_NODES("hive.zookeeper.clean.extra.nodes", false),
// For HBase storage handler
HIVE_HBASE_WAL_ENABLED("hive.hbase.wal.enabled", true),
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (revision 1056098)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (working copy)
@@ -1411,7 +1411,7 @@
while (locksIter.hasNext()) {
HiveLock lock = locksIter.next();
- outStream.writeBytes(lock.getHiveLockObject().getName());
+ outStream.writeBytes(lock.getHiveLockObject().getDisplayName());
outStream.write(separator);
outStream.writeBytes(lock.getHiveLockMode().toString());
if (isExt) {
Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java (revision 1056098)
+++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java (working copy)
@@ -33,7 +33,10 @@
*/
public HiveLock lock(HiveLockObject key, HiveLockMode mode,
boolean keepAlive, int numRetries, int sleepTime) throws LockException;
+ public List lock(List objs,
+ boolean keepAlive, int numRetries, int sleepTime) throws LockException;
public void unlock(HiveLock hiveLock) throws LockException;
+ public void releaseLocks(List hiveLocks);
public List getLocks(boolean verifyTablePartitions, boolean fetchData) throws LockException;
public List getLocks(HiveLockObject key, boolean verifyTablePartitions, boolean fetchData) throws LockException;
Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObj.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObj.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObj.java (revision 0)
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.lockmgr;
+
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+
+public class HiveLockObj {
+ HiveLockObject obj;
+ HiveLockMode mode;
+
+ public HiveLockObj(HiveLockObject obj, HiveLockMode mode) {
+ this.obj = obj;
+ this.mode = mode;
+ }
+
+ public HiveLockObject getObj() {
+ return obj;
+ }
+
+ public void setObj(HiveLockObject obj) {
+ this.obj = obj;
+ }
+
+ public HiveLockMode getMode() {
+ return mode;
+ }
+
+ public void setMode(HiveLockMode mode) {
+ this.mode = mode;
+ }
+
+ public String getName() {
+ return obj.getName();
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (revision 1056098)
+++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (working copy)
@@ -28,8 +28,12 @@
import java.util.List;
import java.util.ArrayList;
import java.util.Set;
+import java.util.Queue;
+import java.util.LinkedList;
import java.util.Map;
import java.util.HashMap;
+import java.util.Comparator;
+import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.regex.Pattern;
import java.util.regex.Matcher;
@@ -41,6 +45,7 @@
import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockManagerCtx;
import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
@@ -52,7 +57,6 @@
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
public class ZooKeeperHiveLockManager implements HiveLockManager {
HiveLockManagerCtx ctx;
@@ -118,16 +122,122 @@
}
/**
- * Since partition names can contain "/", which need all the parent directories to be created by ZooKeeper,
- * replace "/" by a dummy name to ensure a single hierarchy.
+ * @param key object to be locked
+ * Get the name of the last string. For eg. if you need to lock db/T/ds=1=/hr=1,
+ * the last name would be db/T/ds=1/hr=1
**/
- private String getObjectName(HiveLockObject key, HiveLockMode mode) {
- return "/" + parent + "/" +
- key.getName().replaceAll("/", ctx.getConf().getVar(HiveConf.ConfVars.DEFAULT_ZOOKEEPER_PARTITION_NAME)) +
- "-" + mode + "-";
+ private static String getLastObjectName(String parent, HiveLockObject key) {
+ return "/" + parent + "/" + key.getName();
}
/**
+ * @param key object to be locked
+ * Get the list of names for all the parents.
+ * For eg: if you need to lock db/T/ds=1/hr=1, the following list will be returned:
+ * {db, db/T, db/T/ds=1, db/T/ds=1/hr=1}
+ **/
+ private List getObjectNames(HiveLockObject key) {
+ List parents = new ArrayList();
+ String curParent = "/" + parent + "/";
+ String[] names = key.getName().split("/");
+
+ for (String name : names) {
+ curParent = curParent + name;
+ parents.add(curParent);
+ curParent = curParent + "/";
+ }
+ return parents;
+ }
+
+ /**
+ * @param lockObjects List of objects and the modes of the locks requested
+ * @param keepAlive Whether the lock is to be persisted after the statement
+ * @param numRetries number of retries when the lock can not be acquired
+ * @param sleepTime sleep time between retries
+ *
+ * Acuire all the locks. Release all the locks and return null if any lock
+ * could not be acquired.
+ **/
+ public List lock(List lockObjects,
+ boolean keepAlive, int numRetries, int sleepTime) throws LockException
+ {
+ // Sort the objects first. You are guaranteed that if a partition is being locked,
+ // the table has already been locked
+
+ Collections.sort(lockObjects, new Comparator() {
+
+ @Override
+ public int compare(HiveLockObj o1, HiveLockObj o2) {
+ int cmp = o1.getName().compareTo(o2.getName());
+ if (cmp == 0) {
+ if (o1.getMode() == o2.getMode()) {
+ return cmp;
+ }
+ // EXCLUSIVE locks occur before SHARED locks
+ if (o1.getMode() == HiveLockMode.EXCLUSIVE) {
+ return -1;
+ }
+ return +1;
+ }
+ return cmp;
+ }
+ });
+
+ // walk the list and acquire the locks - if any lock cant be acquired, release all locks, sleep
+ // and retry
+ HiveLockObj prevLockObj = null;
+ List hiveLocks = new ArrayList();
+
+ for (HiveLockObj lockObject : lockObjects) {
+ // No need to acquire a lock twice on the same object
+ // It is ensured that EXCLUSIVE locks occur before SHARED locks on the same object
+ if ((prevLockObj != null) && (prevLockObj.getName().equals(lockObject.getName()))) {
+ prevLockObj = lockObject;
+ continue;
+ }
+
+ HiveLock lock = null;
+ try {
+ lock = lock(lockObject.getObj(), lockObject.getMode(), false,
+ numRetries, sleepTime, true);
+ } catch (LockException e) {
+ console.printError("Error in acquireLocks: "+ e.getLocalizedMessage());
+ lock = null;
+ }
+
+ if (lock == null) {
+ releaseLocks(hiveLocks);
+ return null;
+ }
+
+ hiveLocks.add(lock);
+ prevLockObj = lockObject;
+ }
+
+ return hiveLocks;
+
+ }
+
+ /**
+ * @param hiveLocks
+ * list of hive locks to be released Release all the locks specified. If some of the
+ * locks have already been released, ignore them
+ **/
+ public void releaseLocks(List hiveLocks) {
+ if (hiveLocks != null) {
+ int len = hiveLocks.size();
+ for (int pos = len-1; pos >= 0; pos--) {
+ HiveLock hiveLock = hiveLocks.get(pos);
+ try {
+ unlock(hiveLock);
+ } catch (LockException e) {
+ // The lock may have been released. Ignore and continue
+ }
+ }
+ }
+ }
+
+ /**
* @param key The object to be locked
* @param mode The mode of the lock
* @param keepAlive Whether the lock is to be persisted after the statement
@@ -139,61 +249,108 @@
public ZooKeeperHiveLock lock(HiveLockObject key, HiveLockMode mode,
boolean keepAlive, int numRetries, int sleepTime)
throws LockException {
- String name = getObjectName(key, mode);
+ return lock(key, mode, keepAlive, numRetries, sleepTime, false);
+ }
+
+ /**
+ * @param name The name of the zookeeper child
+ * @param data The data for the zookeeper child
+ * @param mode The mode in which the child needs to be created
+ * @param numRetries number of retries if the child cannot be created
+ * @param sleepTime sleep time between retries
+ **/
+ private String createChild(String name, byte[] data, CreateMode mode,
+ int numRetries, int sleepTime) throws LockException {
String res = null;
+ int tryNum = 0;
+ while (true) {
+ String msg = null;
+ try {
+ res = zooKeeper.create(name, data, Ids.OPEN_ACL_UNSAFE, mode);
+ } catch (KeeperException e) {
+ return null;
+ // nothing to do if the node already exists
+ } catch (Exception e) {
+ msg = e.getLocalizedMessage();
+ }
- try {
- int tryNum = 0;
- while (true) {
- String msg = null;
- try {
- if (keepAlive) {
- res = zooKeeper.create(name, key.getData().toString().getBytes(),
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
- } else {
- res = zooKeeper.create(name, key.getData().toString().getBytes(),
- Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
- }
- } catch (Exception e) {
- msg = e.getLocalizedMessage();
- }
+ if (res != null) {
+ return res;
+ }
- if (res != null) {
- break;
- }
-
+ try {
renewZookeeperInstance(sessionTimeout, quorumServers);
+ } catch (Exception e) {
+ console.printError("Lock for " + name
+ + " cannot be acquired in " + mode);
+ throw new LockException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg());
+ }
- if (tryNum == numRetries) {
- console.printError("Lock for " + key.getName()
- + " cannot be acquired in " + mode);
- throw new SemanticException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg());
- }
+ if (tryNum == numRetries) {
+ console.printError("Lock for " + name
+ + " cannot be acquired in " + mode);
+ throw new LockException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg());
+ }
- tryNum++;
+ tryNum++;
- console.printInfo("Lock for " + key.getName()
- + " cannot be acquired in " + mode +", will retry again later..., more info: " + msg);
+ console.printInfo("Lock for " + name
+ + " cannot be acquired in " + mode +
+ ", will retry again later..., more info: " + msg);
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- }
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
}
+ }
+ }
- int seqNo = getSequenceNumber(res, name);
+ private String getLockName(String parent, HiveLockMode mode) {
+ return parent + "/" + "LOCK-" + mode + "-";
+ }
+
+ private ZooKeeperHiveLock lock(HiveLockObject key, HiveLockMode mode,
+ boolean keepAlive, int numRetries, int sleepTime,
+ boolean parentCreated)
+ throws LockException {
+ String res;
+
+ try {
+ // If the parents have already been created, create the last child only
+ List names = new ArrayList();
+ String lastName;
+
+ if (parentCreated) {
+ lastName = getLastObjectName(parent, key);
+ names.add(lastName);
+ }
+ else {
+ names = getObjectNames(key);
+ lastName = names.get(names.size()-1);
+ }
+
+ // Create the parents first
+ for (String name : names) {
+ res = createChild(name, new byte[0], CreateMode.PERSISTENT, numRetries, sleepTime);
+ }
+
+ res = createChild(getLockName(lastName, mode), key.getData().toString().getBytes(),
+ keepAlive ? CreateMode.PERSISTENT_SEQUENTIAL : CreateMode.EPHEMERAL_SEQUENTIAL,
+ numRetries, sleepTime);
+
+ int seqNo = getSequenceNumber(res, getLockName(lastName, mode));
if (seqNo == -1) {
zooKeeper.delete(res, -1);
return null;
}
- List children = zooKeeper.getChildren("/" + parent, false);
+ List children = zooKeeper.getChildren(lastName, false);
- String exLock = getObjectName(key, HiveLockMode.EXCLUSIVE);
- String shLock = getObjectName(key, HiveLockMode.SHARED);
+ String exLock = getLockName(lastName, HiveLockMode.EXCLUSIVE);
+ String shLock = getLockName(lastName, HiveLockMode.SHARED);
for (String child : children) {
- child = "/" + parent + "/" + child;
+ child = lastName + "/" + child;
// Is there a conflicting lock on the same object with a lower sequence number
int childSeq = seqNo;
@@ -206,12 +363,11 @@
if ((childSeq >= 0) && (childSeq < seqNo)) {
zooKeeper.delete(res, -1);
- console.printError("conflicting lock present for " + key.getName() +
+ console.printError("conflicting lock present for " + key.getDisplayName() +
" mode " + mode);
return null;
}
}
-
} catch (Exception e) {
LOG.error("Failed to get ZooKeeper lock: " + e);
throw new LockException(e);
@@ -222,14 +378,25 @@
/* Remove the lock specified */
public void unlock(HiveLock hiveLock) throws LockException {
- unlock(ctx.getConf(), zooKeeper, hiveLock);
+ unlock(ctx.getConf(), zooKeeper, hiveLock, parent);
}
/* Remove the lock specified */
- private static void unlock(HiveConf conf, ZooKeeper zkpClient, HiveLock hiveLock) throws LockException {
+ private static void unlock(HiveConf conf, ZooKeeper zkpClient,
+ HiveLock hiveLock, String parent) throws LockException {
ZooKeeperHiveLock zLock = (ZooKeeperHiveLock)hiveLock;
try {
zkpClient.delete(zLock.getPath(), -1);
+
+ // Delete the parent node if all the children have been deleted
+ HiveLockObject obj = zLock.getHiveLockObject();
+ String name = getLastObjectName(parent, obj);
+
+ List children = zkpClient.getChildren(name, false);
+ if ((children == null) || (children.isEmpty()))
+ {
+ zkpClient.delete(name, -1);
+ }
} catch (Exception e) {
LOG.error("Failed to release ZooKeeper lock: " + e);
throw new LockException(e);
@@ -247,7 +414,7 @@
if (locks != null) {
for (HiveLock lock : locks) {
- unlock(conf, zkpClient, lock);
+ unlock(conf, zkpClient, lock, parent);
}
}
@@ -281,24 +448,56 @@
throws LockException {
List locks = new ArrayList();
List children;
+ boolean recurse = true;
+ String commonParent;
try {
- children = zkpClient.getChildren("/" + parent, false);
+ if (key != null) {
+ commonParent = "/" + parent + "/" + key.getName();
+ children = zkpClient.getChildren(commonParent, false);
+ recurse = false;
+ }
+ else {
+ commonParent = "/" + parent;
+ children = zkpClient.getChildren(commonParent, false);
+ }
} catch (Exception e) {
// no locks present
return locks;
}
- for (String child : children) {
- child = "/" + parent + "/" + child;
- HiveLockMode mode = getLockMode(conf, child);
+ Queue childn = new LinkedList();
+ if (children != null && !children.isEmpty()) {
+ for (String child : children) {
+ childn.add(commonParent + "/" + child);
+ }
+ }
+
+ while (true) {
+ String curChild = childn.poll();
+ if (curChild == null) {
+ return locks;
+ }
+
+ if (recurse) {
+ try {
+ children = zkpClient.getChildren(curChild, false);
+ for (String child : children) {
+ childn.add(curChild + "/" + child);
+ }
+ } catch (Exception e) {
+ // nothing to do
+ }
+ }
+
+ HiveLockMode mode = getLockMode(conf, curChild);
if (mode == null) {
continue;
}
HiveLockObjectData data = null;
- //set the lock object with a dummy data, and then do a set if needed.
- HiveLockObject obj = getLockObject(conf, child, mode, data, verifyTablePartition);
+ // set the lock object with a dummy data, and then do a set if needed.
+ HiveLockObject obj = getLockObject(conf, curChild, mode, data, parent, verifyTablePartition);
if (obj == null) {
continue;
}
@@ -308,21 +507,50 @@
if (fetchData) {
try {
- data = new HiveLockObjectData(new String(zkpClient.getData(child, new DummyWatcher(), null)));
+ data = new HiveLockObjectData(new String(zkpClient.getData(curChild, new DummyWatcher(), null)));
} catch (Exception e) {
- LOG.error("Error in getting data for " + child + " " + e);
+ LOG.error("Error in getting data for " + curChild + " " + e);
// ignore error
}
}
obj.setData(data);
- HiveLock lck = (HiveLock)(new ZooKeeperHiveLock(child, obj, mode));
+ HiveLock lck = (HiveLock)(new ZooKeeperHiveLock(curChild, obj, mode));
locks.add(lck);
}
}
+ }
- return locks;
+ /** Remove all redundant nodes **/
+ private void removeAllRedundantNodes() {
+ try {
+ renewZookeeperInstance(sessionTimeout, quorumServers);
+ checkRedundantNode("/" + parent);
+ } catch (Exception e) {
+ // ignore all errors
+ }
}
+ private void checkRedundantNode(String node) {
+ try {
+ // Nothing to do if it is a lock mode
+ if (getLockMode(ctx.getConf(), node) != null)
+ return;
+
+ List children = zooKeeper.getChildren(node, false);
+ for (String child : children) {
+ checkRedundantNode(node + "/" + child);
+ }
+
+ children = zooKeeper.getChildren(node, false);
+ if ((children == null) || (children.isEmpty()))
+ {
+ zooKeeper.delete(node, -1);
+ }
+ } catch (Exception e) {
+ // ignore all errors
+ }
+ }
+
/* Release all transient locks, by simply closing the client */
public void close() throws LockException {
try {
@@ -330,6 +558,10 @@
zooKeeper.close();
zooKeeper = null;
}
+
+ if (HiveConf.getBoolVar(ctx.getConf(), HiveConf.ConfVars.HIVE_ZOOKEEPER_CLEAN_EXTRA_NODES)) {
+ removeAllRedundantNodes();
+ }
} catch (Exception e) {
LOG.error("Failed to close zooKeeper client: " + e);
throw new LockException(e);
@@ -356,30 +588,25 @@
* @param verifyTablePartition
**/
private static HiveLockObject getLockObject(HiveConf conf, String path,
- HiveLockMode mode, HiveLockObjectData data, boolean verifyTablePartition)
+ HiveLockMode mode, HiveLockObjectData data,
+ String parent, boolean verifyTablePartition)
throws LockException {
try {
Hive db = Hive.get(conf);
- int indx = path.lastIndexOf(mode.toString());
- String objName = path.substring(1, indx-1);
- String[] names = objName.split("/")[1].split("@");
+ int indx = path.lastIndexOf("LOCK-" + mode.toString());
+ String objName = path.substring(("/" + parent + "/").length(), indx-1);
+ String[] names = objName.split("/");
if (names.length < 2) {
return null;
}
if (!verifyTablePartition) {
- if (names.length == 2) {
- return new HiveLockObject(names, data);
- } else {
- return new HiveLockObject(objName.split("/")[1].replaceAll(
- conf.getVar(HiveConf.ConfVars.DEFAULT_ZOOKEEPER_PARTITION_NAME),
- "/").split("@"), data);
- }
+ return new HiveLockObject(names, data);
}
- Table tab = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, //need to change to names[0]
- names[1], false); // do not throw exception if table does not exist
+ // do not throw exception if table does not exist
+ Table tab = db.getTable(names[0], names[1], false);
if (tab == null) {
return null;
}
@@ -388,11 +615,9 @@
return new HiveLockObject(tab, data);
}
- String[] parts = names[2].split(conf.getVar(HiveConf.ConfVars.DEFAULT_ZOOKEEPER_PARTITION_NAME));
-
Map partSpec = new HashMap();
- for (indx = 0; indx < parts.length; indx++) {
- String[] partVals = parts[indx].split("=");
+ for (indx = 2; indx < names.length; indx++) {
+ String[] partVals = names[indx].split("=");
partSpec.put(partVals[0], partVals[1]);
}
@@ -400,13 +625,11 @@
try {
partn = db.getPartition(tab, partSpec, false);
} catch (HiveException e) {
- partn =null;
+ partn = null;
}
if (partn == null) {
- return new HiveLockObject(new DummyPartition(tab,
- objName.split("/")[1].replaceAll(conf.getVar(HiveConf.ConfVars.DEFAULT_ZOOKEEPER_PARTITION_NAME), "/")),
- data);
+ return new HiveLockObject(new DummyPartition(tab, path), data);
}
return new HiveLockObject(partn, data);
Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java (revision 1056098)
+++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java (working copy)
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.lockmgr;
import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.DummyPartition;
import org.apache.hadoop.hive.ql.metadata.Table;
public class HiveLockObject {
@@ -75,6 +76,12 @@
this.data = null;
}
+ public HiveLockObject(String path, HiveLockObjectData lockData) {
+ this.pathNames = new String[1];
+ this.pathNames[0] = path;
+ this.data = lockData;
+ }
+
public HiveLockObject(String[] paths, HiveLockObjectData lockData) {
this.pathNames = paths;
this.data = lockData;
@@ -89,6 +96,10 @@
par.getTable().getTableName(), par.getName() }, lockData);
}
+ public HiveLockObject(DummyPartition par, HiveLockObjectData lockData) {
+ this(new String[] { par.getName() }, lockData);
+ }
+
public String getName() {
if (this.pathNames == null) {
return null;
@@ -97,7 +108,7 @@
boolean first = true;
for (int i = 0; i < pathNames.length; i++) {
if (!first) {
- ret = ret + "@";
+ ret = ret + "/";
} else {
first = false;
}
@@ -106,6 +117,30 @@
return ret;
}
+ public String getDisplayName() {
+ if (this.pathNames == null) {
+ return null;
+ }
+ if (pathNames.length == 1) {
+ return pathNames[0];
+ }
+ else if (pathNames.length == 2) {
+ return pathNames[0] + "@" + pathNames[1];
+ }
+
+ String ret = pathNames[0] + "@" + pathNames[1] + "@";
+ boolean first = true;
+ for (int i = 2; i < pathNames.length; i++) {
+ if (!first) {
+ ret = ret + "/";
+ } else {
+ first = false;
+ }
+ ret = ret + pathNames[i];
+ }
+ return ret;
+ }
+
public HiveLockObjectData getData() {
return data;
}
Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 1056098)
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy)
@@ -3575,9 +3575,9 @@
try {
String ppath = dpCtx.getSPPath();
ppath = ppath.substring(0, ppath.length()-1);
- DummyPartition p = new DummyPartition(dest_tab,
- dest_tab.getDbName() + "@" + dest_tab.getTableName() + "@" + ppath);
-
+ DummyPartition p =
+ new DummyPartition(dest_tab,
+ dest_tab.getDbName() + "@" + dest_tab.getTableName() + "@" + ppath);
outputs.add(new WriteEntity(p, false));
} catch (HiveException e) {
throw new SemanticException(e.getMessage());
Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 1056098)
+++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy)
@@ -24,8 +24,6 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -66,6 +64,7 @@
import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockManagerCtx;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
@@ -97,6 +96,7 @@
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
public class Driver implements CommandProcessor {
@@ -405,49 +405,6 @@
return plan;
}
- public static class LockObject {
- HiveLockObject obj;
- HiveLockMode mode;
-
- public LockObject(HiveLockObject obj, HiveLockMode mode) {
- this.obj = obj;
- this.mode = mode;
- }
-
- public HiveLockObject getObj() {
- return obj;
- }
-
- public HiveLockMode getMode() {
- return mode;
- }
-
- public String getName() {
- return obj.getName();
- }
- }
-
- public static class LockObjectContainer {
- LockObject lck;
-
- public LockObjectContainer() {
- this.lck = null;
- }
-
- public LockObjectContainer(LockObject lck) {
- this.lck = lck;
- }
-
- public LockObject getLck() {
- return lck;
- }
-
- public void setLck(LockObject lck) {
- this.lck = lck;
- }
-
- }
-
/**
* @param t
* The table to be locked
@@ -458,9 +415,9 @@
* partition needs to be locked (in any mode), all its parents should also be locked in
* SHARED mode.
**/
- private List getLockObjects(Table t, Partition p, HiveLockMode mode)
+ private List getLockObjects(Table t, Partition p, HiveLockMode mode)
throws SemanticException {
- List locks = new LinkedList();
+ List locks = new LinkedList();
HiveLockObjectData lockData =
new HiveLockObjectData(plan.getQueryId(),
@@ -468,18 +425,21 @@
"IMPLICIT");
if (t != null) {
- locks.add(new LockObject(new HiveLockObject(t, lockData), mode));
+ locks.add(new HiveLockObj(new HiveLockObject(t, lockData), mode));
return locks;
}
if (p != null) {
- locks.add(new LockObject(new HiveLockObject(p, lockData), mode));
+ if (!(p instanceof DummyPartition)) {
+ locks.add(new HiveLockObj(new HiveLockObject(p, lockData), mode));
+ }
// All the parents are locked in shared mode
mode = HiveLockMode.SHARED;
- // For summy partitions, only partition name is needed
+ // For dummy partitions, only partition name is needed
String name = p.getName();
+
if (p instanceof DummyPartition) {
name = p.getName().split("@")[2];
}
@@ -487,20 +447,22 @@
String partName = name;
String partialName = "";
String[] partns = name.split("/");
- for (int idx = 0; idx < partns.length - 1; idx++) {
+ int len = p instanceof DummyPartition ? partns.length : partns.length - 1;
+ for (int idx = 0; idx < len; idx++) {
String partn = partns[idx];
- partialName += partialName + partn;
+ partialName += partn;
try {
- locks.add(new LockObject(new HiveLockObject(new DummyPartition(p.getTable(), p.getTable()
- .getDbName()
- + "@" + p.getTable().getTableName() + "@" + partialName), lockData), mode));
+ locks.add(new HiveLockObj(
+ new HiveLockObject(new DummyPartition(p.getTable(), p.getTable().getDbName()
+ + "/" + p.getTable().getTableName()
+ + "/" + partialName), lockData), mode));
partialName += "/";
} catch (HiveException e) {
throw new SemanticException(e.getMessage());
}
}
- locks.add(new LockObject(new HiveLockObject(p.getTable(), lockData), mode));
+ locks.add(new HiveLockObj(new HiveLockObject(p.getTable(), lockData), mode));
}
return locks;
}
@@ -522,7 +484,7 @@
return 0;
}
- List lockObjects = new ArrayList();
+ List lockObjects = new ArrayList();
// Sort all the inputs, outputs.
// If a lock needs to be acquired on any partition, a read lock needs to be acquired on all
@@ -557,36 +519,24 @@
return ret;
}
+ HiveLockObjectData lockData =
+ new HiveLockObjectData(plan.getQueryId(),
+ String.valueOf(System.currentTimeMillis()),
+ "IMPLICIT");
+ // Lock the database also
+ try {
+ Hive db = Hive.get(conf);
+ lockObjects.add(new HiveLockObj(
+ new HiveLockObject(db.getCurrentDatabase(), lockData),
+ HiveLockMode.SHARED));
+ } catch (HiveException e) {
+ throw new SemanticException(e.getMessage());
+ }
+
ctx.setHiveLockMgr(hiveLockMgr);
+ List hiveLocks = ctx.getHiveLockMgr().lock(lockObjects, false, numRetries, sleepTime);
- Collections.sort(lockObjects, new Comparator() {
-
- @Override
- public int compare(LockObject o1, LockObject o2) {
- int cmp = o1.getName().compareTo(o2.getName());
- if (cmp == 0) {
- if (o1.getMode() == o2.getMode()) {
- return cmp;
- }
- // EXCLUSIVE locks occur before SHARED locks
- if (o1.getMode() == HiveLockMode.EXCLUSIVE) {
- return -1;
- }
- return +1;
- }
- return cmp;
- }
-
- });
-
- // walk the list and acquire the locks - if any lock cant be acquired, release all locks,
- // sleep and retry
- LockObjectContainer notFound = new LockObjectContainer();
- notFound.setLck(null);
- List hiveLocks = acquireLocks(lockObjects, notFound,
- numRetries, sleepTime);
-
if (hiveLocks == null) {
throw new SemanticException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg());
} else {
@@ -599,55 +549,16 @@
console.printError(errorMessage, "\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
return (10);
+ } catch (LockException e) {
+ errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage();
+ SQLState = ErrorMsg.findSQLState(e.getMessage());
+ console.printError(errorMessage, "\n"
+ + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ return (10);
}
}
/**
- * @param lockObjects
- * The list of objects to be locked Lock the objects specified in the list. The same
- * object is not locked twice, and the list passed is sorted such that EXCLUSIVE locks
- * occur before SHARED locks.
- * @param sleepTime
- * @param numRetries
- **/
- private List acquireLocks(List lockObjects,
- LockObjectContainer notFound, int numRetries, int sleepTime)
- throws SemanticException {
- // walk the list and acquire the locks - if any lock cant be acquired, release all locks, sleep
- // and retry
- LockObject prevLockObj = null;
- List hiveLocks = new ArrayList();
-
- for (LockObject lockObject : lockObjects) {
- // No need to acquire a lock twice on the same object
- // It is ensured that EXCLUSIVE locks occur before SHARED locks on the same object
- if ((prevLockObj != null) && (prevLockObj.getName().equals(lockObject.getName()))) {
- prevLockObj = lockObject;
- continue;
- }
-
- HiveLock lock = null;
- try {
- lock = ctx.getHiveLockMgr().lock(lockObject.getObj(), lockObject.getMode(), false, numRetries, sleepTime);
- } catch (LockException e) {
- console.printError("Error in acquireLocks: "+ e.getLocalizedMessage());
- lock = null;
- }
-
- if (lock == null) {
- notFound.setLck(lockObject);
- releaseLocks(hiveLocks);
- return null;
- }
-
- hiveLocks.add(lock);
- prevLockObj = lockObject;
- }
-
- return hiveLocks;
- }
-
- /**
* Release all the locks acquired implicitly by the statement. Note that the locks acquired with
* 'keepAlive' set to True are not released.
**/
@@ -668,15 +579,9 @@
**/
private void releaseLocks(List hiveLocks) {
if (hiveLocks != null) {
- for (HiveLock hiveLock : hiveLocks) {
- try {
- ctx.getHiveLockMgr().unlock(hiveLock);
- } catch (LockException e) {
- // The lock may have been released. Ignore and continue
- }
- }
- ctx.setHiveLocks(null);
+ ctx.getHiveLockMgr().releaseLocks(hiveLocks);
}
+ ctx.setHiveLocks(null);
}
public CommandProcessorResponse run(String command) {