diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java index 7b86b0c..7f0a6b3 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java @@ -76,6 +76,21 @@ public void testTxns() throws Exception { } @Test + public void testOpenTxnNotExcluded() throws Exception { + List tids = client.openTxns("me", 3).getTxn_ids(); + Assert.assertEquals(1L, (long) tids.get(0)); + Assert.assertEquals(2L, (long) tids.get(1)); + Assert.assertEquals(3L, (long) tids.get(2)); + client.rollbackTxn(1); + client.commitTxn(2); + ValidTxnList validTxns = client.getValidTxns(3); + Assert.assertFalse(validTxns.isTxnCommitted(1)); + Assert.assertTrue(validTxns.isTxnCommitted(2)); + Assert.assertTrue(validTxns.isTxnCommitted(3)); + Assert.assertFalse(validTxns.isTxnCommitted(4)); + } + + @Test public void testTxnRange() throws Exception { ValidTxnList validTxns = client.getValidTxns(); Assert.assertEquals(ValidTxnList.RangeResponse.NONE, diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index 8000081..94c2245 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -1689,7 +1689,12 @@ public void cancelDelegationToken(String tokenStrForm) throws MetaException, TEx @Override public ValidTxnList getValidTxns() throws TException { - return TxnHandler.createValidTxnList(client.get_open_txns()); + return TxnHandler.createValidTxnList(client.get_open_txns(), 0); + } + + @Override + public ValidTxnList getValidTxns(long currentTxn) throws TException { + return TxnHandler.createValidTxnList(client.get_open_txns(), currentTxn); } @Override diff --git metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java index 98739f3..066ab68 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java @@ -1086,6 +1086,15 @@ Function getFunction(String dbName, String funcName) ValidTxnList getValidTxns() throws TException; /** + * Get a structure that details valid transactions. + * @param currentTxn The current transaction of the caller. This will be removed from the + * exceptions list so that the caller sees records from his own transaction. + * @return list of valid transactions + * @throws TException + */ + ValidTxnList getValidTxns(long currentTxn) throws TException; + + /** * Initiate a transaction. * @param user User who is opening this transaction. This is the Hive user, * not necessarily the OS user. It is assumed that this user has already been diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index b71bb41..6f44169 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -233,12 +233,22 @@ public GetOpenTxnsResponse getOpenTxns() throws MetaException { } } - public static ValidTxnList createValidTxnList(GetOpenTxnsResponse txns) { + /** + * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse} to a + * {@link org.apache.hadoop.hive.common.ValidTxnList}. + * @param txns txn list from the metastore + * @param currentTxn Current transaction that the user has open. If this is greater than 0 it + * will be removed from the exceptions list so that the user sees his own + * transaction as valid. + * @return a valid txn list. + */ + public static ValidTxnList createValidTxnList(GetOpenTxnsResponse txns, long currentTxn) { long highWater = txns.getTxn_high_water_mark(); Set open = txns.getOpen_txns(); - long[] exceptions = new long[open.size()]; + long[] exceptions = new long[open.size() - (currentTxn > 0 ? 1 : 0)]; int i = 0; for(long txn: open) { + if (currentTxn > 0 && currentTxn == txn) continue; exceptions[i++] = txn; } return new ValidTxnListImpl(exceptions, highWater); diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 5e46590..4826abc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -390,6 +390,9 @@ public int compile(String command, boolean resetTaskIds) { tree = ParseUtils.findRootNonNullToken(tree); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARSE); + // Initialize the transaction manager. This must be done before analyze is called + SessionState.get().initTxnMgr(conf); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ANALYZE); BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(conf, tree); List saHooks = @@ -889,9 +892,12 @@ private int recordValidTxns() { /** * Acquire read and write locks needed by the statement. The list of objects to be locked are - * obtained from he inputs and outputs populated by the compiler. The lock acuisition scheme is + * obtained from the inputs and outputs populated by the compiler. The lock acuisition scheme is * pretty simple. If all the locks cannot be obtained, error out. Deadlock is avoided by making * sure that the locks are lexicographically sorted. + * + * This method also records the list of valid transactions. This must be done after any + * transactions have been opened and locks acquired. **/ private int acquireLocksAndOpenTxn() { PerfLogger perfLogger = PerfLogger.getPerfLogger(); @@ -931,7 +937,7 @@ private int acquireLocksAndOpenTxn() { txnMgr.acquireLocks(plan, ctx, userFromUGI); - return 0; + return recordValidTxns(); } catch (LockException e) { errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage(); SQLState = ErrorMsg.findSQLState(e.getMessage()); @@ -1108,11 +1114,6 @@ private CommandProcessorResponse runInternal(String command, boolean alreadyComp SessionState ss = SessionState.get(); try { ckLock = checkConcurrency(); - try { - ss.initTxnMgr(conf); - } catch (LockException e) { - throw new SemanticException(e.getMessage(), e); - } } catch (SemanticException e) { errorMessage = "FAILED: Error in semantic analysis: " + e.getMessage(); SQLState = ErrorMsg.findSQLState(e.getMessage()); @@ -1121,11 +1122,8 @@ private CommandProcessorResponse runInternal(String command, boolean alreadyComp + org.apache.hadoop.util.StringUtils.stringifyException(e)); return createProcessorResponse(10); } - int ret = recordValidTxns(); - if (ret != 0) { - return createProcessorResponse(ret); - } + int ret; if (!alreadyCompiled) { ret = compileInternal(command); if (ret != 0) { diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 8f7c759..46b441a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -286,7 +286,7 @@ public void heartbeat() throws LockException { public ValidTxnList getValidTxns() throws LockException { init(); try { - return client.getValidTxns(); + return client.getValidTxns(txnId); } catch (TException e) { throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 4b0009f..c1d0fe1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -76,7 +76,7 @@ public void run() { // don't doom the entire thread. try { ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest()); - ValidTxnList txns = TxnHandler.createValidTxnList(txnHandler.getOpenTxns()); + ValidTxnList txns = TxnHandler.createValidTxnList(txnHandler.getOpenTxns(), 0); Set potentials = txnHandler.findPotentialCompactions(abortedThreshold); LOG.debug("Found " + potentials.size() + " potential compactions, " + "checking to see if we should compact any of them"); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 347bf65..249fece 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -120,7 +120,7 @@ public void run() { final boolean isMajor = ci.isMajorCompaction(); final ValidTxnList txns = - TxnHandler.createValidTxnList(txnHandler.getOpenTxns()); + TxnHandler.createValidTxnList(txnHandler.getOpenTxns(), 0); final StringBuffer jobName = new StringBuffer(name); jobName.append("-compactor-"); jobName.append(ci.getFullPartitionName());