diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index 28c98bd75f..73f2907e55 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -19,9 +19,9 @@ package org.apache.hive.hcatalog.streaming; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; -import org.apache.hadoop.hive.ql.io.AcidUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.cli.CliSessionState; @@ -577,6 +577,17 @@ private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveCo */ private volatile boolean isClosed = false; private final String agentInfo; + /** + * Tracks the state of each transaction + * {@code null} - means Open + * {@code true} - means Committed + * {@code false} - means Aborted + */ + private final Boolean[] txnStatus; + /** + * ID of the last txn used by {@link #beginNextTransactionImpl()} + */ + private long lastTxnUsed; /** * Represents a batch of transactions acquired from MetaStore @@ -604,7 +615,7 @@ private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEn this.heartbeaterMSClient = heartbeaterMSClient; this.recordWriter = recordWriter; this.agentInfo = agentInfo; - + txnStatus = new Boolean[numTxns]; txnIds = openTxnImpl(msClient, user, numTxns, ugi); this.state = TxnState.INACTIVE; @@ -639,8 +650,13 @@ public String toString() { if (txnIds==null || txnIds.isEmpty()) { return "{}"; } + StringBuilder sb = new StringBuilder(" TxnStatus["); + for(Boolean txnStat : txnStatus) { + sb.append(txnStat == null ? "O" : (txnStat ? "C" : "A")); + } + sb.append("] LastUsed ").append(JavaUtils.txnIdToString(lastTxnUsed)); return "TxnIds=[" + txnIds.get(0) + "..." + txnIds.get(txnIds.size()-1) - + "] on endPoint = " + endPt; + + "] on endPoint = " + endPt + "; " + sb; } /** @@ -678,6 +694,7 @@ private void beginNextTransactionImpl() throws TransactionError { " current batch for end point : " + endPt); ++currentTxnIndex; state = TxnState.OPEN; + lastTxnUsed = getCurrentTxnId(); lockRequest = createLockRequest(endPt, partNameForLock, username, getCurrentTxnId(), agentInfo); try { LockResponse res = msClient.lock(lockRequest); @@ -862,6 +879,7 @@ private void commitImpl() throws TransactionError, StreamingException { recordWriter.flush(); msClient.commitTxn(txnIds.get(currentTxnIndex)); state = TxnState.COMMITTED; + txnStatus[currentTxnIndex] = true; } catch (NoSuchTxnException e) { throw new TransactionError("Invalid transaction id : " + getCurrentTxnId(), e); @@ -924,12 +942,14 @@ private void abortImpl(boolean abortAllRemaining) throws TransactionError, Strea for(currentTxnIndex = minOpenTxnIndex; currentTxnIndex < txnIds.size(); currentTxnIndex++) { msClient.rollbackTxn(txnIds.get(currentTxnIndex)); + txnStatus[currentTxnIndex] = false; } currentTxnIndex--;//since the loop left it == txnId.size() } else { if (getCurrentTxnId() > 0) { msClient.rollbackTxn(getCurrentTxnId()); + txnStatus[currentTxnIndex] = false; } } state = TxnState.ABORTED; diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 1e73a4b9ba..921bbd3707 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -44,6 +44,7 @@ import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.Validator; @@ -1974,7 +1975,12 @@ public void testErrorHandling() throws Exception { txnBatch.write("name4,2,more Streaming unlimited".getBytes()); txnBatch.write("name5,2,even more Streaming unlimited".getBytes()); txnBatch.commit(); - + + //test toString() + String s = txnBatch.toString(); + Assert.assertTrue("Actual: " + s, s.contains("LastUsed " + JavaUtils.txnIdToString(txnBatch.getCurrentTxnId()))); + Assert.assertTrue("Actual: " + s, s.contains("TxnStatus[CO]")); + expectedEx = null; txnBatch.beginNextTransaction(); writer.enableErrors(); @@ -1998,6 +2004,11 @@ public void testErrorHandling() throws Exception { Assert.assertTrue("commit() should have failed", expectedEx != null && expectedEx.getMessage().contains("has been closed()")); + //test toString() + s = txnBatch.toString(); + Assert.assertTrue("Actual: " + s, s.contains("LastUsed " + JavaUtils.txnIdToString(txnBatch.getCurrentTxnId()))); + Assert.assertTrue("Actual: " + s, s.contains("TxnStatus[CA]")); + r = msClient.showTxns(); Assert.assertEquals("HWM didn't match", 19, r.getTxn_high_water_mark()); ti = r.getOpen_txns();