diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index ee2c0f3e23..90dc4a31bc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -27,7 +27,6 @@ import java.util.Map; import java.util.UUID; import java.util.regex.Matcher; -import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; @@ -242,10 +241,6 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, writeIds, Ref.from(false), true, null, false); - if (!isEnoughToCompact(ci.isMajorCompaction(), dir, sd)) { - return; - } - List parsedDeltas = dir.getCurrentDirectories(); int maxDeltasToHandle = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA); if (parsedDeltas.size() > maxDeltasToHandle) { @@ -305,38 +300,6 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor su.gatherStats(); } - private static boolean isEnoughToCompact(boolean isMajorCompaction, AcidUtils.Directory dir, StorageDescriptor sd) { - int deltaCount = dir.getCurrentDirectories().size(); - int origCount = dir.getOriginalFiles().size(); - - StringBuilder deltaInfo = new StringBuilder().append(deltaCount); - boolean isEnoughToCompact; - - if (isMajorCompaction) { - isEnoughToCompact = (origCount > 0 - || deltaCount + (dir.getBaseDirectory() == null ? 0 : 1) > 1); - - } else { - isEnoughToCompact = (deltaCount > 1); - - if (deltaCount == 2) { - Map deltaByType = dir.getCurrentDirectories().stream() - .collect(Collectors.groupingBy(delta -> - (delta.isDeleteDelta() ? AcidUtils.DELETE_DELTA_PREFIX : AcidUtils.DELTA_PREFIX), - Collectors.counting())); - - isEnoughToCompact = (deltaByType.size() != deltaCount); - deltaInfo.append(" ").append(deltaByType); - } - } - - if (!isEnoughToCompact) { - LOG.debug("Not compacting {}; current base: {}, delta files: {}, originals: {}", - sd.getLocation(), dir.getBaseDirectory(), deltaInfo, origCount); - } - return isEnoughToCompact; - } - private String generateTmpPath(StorageDescriptor sd) { return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java index 10681c0202..38689ef86c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java @@ -36,7 +36,6 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hive.common.util.Ref; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,13 +54,6 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException { AcidUtils .setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(table.getParameters())); - AcidUtils.Directory dir = AcidUtils - .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false, - table.getParameters(), false); - - if (!Util.isEnoughToCompact(true, dir, storageDescriptor)) { - return; - } String user = UserGroupInformation.getCurrentUser().getShortUserName(); SessionState sessionState = DriverUtils.setUpSessionState(hiveConf, user, true); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java index 27a3ce8d2d..9b8420902f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java @@ -82,10 +82,6 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD return; } - if (!Util.isEnoughToCompact(compactionInfo.isMajorCompaction(), dir, storageDescriptor)) { - return; - } - try { String tmpLocation = Util.generateTmpPath(storageDescriptor); Path baseLocation = new Path(tmpLocation, "_base"); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java index 80119de22f..1eab5b888d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java @@ -94,6 +94,24 @@ static boolean isEnoughToCompact(boolean isMajorCompaction, AcidUtils.Directory return isEnoughToCompact; } + /** + * Check for obsolete directories, and return true if any exist and Cleaner should be + * run. For example if we insert overwrite into a table with only deltas, a new base file with + * the highest writeId is created so there will be no live delta directories, only obsolete + * ones. Compaction is not needed, but the cleaner should still be run. + * + * @return true if cleaning is needed + */ + public static boolean needsCleaning(AcidUtils.Directory dir, StorageDescriptor sd) { + int numObsoleteDirs = dir.getObsolete().size(); + boolean needsJustCleaning = numObsoleteDirs > 0; + if (needsJustCleaning) { + LOG.debug("{} obsolete directories in {} found; marked for cleaning.", + numObsoleteDirs, sd.getLocation()); + } + return needsJustCleaning; + } + /** * Generate a random tmp path, under the provided storage. * @param sd storage descriptor, must be not null. diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 3270175a80..96aaa870cf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.txn.compactor; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; import org.apache.hadoop.hive.common.ValidTxnList; @@ -30,7 +31,9 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.mapred.JobConf; +import org.apache.hive.common.util.Ref; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -182,6 +185,19 @@ public void run() { jobName.append("-compactor-"); jobName.append(ci.getFullPartitionName()); + // Don't start compaction or cleaning if not necessary + AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, + tblValidWriteIds, Ref.from(false), true, null, false); + if (!QueryCompactor.Util.isEnoughToCompact(ci.isMajorCompaction(), dir, sd)) { + if (QueryCompactor.Util.needsCleaning(dir, sd)) { + msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)); + } else { + // do nothing + msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); + } + continue; + } + LOG.info("Starting " + ci.type.toString() + " compaction for " + ci.getFullPartitionName() + " in " + JavaUtils.txnIdToString(compactorTxnId)); final StatsUpdater su = StatsUpdater.init(ci, msc.findColumnsWithStats( CompactionInfo.compactionInfoToStruct(ci)), conf, diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index cbc72b47ea..fc255af129 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -1323,7 +1323,8 @@ public void testCompactWithDelete() throws Exception { ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); Assert.assertEquals("Unexpected number of compactions in history", 2, resp.getCompactsSize()); Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState()); - Assert.assertEquals("Unexpected 1 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(1).getState()); + Assert.assertEquals("Unexpected 1 compaction state", TxnStore.SUCCEEDED_RESPONSE, + resp.getCompacts().get(1).getState()); } /** diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 8f111b9b34..cffa6723d0 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -469,12 +469,13 @@ public void testMetastoreTablesCleanup() throws Exception { driver.run("insert into temp.T11 values (4, 4)"); driver.run("insert into temp.T12p partition (ds='today', hour='1') values (5, 5)"); driver.run("insert into temp.T12p partition (ds='tomorrow', hour='2') values (6, 6)"); + driver.run("insert into temp.T12p partition (ds='tomorrow', hour='2') values (13, 13)"); driver.run("insert into temp.T13p partition (ds='today', hour='1') values (7, 7)"); driver.run("insert into temp.T13p partition (ds='tomorrow', hour='2') values (8, 8)"); int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t10', 't11')"); Assert.assertEquals(4, count); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t12p', 't13p')"); - Assert.assertEquals(4, count); + Assert.assertEquals(5, count); // Fail some inserts, so that we have records in TXN_COMPONENTS conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true); @@ -532,7 +533,10 @@ public void testMetastoreTablesCleanup() throws Exception { count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p' and CC_STATE='s' and CC_TYPE='i'"); Assert.assertEquals(1, count); - // Fail compaction, so that we have failed records in COMPLETED_COMPACTIONS + // Fail compaction, so that we have failed records in COMPLETED_COMPACTIONS. + // Tables need at least 2 delta files to compact, and minor compaction was just run, so insert + driver.run("insert into temp.T11 values (14, 14)"); + driver.run("insert into temp.T12p partition (ds='tomorrow', hour='2') values (15, 15)"); conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true); driver.run("alter table temp.T11 compact 'major'"); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'"); diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java index 553addbe44..70ae85c458 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java @@ -24,6 +24,8 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.StringableMap; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; +import org.apache.hadoop.hive.metastore.TransactionalValidationListener; import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.Order; @@ -32,6 +34,10 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TxnInfo; +import org.apache.hadoop.hive.metastore.api.TxnState; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.junit.After; import org.junit.Assert; @@ -47,12 +53,12 @@ import java.io.DataOutputStream; import java.util.ArrayList; import java.util.Arrays; -import java.util.BitSet; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; /** * Tests for the worker thread and its MR jobs. @@ -346,10 +352,11 @@ public void minorWithOpenInMiddle() throws Exception { startWorker(); + // since compaction was not run, state should not be "ready for cleaning" but "succeeded" ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, compacts.get(0).getState()); // There should still be 4 directories in the location FileSystem fs = FileSystem.get(conf); @@ -1005,6 +1012,61 @@ public void droppedPartition() throws Exception { Assert.assertEquals(0, compacts.size()); } + @Test + public void oneDeltaWithAbortedTxn() throws Exception { + Table t = newTable("default", "delta1", false); + addDeltaFile(t, null, 0, 2L, 3); + Set aborted = new HashSet<>(); + aborted.add(1L); + burnThroughTransactions("default", "delta1", 3, null, aborted); + + // MR + verifyTxn1IsAborted(0, t, CompactionType.MAJOR); + verifyTxn1IsAborted(1, t, CompactionType.MINOR); + + // Query-based + conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true); + verifyTxn1IsAborted(2, t, CompactionType.MAJOR); + verifyTxn1IsAborted(3, t, CompactionType.MINOR); + conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, false); + + // Insert-only + Map parameters = new HashMap<>(); + parameters.put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, + TransactionalValidationListener.INSERTONLY_TRANSACTIONAL_PROPERTY); + Table mm = newTable("default", "delta1", false, parameters); + addDeltaFile(mm, null, 0, 2L, 3); + burnThroughTransactions("default", "delta1", 3, null, aborted); + verifyTxn1IsAborted(0, t, CompactionType.MAJOR); + verifyTxn1IsAborted(1, t, CompactionType.MINOR); + } + + private void verifyTxn1IsAborted(int compactionNum, Table t, CompactionType type) + throws Exception { + CompactionRequest rqst = new CompactionRequest("default", t.getTableName(), type); + txnHandler.compact(rqst); + startWorker(); + + // Compaction should not have run on a single delta file + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); + Assert.assertEquals(1, stat.length); + Assert.assertEquals(makeDeltaDirName(0, 2), stat[0].getPath().getName()); + + // State should not be "ready for cleaning" because we skip cleaning + List compacts = + txnHandler.showCompact(new ShowCompactRequest()).getCompacts(); + Assert.assertEquals(compactionNum + 1, compacts.size()); + Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, compacts.get(compactionNum).getState()); + + // assert transaction with txnId=1 is still aborted after cleaner is run + startCleaner(); + List openTxns = + HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns(); + Assert.assertEquals(1, openTxns.get(0).getId()); + Assert.assertEquals(TxnState.ABORTED, openTxns.get(0).getState()); + } + @After public void tearDown() throws Exception { compactorTestCleanup();