diff --git common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java index c022577bbb..eaa0b34370 100644 --- common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java +++ common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java @@ -39,14 +39,18 @@ public ValidCompactorTxnList() { super(); } + public ValidCompactorTxnList(long[] abortedTxnList, BitSet abortedBits, long highWatermark) { + this(abortedTxnList, abortedBits, highWatermark, Long.MAX_VALUE); + } /** * @param abortedTxnList list of all aborted transactions * @param abortedBits bitset marking whether the corresponding transaction is aborted * @param highWatermark highest committed transaction to be considered for compaction, * equivalently (lowest_open_txn - 1). */ - public ValidCompactorTxnList(long[] abortedTxnList, BitSet abortedBits, long highWatermark) { - super(abortedTxnList, abortedBits, highWatermark); // abortedBits should be all true as everything in exceptions are aborted txns + public ValidCompactorTxnList(long[] abortedTxnList, BitSet abortedBits, long highWatermark, long minOpenTxnId) { + // abortedBits should be all true as everything in exceptions are aborted txns + super(abortedTxnList, abortedBits, highWatermark, minOpenTxnId); if(this.exceptions.length <= 0) { return; } diff --git common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java index 4e57772eb0..002afd6ab5 100644 --- common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java +++ common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java @@ -193,9 +193,10 @@ public long getHighWatermark() { public long[] getInvalidTransactions() { return exceptions; } - @VisibleForTesting - public long getMinOpenTxn() { - return minOpenTxn; + + @Override + public Long getMinOpenTxn() { + return minOpenTxn == Long.MAX_VALUE ? null : minOpenTxn; } @Override diff --git common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java index d4ac02cce5..108e5ca85c 100644 --- common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java +++ common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java @@ -105,4 +105,8 @@ */ public RangeResponse isTxnRangeAborted(long minTxnId, long maxTxnId); + /** + * Returns smallest Open transaction in this set, {@code null} if there is none. + */ + Long getMinOpenTxn(); } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index e6c62d3830..30b155f3b3 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -81,6 +81,7 @@ public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long * @return a valid txn list. */ public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txns) { + //highWater is the last txn id that has been allocated long highWater = txns.getTxn_high_water_mark(); long minOpenTxn = Long.MAX_VALUE; long[] exceptions = new long[txns.getOpen_txnsSize()]; @@ -100,7 +101,12 @@ public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txn highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1; BitSet bitSet = new BitSet(exceptions.length); bitSet.set(0, exceptions.length); // for ValidCompactorTxnList, everything in exceptions are aborted - return new ValidCompactorTxnList(exceptions, bitSet, highWater); + if(minOpenTxn == Long.MAX_VALUE) { + return new ValidCompactorTxnList(exceptions, bitSet, highWater); + } + else { + return new ValidCompactorTxnList(exceptions, bitSet, highWater, minOpenTxn); + } } /** diff --git metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java index 91d621536e..eb88e32d66 100644 --- metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java +++ metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java @@ -103,11 +103,11 @@ public void writeToString() { public void readFromString() { ValidCompactorTxnList txns = new ValidCompactorTxnList("37:" + Long.MAX_VALUE + "::7,9,10"); Assert.assertEquals(37L, txns.getHighWatermark()); - Assert.assertEquals(Long.MAX_VALUE, txns.getMinOpenTxn()); + Assert.assertNull(txns.getMinOpenTxn()); Assert.assertArrayEquals(new long[]{7L, 9L, 10L}, txns.getInvalidTransactions()); txns = new ValidCompactorTxnList("21:" + Long.MAX_VALUE + ":"); Assert.assertEquals(21L, txns.getHighWatermark()); - Assert.assertEquals(Long.MAX_VALUE, txns.getMinOpenTxn()); + Assert.assertNull(txns.getMinOpenTxn()); Assert.assertEquals(0, txns.getInvalidTransactions().length); } @@ -115,7 +115,7 @@ public void readFromString() { public void testAbortedTxn() throws Exception { ValidCompactorTxnList txnList = new ValidCompactorTxnList("5:4::1,2,3"); Assert.assertEquals(5L, txnList.getHighWatermark()); - Assert.assertEquals(4, txnList.getMinOpenTxn()); + Assert.assertEquals(4, txnList.getMinOpenTxn().longValue()); Assert.assertArrayEquals(new long[]{1L, 2L, 3L}, txnList.getInvalidTransactions()); } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 04ef7fc86a..7f5c6d3fbe 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -265,10 +265,15 @@ void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, dirsToSearch.add(baseDir); } } - if (parsedDeltas.size() == 0 && dir.getOriginalFiles().size() == 0) { // Skip compaction if there's no delta files AND there's no original files - LOG.error("No delta files or original files found to compact in " + sd.getLocation() + " for compactionId=" + ci.id); + String minOpenInfo = "."; + if(txns.getMinOpenTxn() != null) { + minOpenInfo = " with min Open " + JavaUtils.txnIdToString(txns.getMinOpenTxn()) + + ". Compaction cannot compact above this txnid"; + } + LOG.error("No delta files or original files found to compact in " + sd.getLocation() + + " for compactionId=" + ci.id + minOpenInfo); return; }