diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index 28c98bd75f..db3109e069 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -19,9 +19,9 @@ package org.apache.hive.hcatalog.streaming; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; -import org.apache.hadoop.hive.ql.io.AcidUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.cli.CliSessionState; @@ -577,6 +577,14 @@ private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveCo */ private volatile boolean isClosed = false; private final String agentInfo; + /** + * Tracks the state of each transaction + */ + private final TxnState[] txnStatus; + /** + * ID of the last txn used by {@link #beginNextTransactionImpl()} + */ + private long lastTxnUsed; /** * Represents a batch of transactions acquired from MetaStore @@ -606,6 +614,10 @@ private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEn this.agentInfo = agentInfo; txnIds = openTxnImpl(msClient, user, numTxns, ugi); + txnStatus = new TxnState[numTxns]; + for(int i = 0; i < txnStatus.length; i++) { + txnStatus[i] = TxnState.OPEN;//Open matches Metastore state + } this.state = TxnState.INACTIVE; recordWriter.newBatch(txnIds.get(0), txnIds.get(txnIds.size()-1)); @@ -639,8 +651,14 @@ public String toString() { if (txnIds==null || txnIds.isEmpty()) { return "{}"; } + StringBuilder sb = new StringBuilder(" TxnStatus["); + for(TxnState state : txnStatus) { + //'state' should not be null - future proofing + sb.append(state == null ? "N" : state); + } + sb.append("] LastUsed ").append(JavaUtils.txnIdToString(lastTxnUsed)); return "TxnIds=[" + txnIds.get(0) + "..." + txnIds.get(txnIds.size()-1) - + "] on endPoint = " + endPt; + + "] on endPoint = " + endPt + "; " + sb; } /** @@ -678,6 +696,7 @@ private void beginNextTransactionImpl() throws TransactionError { " current batch for end point : " + endPt); ++currentTxnIndex; state = TxnState.OPEN; + lastTxnUsed = getCurrentTxnId(); lockRequest = createLockRequest(endPt, partNameForLock, username, getCurrentTxnId(), agentInfo); try { LockResponse res = msClient.lock(lockRequest); @@ -862,6 +881,7 @@ private void commitImpl() throws TransactionError, StreamingException { recordWriter.flush(); msClient.commitTxn(txnIds.get(currentTxnIndex)); state = TxnState.COMMITTED; + txnStatus[currentTxnIndex] = TxnState.COMMITTED; } catch (NoSuchTxnException e) { throw new TransactionError("Invalid transaction id : " + getCurrentTxnId(), e); @@ -924,12 +944,14 @@ private void abortImpl(boolean abortAllRemaining) throws TransactionError, Strea for(currentTxnIndex = minOpenTxnIndex; currentTxnIndex < txnIds.size(); currentTxnIndex++) { msClient.rollbackTxn(txnIds.get(currentTxnIndex)); + txnStatus[currentTxnIndex] = TxnState.ABORTED; } currentTxnIndex--;//since the loop left it == txnId.size() } else { if (getCurrentTxnId() > 0) { msClient.rollbackTxn(getCurrentTxnId()); + txnStatus[currentTxnIndex] = TxnState.ABORTED; } } state = TxnState.ABORTED; diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java index 3bcc510e38..372ad2dc37 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java @@ -33,7 +33,17 @@ * */ public interface TransactionBatch { - public enum TxnState {INACTIVE, OPEN, COMMITTED, ABORTED } + enum TxnState { + INACTIVE("I"), OPEN("O"), COMMITTED("C"), ABORTED("A"); + + private final String code; + TxnState(String code) { + this.code = code; + }; + public String toString() { + return code; + } + } /** * Activate the next available transaction in the current transaction batch diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 1e73a4b9ba..921bbd3707 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -44,6 +44,7 @@ import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.Validator; @@ -1974,7 +1975,12 @@ public void testErrorHandling() throws Exception { txnBatch.write("name4,2,more Streaming unlimited".getBytes()); txnBatch.write("name5,2,even more Streaming unlimited".getBytes()); txnBatch.commit(); - + + //test toString() + String s = txnBatch.toString(); + Assert.assertTrue("Actual: " + s, s.contains("LastUsed " + JavaUtils.txnIdToString(txnBatch.getCurrentTxnId()))); + Assert.assertTrue("Actual: " + s, s.contains("TxnStatus[CO]")); + expectedEx = null; txnBatch.beginNextTransaction(); writer.enableErrors(); @@ -1998,6 +2004,11 @@ public void testErrorHandling() throws Exception { Assert.assertTrue("commit() should have failed", expectedEx != null && expectedEx.getMessage().contains("has been closed()")); + //test toString() + s = txnBatch.toString(); + Assert.assertTrue("Actual: " + s, s.contains("LastUsed " + JavaUtils.txnIdToString(txnBatch.getCurrentTxnId()))); + Assert.assertTrue("Actual: " + s, s.contains("TxnStatus[CA]")); + r = msClient.showTxns(); Assert.assertEquals("HWM didn't match", 19, r.getTxn_high_water_mark()); ti = r.getOpen_txns();