diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 0533ae8..91955e7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -135,7 +135,6 @@ private String errorMessage; private String SQLState; private Throwable downstreamError; - private HiveTxnManager txnMgr; // A limit on the number of threads that can be launched private int maxthreads; @@ -145,16 +144,6 @@ private String userName; - private void createTxnManager() throws SemanticException { - if (txnMgr == null) { - try { - txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - } catch (LockException e) { - throw new SemanticException(e.getMessage(), e); - } - } - } - private boolean checkConcurrency() throws SemanticException { boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); if (!supportConcurrency) { @@ -868,7 +857,7 @@ public QueryPlan getPlan() { // the input format. private int recordValidTxns() { try { - ValidTxnList txns = txnMgr.getValidTxns(); + ValidTxnList txns = SessionState.get().getTxnMgr().getValidTxns(); conf.set(ValidTxnList.VALID_TXNS_KEY, txns.toString()); return 0; } catch (LockException e) { @@ -893,7 +882,7 @@ private int acquireReadWriteLocks() { try { - txnMgr.acquireLocks(plan, ctx, userName); + SessionState.get().getTxnMgr().acquireLocks(plan, ctx, userName); return 0; } catch (LockException e) { errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage(); @@ -917,7 +906,7 @@ private void releaseLocks(List hiveLocks) throws LockException { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RELEASE_LOCKS); if (hiveLocks != null) { - ctx.getHiveTxnManager().getLockManager().releaseLocks(hiveLocks); + SessionState.get().getTxnMgr().getLockManager().releaseLocks(hiveLocks); } ctx.setHiveLocks(null); @@ -1050,7 +1039,6 @@ private CommandProcessorResponse runInternal(String command, boolean alreadyComp boolean ckLock = false; try { ckLock = checkConcurrency(); - createTxnManager(); } catch (SemanticException e) { errorMessage = "FAILED: Error in semantic analysis: " + e.getMessage(); SQLState = ErrorMsg.findSQLState(e.getMessage()); @@ -1074,7 +1062,7 @@ private CommandProcessorResponse runInternal(String command, boolean alreadyComp // the reason that we set the txn manager for the cxt here is because each // query has its own ctx object. The txn mgr is shared across the // same instance of Driver, which can run multiple queries. - ctx.setHiveTxnManager(txnMgr); + ctx.setHiveTxnManager(SessionState.get().getTxnMgr()); if (ckLock) { boolean lockOnlyMapred = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_LOCK_MAPRED_ONLY); @@ -1670,9 +1658,6 @@ public void destroy() { e.getMessage()); } } - if (txnMgr != null) { - txnMgr.closeTxnManager(); - } } public org.apache.hadoop.hive.ql.plan.api.Query getQueryPlan() throws IOException { diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 9798cf3..0d9416e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -52,6 +52,8 @@ import org.apache.hadoop.hive.ql.history.HiveHistory; import org.apache.hadoop.hive.ql.history.HiveHistoryImpl; import org.apache.hadoop.hive.ql.history.HiveHistoryProxyHandler; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -205,6 +207,11 @@ private Path localSessionPath; /** + * Transaction manager to user for this session. + */ + private HiveTxnManager txnMgr; + + /** * Get the lineage state stored in this session. * * @return LineageState @@ -305,6 +312,10 @@ public String getSessionId() { return (conf.getVar(HiveConf.ConfVars.HIVESESSIONID)); } + public HiveTxnManager getTxnMgr() { + return txnMgr; + } + /** * Singleton Session object per thread. * @@ -368,6 +379,7 @@ public static SessionState start(SessionState startSs) { ShimLoader.getHadoopShims().getUGIForConf(startSs.conf); FileSystem.get(startSs.conf); startSs.createSessionPaths(startSs.conf); + startSs.txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(startSs.conf); } catch (Exception e) { // catch-all due to some exec time dependencies on session state // that would cause ClassNoFoundException otherwise @@ -1046,6 +1058,7 @@ public void setCurrentDatabase(String currentDatabase) { } public void close() throws IOException { + txnMgr.closeTxnManager(); JavaUtils.closeClassLoadersTo(conf.getClassLoader(), parentLoader); File resourceDir = new File(getConf().getVar(HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR));