diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index 60674eb..f72c379 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -291,6 +291,9 @@ public static int countQueryAgent(String countQuery) throws Exception { } @VisibleForTesting public static String queryToString(String query) throws Exception { + return queryToString(query, true); + } + public static String queryToString(String query, boolean includeHeader) throws Exception { Connection conn = null; Statement stmt = null; ResultSet rs = null; @@ -300,10 +303,12 @@ public static String queryToString(String query) throws Exception { stmt = conn.createStatement(); rs = stmt.executeQuery(query); ResultSetMetaData rsmd = rs.getMetaData(); - for(int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) { - sb.append(rsmd.getColumnName(colPos)).append(" "); + if(includeHeader) { + for (int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) { + sb.append(rsmd.getColumnName(colPos)).append(" "); + } + sb.append('\n'); } - sb.append('\n'); while(rs.next()) { for (int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) { sb.append(rs.getObject(colPos)).append(" "); diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 5b6f20c..82d47a2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -445,7 +445,7 @@ public void heartbeat() throws LockException { private Heartbeater startHeartbeat(long initialDelay) throws LockException { long heartbeatInterval = getHeartbeatInterval(conf); assert heartbeatInterval > 0; - Heartbeater heartbeater = new Heartbeater(this, conf); + Heartbeater heartbeater = new Heartbeater(this, conf, queryId); // For negative testing purpose.. if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER)) { initialDelay = 0; @@ -601,20 +601,21 @@ private static long getHeartbeatInterval(Configuration conf) throws LockExceptio public static class Heartbeater implements Runnable { private HiveTxnManager txnMgr; private HiveConf conf; - LockException lockException; + private final String queryId; + public LockException getLockException() { return lockException; } - /** * * @param txnMgr transaction manager for this operation */ - Heartbeater(HiveTxnManager txnMgr, HiveConf conf) { + Heartbeater(HiveTxnManager txnMgr, HiveConf conf, String queryId) { this.txnMgr = txnMgr; this.conf = conf; lockException = null; + this.queryId = queryId; } /** @@ -627,13 +628,16 @@ public void run() { if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER)) { throw new LockException(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER.name() + "=true"); } - LOG.debug("Heartbeating..."); txnMgr.heartbeat(); } catch (LockException e) { - LOG.error("Failed trying to heartbeat " + e.getMessage()); + LOG.error("Failed trying to heartbeat queryId=" + queryId + ": " + e.getMessage()); lockException = e; } + catch(Throwable t) { + LOG.error("Failed trying to heartbeat queryId=" + queryId + ": " + t.getMessage(), t); + lockException = new LockException("Failed trying to heartbeat queryId=" + queryId + ": " + t.getMessage(), t); + } } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 388b7c5..d995e85 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -21,10 +21,13 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.LockState; import org.apache.hadoop.hive.metastore.api.LockType; import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; +import org.apache.hadoop.hive.metastore.api.TxnInfo; +import org.apache.hadoop.hive.metastore.api.TxnState; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; @@ -428,26 +431,69 @@ public void testTimeOutReaper() throws Exception { Assert.assertTrue("Actual: " + cpr.getErrorMessage(), cpr.getErrorMessage().contains("Transaction manager has aborted the transaction txnid:1")); //now test that we don't timeout locks we should not - hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 10, TimeUnit.MINUTES); + //heartbeater should be running in the background every 1/2 second + hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.SECONDS); + //hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER, true); runStatementOnDriver("start transaction"); runStatementOnDriver("select count(*) from " + Table.ACIDTBL + " where a = 17"); + pause(750); + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + + //since there is txn open, we are heartbeating the txn not individual locks + GetOpenTxnsInfoResponse txnsInfoResponse = txnHandler.getOpenTxnsInfo(); + Assert.assertEquals(2, txnsInfoResponse.getOpen_txns().size()); + TxnInfo txnInfo = null; + for(TxnInfo ti : txnsInfoResponse.getOpen_txns()) { + if(ti.getState() == TxnState.OPEN) { + txnInfo = ti; + break; + } + } + Assert.assertNotNull(txnInfo); + Assert.assertEquals(2, txnInfo.getId()); + Assert.assertEquals(TxnState.OPEN, txnInfo.getState()); + String s =TxnDbUtil.queryToString("select TXN_STARTED, TXN_LAST_HEARTBEAT from TXNS where TXN_ID = " + txnInfo.getId(), false); + String[] vals = s.split("\\s+"); + Assert.assertEquals("Didn't get expected timestamps", 2, vals.length); + long lastHeartbeat = Long.parseLong(vals[1]); + //these 2 values are equal when TXN entry is made. Should never be equal after 1st heartbeat, which we + //expect to have happened by now since HIVE_TXN_TIMEOUT=1sec + Assert.assertNotEquals("Didn't see heartbeat happen", Long.parseLong(vals[0]), lastHeartbeat); + ShowLocksResponse slr = txnHandler.showLocks(new ShowLocksRequest()); TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks().get(0)); + pause(750); TestTxnCommands2.runHouseKeeperService(houseKeeperService, hiveConf); + pause(750); slr = txnHandler.showLocks(new ShowLocksRequest()); + Assert.assertEquals("Unexpected lock count: " + slr, 1, slr.getLocks().size()); TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks().get(0)); - Assert.assertEquals("Unexpected lock count", 1, slr.getLocks().size()); + pause(750); TestTxnCommands2.runHouseKeeperService(houseKeeperService, hiveConf); slr = txnHandler.showLocks(new ShowLocksRequest()); + Assert.assertEquals("Unexpected lock count: " + slr, 1, slr.getLocks().size()); TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks().get(0)); - Assert.assertEquals("Unexpected lock count", 1, slr.getLocks().size()); + + //should've done several heartbeats + s =TxnDbUtil.queryToString("select TXN_STARTED, TXN_LAST_HEARTBEAT from TXNS where TXN_ID = " + txnInfo.getId(), false); + vals = s.split("\\s+"); + Assert.assertEquals("Didn't get expected timestamps", 2, vals.length); + Assert.assertTrue("Heartbeat didn't progress: (old,new) (" + lastHeartbeat + "," + vals[1]+ ")", + lastHeartbeat < Long.parseLong(vals[1])); runStatementOnDriver("rollback"); slr = txnHandler.showLocks(new ShowLocksRequest()); Assert.assertEquals("Unexpected lock count", 0, slr.getLocks().size()); } + private static void pause(int timeMillis) { + try { + Thread.sleep(timeMillis); + } + catch (InterruptedException e) { + } + } /** * takes raw data and turns it into a string as if from Driver.getResults()