diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 5b6f747516..6ae44293d1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -1217,7 +1217,7 @@ public static Directory getAcidState(FileSystem fileSystem, Path candidateDirect TxnBase bestBase = new TxnBase(); final List original = new ArrayList<>(); - List dirSnapshots = null; + Map dirSnapshots = null; if (childrenWithId != null) { for (HdfsFileStatusWithId child : childrenWithId) { getChildState(child.getFileStatus(), child, writeIdList, working, originalDirectories, original, obsolete, @@ -1307,7 +1307,7 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId } if(bestBase.oldestBase != null && bestBase.status == null && - isCompactedBase(ParsedBase.parseBase(bestBase.oldestBase), fs)) { + isCompactedBase(ParsedBase.parseBase(bestBase.oldestBase), fs, dirSnapshots)) { /** * If here, it means there was a base_x (> 1 perhaps) but none were suitable for given * {@link writeIdList}. Note that 'original' files are logically a base_Long.MIN_VALUE and thus @@ -1331,7 +1331,7 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId boolean isBaseInRawFormat = false; if (bestBase.status != null) { base = bestBase.status.getPath(); - isBaseInRawFormat = MetaDataFile.isRawFormat(base, fs, null); + isBaseInRawFormat = MetaDataFile.isRawFormat(base, fs, dirSnapshots != null ? dirSnapshots.get(base) : null); if (isBaseInRawFormat && (bestBase.dirSnapShot != null)) { for (FileStatus stat : bestBase.dirSnapShot.getFiles()) { if ((!ignoreEmptyFiles) || (stat.getLen() != 0)) { @@ -1357,7 +1357,8 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId obsolete, deltas, base); } - public static List getHdfsDirSnapshots(final FileSystem fs, final Path path) throws IOException { + public static Map getHdfsDirSnapshots(final FileSystem fs, + final Path path) throws IOException { try { Map dirToSnapshots = new HashMap(); RemoteIterator itr = fs.listFiles(path, true); @@ -1374,8 +1375,11 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId } else { Path parentDirPath = fPath.getParent(); if (acidTempDirFilter.accept(parentDirPath)) { - FileStatus parentDirFStatus = fs.getFileStatus(parentDirPath); HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(parentDirPath); + FileStatus parentDirFStatus = null; + if (!parentDirPath.equals(path)) { + parentDirFStatus = fs.getFileStatus(parentDirPath); + } if (dirSnapshot == null) { dirSnapshot = new HdfsDirSnapshotImpl(parentDirPath, parentDirFStatus); dirToSnapshots.put(parentDirPath, dirSnapshot); @@ -1393,7 +1397,7 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId } } } - return new ArrayList(dirToSnapshots.values()); + return dirToSnapshots; } catch (IOException e) { e.printStackTrace(); throw new IOException(e); @@ -1410,41 +1414,43 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId public Path getPath(); public void addOrcAcidFormatFile(FileStatus fStatus); - + public FileStatus getOrcAcidFormatFile(); public void addMetadataFile(FileStatus fStatus); - + public FileStatus getMetadataFile(FileStatus fStatus); // FileStatus of this HDFS directory public FileStatus getFileStatus(); - + // Get the list of files if any within this directory public List getFiles(); - + public void setFileStatus(FileStatus fStatus); - + public void addFile(FileStatus file); - + // File id or null public Long getFileId(); - + public Boolean isRawFormat(); - + public void setIsRawFormat(boolean isRawFormat); - + public Boolean isBase(); - + public void setIsBase(boolean isBase); - - public Boolean isValidBase(); - + + Boolean isValidBase(); + public void setIsValidBase(boolean isValidBase); - - public Boolean isCompactedBase(); - + + Boolean isCompactedBase(); + public void setIsCompactedBase(boolean isCompactedBase); + + boolean contains(Path path); } public static class HdfsDirSnapshotImpl implements HdfsDirSnapshot { @@ -1559,7 +1565,17 @@ public void addMetadataFile(FileStatus fStatus) { public FileStatus getMetadataFile(FileStatus fStatus) { return metadataFStatus; } - + + @Override + public boolean contains(Path path) { + for (FileStatus fileStatus: getFiles()) { + if (fileStatus.getPath().equals(path)) { + return true; + } + } + return false; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -1592,7 +1608,7 @@ private static boolean isValidBase(ParsedBase parsedBase, ValidWriteIdList write //By definition there are no open txns with id < 1. return true; } - if(isCompactedBase(parsedBase, fs)) { + if(isCompactedBase(parsedBase, fs, (HdfsDirSnapshot) null)) { return writeIdList.isValidBase(parsedBase.getWriteId()); } //if here, it's a result of IOW @@ -1605,7 +1621,11 @@ private static boolean isValidBase(HdfsDirSnapshot dirSnapshot, ParsedBase parse if (dirSnapshot.isValidBase() != null) { isValidBase = dirSnapshot.isValidBase(); } else { - isValidBase = isValidBase(parsedBase, writeIdList, fs); + if (isCompactedBase(parsedBase, fs, dirSnapshot)) { + isValidBase = writeIdList.isValidBase(parsedBase.getWriteId()); + } else { + isValidBase = writeIdList.isWriteIdValid(parsedBase.getWriteId()); + } dirSnapshot.setIsValidBase(isValidBase); } return isValidBase; @@ -1617,9 +1637,14 @@ private static boolean isValidBase(HdfsDirSnapshot dirSnapshot, ParsedBase parse * presence of {@link AcidUtils#VISIBILITY_PATTERN} suffix. Base directories written prior to * that, have to rely on the {@link MetaDataFile} in the directory. So look at the filename first * since that is the cheaper test.*/ - private static boolean isCompactedBase(ParsedBase parsedBase, FileSystem fs) throws IOException { - return parsedBase.getVisibilityTxnId() > 0 || MetaDataFile.isCompacted(parsedBase.getBaseDirPath(), fs); + private static boolean isCompactedBase(ParsedBase parsedBase, FileSystem fs, + Map snapshotMap) throws IOException { + return isCompactedBase(parsedBase, fs, snapshotMap != null ? snapshotMap.get(parsedBase.getBaseDirPath()) : null); + } + private static boolean isCompactedBase(ParsedBase parsedBase, FileSystem fs, + HdfsDirSnapshot snapshot) throws IOException { + return parsedBase.getVisibilityTxnId() > 0 || MetaDataFile.isCompacted(parsedBase.getBaseDirPath(), fs, snapshot); } private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId, @@ -1683,15 +1708,24 @@ else if (writeIdList.isWriteIdRangeValid( } } - private static void getChildState(Path candidateDirectory, List dirSnapshots, ValidWriteIdList writeIdList, - List working, List originalDirectories, List original, + private static void getChildState(Path candidateDirectory, Map dirSnapshots, + ValidWriteIdList writeIdList, List working, List originalDirectories, + List original, List obsolete, TxnBase bestBase, boolean ignoreEmptyFiles, List aborted, Map tblproperties, FileSystem fs, ValidTxnList validTxnList) throws IOException { - for (HdfsDirSnapshot dirSnapshot : dirSnapshots) { + for (HdfsDirSnapshot dirSnapshot : dirSnapshots.values()) { FileStatus fStat = dirSnapshot.getFileStatus(); Path dirPath = dirSnapshot.getPath(); String dirName = dirPath.getName(); - if (dirName.startsWith(BASE_PREFIX)) { + if (dirPath.equals(candidateDirectory)) { + // if the candidateDirectory is itself a delta directory, we need to add originals in that directory + // and return. This is the case when compaction thread calls getChildState. + for (FileStatus fileStatus : dirSnapshot.getFiles()) { + if (!ignoreEmptyFiles || fileStatus.getLen() != 0) { + original.add(createOriginalObj(null, fileStatus)); + } + } + } else if (dirName.startsWith(BASE_PREFIX)) { bestBase.dirSnapShot = dirSnapshot; ParsedBase parsedBase = ParsedBase.parseBase(dirPath); if (!isDirUsable(dirPath, parsedBase.getVisibilityTxnId(), aborted, validTxnList)) { @@ -1723,25 +1757,15 @@ private static void getChildState(Path candidateDirectory, List if (!isDirUsable(dirPath, delta.getVisibilityTxnId(), aborted, validTxnList)) { continue; } - if (ValidWriteIdList.RangeResponse.ALL == writeIdList.isWriteIdRangeAborted(delta.minWriteId, - delta.maxWriteId)) { + if (ValidWriteIdList.RangeResponse.ALL == writeIdList + .isWriteIdRangeAborted(delta.minWriteId, delta.maxWriteId)) { aborted.add(dirPath); - } else if (writeIdList.isWriteIdRangeValid(delta.minWriteId, - delta.maxWriteId) != ValidWriteIdList.RangeResponse.NONE) { - if (delta.isRawFormat) { - for (FileStatus stat : dirSnapshot.getFiles()) { - if ((!ignoreEmptyFiles) || (stat.getLen() != 0)) { - original.add(createOriginalObj(null, stat)); - } - } - } else { - working.add(delta); - } + } else if (writeIdList.isWriteIdRangeValid(delta.minWriteId, delta.maxWriteId) + != ValidWriteIdList.RangeResponse.NONE) { + working.add(delta); } } else { - if (!candidateDirectory.equals(dirPath)) { - originalDirectories.add(dirPath); - } + originalDirectories.add(dirPath); for (FileStatus stat : dirSnapshot.getFiles()) { if ((!ignoreEmptyFiles) || (stat.getLen() != 0)) { original.add(createOriginalObj(null, stat)); @@ -2365,14 +2389,17 @@ public static String getFullTableName(String dbName, String tableName) { String COMPACTED = "compacted"; } - static boolean isCompacted(Path baseOrDeltaDir, FileSystem fs) throws IOException { + static boolean isCompacted(Path baseOrDeltaDir, FileSystem fs, HdfsDirSnapshot dirSnapshot) throws IOException { /** * this file was written by Hive versions before 4.0 into a base_x/ dir * created by compactor so that it can be distinguished from the one * created by Insert Overwrite */ Path formatFile = new Path(baseOrDeltaDir, METADATA_FILE); - if(!fs.exists(formatFile)) { + if (dirSnapshot != null && !dirSnapshot.contains(formatFile)) { + return false; + } + if(dirSnapshot == null && !fs.exists(formatFile)) { return false; } try (FSDataInputStream strm = fs.open(formatFile)) { @@ -2450,14 +2477,18 @@ public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs, HdfsDirSna } else { //must be base_x - if(isCompactedBase(ParsedBase.parseBase(baseOrDeltaDir), fs)) { + if(isCompactedBase(ParsedBase.parseBase(baseOrDeltaDir), fs, dirSnapshot)) { return false; } } //if here, have to check the files - Path dataFile; + Path dataFile = null; if ((dirSnapshot != null) && (dirSnapshot.getFiles() != null) && (dirSnapshot.getFiles().size() > 0)) { - dataFile = dirSnapshot.getFiles().get(0).getPath(); + for (FileStatus fileStatus: dirSnapshot.getFiles()) { + if (originalBucketFilter.accept(fileStatus.getPath())) { + dataFile = fileStatus.getPath(); + } + } } else { dataFile = chooseFile(baseOrDeltaDir, fs); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 8b2890b496..76bf283115 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -1273,7 +1273,7 @@ private AcidDirInfo callInternal() throws IOException { } //todo: shouldn't ignoreEmptyFiles be set based on ExecutionEngine? AcidUtils.Directory dirInfo = AcidUtils.getAcidState( - fs, dir, context.conf, context.writeIdList, useFileIds, true, null, false); + fs, dir, context.conf, context.writeIdList, useFileIds, true, null, true); // find the base files (original or new style) List baseFiles = new ArrayList<>(); if (dirInfo.getBaseDirectory() == null) { diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java index 13c739d746..bb55d9fd79 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java @@ -123,13 +123,24 @@ private void loadDataUpdate(boolean isVectorized) throws Exception { checkResult(expected2, testQuery, isVectorized, "update"); runStatementOnDriver("insert into T values(2,2)"); + String[][] expectedInter2 = new String[][] { + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/000000_0"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000002_0000002_0000/bucket_00000"}, + {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000003_0000003_0000/bucket_00000"} + }; + checkResult(expectedInter2, testQuery, isVectorized, "insert"); runStatementOnDriver("delete from T where a = 3"); + String[][] expectedInter3 = new String[][] { + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000002_0000002_0000/bucket_00000"}, + {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000003_0000003_0000/bucket_00000"} + }; + checkResult(expectedInter3, testQuery, isVectorized, "delete"); //test minor compaction runStatementOnDriver("alter table T compact 'minor'"); TestTxnCommands2.runWorker(hiveConf); String[][] expected3 = new String[][] { - {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000001_0000004_v0000029/bucket_00000"}, - {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000001_0000004_v0000029/bucket_00000"} + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000001_0000004_v0000033/bucket_00000"}, + {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000001_0000004_v0000033/bucket_00000"} }; checkResult(expected3, testQuery, isVectorized, "delete compact minor"); @@ -141,6 +152,13 @@ private void loadDataUpdate(boolean isVectorized) throws Exception { //load same data again (additive) runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T"); + String[][] expectedInt1 = new String[][] { + {"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000005/000000_0"}, + {"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000005/000000_0"}, + {"{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000006_0000006_0000/000000_0"}, + {"{\"writeid\":6,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000006_0000006_0000/000000_0"} + }; + checkResult(expectedInt1, testQuery, isVectorized, "load data local inpath"); runStatementOnDriver("update T set b = 17 where a = 1");//matches 2 rows runStatementOnDriver("delete from T where a = 3");//matches 2 rows runStatementOnDriver("insert into T values(2,2)"); @@ -155,9 +173,9 @@ private void loadDataUpdate(boolean isVectorized) throws Exception { runStatementOnDriver("alter table T compact 'major'"); TestTxnCommands2.runWorker(hiveConf); String[][] expected6 = new String[][]{ - {"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/base_0000009_v0000042/bucket_00000"}, - {"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/base_0000009_v0000042/bucket_00000"}, - {"{\"writeid\":9,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000009_v0000042/bucket_00000"} + {"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/base_0000009_v0000048/bucket_00000"}, + {"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/base_0000009_v0000048/bucket_00000"}, + {"{\"writeid\":9,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000009_v0000048/bucket_00000"} }; checkResult(expected6, testQuery, isVectorized, "load data inpath compact major"); } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index e4f0529761..9e6d47ebc5 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.util.BitSet; import java.util.HashMap; import java.util.HashSet; @@ -265,6 +266,40 @@ public void testBaseDeltas() throws Exception { assertEquals(105, delt.getMaxWriteId()); } + @Test + public void testRecursiveDirListingIsReusedWhenSnapshotTrue() throws IOException { + Configuration conf = new Configuration(); + MockFileSystem fs = new MockFileSystem(conf, + new MockFile("mock:/tbl/part1/base_0/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/base_0/_orc_acid_version", 10, new byte[0])); + conf.set(ValidTxnList.VALID_TXNS_KEY, + new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); + AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf, + new ValidReaderWriteIdList(), null, false, null, true); + assertEquals("mock:/tbl/part1/base_0", dir.getBaseDirectory().toString()); + assertEquals(0, dir.getObsolete().size()); + assertEquals(0, dir.getOriginalFiles().size()); + assertEquals(0, dir.getCurrentDirectories().size()); + assertEquals(0, fs.getNumExistsCalls()); + } + + @Test + public void testRecursiveDirListingIsNotReusedWhenSnapshotFalse() throws IOException { + Configuration conf = new Configuration(); + MockFileSystem fs = new MockFileSystem(conf, + new MockFile("mock:/tbl/part1/base_0/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/base_0/_orc_acid_version", 10, new byte[0])); + conf.set(ValidTxnList.VALID_TXNS_KEY, + new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); + AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf, + new ValidReaderWriteIdList(), null, false, null, false); + assertEquals("mock:/tbl/part1/base_0", dir.getBaseDirectory().toString()); + assertEquals(0, dir.getObsolete().size()); + assertEquals(0, dir.getOriginalFiles().size()); + assertEquals(0, dir.getCurrentDirectories().size()); + assertEquals(2, fs.getNumExistsCalls()); + } + @Test public void testObsoleteOriginals() throws Exception { Configuration conf = new Configuration(); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 5e2a8cfba1..9d6fd35bbe 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -1211,11 +1211,18 @@ public String toString() { private static String blockedUgi = null; private final static List globalFiles = new ArrayList(); protected Statistics statistics; + private int numExistsCalls; public MockFileSystem() { // empty } + @Override + public boolean exists(Path f) throws IOException { + numExistsCalls++; + return super.exists(f); + } + @Override public void initialize(URI uri, Configuration conf) { setConf(conf); @@ -1572,6 +1579,10 @@ public String toString() { return buffer.toString(); } + public int getNumExistsCalls() { + return numExistsCalls; + } + public static void addGlobalFile(MockFile mockFile) { globalFiles.add(mockFile); } @@ -3521,7 +3532,7 @@ public void testACIDReaderNoFooterSerialize() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - assertEquals(10, readOpsDelta); + assertEquals(6, readOpsDelta); // revert back to local fs conf.set("fs.defaultFS", "file:///"); @@ -3600,7 +3611,7 @@ public void testACIDReaderFooterSerialize() throws Exception { // call-4: AcidUtils.getAcidState - split 2 => ls mock:/mocktable6 // call-5: read footer - split 2 => mock:/mocktable6/0_0 (to get offset since it's original file) // call-6: file stat - split 2 => mock:/mocktable6/0_0 - assertEquals(10, readOpsDelta); + assertEquals(6, readOpsDelta); // revert back to local fs conf.set("fs.defaultFS", "file:///"); @@ -3675,7 +3686,7 @@ public void testACIDReaderNoFooterSerializeWithDeltas() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - assertEquals(10, readOpsDelta); + assertEquals(7, readOpsDelta); // revert back to local fs conf.set("fs.defaultFS", "file:///"); @@ -3749,7 +3760,7 @@ public void testACIDReaderFooterSerializeWithDeltas() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - assertEquals(10, readOpsDelta); + assertEquals(7, readOpsDelta); // revert back to local fs conf.set("fs.defaultFS", "file:///");