diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index 1a7cfae..81f6155 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -562,10 +562,11 @@ private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveCo private final RecordWriter recordWriter; private final List txnIds; - private int currentTxnIndex = -1; + //volatile because heartbeat() may be in a "different" thread; updates of this are "piggybacking" + private volatile int currentTxnIndex = -1; private final String partNameForLock; - - private TxnState state; + //volatile because heartbeat() may be in a "different" thread + private volatile TxnState state; private LockRequest lockRequest = null; /** * once any operation on this batch encounters a system exception @@ -945,7 +946,14 @@ public void heartbeat() throws StreamingException, HeartBeatFailure { if(isClosed) { return; } - Long first = txnIds.get(currentTxnIndex); + if(state != TxnState.OPEN && currentTxnIndex >= txnIds.size() - 1) { + //here means last txn in the batch is resolved but the close() hasn't been called yet so + //there is nothing to heartbeat + return; + } + //if here after commit()/abort() but before next beginNextTransaction(), currentTxnIndex still + //points at the last txn which we don't want to heartbeat + Long first = txnIds.get(state == TxnState.OPEN ? currentTxnIndex : currentTxnIndex + 1); Long last = txnIds.get(txnIds.size()-1); try { HeartbeatTxnRangeResponse resp = heartbeaterMSClient.heartbeatTxnRange(first, last); diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 197ca7b..bf29993 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -669,6 +669,25 @@ public void testHeartbeat() throws Exception { Assert.assertEquals("Acquired timestamp didn't match", acquiredAt, lock.getAcquiredat()); Assert.assertTrue("Expected new heartbeat (" + lock.getLastheartbeat() + ") == old heartbeat(" + heartbeatAt +")", lock.getLastheartbeat() == heartbeatAt); + txnBatch.close(); + int txnBatchSize = 200; + txnBatch = connection.fetchTransactionBatch(txnBatchSize, writer); + for(int i = 0; i < txnBatchSize; i++) { + txnBatch.beginNextTransaction(); + if(i % 47 == 0) { + txnBatch.heartbeat(); + } + if(i % 10 == 0) { + txnBatch.abort(); + } + else { + txnBatch.commit(); + } + if(i % 37 == 0) { + txnBatch.heartbeat(); + } + } + } @Test public void testTransactionBatchEmptyAbort() throws Exception { diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 805db34..c0518ad 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -1395,6 +1395,7 @@ public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst throws MetaException { try { Connection dbConn = null; + Statement stmt = null; HeartbeatTxnRangeResponse rsp = new HeartbeatTxnRangeResponse(); Set nosuch = new HashSet(); Set aborted = new HashSet(); @@ -1408,11 +1409,32 @@ public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst * would care about (which would have required SERIALIZABLE) */ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + /*do fast path first (in 1 statement) if doesn't work, rollback and do the long version*/ + stmt = dbConn.createStatement(); + List queries = new ArrayList<>(); + int numTxnsToHeartbeat = (int) (rqst.getMax() - rqst.getMin() + 1); + List txnIds = new ArrayList<>(numTxnsToHeartbeat); + for (long txn = rqst.getMin(); txn <= rqst.getMax(); txn++) { + txnIds.add(txn); + } + TxnUtils.buildQueryWithINClause(conf, queries, + new StringBuilder("update TXNS set txn_last_heartbeat = " + getDbTime(dbConn) + + " where txn_state = " + quoteChar(TXN_OPEN) + " and "), + new StringBuilder(""), txnIds, "txn_id", true, false); + int updateCnt = 0; + for (String query : queries) { + LOG.debug("Going to execute update <" + query + ">"); + updateCnt += stmt.executeUpdate(query); + } + if (updateCnt == numTxnsToHeartbeat) { + //fast pass worked, i.e. all txns we were asked to heartbeat were Open as expected + dbConn.commit(); + return rsp; + } + //if here, do the slow path so that we can return info txns which were not in expected state + dbConn.rollback(); for (long txn = rqst.getMin(); txn <= rqst.getMax(); txn++) { try { - //todo: do all updates in 1 SQL statement and check update count - //if update count is less than was requested, go into more expensive checks - //for each txn heartbeatTxn(dbConn, txn); } catch (NoSuchTxnException e) { nosuch.add(txn); @@ -1428,7 +1450,7 @@ public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { - closeDbConn(dbConn); + close(null, stmt, dbConn); } } catch (RetryException e) { return heartbeatTxnRange(rqst);