diff --git common/src/java/org/apache/hadoop/hive/common/JavaUtils.java common/src/java/org/apache/hadoop/hive/common/JavaUtils.java index 3dd8f75..dc3a4ae 100644 --- common/src/java/org/apache/hadoop/hive/common/JavaUtils.java +++ common/src/java/org/apache/hadoop/hive/common/JavaUtils.java @@ -137,12 +137,19 @@ public static void closeClassLoader(ClassLoader loader) throws IOException { } /** - * Utility method for ACID to normalize logging info - * @param extLockId LockResponse.lockid + * Utility method for ACID to normalize logging info. Matches + * {@link org.apache.hadoop.hive.metastore.api.LockRequest#toString()} */ public static String lockIdToString(long extLockId) { return "lockid:" + extLockId; } + /** + * Utility method for ACID to normalize logging info. Matches + * {@link org.apache.hadoop.hive.metastore.api.LockResponse#toString()} + */ + public static String txnIdToString(long txnId) { + return "txnid:" + txnId; + } private JavaUtils() { // prevent instantiation diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index c0e83c6..88e007c 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -339,7 +339,7 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept if (abortTxns(dbConn, Collections.singletonList(txnid)) != 1) { LOG.debug("Going to rollback"); dbConn.rollback(); - throw new NoSuchTxnException("No such transaction: " + txnid); + throw new NoSuchTxnException("No such transaction " + JavaUtils.txnIdToString(txnid)); } LOG.debug("Going to commit"); @@ -382,7 +382,7 @@ public void commitTxn(CommitTxnRequest rqst) if (stmt.executeUpdate(s) < 1) { //this can be reasonable for an empty txn START/COMMIT LOG.info("Expected to move at least one record from txn_components to " + - "completed_txn_components when committing txn! txnid:" + txnid); + "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid)); } // Always access TXN_COMPONENTS before HIVE_LOCKS; @@ -508,8 +508,8 @@ public void unlock(UnlockRequest rqst) LOG.debug("Going to rollback"); dbConn.rollback(); String msg = "Unlocking locks associated with transaction" + - " not permitted. Lockid " + extLockId + " is associated with " + - "transaction " + txnid; + " not permitted. Lockid " + JavaUtils.lockIdToString(extLockId) + " is associated with " + + "transaction " + JavaUtils.txnIdToString(txnid); LOG.error(msg); throw new TxnOpenException(msg); } @@ -520,7 +520,7 @@ public void unlock(UnlockRequest rqst) if (rc < 1) { LOG.debug("Going to rollback"); dbConn.rollback(); - throw new NoSuchLockException("No such lock: " + extLockId); + throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId)); } LOG.debug("Going to commit"); dbConn.commit(); @@ -1175,8 +1175,8 @@ public boolean equals(Object other) { @Override public String toString() { return JavaUtils.lockIdToString(extLockId) + " intLockId:" + - intLockId + " txnId:" + Long.toString - (txnId) + " db:" + db + " table:" + table + " partition:" + + intLockId + " " + JavaUtils.txnIdToString(txnId) + + " db:" + db + " table:" + table + " partition:" + partition + " state:" + (state == null ? "null" : state.toString()) + " type:" + (type == null ? "null" : type.toString()); } @@ -1315,7 +1315,8 @@ private int abortTxns(Connection dbConn, List txnids) throws SQLException LOG.debug("Going to execute update <" + buf.toString() + ">"); stmt.executeUpdate(buf.toString()); - buf = new StringBuilder("update TXNS set txn_state = '" + TXN_ABORTED + "' where txn_id in ("); + buf = new StringBuilder("update TXNS set txn_state = '" + TXN_ABORTED + + "' where txn_state = '" + TXN_OPEN + "' and txn_id in ("); first = true; for (Long id : txnids) { if (first) first = false; @@ -1344,7 +1345,7 @@ private int abortTxns(Connection dbConn, List txnids) throws SQLException * of NOT_ACQUIRED. The caller will need to call * {@link #lockNoWait(org.apache.hadoop.hive.metastore.api.LockRequest)} again to * attempt another lock. - * @return informatino on whether the lock was acquired. + * @return information on whether the lock was acquired. * @throws NoSuchTxnException * @throws TxnAbortedException */ @@ -1733,12 +1734,12 @@ private void heartbeatTxn(Connection dbConn, long txnid) if (!rs.next()) { LOG.debug("Going to rollback"); dbConn.rollback(); - throw new NoSuchTxnException("No such transaction: " + txnid); + throw new NoSuchTxnException("No such transaction " + JavaUtils.txnIdToString(txnid)); } if (rs.getString(1).charAt(0) == TXN_ABORTED) { LOG.debug("Going to rollback"); dbConn.rollback(); - throw new TxnAbortedException("Transaction " + txnid + + throw new TxnAbortedException("Transaction " + JavaUtils.txnIdToString(txnid) + " already aborted"); } s = "update TXNS set txn_last_heartbeat = " + now + @@ -1767,7 +1768,7 @@ private long getTxnIdFromLockId(Connection dbConn, long extLockId) "checked the lock existed but now we can't find it!"); } long txnid = rs.getLong(1); - LOG.debug("Return txnid " + (rs.wasNull() ? -1 : txnid)); + LOG.debug("Return " + JavaUtils.txnIdToString(rs.wasNull() ? -1 : txnid)); return (rs.wasNull() ? -1 : txnid); } finally { closeStmt(stmt); diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 39b287a..63c8f96 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -385,14 +385,14 @@ "set hive.txn.manager"), TXNMGR_NOT_INSTANTIATED(10261, "Transaction manager could not be " + "instantiated, check hive.txn.manager"), - TXN_NO_SUCH_TRANSACTION(10262, "No record of transaction could be found, " + - "may have timed out"), - TXN_ABORTED(10263, "Transaction manager has aborted the transaction."), + TXN_NO_SUCH_TRANSACTION(10262, "No record of transaction {0} could be found, " + + "may have timed out", true), + TXN_ABORTED(10263, "Transaction manager has aborted the transaction {0}.", true), DBTXNMGR_REQUIRES_CONCURRENCY(10264, "To use DbTxnManager you must set hive.support.concurrency=true"), - LOCK_NO_SUCH_LOCK(10270, "No record of lock could be found, " + - "may have timed out"), + LOCK_NO_SUCH_LOCK(10270, "No record of lock {0} could be found, " + + "may have timed out", true), LOCK_REQUEST_UNSUPPORTED(10271, "Current transaction manager does not " + "support explicit lock requests. Transaction manager: "), diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java index aa00573..82e227f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java @@ -48,7 +48,7 @@ private long nextSleep = 50; DbLockManager(IMetaStoreClient client) { - locks = new HashSet(); + locks = new HashSet<>(); this.client = client; } @@ -104,8 +104,8 @@ LockState lock(LockRequest lock, String queryId, boolean isBlocking, List locks = lockMgr.getLocks(false, false); if (locks.size() == 0) { if (!isTxnOpen()) { @@ -309,14 +310,14 @@ public void heartbeat() throws LockException { try { client.heartbeat(txnId, lockId); } catch (NoSuchLockException e) { - LOG.error("Unable to find lock " + lockId); - throw new LockException(ErrorMsg.LOCK_NO_SUCH_LOCK.getMsg(), e); + LOG.error("Unable to find lock " + JavaUtils.lockIdToString(lockId)); + throw new LockException(e, ErrorMsg.LOCK_NO_SUCH_LOCK, JavaUtils.lockIdToString(lockId)); } catch (NoSuchTxnException e) { - LOG.error("Unable to find transaction " + txnId); - throw new LockException(ErrorMsg.TXN_NO_SUCH_TRANSACTION.getMsg(), e); + LOG.error("Unable to find transaction " + JavaUtils.txnIdToString(txnId)); + throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(txnId)); } catch (TxnAbortedException e) { - LOG.error("Transaction aborted " + txnId); - throw new LockException(ErrorMsg.TXN_ABORTED.getMsg(), e); + LOG.error("Transaction aborted " + JavaUtils.txnIdToString(txnId)); + throw new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(txnId)); } catch (TException e) { throw new LockException( ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e); diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java index 2fb78fd..f57350d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java @@ -17,12 +17,12 @@ */ package org.apache.hadoop.hive.ql.lockmgr; -import junit.framework.Assert; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; @@ -31,16 +31,19 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.log4j.Level; import org.apache.log4j.LogManager; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; import org.junit.After; -import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import java.util.*; +import java.util.concurrent.TimeUnit; /** * Unit tests for {@link DbTxnManager}. - * See additional tests in {@link org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager} + * See additional tests in {@link org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2} */ public class TestDbTxnManager { @@ -53,6 +56,7 @@ HashSet writeEntities; public TestDbTxnManager() throws Exception { + conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 500, TimeUnit.MILLISECONDS); TxnDbUtil.setConfValues(conf); SessionState.start(conf); ctx = new Context(conf); @@ -175,6 +179,51 @@ public void testWriteDynamicPartition() throws Exception { locks = txnMgr.getLockManager().getLocks(false, false); Assert.assertEquals(0, locks.size()); } + @Test + public void testExceptions() throws Exception { + WriteEntity we = addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT); + QueryPlan qp = new MockQueryPlan(this); + txnMgr.acquireLocks(qp, ctx, "PeterI"); + txnMgr.openTxn("NicholasII"); + Thread.sleep(1000);//let txn timeout + txnMgr.getValidTxns(); + LockException exception = null; + try { + txnMgr.commitTxn(); + } + catch(LockException ex) { + exception = ex; + } + Assert.assertNotNull("Expected exception1", exception); + Assert.assertEquals("Wrong Exception1", ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg()); + exception = null; + txnMgr.openTxn("AlexanderIII"); + Thread.sleep(1000); + txnMgr.getValidTxns(); + try { + txnMgr.rollbackTxn(); + } + catch (LockException ex) { + exception = ex; + } + Assert.assertNotNull("Expected exception2", exception); + Assert.assertEquals("Wrong Exception2", ErrorMsg.TXN_NO_SUCH_TRANSACTION, exception.getCanonicalErrorMsg()); + exception = null; + txnMgr.openTxn("PeterI"); + txnMgr.acquireLocks(qp, ctx, "PeterI"); + List locks = ctx.getHiveLocks(); + Assert.assertThat("Unexpected lock count", locks.size(), is(1)); + Thread.sleep(1000); + txnMgr.getValidTxns(); + try { + txnMgr.heartbeat(); + } + catch(LockException ex) { + exception = ex; + } + Assert.assertNotNull("Expected exception3", exception); + Assert.assertEquals("Wrong Exception3", ErrorMsg.LOCK_NO_SUCH_LOCK, exception.getCanonicalErrorMsg()); + } @Test public void testReadWrite() throws Exception { diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 6a69641..44ad8b0 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -39,7 +39,7 @@ /** * See additional tests in {@link org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager} - * Tests here + * Tests here are "end-to-end"ish and simulate concurrent queries. */ public class TestDbTxnManager2 { private static HiveConf conf = new HiveConf(Driver.class);