diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index bb70db4524..7e3d77f53a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -231,9 +231,6 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor } JobConf job = createBaseJobConf(conf, jobName, t, sd, writeIds, ci); - if (!QueryCompactor.Util.isEnoughToCompact(ci.isMajorCompaction(), dir, sd)) { - return; - } List parsedDeltas = dir.getCurrentDirectories(); int maxDeltasToHandle = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java index f96a0481b8..59dcf2c5c2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java @@ -56,9 +56,6 @@ AcidUtils.Directory dir = AcidUtils .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false, table.getParameters(), false); - if (!Util.isEnoughToCompact(compactionInfo.isMajorCompaction(), dir, storageDescriptor)) { - return; - } // Set up the session for driver. HiveConf conf = new HiveConf(hiveConf); conf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column"); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java index bad5d00a8d..a9a88d59e0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java @@ -75,10 +75,6 @@ return; } - if (!Util.isEnoughToCompact(compactionInfo.isMajorCompaction(), dir, storageDescriptor)) { - return; - } - String tmpLocation = Util.generateTmpPath(storageDescriptor); Path baseLocation = new Path(tmpLocation, "_base"); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java index d234910490..3ce4dde4c6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java @@ -40,10 +40,8 @@ import java.io.IOException; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.UUID; -import java.util.stream.Collectors; /** * Common interface for query based compactions. @@ -143,61 +141,6 @@ protected void runCompactionQueries(HiveConf conf, String tmpTableName, StorageD * Collection of some helper functions. */ static class Util { - /** - * Determine if compaction can run in a specified directory. - * @param isMajorCompaction type of compaction. - * @param dir the delta directory - * @param sd resolved storage descriptor - * @return true, if compaction can run. - */ - static boolean isEnoughToCompact(boolean isMajorCompaction, AcidUtils.Directory dir, StorageDescriptor sd) { - int deltaCount = dir.getCurrentDirectories().size(); - int origCount = dir.getOriginalFiles().size(); - - StringBuilder deltaInfo = new StringBuilder().append(deltaCount); - boolean isEnoughToCompact; - - if (isMajorCompaction) { - isEnoughToCompact = (origCount > 0 || deltaCount + (dir.getBaseDirectory() == null ? 0 : 1) > 1); - - } else { - isEnoughToCompact = (deltaCount > 1); - - if (deltaCount == 2) { - Map deltaByType = dir.getCurrentDirectories().stream().collect(Collectors - .groupingBy(delta -> (delta.isDeleteDelta() ? AcidUtils.DELETE_DELTA_PREFIX : AcidUtils.DELTA_PREFIX), - Collectors.counting())); - - isEnoughToCompact = (deltaByType.size() != deltaCount); - deltaInfo.append(" ").append(deltaByType); - } - } - - if (!isEnoughToCompact) { - LOG.debug("Not compacting {}; current base: {}, delta files: {}, originals: {}", sd.getLocation(), - dir.getBaseDirectory(), deltaInfo, origCount); - } - return isEnoughToCompact; - } - - /** - * Check for obsolete directories, and return true if any exist and Cleaner should be - * run. For example if we insert overwrite into a table with only deltas, a new base file with - * the highest writeId is created so there will be no live delta directories, only obsolete - * ones. Compaction is not needed, but the cleaner should still be run. - * - * @return true if cleaning is needed - */ - public static boolean needsCleaning(AcidUtils.Directory dir, StorageDescriptor sd) { - int numObsoleteDirs = dir.getObsolete().size(); - boolean needsJustCleaning = numObsoleteDirs > 0; - if (needsJustCleaning) { - LOG.debug("{} obsolete directories in {} found; marked for cleaning.", - numObsoleteDirs, sd.getLocation()); - } - return needsJustCleaning; - } - /** * Generate a random tmp path, under the provided storage. * @param sd storage descriptor, must be not null. diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 5aff71e0e9..7726b46431 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -58,6 +58,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; /** * A class to do compactions. This will run in a separate thread. It will spin on the @@ -188,8 +189,8 @@ public void run() { // Don't start compaction or cleaning if not necessary AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, tblValidWriteIds, Ref.from(false), true, null, false); - if (!QueryCompactor.Util.isEnoughToCompact(ci.isMajorCompaction(), dir, sd)) { - if (QueryCompactor.Util.needsCleaning(dir, sd)) { + if (!isEnoughToCompact(ci.isMajorCompaction(), dir, sd)) { + if (needsCleaning(dir, sd)) { msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)); } else { // do nothing @@ -425,4 +426,62 @@ public void cancel() { } } } + + /** + * Determine if compaction can run in a specified directory. + * @param isMajorCompaction type of compaction. + * @param dir the delta directory + * @param sd resolved storage descriptor + * @return true, if compaction can run. + */ + static boolean isEnoughToCompact(boolean isMajorCompaction, AcidUtils.Directory dir, + StorageDescriptor sd) { + int deltaCount = dir.getCurrentDirectories().size(); + int origCount = dir.getOriginalFiles().size(); + + StringBuilder deltaInfo = new StringBuilder().append(deltaCount); + boolean isEnoughToCompact; + + if (isMajorCompaction) { + isEnoughToCompact = + (origCount > 0 || deltaCount + (dir.getBaseDirectory() == null ? 0 : 1) > 1); + + } else { + isEnoughToCompact = (deltaCount > 1); + + if (deltaCount == 2) { + Map deltaByType = dir.getCurrentDirectories().stream().collect(Collectors + .groupingBy(delta -> (delta + .isDeleteDelta() ? AcidUtils.DELETE_DELTA_PREFIX : AcidUtils.DELTA_PREFIX), + Collectors.counting())); + + isEnoughToCompact = (deltaByType.size() != deltaCount); + deltaInfo.append(" ").append(deltaByType); + } + } + + if (!isEnoughToCompact) { + LOG.debug("Not compacting {}; current base: {}, delta files: {}, originals: {}", + sd.getLocation(), dir.getBaseDirectory(), deltaInfo, origCount); + } + return isEnoughToCompact; + } + + /** + * Check for obsolete directories, and return true if any exist and Cleaner should be + * run. For example if we insert overwrite into a table with only deltas, a new base file with + * the highest writeId is created so there will be no live delta directories, only obsolete + * ones. Compaction is not needed, but the cleaner should still be run. + * + * @return true if cleaning is needed + */ + public static boolean needsCleaning(AcidUtils.Directory dir, StorageDescriptor sd) { + int numObsoleteDirs = dir.getObsolete().size(); + boolean needsJustCleaning = numObsoleteDirs > 0; + if (needsJustCleaning) { + LOG.debug("{} obsolete directories in {} found; marked for cleaning.", numObsoleteDirs, + sd.getLocation()); + } + return needsJustCleaning; + } }