Index: conf/hive-default.xml =================================================================== --- conf/hive-default.xml (revision 1056098) +++ conf/hive-default.xml (working copy) @@ -764,6 +764,12 @@ + hive.zookeeper.clean.extra.nodes + false + Clean extra nodes at the end of the session. + + + fs.har.impl org.apache.hadoop.hive.shims.HiveHarFileSystem The implementation for accessing Hadoop Archives. Note that this won't be applicable to Hadoop vers less than 0.20 @@ -815,8 +821,8 @@ hive.auto.progress.timeout 0 - - How long to run autoprogressor for the script/UDTF operators (in seconds). + + How long to run autoprogressor for the script/UDTF operators (in seconds). Set to 0 for forever. Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1056098) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -313,6 +313,7 @@ HIVE_ZOOKEEPER_CLIENT_PORT("hive.zookeeper.client.port", ""), HIVE_ZOOKEEPER_SESSION_TIMEOUT("hive.zookeeper.session.timeout", 600*1000), HIVE_ZOOKEEPER_NAMESPACE("hive.zookeeper.namespace", "hive_zookeeper_namespace"), + HIVE_ZOOKEEPER_CLEAN_EXTRA_NODES("hive.zookeeper.clean.extra.nodes", false), // For HBase storage handler HIVE_HBASE_WAL_ENABLED("hive.hbase.wal.enabled", true), Index: ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (revision 1056098) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (working copy) @@ -1411,7 +1411,7 @@ while (locksIter.hasNext()) { HiveLock lock = locksIter.next(); - outStream.writeBytes(lock.getHiveLockObject().getName()); + outStream.writeBytes(lock.getHiveLockObject().getDisplayName()); outStream.write(separator); outStream.writeBytes(lock.getHiveLockMode().toString()); if (isExt) { Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java (revision 1056098) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java (working copy) @@ -33,7 +33,10 @@ */ public HiveLock lock(HiveLockObject key, HiveLockMode mode, boolean keepAlive, int numRetries, int sleepTime) throws LockException; + public List lock(List objs, + boolean keepAlive, int numRetries, int sleepTime) throws LockException; public void unlock(HiveLock hiveLock) throws LockException; + public void releaseLocks(List hiveLocks); public List getLocks(boolean verifyTablePartitions, boolean fetchData) throws LockException; public List getLocks(HiveLockObject key, boolean verifyTablePartitions, boolean fetchData) throws LockException; Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObj.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObj.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObj.java (revision 0) @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lockmgr; + +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; + +public class HiveLockObj { + HiveLockObject obj; + HiveLockMode mode; + + public HiveLockObj(HiveLockObject obj, HiveLockMode mode) { + this.obj = obj; + this.mode = mode; + } + + public HiveLockObject getObj() { + return obj; + } + + public void setObj(HiveLockObject obj) { + this.obj = obj; + } + + public HiveLockMode getMode() { + return mode; + } + + public void setMode(HiveLockMode mode) { + this.mode = mode; + } + + public String getName() { + return obj.getName(); + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (revision 1056098) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (working copy) @@ -28,8 +28,12 @@ import java.util.List; import java.util.ArrayList; import java.util.Set; +import java.util.Queue; +import java.util.LinkedList; import java.util.Map; import java.util.HashMap; +import java.util.Comparator; +import java.util.Collections; import java.util.LinkedHashSet; import java.util.regex.Pattern; import java.util.regex.Matcher; @@ -41,6 +45,7 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; import org.apache.hadoop.hive.ql.lockmgr.HiveLockManagerCtx; import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData; import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; @@ -52,7 +57,6 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.metastore.MetaStoreUtils; public class ZooKeeperHiveLockManager implements HiveLockManager { HiveLockManagerCtx ctx; @@ -118,16 +122,122 @@ } /** - * Since partition names can contain "/", which need all the parent directories to be created by ZooKeeper, - * replace "/" by a dummy name to ensure a single hierarchy. + * @param key object to be locked + * Get the name of the last string. For eg. if you need to lock db/T/ds=1=/hr=1, + * the last name would be db/T/ds=1/hr=1 **/ - private String getObjectName(HiveLockObject key, HiveLockMode mode) { - return "/" + parent + "/" + - key.getName().replaceAll("/", ctx.getConf().getVar(HiveConf.ConfVars.DEFAULT_ZOOKEEPER_PARTITION_NAME)) + - "-" + mode + "-"; + private static String getLastObjectName(String parent, HiveLockObject key) { + return "/" + parent + "/" + key.getName(); } /** + * @param key object to be locked + * Get the list of names for all the parents. + * For eg: if you need to lock db/T/ds=1/hr=1, the following list will be returned: + * {db, db/T, db/T/ds=1, db/T/ds=1/hr=1} + **/ + private List getObjectNames(HiveLockObject key) { + List parents = new ArrayList(); + String curParent = "/" + parent + "/"; + String[] names = key.getName().split("/"); + + for (String name : names) { + curParent = curParent + name; + parents.add(curParent); + curParent = curParent + "/"; + } + return parents; + } + + /** + * @param lockObjects List of objects and the modes of the locks requested + * @param keepAlive Whether the lock is to be persisted after the statement + * @param numRetries number of retries when the lock can not be acquired + * @param sleepTime sleep time between retries + * + * Acuire all the locks. Release all the locks and return null if any lock + * could not be acquired. + **/ + public List lock(List lockObjects, + boolean keepAlive, int numRetries, int sleepTime) throws LockException + { + // Sort the objects first. You are guaranteed that if a partition is being locked, + // the table has already been locked + + Collections.sort(lockObjects, new Comparator() { + + @Override + public int compare(HiveLockObj o1, HiveLockObj o2) { + int cmp = o1.getName().compareTo(o2.getName()); + if (cmp == 0) { + if (o1.getMode() == o2.getMode()) { + return cmp; + } + // EXCLUSIVE locks occur before SHARED locks + if (o1.getMode() == HiveLockMode.EXCLUSIVE) { + return -1; + } + return +1; + } + return cmp; + } + }); + + // walk the list and acquire the locks - if any lock cant be acquired, release all locks, sleep + // and retry + HiveLockObj prevLockObj = null; + List hiveLocks = new ArrayList(); + + for (HiveLockObj lockObject : lockObjects) { + // No need to acquire a lock twice on the same object + // It is ensured that EXCLUSIVE locks occur before SHARED locks on the same object + if ((prevLockObj != null) && (prevLockObj.getName().equals(lockObject.getName()))) { + prevLockObj = lockObject; + continue; + } + + HiveLock lock = null; + try { + lock = lock(lockObject.getObj(), lockObject.getMode(), false, + numRetries, sleepTime, true); + } catch (LockException e) { + console.printError("Error in acquireLocks: "+ e.getLocalizedMessage()); + lock = null; + } + + if (lock == null) { + releaseLocks(hiveLocks); + return null; + } + + hiveLocks.add(lock); + prevLockObj = lockObject; + } + + return hiveLocks; + + } + + /** + * @param hiveLocks + * list of hive locks to be released Release all the locks specified. If some of the + * locks have already been released, ignore them + **/ + public void releaseLocks(List hiveLocks) { + if (hiveLocks != null) { + int len = hiveLocks.size(); + for (int pos = len-1; pos >= 0; pos--) { + HiveLock hiveLock = hiveLocks.get(pos); + try { + unlock(hiveLock); + } catch (LockException e) { + // The lock may have been released. Ignore and continue + } + } + } + } + + /** * @param key The object to be locked * @param mode The mode of the lock * @param keepAlive Whether the lock is to be persisted after the statement @@ -139,61 +249,108 @@ public ZooKeeperHiveLock lock(HiveLockObject key, HiveLockMode mode, boolean keepAlive, int numRetries, int sleepTime) throws LockException { - String name = getObjectName(key, mode); + return lock(key, mode, keepAlive, numRetries, sleepTime, false); + } + + /** + * @param name The name of the zookeeper child + * @param data The data for the zookeeper child + * @param mode The mode in which the child needs to be created + * @param numRetries number of retries if the child cannot be created + * @param sleepTime sleep time between retries + **/ + private String createChild(String name, byte[] data, CreateMode mode, + int numRetries, int sleepTime) throws LockException { String res = null; + int tryNum = 0; + while (true) { + String msg = null; + try { + res = zooKeeper.create(name, data, Ids.OPEN_ACL_UNSAFE, mode); + } catch (KeeperException e) { + return null; + // nothing to do if the node already exists + } catch (Exception e) { + msg = e.getLocalizedMessage(); + } - try { - int tryNum = 0; - while (true) { - String msg = null; - try { - if (keepAlive) { - res = zooKeeper.create(name, key.getData().toString().getBytes(), - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); - } else { - res = zooKeeper.create(name, key.getData().toString().getBytes(), - Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); - } - } catch (Exception e) { - msg = e.getLocalizedMessage(); - } + if (res != null) { + return res; + } - if (res != null) { - break; - } - + try { renewZookeeperInstance(sessionTimeout, quorumServers); + } catch (Exception e) { + console.printError("Lock for " + name + + " cannot be acquired in " + mode); + throw new LockException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg()); + } - if (tryNum == numRetries) { - console.printError("Lock for " + key.getName() - + " cannot be acquired in " + mode); - throw new SemanticException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg()); - } + if (tryNum == numRetries) { + console.printError("Lock for " + name + + " cannot be acquired in " + mode); + throw new LockException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg()); + } - tryNum++; + tryNum++; - console.printInfo("Lock for " + key.getName() - + " cannot be acquired in " + mode +", will retry again later..., more info: " + msg); + console.printInfo("Lock for " + name + + " cannot be acquired in " + mode + + ", will retry again later..., more info: " + msg); - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - } + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { } + } + } - int seqNo = getSequenceNumber(res, name); + private String getLockName(String parent, HiveLockMode mode) { + return parent + "/" + "LOCK-" + mode + "-"; + } + + private ZooKeeperHiveLock lock(HiveLockObject key, HiveLockMode mode, + boolean keepAlive, int numRetries, int sleepTime, + boolean parentCreated) + throws LockException { + String res; + + try { + // If the parents have already been created, create the last child only + List names = new ArrayList(); + String lastName; + + if (parentCreated) { + lastName = getLastObjectName(parent, key); + names.add(lastName); + } + else { + names = getObjectNames(key); + lastName = names.get(names.size()-1); + } + + // Create the parents first + for (String name : names) { + res = createChild(name, new byte[0], CreateMode.PERSISTENT, numRetries, sleepTime); + } + + res = createChild(getLockName(lastName, mode), key.getData().toString().getBytes(), + keepAlive ? CreateMode.PERSISTENT_SEQUENTIAL : CreateMode.EPHEMERAL_SEQUENTIAL, + numRetries, sleepTime); + + int seqNo = getSequenceNumber(res, getLockName(lastName, mode)); if (seqNo == -1) { zooKeeper.delete(res, -1); return null; } - List children = zooKeeper.getChildren("/" + parent, false); + List children = zooKeeper.getChildren(lastName, false); - String exLock = getObjectName(key, HiveLockMode.EXCLUSIVE); - String shLock = getObjectName(key, HiveLockMode.SHARED); + String exLock = getLockName(lastName, HiveLockMode.EXCLUSIVE); + String shLock = getLockName(lastName, HiveLockMode.SHARED); for (String child : children) { - child = "/" + parent + "/" + child; + child = lastName + "/" + child; // Is there a conflicting lock on the same object with a lower sequence number int childSeq = seqNo; @@ -206,12 +363,11 @@ if ((childSeq >= 0) && (childSeq < seqNo)) { zooKeeper.delete(res, -1); - console.printError("conflicting lock present for " + key.getName() + + console.printError("conflicting lock present for " + key.getDisplayName() + " mode " + mode); return null; } } - } catch (Exception e) { LOG.error("Failed to get ZooKeeper lock: " + e); throw new LockException(e); @@ -222,14 +378,25 @@ /* Remove the lock specified */ public void unlock(HiveLock hiveLock) throws LockException { - unlock(ctx.getConf(), zooKeeper, hiveLock); + unlock(ctx.getConf(), zooKeeper, hiveLock, parent); } /* Remove the lock specified */ - private static void unlock(HiveConf conf, ZooKeeper zkpClient, HiveLock hiveLock) throws LockException { + private static void unlock(HiveConf conf, ZooKeeper zkpClient, + HiveLock hiveLock, String parent) throws LockException { ZooKeeperHiveLock zLock = (ZooKeeperHiveLock)hiveLock; try { zkpClient.delete(zLock.getPath(), -1); + + // Delete the parent node if all the children have been deleted + HiveLockObject obj = zLock.getHiveLockObject(); + String name = getLastObjectName(parent, obj); + + List children = zkpClient.getChildren(name, false); + if ((children == null) || (children.isEmpty())) + { + zkpClient.delete(name, -1); + } } catch (Exception e) { LOG.error("Failed to release ZooKeeper lock: " + e); throw new LockException(e); @@ -247,7 +414,7 @@ if (locks != null) { for (HiveLock lock : locks) { - unlock(conf, zkpClient, lock); + unlock(conf, zkpClient, lock, parent); } } @@ -281,24 +448,56 @@ throws LockException { List locks = new ArrayList(); List children; + boolean recurse = true; + String commonParent; try { - children = zkpClient.getChildren("/" + parent, false); + if (key != null) { + commonParent = "/" + parent + "/" + key.getName(); + children = zkpClient.getChildren(commonParent, false); + recurse = false; + } + else { + commonParent = "/" + parent; + children = zkpClient.getChildren(commonParent, false); + } } catch (Exception e) { // no locks present return locks; } - for (String child : children) { - child = "/" + parent + "/" + child; - HiveLockMode mode = getLockMode(conf, child); + Queue childn = new LinkedList(); + if (children != null && !children.isEmpty()) { + for (String child : children) { + childn.add(commonParent + "/" + child); + } + } + + while (true) { + String curChild = childn.poll(); + if (curChild == null) { + return locks; + } + + if (recurse) { + try { + children = zkpClient.getChildren(curChild, false); + for (String child : children) { + childn.add(curChild + "/" + child); + } + } catch (Exception e) { + // nothing to do + } + } + + HiveLockMode mode = getLockMode(conf, curChild); if (mode == null) { continue; } HiveLockObjectData data = null; - //set the lock object with a dummy data, and then do a set if needed. - HiveLockObject obj = getLockObject(conf, child, mode, data, verifyTablePartition); + // set the lock object with a dummy data, and then do a set if needed. + HiveLockObject obj = getLockObject(conf, curChild, mode, data, parent, verifyTablePartition); if (obj == null) { continue; } @@ -308,21 +507,50 @@ if (fetchData) { try { - data = new HiveLockObjectData(new String(zkpClient.getData(child, new DummyWatcher(), null))); + data = new HiveLockObjectData(new String(zkpClient.getData(curChild, new DummyWatcher(), null))); } catch (Exception e) { - LOG.error("Error in getting data for " + child + " " + e); + LOG.error("Error in getting data for " + curChild + " " + e); // ignore error } } obj.setData(data); - HiveLock lck = (HiveLock)(new ZooKeeperHiveLock(child, obj, mode)); + HiveLock lck = (HiveLock)(new ZooKeeperHiveLock(curChild, obj, mode)); locks.add(lck); } } + } - return locks; + /** Remove all redundant nodes **/ + private void removeAllRedundantNodes() { + try { + renewZookeeperInstance(sessionTimeout, quorumServers); + checkRedundantNode("/" + parent); + } catch (Exception e) { + // ignore all errors + } } + private void checkRedundantNode(String node) { + try { + // Nothing to do if it is a lock mode + if (getLockMode(ctx.getConf(), node) != null) + return; + + List children = zooKeeper.getChildren(node, false); + for (String child : children) { + checkRedundantNode(node + "/" + child); + } + + children = zooKeeper.getChildren(node, false); + if ((children == null) || (children.isEmpty())) + { + zooKeeper.delete(node, -1); + } + } catch (Exception e) { + // ignore all errors + } + } + /* Release all transient locks, by simply closing the client */ public void close() throws LockException { try { @@ -330,6 +558,10 @@ zooKeeper.close(); zooKeeper = null; } + + if (HiveConf.getBoolVar(ctx.getConf(), HiveConf.ConfVars.HIVE_ZOOKEEPER_CLEAN_EXTRA_NODES)) { + removeAllRedundantNodes(); + } } catch (Exception e) { LOG.error("Failed to close zooKeeper client: " + e); throw new LockException(e); @@ -356,30 +588,25 @@ * @param verifyTablePartition **/ private static HiveLockObject getLockObject(HiveConf conf, String path, - HiveLockMode mode, HiveLockObjectData data, boolean verifyTablePartition) + HiveLockMode mode, HiveLockObjectData data, + String parent, boolean verifyTablePartition) throws LockException { try { Hive db = Hive.get(conf); - int indx = path.lastIndexOf(mode.toString()); - String objName = path.substring(1, indx-1); - String[] names = objName.split("/")[1].split("@"); + int indx = path.lastIndexOf("LOCK-" + mode.toString()); + String objName = path.substring(("/" + parent + "/").length(), indx-1); + String[] names = objName.split("/"); if (names.length < 2) { return null; } if (!verifyTablePartition) { - if (names.length == 2) { - return new HiveLockObject(names, data); - } else { - return new HiveLockObject(objName.split("/")[1].replaceAll( - conf.getVar(HiveConf.ConfVars.DEFAULT_ZOOKEEPER_PARTITION_NAME), - "/").split("@"), data); - } + return new HiveLockObject(names, data); } - Table tab = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, //need to change to names[0] - names[1], false); // do not throw exception if table does not exist + // do not throw exception if table does not exist + Table tab = db.getTable(names[0], names[1], false); if (tab == null) { return null; } @@ -388,11 +615,9 @@ return new HiveLockObject(tab, data); } - String[] parts = names[2].split(conf.getVar(HiveConf.ConfVars.DEFAULT_ZOOKEEPER_PARTITION_NAME)); - Map partSpec = new HashMap(); - for (indx = 0; indx < parts.length; indx++) { - String[] partVals = parts[indx].split("="); + for (indx = 2; indx < names.length; indx++) { + String[] partVals = names[indx].split("="); partSpec.put(partVals[0], partVals[1]); } @@ -400,13 +625,11 @@ try { partn = db.getPartition(tab, partSpec, false); } catch (HiveException e) { - partn =null; + partn = null; } if (partn == null) { - return new HiveLockObject(new DummyPartition(tab, - objName.split("/")[1].replaceAll(conf.getVar(HiveConf.ConfVars.DEFAULT_ZOOKEEPER_PARTITION_NAME), "/")), - data); + return new HiveLockObject(new DummyPartition(tab, path), data); } return new HiveLockObject(partn, data); Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java (revision 1056098) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java (working copy) @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.lockmgr; import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.DummyPartition; import org.apache.hadoop.hive.ql.metadata.Table; public class HiveLockObject { @@ -75,6 +76,12 @@ this.data = null; } + public HiveLockObject(String path, HiveLockObjectData lockData) { + this.pathNames = new String[1]; + this.pathNames[0] = path; + this.data = lockData; + } + public HiveLockObject(String[] paths, HiveLockObjectData lockData) { this.pathNames = paths; this.data = lockData; @@ -89,6 +96,10 @@ par.getTable().getTableName(), par.getName() }, lockData); } + public HiveLockObject(DummyPartition par, HiveLockObjectData lockData) { + this(new String[] { par.getName() }, lockData); + } + public String getName() { if (this.pathNames == null) { return null; @@ -97,7 +108,7 @@ boolean first = true; for (int i = 0; i < pathNames.length; i++) { if (!first) { - ret = ret + "@"; + ret = ret + "/"; } else { first = false; } @@ -106,6 +117,30 @@ return ret; } + public String getDisplayName() { + if (this.pathNames == null) { + return null; + } + if (pathNames.length == 1) { + return pathNames[0]; + } + else if (pathNames.length == 2) { + return pathNames[0] + "@" + pathNames[1]; + } + + String ret = pathNames[0] + "@" + pathNames[1] + "@"; + boolean first = true; + for (int i = 2; i < pathNames.length; i++) { + if (!first) { + ret = ret + "/"; + } else { + first = false; + } + ret = ret + pathNames[i]; + } + return ret; + } + public HiveLockObjectData getData() { return data; } Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 1056098) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -3575,9 +3575,9 @@ try { String ppath = dpCtx.getSPPath(); ppath = ppath.substring(0, ppath.length()-1); - DummyPartition p = new DummyPartition(dest_tab, - dest_tab.getDbName() + "@" + dest_tab.getTableName() + "@" + ppath); - + DummyPartition p = + new DummyPartition(dest_tab, + dest_tab.getDbName() + "@" + dest_tab.getTableName() + "@" + ppath); outputs.add(new WriteEntity(p, false)); } catch (HiveException e) { throw new SemanticException(e.getMessage()); Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 1056098) +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy) @@ -24,8 +24,6 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -66,6 +64,7 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; import org.apache.hadoop.hive.ql.lockmgr.HiveLockManagerCtx; import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData; import org.apache.hadoop.hive.ql.lockmgr.LockException; @@ -97,6 +96,7 @@ import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.hive.ql.metadata.Hive; public class Driver implements CommandProcessor { @@ -405,49 +405,6 @@ return plan; } - public static class LockObject { - HiveLockObject obj; - HiveLockMode mode; - - public LockObject(HiveLockObject obj, HiveLockMode mode) { - this.obj = obj; - this.mode = mode; - } - - public HiveLockObject getObj() { - return obj; - } - - public HiveLockMode getMode() { - return mode; - } - - public String getName() { - return obj.getName(); - } - } - - public static class LockObjectContainer { - LockObject lck; - - public LockObjectContainer() { - this.lck = null; - } - - public LockObjectContainer(LockObject lck) { - this.lck = lck; - } - - public LockObject getLck() { - return lck; - } - - public void setLck(LockObject lck) { - this.lck = lck; - } - - } - /** * @param t * The table to be locked @@ -458,9 +415,9 @@ * partition needs to be locked (in any mode), all its parents should also be locked in * SHARED mode. **/ - private List getLockObjects(Table t, Partition p, HiveLockMode mode) + private List getLockObjects(Table t, Partition p, HiveLockMode mode) throws SemanticException { - List locks = new LinkedList(); + List locks = new LinkedList(); HiveLockObjectData lockData = new HiveLockObjectData(plan.getQueryId(), @@ -468,18 +425,21 @@ "IMPLICIT"); if (t != null) { - locks.add(new LockObject(new HiveLockObject(t, lockData), mode)); + locks.add(new HiveLockObj(new HiveLockObject(t, lockData), mode)); return locks; } if (p != null) { - locks.add(new LockObject(new HiveLockObject(p, lockData), mode)); + if (!(p instanceof DummyPartition)) { + locks.add(new HiveLockObj(new HiveLockObject(p, lockData), mode)); + } // All the parents are locked in shared mode mode = HiveLockMode.SHARED; - // For summy partitions, only partition name is needed + // For dummy partitions, only partition name is needed String name = p.getName(); + if (p instanceof DummyPartition) { name = p.getName().split("@")[2]; } @@ -487,20 +447,22 @@ String partName = name; String partialName = ""; String[] partns = name.split("/"); - for (int idx = 0; idx < partns.length - 1; idx++) { + int len = p instanceof DummyPartition ? partns.length : partns.length - 1; + for (int idx = 0; idx < len; idx++) { String partn = partns[idx]; - partialName += partialName + partn; + partialName += partn; try { - locks.add(new LockObject(new HiveLockObject(new DummyPartition(p.getTable(), p.getTable() - .getDbName() - + "@" + p.getTable().getTableName() + "@" + partialName), lockData), mode)); + locks.add(new HiveLockObj( + new HiveLockObject(new DummyPartition(p.getTable(), p.getTable().getDbName() + + "/" + p.getTable().getTableName() + + "/" + partialName), lockData), mode)); partialName += "/"; } catch (HiveException e) { throw new SemanticException(e.getMessage()); } } - locks.add(new LockObject(new HiveLockObject(p.getTable(), lockData), mode)); + locks.add(new HiveLockObj(new HiveLockObject(p.getTable(), lockData), mode)); } return locks; } @@ -522,7 +484,7 @@ return 0; } - List lockObjects = new ArrayList(); + List lockObjects = new ArrayList(); // Sort all the inputs, outputs. // If a lock needs to be acquired on any partition, a read lock needs to be acquired on all @@ -557,36 +519,24 @@ return ret; } + HiveLockObjectData lockData = + new HiveLockObjectData(plan.getQueryId(), + String.valueOf(System.currentTimeMillis()), + "IMPLICIT"); + // Lock the database also + try { + Hive db = Hive.get(conf); + lockObjects.add(new HiveLockObj( + new HiveLockObject(db.getCurrentDatabase(), lockData), + HiveLockMode.SHARED)); + } catch (HiveException e) { + throw new SemanticException(e.getMessage()); + } + ctx.setHiveLockMgr(hiveLockMgr); + List hiveLocks = ctx.getHiveLockMgr().lock(lockObjects, false, numRetries, sleepTime); - Collections.sort(lockObjects, new Comparator() { - - @Override - public int compare(LockObject o1, LockObject o2) { - int cmp = o1.getName().compareTo(o2.getName()); - if (cmp == 0) { - if (o1.getMode() == o2.getMode()) { - return cmp; - } - // EXCLUSIVE locks occur before SHARED locks - if (o1.getMode() == HiveLockMode.EXCLUSIVE) { - return -1; - } - return +1; - } - return cmp; - } - - }); - - // walk the list and acquire the locks - if any lock cant be acquired, release all locks, - // sleep and retry - LockObjectContainer notFound = new LockObjectContainer(); - notFound.setLck(null); - List hiveLocks = acquireLocks(lockObjects, notFound, - numRetries, sleepTime); - if (hiveLocks == null) { throw new SemanticException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg()); } else { @@ -599,55 +549,16 @@ console.printError(errorMessage, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); return (10); + } catch (LockException e) { + errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage(); + SQLState = ErrorMsg.findSQLState(e.getMessage()); + console.printError(errorMessage, "\n" + + org.apache.hadoop.util.StringUtils.stringifyException(e)); + return (10); } } /** - * @param lockObjects - * The list of objects to be locked Lock the objects specified in the list. The same - * object is not locked twice, and the list passed is sorted such that EXCLUSIVE locks - * occur before SHARED locks. - * @param sleepTime - * @param numRetries - **/ - private List acquireLocks(List lockObjects, - LockObjectContainer notFound, int numRetries, int sleepTime) - throws SemanticException { - // walk the list and acquire the locks - if any lock cant be acquired, release all locks, sleep - // and retry - LockObject prevLockObj = null; - List hiveLocks = new ArrayList(); - - for (LockObject lockObject : lockObjects) { - // No need to acquire a lock twice on the same object - // It is ensured that EXCLUSIVE locks occur before SHARED locks on the same object - if ((prevLockObj != null) && (prevLockObj.getName().equals(lockObject.getName()))) { - prevLockObj = lockObject; - continue; - } - - HiveLock lock = null; - try { - lock = ctx.getHiveLockMgr().lock(lockObject.getObj(), lockObject.getMode(), false, numRetries, sleepTime); - } catch (LockException e) { - console.printError("Error in acquireLocks: "+ e.getLocalizedMessage()); - lock = null; - } - - if (lock == null) { - notFound.setLck(lockObject); - releaseLocks(hiveLocks); - return null; - } - - hiveLocks.add(lock); - prevLockObj = lockObject; - } - - return hiveLocks; - } - - /** * Release all the locks acquired implicitly by the statement. Note that the locks acquired with * 'keepAlive' set to True are not released. **/ @@ -668,15 +579,9 @@ **/ private void releaseLocks(List hiveLocks) { if (hiveLocks != null) { - for (HiveLock hiveLock : hiveLocks) { - try { - ctx.getHiveLockMgr().unlock(hiveLock); - } catch (LockException e) { - // The lock may have been released. Ignore and continue - } - } - ctx.setHiveLocks(null); + ctx.getHiveLockMgr().releaseLocks(hiveLocks); } + ctx.setHiveLocks(null); } public CommandProcessorResponse run(String command) {