Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLock.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLock.java (revision 1423757) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLock.java (working copy) @@ -41,6 +41,7 @@ this.path = path; } + @Override public HiveLockObject getHiveLockObject() { return obj; } @@ -49,6 +50,7 @@ this.obj = obj; } + @Override public HiveLockMode getHiveLockMode() { return mode; } @@ -56,4 +58,16 @@ public void setHiveLockMode(HiveLockMode mode) { this.mode = mode; } + + @Override + public boolean equals(Object o) { + if (!(o instanceof ZooKeeperHiveLock)) { + return false; + } + + ZooKeeperHiveLock zLock = (ZooKeeperHiveLock)o; + return path.equals(zLock.getPath()) && + obj.equals(zLock.getHiveLockObject()) && + mode == zLock.getHiveLockMode(); + } } Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java (revision 1423757) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java (working copy) @@ -18,27 +18,27 @@ package org.apache.hadoop.hive.ql.lockmgr; +import org.apache.hadoop.hive.ql.metadata.DummyPartition; import org.apache.hadoop.hive.ql.metadata.Partition; -import org.apache.hadoop.hive.ql.metadata.DummyPartition; import org.apache.hadoop.hive.ql.metadata.Table; public class HiveLockObject { - String [] pathNames = null; + String[] pathNames = null; public static class HiveLockObjectData { - private String queryId; // queryId of the command + private String queryId; // queryId of the command private String lockTime; // time at which lock was acquired // mode of the lock: EXPLICIT(lock command)/IMPLICIT(query) private String lockMode; private String queryStr; - private String clientIp; + private String clientIp; public HiveLockObjectData(String queryId, - String lockTime, - String lockMode, - String queryStr) { - this.queryId = queryId; + String lockTime, + String lockMode, + String queryStr) { + this.queryId = queryId; this.lockTime = lockTime; this.lockMode = lockMode; this.queryStr = queryStr.trim(); @@ -51,7 +51,7 @@ } String[] elem = data.split(":"); - queryId = elem[0]; + queryId = elem[0]; lockTime = elem[1]; lockMode = elem[2]; queryStr = elem[3]; @@ -73,18 +73,40 @@ return queryStr; } + @Override public String toString() { return queryId + ":" + lockTime + ":" + lockMode + ":" + queryStr + ":" + clientIp; } - + public String getClientIp() { return this.clientIp; } - + public void setClientIp(String clientIp) { this.clientIp = clientIp; } + + @Override + public boolean equals(Object o) { + if (!(o instanceof HiveLockObjectData)) { + return false; + } + + HiveLockObjectData target = (HiveLockObjectData) o; + boolean ret = (queryId == null) ? target.getQueryId() == null : + queryId.equals(target.getQueryId()); + ret = ret && (lockTime == null) ? target.getLockTime() == null : + queryId.equals(target.getLockTime()); + ret = ret && (lockMode == null) ? target.getLockMode() == null : + queryId.equals(target.getLockMode()); + ret = ret && (queryStr == null) ? target.getQueryStr() == null : + queryStr.equals(target.getQueryStr()); + ret = ret && (clientIp == null) ? target.getClientIp() == null : + clientIp.equals(target.getClientIp()); + + return ret; + } } /* user supplied data for that object */ @@ -110,12 +132,12 @@ } public HiveLockObject(Partition par, HiveLockObjectData lockData) { - this(new String[] { par.getTable().getDbName(), - par.getTable().getTableName(), par.getName() }, lockData); + this(new String[] {par.getTable().getDbName(), + par.getTable().getTableName(), par.getName()}, lockData); } public HiveLockObject(DummyPartition par, HiveLockObjectData lockData) { - this(new String[] { par.getName() }, lockData); + this(new String[] {par.getName()}, lockData); } public String[] getPaths() { @@ -171,4 +193,14 @@ this.data = data; } + @Override + public boolean equals(Object o) { + if (!(o instanceof HiveLockObject)) { + return false; + } + + HiveLockObject tgt = (HiveLockObject) o; + return getName().equals(tgt.getName()) && + data == null ? tgt.getData() == null : data.equals(tgt.getData()); + } } Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java (revision 1423757) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java (working copy) @@ -438,10 +438,12 @@ this.lockMode = lockMode; } + @Override public HiveLockObject getHiveLockObject() { return lockObj; } + @Override public HiveLockMode getHiveLockMode() { return lockMode; } @@ -450,5 +452,16 @@ public String toString() { return lockMode + "=" + lockObj.getDisplayName() + "(" + lockObj.getData() + ")"; } + + @Override + public boolean equals(Object o) { + if (!(o instanceof SimpleHiveLock)) { + return false; + } + + SimpleHiveLock simpleLock = (SimpleHiveLock) o; + return lockObj.equals(simpleLock.getHiveLockObject()) && + lockMode == simpleLock.getHiveLockMode(); + } } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (revision 1423757) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (working copy) @@ -51,6 +51,9 @@ import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj; /** * MoveTask implementation. @@ -146,6 +149,40 @@ return deletePath; } + // Release all the locks acquired for this object + // This becomes important for multi-table inserts when one branch may take much more + // time than the others. It is better to release the lock for this particular insert. + // The other option is to wait for all the branches to finish, or set + // hive.multi.insert.move.tasks.share.dependencies to true, which will mean that the + // first multi-insert results will be available when all of the branches of multi-table + // inserts are done. + private void releaseLocks(LoadTableDesc ltd) throws HiveException { + // nothing needs to be done + if (!conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY)) { + return; + } + + Context ctx = driverContext.getCtx(); + HiveLockManager lockMgr = ctx.getHiveLockMgr(); + WriteEntity output = ctx.getLoadTableOutputMap().get(ltd); + List lockObjects = ctx.getOutputLockObjects().get(output); + if (lockObjects == null) { + return; + } + + for (HiveLockObj lockObj : lockObjects) { + List locks = lockMgr.getLocks(lockObj.getObj(), false, false); + for (HiveLock lock : locks) { + if (lock.getHiveLockMode() == lockObj.getMode()) { + LOG.info("about to release lock for " + lock.getHiveLockObject().getName()); + lockMgr.unlock(lock); + ctx.getHiveLocks().remove(lock); + } + } + } + } + + @Override public int execute(DriverContext driverContext) { @@ -317,6 +354,7 @@ SessionState.get().getLineageState().setLineage(tbd.getSourceDir(), dc, table.getCols()); } + releaseLocks(tbd); } return 0; Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 1423757) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -4646,18 +4646,23 @@ loadTableWork.add(ltd); } + WriteEntity output = null; + // Here only register the whole table for post-exec hook if no DP present // in the case of DP, we will register WriteEntity in MoveTask when the // list of dynamically created partitions are known. - if ((dpCtx == null || dpCtx.getNumDPCols() == 0) && - !outputs.add(new WriteEntity(dest_tab))) { - throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES - .getMsg(dest_tab.getTableName())); + if ((dpCtx == null || dpCtx.getNumDPCols() == 0)) { + output = new WriteEntity(dest_tab); + if (!outputs.add(output)) { + throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES + .getMsg(dest_tab.getTableName())); + } } if ((dpCtx != null) && (dpCtx.getNumDPCols() >= 0)) { // No static partition specified if (dpCtx.getNumSPCols() == 0) { - outputs.add(new WriteEntity(dest_tab, false)); + output = new WriteEntity(dest_tab, false); + outputs.add(output); } // part of the partition specified // Create a DummyPartition in this case. Since, the metastore does not store partial @@ -4670,13 +4675,15 @@ new DummyPartition(dest_tab, dest_tab.getDbName() + "@" + dest_tab.getTableName() + "@" + ppath, partSpec); - outputs.add(new WriteEntity(p, false)); + output = new WriteEntity(p, false); + outputs.add(output); } catch (HiveException e) { throw new SemanticException(e.getMessage(), e); } } } + ctx.getLoadTableOutputMap().put(ltd, output); break; } case QBMetaData.DEST_PARTITION: { Index: ql/src/java/org/apache/hadoop/hive/ql/Context.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Context.java (revision 1423757) +++ ql/src/java/org/apache/hadoop/hive/ql/Context.java (working copy) @@ -42,8 +42,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.lockmgr.HiveLock; import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj; +import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.util.StringUtils; @@ -88,6 +91,12 @@ private boolean needLockMgr; + // Keep track of the mapping from load table desc to the output and the lock + private final Map loadTableOutputMap = + new HashMap(); + private final Map> outputLockObjects = + new HashMap>(); + public Context(Configuration conf) throws IOException { this(conf, generateExecutionId()); } @@ -109,6 +118,15 @@ executionId).toUri().getPath(); } + + public Map getLoadTableOutputMap() { + return loadTableOutputMap; + } + + public Map> getOutputLockObjects() { + return outputLockObjects; + } + /** * Set the context on whether the current query is an explain query. * @param value true if the query is an explain query, false if not Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 1423757) +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy) @@ -786,16 +786,22 @@ } for (WriteEntity output : plan.getOutputs()) { + List lockObj = null; if (output.getTyp() == WriteEntity.Type.TABLE) { - lockObjects.addAll(getLockObjects(output.getTable(), null, - output.isComplete() ? HiveLockMode.EXCLUSIVE : HiveLockMode.SHARED)); + lockObj = getLockObjects(output.getTable(), null, + output.isComplete() ? HiveLockMode.EXCLUSIVE : HiveLockMode.SHARED); } else if (output.getTyp() == WriteEntity.Type.PARTITION) { - lockObjects.addAll(getLockObjects(null, output.getPartition(), HiveLockMode.EXCLUSIVE)); + lockObj = getLockObjects(null, output.getPartition(), HiveLockMode.EXCLUSIVE); } // In case of dynamic queries, it is possible to have incomplete dummy partitions else if (output.getTyp() == WriteEntity.Type.DUMMYPARTITION) { - lockObjects.addAll(getLockObjects(null, output.getPartition(), HiveLockMode.SHARED)); + lockObj = getLockObjects(null, output.getPartition(), HiveLockMode.SHARED); } + + if(lockObj != null) { + lockObjects.addAll(lockObj); + ctx.getOutputLockObjects().put(output, lockObj); + } } if (lockObjects.isEmpty() && !ctx.isNeedLockMgr()) {