diff --git metastore/if/hive_metastore.thrift metastore/if/hive_metastore.thrift index a0c6e59..0ca426a 100755 --- metastore/if/hive_metastore.thrift +++ metastore/if/hive_metastore.thrift @@ -127,6 +127,7 @@ enum LockType { enum CompactionType { MINOR = 1, MAJOR = 2, + DELETE = 3, } enum GrantRevokeType { diff --git metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionType.java metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionType.java index 7450b27..dfe11ec 100644 --- metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionType.java +++ metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionType.java @@ -13,7 +13,8 @@ public enum CompactionType implements org.apache.thrift.TEnum { MINOR(1), - MAJOR(2); + MAJOR(2), + DELETE(3); // todo to be generated private final int value; @@ -38,6 +39,8 @@ public static CompactionType findByValue(int value) { return MINOR; case 2: return MAJOR; + case 3: + return DELETE; default: return null; } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 60839fa..d785d87 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -298,6 +298,7 @@ public void markCompacted(CompactionInfo info) throws MetaException { switch (rs.getString(5).charAt(0)) { case MAJOR_TYPE: info.type = CompactionType.MAJOR; break; case MINOR_TYPE: info.type = CompactionType.MINOR; break; + case DELETE_TYPE: info.type = CompactionType.DELETE; break; default: throw new MetaException("Unexpected compaction type " + rs.getString(5)); } info.runAs = rs.getString(6); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index d378d06..9dcc11b 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -127,6 +127,7 @@ // Compactor types static final protected char MAJOR_TYPE = 'a'; static final protected char MINOR_TYPE = 'i'; + static final protected char DELETE_TYPE = 'd'; // Transaction states static final protected char TXN_ABORTED = 'a'; @@ -1547,6 +1548,10 @@ public CompactionResponse compact(CompactionRequest rqst) throws MetaException { buf.append(MINOR_TYPE); break; + case DELETE: + buf.append(DELETE_TYPE); + break; + default: LOG.debug("Going to rollback"); dbConn.rollback(); @@ -3372,6 +3377,8 @@ static CompactionType dbCompactionType2ThriftType(char dbValue) { return CompactionType.MAJOR; case MINOR_TYPE: return CompactionType.MINOR; + case DELETE_TYPE: + return CompactionType.DELETE; default: LOG.warn("Unexpected compaction type " + dbValue); return null; @@ -3383,6 +3390,8 @@ static Character thriftCompactionType2DbType(CompactionType ct) { return MAJOR_TYPE; case MINOR: return MINOR_TYPE; + case DELETE: + return DELETE_TYPE; default: LOG.warn("Unexpected compaction type " + ct); return null; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 902caa3..fabae09 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -534,6 +534,13 @@ public String toString() { * more up to date ones. Not {@code null}. */ List getObsolete(); + + /** + * This is only used by MM tables. + * Get the list of directories for aborted transactions. + * @return the list of aborted directories + */ + List getAbortedDirectories(); } public static class ParsedDelta implements Comparable { @@ -795,21 +802,22 @@ public static Directory getAcidState(Path directory, boolean useFileIds, boolean ignoreEmptyFiles ) throws IOException { - return getAcidState(directory, conf, txnList, Ref.from(useFileIds), ignoreEmptyFiles); + return getAcidState(directory, conf, txnList, Ref.from(useFileIds), ignoreEmptyFiles, null); } public static Directory getAcidState(Path directory, Configuration conf, ValidTxnList txnList, Ref useFileIds, - boolean ignoreEmptyFiles - ) throws IOException { + boolean ignoreEmptyFiles, + Map tblproperties) throws IOException { FileSystem fs = directory.getFileSystem(conf); // The following 'deltas' includes all kinds of delta files including insert & delete deltas. final List deltas = new ArrayList(); List working = new ArrayList(); List originalDirectories = new ArrayList(); final List obsolete = new ArrayList(); + final List abortedDirectories = new ArrayList<>(); List childrenWithId = null; Boolean val = useFileIds.value; if (val == null || val) { @@ -830,13 +838,13 @@ public static Directory getAcidState(Path directory, if (childrenWithId != null) { for (HdfsFileStatusWithId child : childrenWithId) { getChildState(child.getFileStatus(), child, txnList, working, - originalDirectories, original, obsolete, bestBase, ignoreEmptyFiles); + originalDirectories, original, obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties); } } else { List children = HdfsUtils.listLocatedStatus(fs, directory, hiddenFileFilter); for (FileStatus child : children) { getChildState( - child, null, txnList, working, originalDirectories, original, obsolete, bestBase, ignoreEmptyFiles); + child, null, txnList, working, originalDirectories, original, obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties); } } @@ -913,6 +921,7 @@ else if (prev != null && next.maxTransaction == prev.maxTransaction * {@link txnList}. Note that 'original' files are logically a base_Long.MIN_VALUE and thus * cannot have any data for an open txn. We could check {@link deltas} has files to cover * [1,n] w/o gaps but this would almost never happen...*/ + // MM is interested (only) in this. Note that invalid transactions include both open and aborted txns long[] exceptions = txnList.getInvalidTransactions(); String minOpenTxn = exceptions != null && exceptions.length > 0 ? Long.toString(exceptions[0]) : "x"; @@ -946,6 +955,11 @@ public Path getBaseDirectory() { public List getObsolete() { return obsolete; } + + @Override + public List getAbortedDirectories() { + return abortedDirectories; + } }; } /** @@ -964,9 +978,9 @@ private static boolean isValidBase(long baseTxnId, ValidTxnList txnList) { return txnList.isValidBase(baseTxnId); } private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId, - ValidTxnList txnList, List working, List originalDirectories, - List original, List obsolete, TxnBase bestBase, - boolean ignoreEmptyFiles) throws IOException { + ValidTxnList txnList, List working, List originalDirectories, + List original, List obsolete, TxnBase bestBase, + boolean ignoreEmptyFiles, List aborted, Map tblproperties) throws IOException { Path p = child.getPath(); String fn = p.getName(); if (fn.startsWith(BASE_PREFIX) && child.isDir()) { @@ -995,6 +1009,10 @@ private static void getChildState(FileStatus child, HdfsFileStatusWithId childWi String deltaPrefix = (fn.startsWith(DELTA_PREFIX)) ? DELTA_PREFIX : DELETE_DELTA_PREFIX; ParsedDelta delta = parseDelta(child, deltaPrefix); + if (MetaStoreUtils.isInsertOnlyTable(tblproperties) && + txnList.isTxnAborted(delta.minTransaction)) { // for MM table, minTxnId & maxTxnId is same + aborted.add(child); + } if (txnList.isTxnRangeValid(delta.minTransaction, delta.maxTransaction) != ValidTxnList.RangeResponse.NONE) { diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 23b1b7f..e361395 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -17,7 +17,9 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileStatus; @@ -219,7 +221,7 @@ public void run() { private void clean(CompactionInfo ci) throws MetaException { LOG.info("Starting cleaning for " + ci.getFullPartitionName()); try { - Table t = resolveTable(ci); + final Table t = resolveTable(ci); if (t == null) { // The table was dropped before we got around to cleaning it. LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped"); @@ -256,11 +258,16 @@ private void clean(CompactionInfo ci) throws MetaException { * Now removeFiles() (more specifically AcidUtils.getAcidState()) will declare D3 to be obsolete * unless ValidTxnList is "capped" at highestTxnId. */ - final ValidTxnList txnList = ci.highestTxnId > 0 ? - new ValidReadTxnList(new long[0], ci.highestTxnId) : new ValidReadTxnList(); + final ValidTxnList txnList; + if (MetaStoreUtils.isInsertOnlyTable(t.getParameters())) { + txnList = TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo()); + } else { + txnList = ci.highestTxnId > 0 ? + new ValidReadTxnList(new long[0], ci.highestTxnId) : new ValidReadTxnList(); + } if (runJobAsSelf(ci.runAs)) { - removeFiles(location, txnList); + removeFiles(location, txnList, t); } else { LOG.info("Cleaning as user " + ci.runAs + " for " + ci.getFullPartitionName()); UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.runAs, @@ -268,7 +275,7 @@ private void clean(CompactionInfo ci) throws MetaException { ugi.doAs(new PrivilegedExceptionAction() { @Override public Object run() throws Exception { - removeFiles(location, txnList); + removeFiles(location, txnList, t); return null; } }); @@ -287,9 +294,18 @@ public Object run() throws Exception { } } - private void removeFiles(String location, ValidTxnList txnList) throws IOException { - AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(location), conf, txnList); - List obsoleteDirs = dir.getObsolete(); + private void removeFiles(String location, ValidTxnList txnList, Table t) throws IOException { + AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(location), conf, txnList, null, false, t.getParameters()); + List obsoleteDirs; + + // todo for mm, we don't want to accidentally delete earlier delta dirs (lower than highestTxnId) + // because they are still valid. We only want to delete delta dirs for "aborted" txns. + if (MetaStoreUtils.isInsertOnlyTable(t.getParameters())) { + // for mm, obsoleteDirs has different meaning + obsoleteDirs = dir.getAbortedDirectories(); + } else { + obsoleteDirs = dir.getObsolete(); + } List filesToDelete = new ArrayList(obsoleteDirs.size()); for (FileStatus stat : obsoleteDirs) { filesToDelete.add(stat.getPath()); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 05b6fc4..11777547 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.common.ValidCompactorTxnList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; @@ -204,6 +205,13 @@ void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) { throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true"); } + + // We skip compaction for MM tables as it's not needed. We'll only wait for Cleaner to remove the obsolete dirs (aborted) + if (MetaStoreUtils.isInsertOnlyTable(t.getParameters())) { + LOG.debug("No compaction is necessary for MM table " + t.getDbName() + "." + t.getTableName()); + return; + } + JobConf job = createBaseJobConf(conf, jobName, t, sd, txns, ci); // Figure out and encode what files we need to read. We do this here (rather than in diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index af4a1da..2512e9d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionResponse; import org.apache.hadoop.hive.metastore.api.CompactionType; @@ -251,6 +252,11 @@ public CompactionType run() throws Exception { private CompactionType determineCompactionType(CompactionInfo ci, ValidTxnList txns, StorageDescriptor sd, Map tblproperties) throws IOException, InterruptedException { + + if (MetaStoreUtils.isInsertOnlyTable(tblproperties)) { + return CompactionType.DELETE; + } + boolean noBase = false; Path location = new Path(sd.getLocation()); FileSystem fs = location.getFileSystem(conf);