From 2415dfae71da8834211fc37828e1dda7b4655ec3 Mon Sep 17 00:00:00 2001 From: Denys Kuzmenko Date: Thu, 21 Mar 2019 11:35:15 +0100 Subject: [PATCH] HIVE-9995: ACID compaction tries to compact a single file --- .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 57 +++++++++----------- .../hadoop/hive/ql/txn/compactor/CompactorMR.java | 63 ++++++++++++---------- .../apache/hadoop/hive/ql/TestTxnCommands2.java | 2 +- .../hadoop/hive/ql/txn/compactor/TestWorker.java | 18 ++----- 4 files changed, 66 insertions(+), 74 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 50a233d5de..48d87c8894 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -723,7 +723,10 @@ public String toString() { } } - public static interface Directory { + /** + * Interface used to provide ACID directory information. + */ + public interface Directory { /** * Get the base directory. @@ -1137,6 +1140,25 @@ public static Directory getAcidState(Path directory, return getAcidState(directory, conf, writeIdList, Ref.from(useFileIds), ignoreEmptyFiles, null); } + private static List tryListLocatedHdfsStatus(Ref useFileIds, + FileSystem fs, Path directory) { + Boolean val = useFileIds.value; + List childrenWithId = null; + if (val == null || val) { + try { + childrenWithId = SHIMS.listLocatedHdfsStatus(fs, directory, hiddenFileFilter); + if (val == null) { + useFileIds.value = true; + } + } catch (Throwable t) { + LOG.error("Failed to get files with ID; using regular API: " + t.getMessage()); + if (val == null && t instanceof UnsupportedOperationException) { + useFileIds.value = false; + } + } + } + return childrenWithId; + } public static Directory getAcidState(Path directory, Configuration conf, ValidWriteIdList writeIdList, @@ -1167,21 +1189,8 @@ public static Directory getAcidState(Path directory, List originalDirectories = new ArrayList<>(); final List obsolete = new ArrayList<>(); final List abortedDirectories = new ArrayList<>(); - List childrenWithId = null; - Boolean val = useFileIds.value; - if (val == null || val) { - try { - childrenWithId = SHIMS.listLocatedHdfsStatus(fs, directory, hiddenFileFilter); - if (val == null) { - useFileIds.value = true; - } - } catch (Throwable t) { - LOG.error("Failed to get files with ID; using regular API: " + t.getMessage()); - if (val == null && t instanceof UnsupportedOperationException) { - useFileIds.value = false; - } - } - } + List childrenWithId = tryListLocatedHdfsStatus(useFileIds, fs, directory); + TxnBase bestBase = new TxnBase(); final List original = new ArrayList<>(); if (childrenWithId != null) { @@ -1450,21 +1459,7 @@ public Long getFileId() { public static void findOriginals(FileSystem fs, Path dir, List original, Ref useFileIds, boolean ignoreEmptyFiles, boolean recursive) throws IOException { - List childrenWithId = null; - Boolean val = useFileIds.value; - if (val == null || val) { - try { - childrenWithId = SHIMS.listLocatedHdfsStatus(fs, dir, hiddenFileFilter); - if (val == null) { - useFileIds.value = true; - } - } catch (Throwable t) { - LOG.error("Failed to get files with ID; using regular API: " + t.getMessage()); - if (val == null && t instanceof UnsupportedOperationException) { - useFileIds.value = false; - } - } - } + List childrenWithId = tryListLocatedHdfsStatus(useFileIds, fs, dir); if (childrenWithId != null) { for (HdfsFileStatusWithId child : childrenWithId) { if (child.getFileStatus().isDirectory()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 10192859a7..8c099dee48 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -266,10 +266,14 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor // and discovering that in getSplits is too late as we then have no way to pass it to our // mapper. - AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, writeIds, false, true); + AcidUtils.Directory dir = AcidUtils.getAcidState( + new Path(sd.getLocation()), conf, writeIds, false, true); + if(!isEnoughToCompact(ci.isMajorCompaction(), dir, sd)) { + return; + } List parsedDeltas = dir.getCurrentDirectories(); - int maxDeltastoHandle = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA); - if(parsedDeltas.size() > maxDeltastoHandle) { + int maxDeltasToHandle = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA); + if(parsedDeltas.size() > maxDeltasToHandle) { /** * if here, that means we have very high number of delta files. This may be sign of a temporary * glitch or a real issue. For example, if transaction batch size or transaction size is set too @@ -282,13 +286,13 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor + " located at " + sd.getLocation() + "! This is likely a sign of misconfiguration, " + "especially if this message repeats. Check that compaction is running properly. Check for any " + "runaway/mis-configured process writing to ACID tables, especially using Streaming Ingest API."); - int numMinorCompactions = parsedDeltas.size() / maxDeltastoHandle; + int numMinorCompactions = parsedDeltas.size() / maxDeltasToHandle; for(int jobSubId = 0; jobSubId < numMinorCompactions; jobSubId++) { JobConf jobMinorCompact = createBaseJobConf(conf, jobName + "_" + jobSubId, t, sd, writeIds, ci); launchCompactionJob(jobMinorCompact, null, CompactionType.MINOR, null, - parsedDeltas.subList(jobSubId * maxDeltastoHandle, (jobSubId + 1) * maxDeltastoHandle), - maxDeltastoHandle, -1, conf, msc, ci.id, jobName); + parsedDeltas.subList(jobSubId * maxDeltasToHandle, (jobSubId + 1) * maxDeltasToHandle), + maxDeltasToHandle, -1, conf, msc, ci.id, jobName); } //now recompute state since we've done minor compactions and have different 'best' set of deltas dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, writeIds); @@ -319,17 +323,6 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor dirsToSearch.add(baseDir); } } - if (parsedDeltas.size() == 0 && dir.getOriginalFiles().size() == 0) { - // Skip compaction if there's no delta files AND there's no original files - String minOpenInfo = "."; - if(writeIds.getMinOpenWriteId() != null) { - minOpenInfo = " with min Open " + JavaUtils.writeIdToString(writeIds.getMinOpenWriteId()) + - ". Compaction cannot compact above this writeId"; - } - LOG.error("No delta files or original files found to compact in " + sd.getLocation() + - " for compactionId=" + ci.id + minOpenInfo); - return; - } launchCompactionJob(job, baseDir, ci.type, dirsToSearch, dir.getCurrentDirectories(), dir.getCurrentDirectories().size(), dir.getObsolete().size(), conf, msc, ci.id, jobName); @@ -347,13 +340,11 @@ private void runCrudCompaction(HiveConf hiveConf, Table t, Partition p, StorageD AcidUtils.setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(t.getParameters())); AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), hiveConf, writeIds, Ref.from(false), false, t.getParameters()); - int deltaCount = dir.getCurrentDirectories().size(); - int origCount = dir.getOriginalFiles().size(); - if ((deltaCount + (dir.getBaseDirectory() == null ? 0 : 1)) + origCount <= 1) { - LOG.debug("Not compacting {}; current base is {} and there are {} deltas and {} originals", sd.getLocation(), dir - .getBaseDirectory(), deltaCount, origCount); + + if(!isEnoughToCompact(dir, sd)) { return; } + String user = UserGroupInformation.getCurrentUser().getShortUserName(); SessionState sessionState = DriverUtils.setUpSessionState(hiveConf, user, false); // Set up the session for driver. @@ -411,6 +402,26 @@ private void runCrudCompaction(HiveConf hiveConf, Table t, Partition p, StorageD } } + private static boolean isEnoughToCompact(AcidUtils.Directory dir, StorageDescriptor sd) { + return isEnoughToCompact(true, dir, sd); + } + + private static boolean isEnoughToCompact(boolean isMajorCompaction, AcidUtils.Directory dir, StorageDescriptor sd) { + int deltaCount = dir.getCurrentDirectories().size(); + int origCount = dir.getOriginalFiles().size(); + + if (isMajorCompaction ? origCount == 0 && + deltaCount + (dir.getBaseDirectory() == null ? 0 : 1) <= 1 : + deltaCount <= 1) { + + LOG.debug("Skipping compaction {}; type: {}, current base: {}, delta files: {} originals: {}", + sd.getLocation(), isMajorCompaction ? CompactionType.MAJOR : CompactionType.MINOR, + dir.getBaseDirectory(), deltaCount, origCount); + return false; + } + return true; + } + private void runMmCompaction(HiveConf conf, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds, CompactionInfo ci) throws IOException { LOG.debug("Going to delete directories for aborted transactions for MM table " @@ -425,13 +436,7 @@ private void runMmCompaction(HiveConf conf, Table t, Partition p, LOG.info("Not compacting " + sd.getLocation() + "; not a major compaction"); return; } - - int deltaCount = dir.getCurrentDirectories().size(); - int origCount = dir.getOriginalFiles().size(); - if ((deltaCount + (dir.getBaseDirectory() == null ? 0 : 1)) + origCount <= 1) { - LOG.debug("Not compacting " + sd.getLocation() + "; current base is " - + dir.getBaseDirectory() + " and there are " + deltaCount + " deltas and " - + origCount + " originals"); + if(!isEnoughToCompact(dir, sd)) { return; } try { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 0734ed959c..d4abf4277b 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -953,7 +953,7 @@ public void updateDeletePartitioned() throws Exception { public void testEmptyInTblproperties() throws Exception { runStatementOnDriver("create table t1 " + "(a int, b int) stored as orc TBLPROPERTIES ('serialization.null.format'='', 'transactional'='true')"); runStatementOnDriver("insert into t1 " + "(a,b) values(1,7),(3,7)"); - runStatementOnDriver("update t1" + " set b = -2 where b = 2"); + runStatementOnDriver("update t1" + " set b = -2 where a = 1"); runStatementOnDriver("alter table t1 " + " compact 'MAJOR'"); runWorker(hiveConf); TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java index d9e4468c34..553addbe44 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java @@ -351,25 +351,17 @@ public void minorWithOpenInMiddle() throws Exception { Assert.assertEquals(1, compacts.size()); Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); - // There should still now be 5 directories in the location + // There should still be 4 directories in the location FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - Assert.assertEquals(toString(stat),6 , stat.length); + Assert.assertEquals(toString(stat), 4, stat.length); // Find the new delta file and make sure it has the right contents Arrays.sort(stat); Assert.assertEquals("base_20", stat[0].getPath().getName()); - /** - * this may look a bit odd. Compactor is capped at min open write id which is 23 in this case - * so the minor compaction above only 1 dir as input, delta_21_22 and outputs - * delta_21_22_v28 (and matching delete_delta) (HIVE-9995/HIVE-20901) - */ - Assert.assertEquals(makeDeleteDeltaDirNameCompacted(21, 22) + "_v0000028", - stat[1].getPath().getName()); - Assert.assertEquals(makeDeltaDirNameCompacted(21, 22), stat[2].getPath().getName()); - Assert.assertEquals(makeDeltaDirNameCompacted(21, 22) + "_v0000028", stat[3].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(23, 25), stat[4].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(26, 27), stat[5].getPath().getName()); + Assert.assertEquals(makeDeltaDirName(21, 22), stat[1].getPath().getName()); + Assert.assertEquals(makeDeltaDirName(23, 25), stat[2].getPath().getName()); + Assert.assertEquals(makeDeltaDirName(26, 27), stat[3].getPath().getName()); } @Test -- 2.15.1 (Apple Git-101)