diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 2f6baec..340ab6c 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -31,6 +31,8 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.ql.CommandNeedRetryException; @@ -553,6 +555,31 @@ public void testTimeOutReaper() throws Exception { txnBatch.close(); connection.close(); } + + @Test + public void testHearbeat() throws Exception { + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); + DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", endPt); + StreamingConnection connection = endPt.newConnection(false, null); + + TransactionBatch txnBatch = connection.fetchTransactionBatch(5, writer); + txnBatch.beginNextTransaction(); + //todo: this should ideally check Transaction heartbeat as well, but heartbeat + //timestamp is not reported yet + //GetOpenTxnsInfoResponse txnresp = msClient.showTxns(); + ShowLocksResponse response = msClient.showLocks(); + Assert.assertEquals("Wrong nubmer of locks: " + response, 1, response.getLocks().size()); + ShowLocksResponseElement lock = response.getLocks().get(0); + long acquiredAt = lock.getAcquiredat(); + long heartbeatAt = lock.getAcquiredat(); + txnBatch.heartbeat(); + response = msClient.showLocks(); + Assert.assertEquals("Wrong number of locks2: " + response, 1, response.getLocks().size()); + lock = response.getLocks().get(0); + Assert.assertEquals("Acquired timestamp didn't match", acquiredAt, lock.getAcquiredat()); + Assert.assertTrue("Expected new heartbeat (" + lock.getLastheartbeat() + + ") > old heartbeat(" + heartbeatAt +")", lock.getLastheartbeat() > heartbeatAt); + } @Test public void testTransactionBatchEmptyAbort() throws Exception { // 1) to partitioned table diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 6218a03..ca485fa 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -1742,6 +1742,10 @@ private void heartbeatTxn(Connection dbConn, long txnid) dbConn.rollback(); throw new NoSuchTxnException("No such txn: " + txnid); } + //update locks for this txn to the same heartbeat + s = "update HIVE_LOCKS set hl_last_heartbeat = " + now + " where hl_txnid = " + txnid; + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); LOG.debug("Going to commit"); dbConn.commit(); } finally { diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 39b44e8..219a54a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -293,17 +293,28 @@ public void rollbackTxn() throws LockException { @Override public void heartbeat() throws LockException { - LOG.debug("Heartbeating lock and transaction " + JavaUtils.txnIdToString(txnId)); - List locks = lockMgr.getLocks(false, false); - if (locks.size() == 0) { - if (!isTxnOpen()) { - // No locks, no txn, we outta here. - return; - } else { - // Create one dummy lock so we can go through the loop below - DbLockManager.DbHiveLock dummyLock = new DbLockManager.DbHiveLock(0L); - locks.add(dummyLock); + List locks; + if(isTxnOpen()) { + // Create one dummy lock so we can go through the loop below, though we only + //really need txnId + DbLockManager.DbHiveLock dummyLock = new DbLockManager.DbHiveLock(0L); + locks = new ArrayList<>(1); + locks.add(dummyLock); + } + else { + locks = lockMgr.getLocks(false, false); + } + if(LOG.isInfoEnabled()) { + StringBuilder sb = new StringBuilder("Sending heartbeat for ") + .append(JavaUtils.txnIdToString(txnId)).append(" and"); + for(HiveLock lock : locks) { + sb.append(" ").append(lock.toString()); } + LOG.info(sb.toString()); + } + if(!isTxnOpen() && locks.isEmpty()) { + // No locks, no txn, we outta here. + return; } for (HiveLock lock : locks) { long lockId = ((DbLockManager.DbHiveLock)lock).lockId; @@ -320,7 +331,8 @@ public void heartbeat() throws LockException { throw new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(txnId)); } catch (TException e) { throw new LockException( - ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e); + ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg() + "(" + JavaUtils.txnIdToString(txnId) + + "," + lock.toString() + ")", e); } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 3ee9346..391f99a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -402,10 +402,10 @@ public String toString() { dir.getName().startsWith(AcidUtils.DELTA_PREFIX)) { boolean sawBase = dir.getName().startsWith(AcidUtils.BASE_PREFIX); FileStatus[] files = fs.listStatus(dir, AcidUtils.bucketFileFilter); - for (int j = 0; j < files.length; j++) { + for(FileStatus f : files) { // For each file, figure out which bucket it is. - Matcher matcher = AcidUtils.BUCKET_DIGIT_PATTERN.matcher(files[j].getPath().getName()); - addFileToMap(matcher, files[j].getPath(), sawBase, splitToBucketMap); + Matcher matcher = AcidUtils.BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName()); + addFileToMap(matcher, f.getPath(), sawBase, splitToBucketMap); } } else { // Legacy file, see if it's a bucket file @@ -434,7 +434,7 @@ private void addFileToMap(Matcher matcher, Path file, boolean sawBase, Map splitToBucketMap) { if (!matcher.find()) { LOG.warn("Found a non-bucket file that we thought matched the bucket pattern! " + - file.toString()); + file.toString() + " Matcher=" + matcher.toString()); } int bucketNum = Integer.valueOf(matcher.group()); BucketTracker bt = splitToBucketMap.get(bucketNum);