diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 71e51315d3..75a8f8b4e7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -1046,6 +1046,12 @@ public static Directory getAcidState(Path directory, Ref useFileIds, boolean ignoreEmptyFiles, Map tblproperties) throws IOException { + String s = conf.get(ValidTxnList.VALID_TXNS_KEY); + if(s == null || s.isEmpty()) { + throw new RuntimeException("No " + ValidTxnList.VALID_TXNS_KEY + "found for " + directory); + } + ValidTxnList validTxnList = new ValidReadTxnList(); + validTxnList.readFromString(s); FileSystem fs = directory.getFileSystem(conf); // The following 'deltas' includes all kinds of delta files including insert & delete deltas. final List deltas = new ArrayList(); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index d9f186cd03..be7310a742 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -18,11 +18,13 @@ package org.apache.hadoop.hive.ql.txn.compactor; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.mapred.JobConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -144,7 +146,20 @@ public void run() { // Compaction doesn't work under a transaction and hence pass 0 for current txn Id // The response will have one entry per table and hence we get only one OpenWriteIds String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName()); + + ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList(txnHandler.getOpenTxns(), 0); GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName)); + //with this ValidWriteIdList is capped at whatever HWM validTxnList has + rqst.setValidTxnList(validTxnList.writeToString()); + //and now getAcidState() will find it. Compactor must see all compacted files (unless aborted) + //i.e. we don't want to cap ValidTxnList at minOpen txn + //todo: doc this better.... + String oldValue = conf.get(ValidTxnList.VALID_TXNS_KEY); + if(!(oldValue == null || oldValue.isEmpty())) { + throw new RuntimeException(ValidTxnList.VALID_TXNS_KEY + "=" + oldValue + " for " + ci); + } + conf.set(ValidTxnList.VALID_TXNS_KEY, rqst.getValidTxnList()); + final ValidWriteIdList tblValidWriteIds = TxnUtils.createValidCompactWriteIdList(txnHandler.getValidWriteIds(rqst).getTblValidWriteIds().get(0)); LOG.debug("ValidCompactWriteIdList: " + tblValidWriteIds.writeToString()); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 057fd7704c..42eb29d587 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -1201,7 +1201,7 @@ public static void runWorker(HiveConf hiveConf) throws MetaException { AtomicBoolean stop = new AtomicBoolean(true); Worker t = new Worker(); t.setThreadId((int) t.getId()); - t.setConf(hiveConf); + t.setConf(new HiveConf(hiveConf)); AtomicBoolean looped = new AtomicBoolean(); t.init(stop, looped); t.run();