diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 36f38f6..a7161b4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -566,6 +566,12 @@ else if(next.maxTransaction == current && lastStmtId >= 0) { //of course, if maxTransaction has already been minor compacted, all per statement deltas are obsolete deltas.add(next); } + else if (next.maxTransaction < current) { + if (txnList.isTxnRangeValid(next.minTransaction, next.maxTransaction) != + ValidTxnList.RangeResponse.ALL) { + throw new IOException("Transaction has to be aborted since its transaction id is not valid"); + } + } else { obsolete.add(next.path); } @@ -606,13 +612,17 @@ private static void getChildState(FileStatus child, HdfsFileStatusWithId childWi String fn = p.getName(); if (fn.startsWith(BASE_PREFIX) && child.isDir()) { long txn = parseBase(p); - if (bestBase.status == null) { - bestBase.status = child; - bestBase.txn = txn; - } else if (bestBase.txn < txn) { - obsolete.add(bestBase.status); - bestBase.status = child; - bestBase.txn = txn; + if (txnList.isTxnValid(txn)) { + if (bestBase.status == null) { + bestBase.status = child; + bestBase.txn = txn; + } else if (bestBase.txn < txn) { + obsolete.add(bestBase.status); + bestBase.status = child; + bestBase.txn = txn; + } else { + obsolete.add(child); + } } else { obsolete.add(child); } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java index f5eb8a1..c2af0f8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java @@ -49,6 +49,7 @@ public String getServiceDescription() { private static final class OpenTxnsCounter implements Runnable { private final TxnStore txnHandler; private final AtomicInteger isAliveCounter; + private static int logCounter = 0; private OpenTxnsCounter(HiveConf hiveConf, AtomicInteger isAliveCounter) { txnHandler = TxnUtils.getTxnStore(hiveConf); this.isAliveCounter = isAliveCounter; @@ -59,7 +60,10 @@ public void run() { long startTime = System.currentTimeMillis(); txnHandler.countOpenTxns(); int count = isAliveCounter.incrementAndGet(); - LOG.info("OpenTxnsCounter ran for " + (System.currentTimeMillis() - startTime)/1000 + "seconds. isAliveCounter=" + count); + logCounter++; + if (logCounter % 100 == 0) { // To avoid excessive logging, only print log every 100 runs + LOG.info("OpenTxnsCounter ran for " + (System.currentTimeMillis() - startTime) / 1000 + "seconds. isAliveCounter=" + count); + } } catch(Throwable t) { LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index 5745dee..b94b579 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -221,7 +221,7 @@ public void testBestBase() throws Exception { new MockFile("mock:/tbl/part1/base_200/bucket_0", 500, new byte[0])); Path part = new MockPath(fs, "/tbl/part1"); AcidUtils.Directory dir = - AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:")); + AcidUtils.getAcidState(part, conf, new ValidReadTxnList("250:")); assertEquals("mock:/tbl/part1/base_200", dir.getBaseDirectory().toString()); List obsoletes = dir.getObsolete(); assertEquals(4, obsoletes.size()); @@ -232,8 +232,26 @@ public void testBestBase() throws Exception { assertEquals(0, dir.getOriginalFiles().size()); assertEquals(0, dir.getCurrentDirectories().size()); // we should always get the latest base - dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("10:")); + dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("300:")); assertEquals("mock:/tbl/part1/base_200", dir.getBaseDirectory().toString()); + // If a transaction id is higher than HWM, it's not valid and cannot be used as best base + dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:")); + assertEquals("mock:/tbl/part1/base_100", dir.getBaseDirectory().toString()); + obsoletes = dir.getObsolete(); + assertEquals(4, obsoletes.size()); + assertEquals("mock:/tbl/part1/base_10", obsoletes.get(0).getPath().toString()); + assertEquals("mock:/tbl/part1/base_200", obsoletes.get(1).getPath().toString()); + assertEquals("mock:/tbl/part1/base_25", obsoletes.get(2).getPath().toString()); + assertEquals("mock:/tbl/part1/base_5", obsoletes.get(3).getPath().toString()); + // If a transaction id is one that we must exclude, we cannot use it as the best base + dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("250:100:200")); + assertEquals("mock:/tbl/part1/base_25", dir.getBaseDirectory().toString()); + obsoletes = dir.getObsolete(); + assertEquals(4, obsoletes.size()); + assertEquals("mock:/tbl/part1/base_100", obsoletes.get(0).getPath().toString()); + assertEquals("mock:/tbl/part1/base_200", obsoletes.get(1).getPath().toString()); + assertEquals("mock:/tbl/part1/base_10", obsoletes.get(2).getPath().toString()); + assertEquals("mock:/tbl/part1/base_5", obsoletes.get(3).getPath().toString()); } @Test