diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index 6e0070b..b2ced6b 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -99,7 +99,7 @@ public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txn } highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1; BitSet bitSet = new BitSet(exceptions.length); - bitSet.set(0, bitSet.length()); // for ValidCompactorTxnList, everything in exceptions are aborted + bitSet.set(0, exceptions.length); // for ValidCompactorTxnList, everything in exceptions are aborted return new ValidCompactorTxnList(exceptions, bitSet, highWater); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 733595c..f67831e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -1860,7 +1860,7 @@ private void checkArchiveProperty(int partSpecLevel, private int compact(Hive db, AlterTableSimpleDesc desc) throws HiveException { Table tbl = db.getTable(desc.getTableName()); - if (!AcidUtils.isFullAcidTable(tbl)) { + if (!AcidUtils.isFullAcidTable(tbl) && !MetaStoreUtils.isInsertOnlyTable(tbl.getParameters())) { throw new HiveException(ErrorMsg.NONACID_COMPACTION_NOT_SUPPORTED, tbl.getDbName(), tbl.getTableName()); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 902caa3..e723e2f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -534,6 +534,12 @@ public String toString() { * more up to date ones. Not {@code null}. */ List getObsolete(); + + /** + * Get the list of directories that has nothing but aborted transactions. + * @return the list of aborted directories + */ + List getAbortedDirectories(); } public static class ParsedDelta implements Comparable { @@ -795,21 +801,22 @@ public static Directory getAcidState(Path directory, boolean useFileIds, boolean ignoreEmptyFiles ) throws IOException { - return getAcidState(directory, conf, txnList, Ref.from(useFileIds), ignoreEmptyFiles); + return getAcidState(directory, conf, txnList, Ref.from(useFileIds), ignoreEmptyFiles, null); } public static Directory getAcidState(Path directory, Configuration conf, ValidTxnList txnList, Ref useFileIds, - boolean ignoreEmptyFiles - ) throws IOException { + boolean ignoreEmptyFiles, + Map tblproperties) throws IOException { FileSystem fs = directory.getFileSystem(conf); // The following 'deltas' includes all kinds of delta files including insert & delete deltas. final List deltas = new ArrayList(); List working = new ArrayList(); List originalDirectories = new ArrayList(); final List obsolete = new ArrayList(); + final List abortedDirectories = new ArrayList<>(); List childrenWithId = null; Boolean val = useFileIds.value; if (val == null || val) { @@ -829,14 +836,14 @@ public static Directory getAcidState(Path directory, final List original = new ArrayList<>(); if (childrenWithId != null) { for (HdfsFileStatusWithId child : childrenWithId) { - getChildState(child.getFileStatus(), child, txnList, working, - originalDirectories, original, obsolete, bestBase, ignoreEmptyFiles); + getChildState(child.getFileStatus(), child, txnList, working, originalDirectories, original, + obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties); } } else { List children = HdfsUtils.listLocatedStatus(fs, directory, hiddenFileFilter); for (FileStatus child : children) { - getChildState( - child, null, txnList, working, originalDirectories, original, obsolete, bestBase, ignoreEmptyFiles); + getChildState(child, null, txnList, working, originalDirectories, original, obsolete, + bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties); } } @@ -946,6 +953,11 @@ public Path getBaseDirectory() { public List getObsolete() { return obsolete; } + + @Override + public List getAbortedDirectories() { + return abortedDirectories; + } }; } /** @@ -966,7 +978,7 @@ private static boolean isValidBase(long baseTxnId, ValidTxnList txnList) { private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId, ValidTxnList txnList, List working, List originalDirectories, List original, List obsolete, TxnBase bestBase, - boolean ignoreEmptyFiles) throws IOException { + boolean ignoreEmptyFiles, List aborted, Map tblproperties) throws IOException { Path p = child.getPath(); String fn = p.getName(); if (fn.startsWith(BASE_PREFIX) && child.isDir()) { @@ -995,6 +1007,10 @@ private static void getChildState(FileStatus child, HdfsFileStatusWithId childWi String deltaPrefix = (fn.startsWith(DELTA_PREFIX)) ? DELTA_PREFIX : DELETE_DELTA_PREFIX; ParsedDelta delta = parseDelta(child, deltaPrefix); + if (tblproperties != null && MetaStoreUtils.isInsertOnlyTable(tblproperties) && + ValidTxnList.RangeResponse.ALL == txnList.isTxnRangeAborted(delta.minTransaction, delta.maxTransaction)) { + aborted.add(child); + } if (txnList.isTxnRangeValid(delta.minTransaction, delta.maxTransaction) != ValidTxnList.RangeResponse.NONE) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 8fb7211..db8c23f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -1094,7 +1094,7 @@ public AcidDirInfo run() throws Exception { private AcidDirInfo callInternal() throws IOException { AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, context.conf, - context.transactionList, useFileIds, true); + context.transactionList, useFileIds, true, null); Path base = dirInfo.getBaseDirectory(); // find the base files (original or new style) List baseFiles = new ArrayList(); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index f83b6db..c0b39ae 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.common.ValidCompactorTxnList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; @@ -70,6 +71,7 @@ import org.apache.hadoop.mapred.TaskAttemptContext; import org.apache.hadoop.mapred.lib.NullOutputFormat; import org.apache.hadoop.util.StringUtils; +import org.apache.hive.common.util.Ref; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -204,6 +206,16 @@ void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) { throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true"); } + + // For MM tables we don't need to launch MR jobs as there is no compaction needed. + // We just need to delete the directories for aborted transactions. + if (MetaStoreUtils.isInsertOnlyTable(t.getParameters())) { + LOG.debug("Going to delete directories for aborted transactions for MM table " + + t.getDbName() + "." + t.getTableName()); + removeFiles(conf, sd.getLocation(), txns, t); + return; + } + JobConf job = createBaseJobConf(conf, jobName, t, sd, txns, ci); // Figure out and encode what files we need to read. We do this here (rather than in @@ -344,6 +356,30 @@ private void setColumnTypes(JobConf job, List cols) { HiveConf.setVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); } + // Remove the directories for aborted transactions only + private void removeFiles(HiveConf conf, String location, ValidTxnList txnList, Table t) + throws IOException { + AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(location), conf, txnList, + Ref.from(false), false, t.getParameters()); + // For MM table, we only want to delete delta dirs for aborted txns. + List abortedDirs = dir.getAbortedDirectories(); + List filesToDelete = new ArrayList<>(abortedDirs.size()); + for (FileStatus stat : abortedDirs) { + filesToDelete.add(stat.getPath()); + } + if (filesToDelete.size() < 1) { + LOG.warn("Hmm, nothing to delete in the worker for directory " + location + + ", that hardly seems right."); + return; + } + LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + location); + FileSystem fs = filesToDelete.get(0).getFileSystem(conf); + for (Path dead : filesToDelete) { + LOG.debug("Going to delete path " + dead.toString()); + fs.delete(dead, true); + } + } + public JobConf getMrJob() { return mrJob; } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index af4a1da..c52bd3e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionResponse; import org.apache.hadoop.hive.metastore.api.CompactionType; @@ -251,6 +252,11 @@ public CompactionType run() throws Exception { private CompactionType determineCompactionType(CompactionInfo ci, ValidTxnList txns, StorageDescriptor sd, Map tblproperties) throws IOException, InterruptedException { + + if (MetaStoreUtils.isInsertOnlyTable(tblproperties)) { + return CompactionType.MINOR; + } + boolean noBase = false; Path location = new Path(sd.getLocation()); FileSystem fs = location.getFileSystem(conf); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 5786c4f..7a73a17 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -91,7 +91,8 @@ NONACIDORCTBL("nonAcidOrcTbl"), NONACIDPART("nonAcidPart"), NONACIDPART2("nonAcidPart2"), - ACIDNESTEDPART("acidNestedPart"); + ACIDNESTEDPART("acidNestedPart"), + MMTBL("mmTbl"); private final String name; @Override @@ -143,6 +144,7 @@ protected void setUpWithTableProperties(String tableProperties) throws Exception runStatementOnDriver("create table " + Table.ACIDNESTEDPART + "(a int, b int) partitioned by (p int, q int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES (" + tableProperties + ")"); + runStatementOnDriver("create table " + Table.MMTBL + "(a int, b int) TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')"); } protected void dropTables() throws Exception { @@ -662,11 +664,11 @@ public void testNonAcidToAcidConversion3() throws Exception { FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); Arrays.sort(buckets); if (numDelta == 1) { - Assert.assertEquals("delta_0000022_0000022_0000", status[i].getPath().getName()); + Assert.assertEquals("delta_0000024_0000024_0000", status[i].getPath().getName()); Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); } else if (numDelta == 2) { - Assert.assertEquals("delta_0000023_0000023_0000", status[i].getPath().getName()); + Assert.assertEquals("delta_0000025_0000025_0000", status[i].getPath().getName()); Assert.assertEquals(BUCKET_COUNT, buckets.length); Assert.assertEquals("bucket_00000", buckets[0].getPath().getName()); Assert.assertEquals("bucket_00001", buckets[1].getPath().getName()); @@ -711,7 +713,7 @@ public void testNonAcidToAcidConversion3() throws Exception { Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); } else if (numBase == 2) { // The new base dir now has two bucket files, since the delta dir has two bucket files - Assert.assertEquals("base_0000023", status[i].getPath().getName()); + Assert.assertEquals("base_0000025", status[i].getPath().getName()); Assert.assertEquals(BUCKET_COUNT, buckets.length); Assert.assertEquals("bucket_00000", buckets[0].getPath().getName()); Assert.assertEquals("bucket_00001", buckets[1].getPath().getName()); @@ -738,7 +740,7 @@ public void testNonAcidToAcidConversion3() throws Exception { status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); Assert.assertEquals(1, status.length); - Assert.assertEquals("base_0000023", status[0].getPath().getName()); + Assert.assertEquals("base_0000025", status[0].getPath().getName()); FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); Arrays.sort(buckets); Assert.assertEquals(BUCKET_COUNT, buckets.length); @@ -1642,6 +1644,64 @@ public void testValuesSource() throws Exception { } /** + * Test compaction for Micro-managed table + * 1. Regular compaction shouldn't impact any valid subdirectories of MM tables + * 2. Compactions will only remove subdirectories for aborted transactions of MM tables, if any + * @throws Exception + */ + @Test + public void testMmTableCompaction() throws Exception { + // 1. Insert some rows into MM table + runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(1,2)"); + runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(3,4)"); + // There should be 2 delta directories + verifyDirAndResult(2); + + // 2. Perform a MINOR compaction. Since nothing was aborted, subdirs should stay. + runStatementOnDriver("alter table "+ Table.MMTBL + " compact 'MINOR'"); + runWorker(hiveConf); + verifyDirAndResult(2); + + // 3. Let a transaction be aborted + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true); + runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(5,6)"); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false); + // There should be 3 delta directories. The new one is the aborted one. + verifyDirAndResult(3); + + // 4. Perform a MINOR compaction again. This time it will remove the subdir for aborted transaction. + runStatementOnDriver("alter table "+ Table.MMTBL + " compact 'MINOR'"); + runWorker(hiveConf); + // The worker should remove the subdir for aborted transaction + verifyDirAndResult(2); + + // 5. Run Cleaner. Shouldn't impact anything. + runCleaner(hiveConf); + verifyDirAndResult(2); + } + + private void verifyDirAndResult(int expectedDeltas) throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + // Verify the content of subdirs + FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + int sawDeltaTimes = 0; + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("delta_.*")); + sawDeltaTimes++; + FileStatus[] files = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, files.length); + Assert.assertTrue(files[0].getPath().getName().equals("000000_0")); + } + Assert.assertEquals(expectedDeltas, sawDeltaTimes); + + // Verify query result + int [][] resultData = new int[][] {{1,2}, {3,4}}; + List rs = runStatementOnDriver("select a,b from " + Table.MMTBL); + Assert.assertEquals(stringifyValues(resultData), rs); + } + + /** * takes raw data and turns it into a string as if from Driver.getResults() * sorts rows in dictionary order */