diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 0533ae8..9953919 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -135,7 +135,6 @@ private String errorMessage; private String SQLState; private Throwable downstreamError; - private HiveTxnManager txnMgr; // A limit on the number of threads that can be launched private int maxthreads; @@ -145,16 +144,6 @@ private String userName; - private void createTxnManager() throws SemanticException { - if (txnMgr == null) { - try { - txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - } catch (LockException e) { - throw new SemanticException(e.getMessage(), e); - } - } - } - private boolean checkConcurrency() throws SemanticException { boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); if (!supportConcurrency) { @@ -868,7 +857,7 @@ public QueryPlan getPlan() { // the input format. private int recordValidTxns() { try { - ValidTxnList txns = txnMgr.getValidTxns(); + ValidTxnList txns = SessionState.get().getTxnMgr().getValidTxns(); conf.set(ValidTxnList.VALID_TXNS_KEY, txns.toString()); return 0; } catch (LockException e) { @@ -893,7 +882,7 @@ private int acquireReadWriteLocks() { try { - txnMgr.acquireLocks(plan, ctx, userName); + SessionState.get().getTxnMgr().acquireLocks(plan, ctx, userName); return 0; } catch (LockException e) { errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage(); @@ -917,7 +906,7 @@ private void releaseLocks(List hiveLocks) throws LockException { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RELEASE_LOCKS); if (hiveLocks != null) { - ctx.getHiveTxnManager().getLockManager().releaseLocks(hiveLocks); + SessionState.get().getTxnMgr().getLockManager().releaseLocks(hiveLocks); } ctx.setHiveLocks(null); @@ -1048,9 +1037,14 @@ private CommandProcessorResponse runInternal(String command, boolean alreadyComp boolean requireLock = false; boolean ckLock = false; + SessionState ss = SessionState.get(); try { ckLock = checkConcurrency(); - createTxnManager(); + try { + ss.initTxnMgr(conf); + } catch (LockException e) { + throw new SemanticException(e.getMessage(), e); + } } catch (SemanticException e) { errorMessage = "FAILED: Error in semantic analysis: " + e.getMessage(); SQLState = ErrorMsg.findSQLState(e.getMessage()); @@ -1074,7 +1068,7 @@ private CommandProcessorResponse runInternal(String command, boolean alreadyComp // the reason that we set the txn manager for the cxt here is because each // query has its own ctx object. The txn mgr is shared across the // same instance of Driver, which can run multiple queries. - ctx.setHiveTxnManager(txnMgr); + ctx.setHiveTxnManager(ss.getTxnMgr()); if (ckLock) { boolean lockOnlyMapred = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_LOCK_MAPRED_ONLY); @@ -1670,9 +1664,6 @@ public void destroy() { e.getMessage()); } } - if (txnMgr != null) { - txnMgr.closeTxnManager(); - } } public org.apache.hadoop.hive.ql.plan.api.Query getQueryPlan() throws IOException { diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index df66f83..48543d8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -54,6 +54,9 @@ import org.apache.hadoop.hive.ql.history.HiveHistory; import org.apache.hadoop.hive.ql.history.HiveHistoryImpl; import org.apache.hadoop.hive.ql.history.HiveHistoryProxyHandler; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.LockException; +import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -209,6 +212,29 @@ private Path localSessionPath; /** + * Transaction manager to use for this session. This is instantiated lazily by + * {@link #initTxnMgr(org.apache.hadoop.hive.conf.HiveConf)} + */ + private HiveTxnManager txnMgr = null; + + /** + * When {@link #setCurrentTxn(long)} is set to this or {@link #getCurrentTxn()}} returns this it + * indicates that there is not a current transaction in this session. + */ + public static final long NO_CURRENT_TXN = -1L; + + /** + * Transaction currently open + */ + private long currentTxn = NO_CURRENT_TXN; + + /** + * Whether we are in auto-commit state or not. Currently we are always in auto-commit, + * so there are not setters for this yet. + */ + private boolean txnAutoCommit = true; + + /** * Get the lineage state stored in this session. * * @return LineageState @@ -310,6 +336,37 @@ public String getSessionId() { } /** + * Initialize the transaction manager. This is done lazily to avoid hard wiring one + * transaction manager at the beginning of the session. In general users shouldn't change + * this, but it's useful for testing. + * @param conf Hive configuration to initialize transaction manager + * @return transaction manager + * @throws LockException + */ + public HiveTxnManager initTxnMgr(HiveConf conf) throws LockException { + if (txnMgr == null) { + txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + } + return txnMgr; + } + + public HiveTxnManager getTxnMgr() { + return txnMgr; + } + + public long getCurrentTxn() { + return currentTxn; + } + + public void setCurrentTxn(long currTxn) { + currentTxn = currTxn; + } + + public boolean isAutoCommit() { + return txnAutoCommit; + } + + /** * Singleton Session object per thread. * **/ @@ -1050,6 +1107,7 @@ public void setCurrentDatabase(String currentDatabase) { } public void close() throws IOException { + if (txnMgr != null) txnMgr.closeTxnManager(); JavaUtils.closeClassLoadersTo(conf.getClassLoader(), parentLoader); File resourceDir = new File(getConf().getVar(HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR));