diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 50a233d5de..48d87c8894 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -723,7 +723,10 @@ public String toString() { } } - public static interface Directory { + /** + * Interface used to provide ACID directory information. + */ + public interface Directory { /** * Get the base directory. @@ -1137,6 +1140,25 @@ public static Directory getAcidState(Path directory, return getAcidState(directory, conf, writeIdList, Ref.from(useFileIds), ignoreEmptyFiles, null); } + private static List tryListLocatedHdfsStatus(Ref useFileIds, + FileSystem fs, Path directory) { + Boolean val = useFileIds.value; + List childrenWithId = null; + if (val == null || val) { + try { + childrenWithId = SHIMS.listLocatedHdfsStatus(fs, directory, hiddenFileFilter); + if (val == null) { + useFileIds.value = true; + } + } catch (Throwable t) { + LOG.error("Failed to get files with ID; using regular API: " + t.getMessage()); + if (val == null && t instanceof UnsupportedOperationException) { + useFileIds.value = false; + } + } + } + return childrenWithId; + } public static Directory getAcidState(Path directory, Configuration conf, ValidWriteIdList writeIdList, @@ -1167,21 +1189,8 @@ public static Directory getAcidState(Path directory, List originalDirectories = new ArrayList<>(); final List obsolete = new ArrayList<>(); final List abortedDirectories = new ArrayList<>(); - List childrenWithId = null; - Boolean val = useFileIds.value; - if (val == null || val) { - try { - childrenWithId = SHIMS.listLocatedHdfsStatus(fs, directory, hiddenFileFilter); - if (val == null) { - useFileIds.value = true; - } - } catch (Throwable t) { - LOG.error("Failed to get files with ID; using regular API: " + t.getMessage()); - if (val == null && t instanceof UnsupportedOperationException) { - useFileIds.value = false; - } - } - } + List childrenWithId = tryListLocatedHdfsStatus(useFileIds, fs, directory); + TxnBase bestBase = new TxnBase(); final List original = new ArrayList<>(); if (childrenWithId != null) { @@ -1450,21 +1459,7 @@ public Long getFileId() { public static void findOriginals(FileSystem fs, Path dir, List original, Ref useFileIds, boolean ignoreEmptyFiles, boolean recursive) throws IOException { - List childrenWithId = null; - Boolean val = useFileIds.value; - if (val == null || val) { - try { - childrenWithId = SHIMS.listLocatedHdfsStatus(fs, dir, hiddenFileFilter); - if (val == null) { - useFileIds.value = true; - } - } catch (Throwable t) { - LOG.error("Failed to get files with ID; using regular API: " + t.getMessage()); - if (val == null && t instanceof UnsupportedOperationException) { - useFileIds.value = false; - } - } - } + List childrenWithId = tryListLocatedHdfsStatus(useFileIds, fs, dir); if (childrenWithId != null) { for (HdfsFileStatusWithId child : childrenWithId) { if (child.getFileStatus().isDirectory()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 10192859a7..e3dad28d74 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -266,10 +266,14 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor // and discovering that in getSplits is too late as we then have no way to pass it to our // mapper. - AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, writeIds, false, true); + AcidUtils.Directory dir = AcidUtils.getAcidState( + new Path(sd.getLocation()), conf, writeIds, false, true); + if(!isEnoughToCompact(dir, sd)) { + return; + } List parsedDeltas = dir.getCurrentDirectories(); - int maxDeltastoHandle = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA); - if(parsedDeltas.size() > maxDeltastoHandle) { + int maxDeltasToHandle = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA); + if(parsedDeltas.size() > maxDeltasToHandle) { /** * if here, that means we have very high number of delta files. This may be sign of a temporary * glitch or a real issue. For example, if transaction batch size or transaction size is set too @@ -282,13 +286,13 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor + " located at " + sd.getLocation() + "! This is likely a sign of misconfiguration, " + "especially if this message repeats. Check that compaction is running properly. Check for any " + "runaway/mis-configured process writing to ACID tables, especially using Streaming Ingest API."); - int numMinorCompactions = parsedDeltas.size() / maxDeltastoHandle; + int numMinorCompactions = parsedDeltas.size() / maxDeltasToHandle; for(int jobSubId = 0; jobSubId < numMinorCompactions; jobSubId++) { JobConf jobMinorCompact = createBaseJobConf(conf, jobName + "_" + jobSubId, t, sd, writeIds, ci); launchCompactionJob(jobMinorCompact, null, CompactionType.MINOR, null, - parsedDeltas.subList(jobSubId * maxDeltastoHandle, (jobSubId + 1) * maxDeltastoHandle), - maxDeltastoHandle, -1, conf, msc, ci.id, jobName); + parsedDeltas.subList(jobSubId * maxDeltasToHandle, (jobSubId + 1) * maxDeltasToHandle), + maxDeltasToHandle, -1, conf, msc, ci.id, jobName); } //now recompute state since we've done minor compactions and have different 'best' set of deltas dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, writeIds); @@ -347,13 +351,11 @@ private void runCrudCompaction(HiveConf hiveConf, Table t, Partition p, StorageD AcidUtils.setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(t.getParameters())); AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), hiveConf, writeIds, Ref.from(false), false, t.getParameters()); - int deltaCount = dir.getCurrentDirectories().size(); - int origCount = dir.getOriginalFiles().size(); - if ((deltaCount + (dir.getBaseDirectory() == null ? 0 : 1)) + origCount <= 1) { - LOG.debug("Not compacting {}; current base is {} and there are {} deltas and {} originals", sd.getLocation(), dir - .getBaseDirectory(), deltaCount, origCount); + + if(!isEnoughToCompact(dir, sd)) { return; } + String user = UserGroupInformation.getCurrentUser().getShortUserName(); SessionState sessionState = DriverUtils.setUpSessionState(hiveConf, user, false); // Set up the session for driver. @@ -410,7 +412,16 @@ private void runCrudCompaction(HiveConf hiveConf, Table t, Partition p, StorageD } } } - + private static boolean isEnoughToCompact(AcidUtils.Directory dir, StorageDescriptor sd) { + int deltaCount = dir.getCurrentDirectories().size(); + int origCount = dir.getOriginalFiles().size(); + if ((deltaCount + (dir.getBaseDirectory() == null ? 0 : 1)) + origCount * 2 <= 1) { + LOG.debug("Not compacting {}; current base is {} and there are {} deltas and {} originals", sd.getLocation(), dir + .getBaseDirectory(), deltaCount, origCount); + return false; + } + return true; + } private void runMmCompaction(HiveConf conf, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds, CompactionInfo ci) throws IOException { LOG.debug("Going to delete directories for aborted transactions for MM table " @@ -425,13 +436,7 @@ private void runMmCompaction(HiveConf conf, Table t, Partition p, LOG.info("Not compacting " + sd.getLocation() + "; not a major compaction"); return; } - - int deltaCount = dir.getCurrentDirectories().size(); - int origCount = dir.getOriginalFiles().size(); - if ((deltaCount + (dir.getBaseDirectory() == null ? 0 : 1)) + origCount <= 1) { - LOG.debug("Not compacting " + sd.getLocation() + "; current base is " - + dir.getBaseDirectory() + " and there are " + deltaCount + " deltas and " - + origCount + " originals"); + if(!isEnoughToCompact(dir, sd)) { return; } try { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 0734ed959c..d4abf4277b 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -953,7 +953,7 @@ public void updateDeletePartitioned() throws Exception { public void testEmptyInTblproperties() throws Exception { runStatementOnDriver("create table t1 " + "(a int, b int) stored as orc TBLPROPERTIES ('serialization.null.format'='', 'transactional'='true')"); runStatementOnDriver("insert into t1 " + "(a,b) values(1,7),(3,7)"); - runStatementOnDriver("update t1" + " set b = -2 where b = 2"); + runStatementOnDriver("update t1" + " set b = -2 where a = 1"); runStatementOnDriver("alter table t1 " + " compact 'MAJOR'"); runWorker(hiveConf); TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);