diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 6d3eea0..cc8eed7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -120,12 +120,14 @@ import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; +import org.apache.hive.common.util.ShutdownHookManager; public class Driver implements CommandProcessor { static final private String CLASS_NAME = Driver.class.getName(); private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); static final private LogHelper console = new LogHelper(LOG); + static final int SHUTDOWN_HOOK_PRIORITY = 0; private int maxRows = 100; ByteStream.Output bos = new ByteStream.Output(); @@ -290,6 +292,23 @@ public Driver(HiveConf conf) { this.conf = conf; isParallelEnabled = (conf != null) && HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION); + + SessionState ss = SessionState.get(); + final HiveTxnManager txnMgr = ss.getTxnMgr(); + ShutdownHookManager.addShutdownHook( + new Runnable() { + @Override + public void run() { + try { + if (txnMgr != null) { + releaseLocksAndCommitOrRollback(false, txnMgr); + } + } catch (LockException e) { + LOG.warn("Exception when releasing locks in ShutdownHook for Driver: " + + e.getMessage()); + } + } + }, SHUTDOWN_HOOK_PRIORITY); } public Driver(HiveConf conf, String userName) { @@ -536,7 +555,7 @@ private void dumpMetaCallTimingWithoutEx(String phase) { * * @param sem semantic analyzer for analyzed query * @param plan query plan - * @param astStringTree AST tree dump + * @param astTree AST tree dump * @throws java.io.IOException */ private String getExplainOutput(BaseSemanticAnalyzer sem, QueryPlan plan, @@ -1048,15 +1067,21 @@ private boolean haveAcidWrite() { /** * @param commit if there is an open transaction and if true, commit, * if false rollback. If there is no open transaction this parameter is ignored. + * @param txnManager an optional existing transaction manager retrieved earlier from the session * **/ - private void releaseLocksAndCommitOrRollback(boolean commit) + private void releaseLocksAndCommitOrRollback(boolean commit, HiveTxnManager txnManager) throws LockException { PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RELEASE_LOCKS); - SessionState ss = SessionState.get(); - HiveTxnManager txnMgr = ss.getTxnMgr(); + HiveTxnManager txnMgr; + if (txnManager == null) { + SessionState ss = SessionState.get(); + txnMgr = ss.getTxnMgr(); + } else { + txnMgr = txnManager; + } // If we've opened a transaction we need to commit or rollback rather than explicitly // releasing the locks. if (txnMgr.isTxnOpen()) { @@ -1205,7 +1230,7 @@ private int compileInternal(String command) { } if (ret != 0) { try { - releaseLocksAndCommitOrRollback(false); + releaseLocksAndCommitOrRollback(false, null); } catch (LockException e) { LOG.warn("Exception in releasing locks. " + org.apache.hadoop.util.StringUtils.stringifyException(e)); @@ -1290,7 +1315,7 @@ private CommandProcessorResponse runInternal(String command, boolean alreadyComp if(plan.getAutoCommitValue() && !txnManager.getAutoCommit()) { /*here, if there is an open txn, we want to commit it; this behavior matches * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean)*/ - releaseLocksAndCommitOrRollback(true); + releaseLocksAndCommitOrRollback(true, null); txnManager.setAutoCommit(true); } else if(!plan.getAutoCommitValue() && txnManager.getAutoCommit()) { @@ -1318,10 +1343,10 @@ else if(!plan.getAutoCommitValue() && txnManager.getAutoCommit()) { //if needRequireLock is false, the release here will do nothing because there is no lock try { if(txnManager.getAutoCommit() || plan.getOperation() == HiveOperation.COMMIT) { - releaseLocksAndCommitOrRollback(true); + releaseLocksAndCommitOrRollback(true, null); } else if(plan.getOperation() == HiveOperation.ROLLBACK) { - releaseLocksAndCommitOrRollback(false); + releaseLocksAndCommitOrRollback(false, null); } else { //txn (if there is one started) is not finished @@ -1352,7 +1377,7 @@ else if(plan.getOperation() == HiveOperation.ROLLBACK) { private CommandProcessorResponse rollback(CommandProcessorResponse cpr) { //console.printError(cpr.toString()); try { - releaseLocksAndCommitOrRollback(false); + releaseLocksAndCommitOrRollback(false, null); } catch (LockException e) { LOG.error("rollback() FAILED: " + cpr);//make sure not to loose @@ -1920,7 +1945,7 @@ public void destroy() { destroyed = true; if (!hiveLocks.isEmpty()) { try { - releaseLocksAndCommitOrRollback(false); + releaseLocksAndCommitOrRollback(false, null); } catch (LockException e) { LOG.warn("Exception when releasing locking in destroy: " + e.getMessage());