diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 81967b26c6..e9f48faedb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -1207,7 +1207,7 @@ public static Directory getAcidState(FileSystem fileSystem, Path candidateDirect TxnBase bestBase = new TxnBase(); final List original = new ArrayList<>(); - List dirSnapshots = null; + Map dirSnapshots = null; if (childrenWithId != null) { for (HdfsFileStatusWithId child : childrenWithId) { getChildState(child.getFileStatus(), child, writeIdList, working, originalDirectories, original, obsolete, @@ -1297,7 +1297,7 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId } if(bestBase.oldestBase != null && bestBase.status == null && - isCompactedBase(ParsedBase.parseBase(bestBase.oldestBase), fs)) { + isCompactedBase(ParsedBase.parseBase(bestBase.oldestBase), fs, dirSnapshots)) { /** * If here, it means there was a base_x (> 1 perhaps) but none were suitable for given * {@link writeIdList}. Note that 'original' files are logically a base_Long.MIN_VALUE and thus @@ -1321,7 +1321,7 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId boolean isBaseInRawFormat = false; if (bestBase.status != null) { base = bestBase.status.getPath(); - isBaseInRawFormat = MetaDataFile.isRawFormat(base, fs, null); + isBaseInRawFormat = MetaDataFile.isRawFormat(base, fs, dirSnapshots != null ? dirSnapshots.get(base) : null); if (isBaseInRawFormat && (bestBase.dirSnapShot != null)) { for (FileStatus stat : bestBase.dirSnapShot.getFiles()) { if ((!ignoreEmptyFiles) || (stat.getLen() != 0)) { @@ -1347,7 +1347,7 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId obsolete, deltas, base); } - public static List getHdfsDirSnapshots(final FileSystem fs, final Path path) throws IOException { + public static Map getHdfsDirSnapshots(final FileSystem fs, final Path path) throws IOException { try { Map dirToSnapshots = new HashMap(); RemoteIterator itr = fs.listFiles(path, true); @@ -1383,7 +1383,7 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId } } } - return new ArrayList(dirToSnapshots.values()); + return dirToSnapshots; } catch (IOException e) { e.printStackTrace(); throw new IOException(e); @@ -1435,6 +1435,8 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId public Boolean isCompactedBase(); public void setIsCompactedBase(boolean isCompactedBase); + + public boolean contains(Path path); } public static class HdfsDirSnapshotImpl implements HdfsDirSnapshot { @@ -1549,7 +1551,17 @@ public void addMetadataFile(FileStatus fStatus) { public FileStatus getMetadataFile(FileStatus fStatus) { return metadataFStatus; } - + + @Override + public boolean contains(Path path) { + for (FileStatus fileStatus: getFiles()) { + if (fileStatus.getPath().equals(path)) { + return true; + } + } + return false; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -1582,7 +1594,7 @@ private static boolean isValidBase(ParsedBase parsedBase, ValidWriteIdList write //By definition there are no open txns with id < 1. return true; } - if(isCompactedBase(parsedBase, fs)) { + if(isCompactedBase(parsedBase, fs, (HdfsDirSnapshot) null)) { return writeIdList.isValidBase(parsedBase.getWriteId()); } //if here, it's a result of IOW @@ -1595,7 +1607,11 @@ private static boolean isValidBase(HdfsDirSnapshot dirSnapshot, ParsedBase parse if (dirSnapshot.isValidBase() != null) { isValidBase = dirSnapshot.isValidBase(); } else { - isValidBase = isValidBase(parsedBase, writeIdList, fs); + if (isCompactedBase(parsedBase, fs, dirSnapshot)) { + isValidBase = writeIdList.isValidBase(parsedBase.getWriteId()); + } else { + isValidBase = writeIdList.isWriteIdValid(parsedBase.getWriteId()); + } dirSnapshot.setIsValidBase(isValidBase); } return isValidBase; @@ -1607,9 +1623,12 @@ private static boolean isValidBase(HdfsDirSnapshot dirSnapshot, ParsedBase parse * presence of {@link AcidUtils#VISIBILITY_PATTERN} suffix. Base directories written prior to * that, have to rely on the {@link MetaDataFile} in the directory. So look at the filename first * since that is the cheaper test.*/ - private static boolean isCompactedBase(ParsedBase parsedBase, FileSystem fs) throws IOException { - return parsedBase.getVisibilityTxnId() > 0 || MetaDataFile.isCompacted(parsedBase.getBaseDirPath(), fs); + private static boolean isCompactedBase(ParsedBase parsedBase, FileSystem fs, Map dirSnapshotMap) throws IOException { + return isCompactedBase(parsedBase, fs, dirSnapshotMap != null ? dirSnapshotMap.get(parsedBase.getBaseDirPath()) : null); + } + private static boolean isCompactedBase(ParsedBase parsedBase, FileSystem fs, HdfsDirSnapshot dirSnapshot) throws IOException { + return parsedBase.getVisibilityTxnId() > 0 || MetaDataFile.isCompacted(parsedBase.getBaseDirPath(), fs, dirSnapshot); } private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId, @@ -1673,15 +1692,20 @@ else if (writeIdList.isWriteIdRangeValid( } } - private static void getChildState(Path candidateDirectory, List dirSnapshots, ValidWriteIdList writeIdList, + private static void getChildState(Path candidateDirectory, Map dirSnapshots, ValidWriteIdList writeIdList, List working, List originalDirectories, List original, List obsolete, TxnBase bestBase, boolean ignoreEmptyFiles, List aborted, Map tblproperties, FileSystem fs, ValidTxnList validTxnList) throws IOException { - for (HdfsDirSnapshot dirSnapshot : dirSnapshots) { + for (HdfsDirSnapshot dirSnapshot : dirSnapshots.values()) { FileStatus fStat = dirSnapshot.getFileStatus(); Path dirPath = dirSnapshot.getPath(); String dirName = dirPath.getName(); - if (dirName.startsWith(BASE_PREFIX)) { + if (dirPath.equals(candidateDirectory)) { + for (FileStatus fileStatus: dirSnapshot.getFiles()) + if (!ignoreEmptyFiles || fileStatus.getLen() != 0) { + original.add(createOriginalObj(null, fileStatus)); + } + } else if (dirName.startsWith(BASE_PREFIX)) { bestBase.dirSnapShot = dirSnapshot; ParsedBase parsedBase = ParsedBase.parseBase(dirPath); if (!isDirUsable(dirPath, parsedBase.getVisibilityTxnId(), aborted, validTxnList)) { @@ -1718,23 +1742,15 @@ private static void getChildState(Path candidateDirectory, List aborted.add(dirPath); } else if (writeIdList.isWriteIdRangeValid(delta.minWriteId, delta.maxWriteId) != ValidWriteIdList.RangeResponse.NONE) { - if (delta.isRawFormat) { - for (FileStatus stat : dirSnapshot.getFiles()) { - if ((!ignoreEmptyFiles) || (stat.getLen() != 0)) { - original.add(createOriginalObj(null, stat)); - } - } - } else { - working.add(delta); - } + working.add(delta); } } else { if (!candidateDirectory.equals(dirPath)) { originalDirectories.add(dirPath); - } - for (FileStatus stat : dirSnapshot.getFiles()) { - if ((!ignoreEmptyFiles) || (stat.getLen() != 0)) { - original.add(createOriginalObj(null, stat)); + for (FileStatus stat : dirSnapshot.getFiles()) { + if ((!ignoreEmptyFiles) || (stat.getLen() != 0)) { + original.add(createOriginalObj(null, stat)); + } } } } @@ -2355,14 +2371,17 @@ public static String getFullTableName(String dbName, String tableName) { String COMPACTED = "compacted"; } - static boolean isCompacted(Path baseOrDeltaDir, FileSystem fs) throws IOException { + static boolean isCompacted(Path baseOrDeltaDir, FileSystem fs, HdfsDirSnapshot dirSnapshot) throws IOException { /** * this file was written by Hive versions before 4.0 into a base_x/ dir * created by compactor so that it can be distinguished from the one * created by Insert Overwrite */ Path formatFile = new Path(baseOrDeltaDir, METADATA_FILE); - if(!fs.exists(formatFile)) { + if (dirSnapshot != null && !dirSnapshot.contains(formatFile)) { + return false; + } + if(dirSnapshot == null && !fs.exists(formatFile)) { return false; } try (FSDataInputStream strm = fs.open(formatFile)) { @@ -2440,14 +2459,18 @@ public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs, HdfsDirSna } else { //must be base_x - if(isCompactedBase(ParsedBase.parseBase(baseOrDeltaDir), fs)) { + if(isCompactedBase(ParsedBase.parseBase(baseOrDeltaDir), fs, dirSnapshot)) { return false; } } //if here, have to check the files - Path dataFile; + Path dataFile = null; if ((dirSnapshot != null) && (dirSnapshot.getFiles() != null) && (dirSnapshot.getFiles().size() > 0)) { - dataFile = dirSnapshot.getFiles().get(0).getPath(); + for (FileStatus fileStatus: dirSnapshot.getFiles()) { + if (originalBucketFilter.accept(fileStatus.getPath())) { + dataFile = fileStatus.getPath(); + } + } } else { dataFile = chooseFile(baseOrDeltaDir, fs); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index e88b10a21c..0fdcfd00a3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -1259,7 +1259,7 @@ private AcidDirInfo callInternal() throws IOException { } //todo: shouldn't ignoreEmptyFiles be set based on ExecutionEngine? AcidUtils.Directory dirInfo = AcidUtils.getAcidState( - fs, dir, context.conf, context.writeIdList, useFileIds, true, null, false); + fs, dir, context.conf, context.writeIdList, useFileIds, true, null, true); // find the base files (original or new style) List baseFiles = new ArrayList<>(); if (dirInfo.getBaseDirectory() == null) { diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java index 3231a97009..a4d8e795a6 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java @@ -123,13 +123,24 @@ private void loadDataUpdate(boolean isVectorized) throws Exception { checkResult(expected2, testQuery, isVectorized, "update"); runStatementOnDriver("insert into T values(2,2)"); + String[][] expectedInter2 = new String[][] { + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/000000_0"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000002_0000002_0000/bucket_00000"}, + {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000003_0000003_0000/bucket_00000"} + }; + checkResult(expectedInter2, testQuery, isVectorized, "insert"); runStatementOnDriver("delete from T where a = 3"); + String[][] expectedInter3 = new String[][] { + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000002_0000002_0000/bucket_00000"}, + {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000003_0000003_0000/bucket_00000"} + }; + checkResult(expectedInter3, testQuery, isVectorized, "delete"); //test minor compaction runStatementOnDriver("alter table T compact 'minor'"); TestTxnCommands2.runWorker(hiveConf); String[][] expected3 = new String[][] { - {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000001_0000004_v0000029/bucket_00000"}, - {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000001_0000004_v0000029/bucket_00000"} + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000001_0000004_v0000033/bucket_00000"}, + {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000001_0000004_v0000033/bucket_00000"} }; checkResult(expected3, testQuery, isVectorized, "delete compact minor"); @@ -141,6 +152,13 @@ private void loadDataUpdate(boolean isVectorized) throws Exception { //load same data again (additive) runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T"); + String[][] expectedInt1 = new String[][] { + {"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000005/000000_0"}, + {"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000005/000000_0"}, + {"{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000006_0000006_0000/000000_0"}, + {"{\"writeid\":6,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000006_0000006_0000/000000_0"} + }; + checkResult(expectedInt1, testQuery, isVectorized, "load data local inpath"); runStatementOnDriver("update T set b = 17 where a = 1");//matches 2 rows runStatementOnDriver("delete from T where a = 3");//matches 2 rows runStatementOnDriver("insert into T values(2,2)"); @@ -155,9 +173,9 @@ private void loadDataUpdate(boolean isVectorized) throws Exception { runStatementOnDriver("alter table T compact 'major'"); TestTxnCommands2.runWorker(hiveConf); String[][] expected6 = new String[][]{ - {"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/base_0000009_v0000042/bucket_00000"}, - {"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/base_0000009_v0000042/bucket_00000"}, - {"{\"writeid\":9,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000009_v0000042/bucket_00000"} + {"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/base_0000009_v0000048/bucket_00000"}, + {"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/base_0000009_v0000048/bucket_00000"}, + {"{\"writeid\":9,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000009_v0000048/bucket_00000"} }; checkResult(expected6, testQuery, isVectorized, "load data inpath compact major"); } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index 8fb09b747d..1dd8a9c3f4 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -18,8 +18,10 @@ package org.apache.hadoop.hive.ql.io; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.util.BitSet; import java.util.HashMap; import java.util.HashSet; @@ -265,6 +267,40 @@ public void testBaseDeltas() throws Exception { assertEquals(105, delt.getMaxWriteId()); } + @Test + public void testRecursiveDirListingIsReused_whenSnapshotTrue() throws IOException { + Configuration conf = new Configuration(); + MockFileSystem fs = new MockFileSystem(conf, + new MockFile("mock:/tbl/part1/base_0/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/base_0/_orc_acid_version", 10, new byte[0])); + conf.set(ValidTxnList.VALID_TXNS_KEY, + new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); + AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf, + new ValidReaderWriteIdList(), null, false, null, true); + assertEquals("mock:/tbl/part1/base_0", dir.getBaseDirectory().toString()); + assertEquals(0, dir.getObsolete().size()); + assertEquals(0, dir.getOriginalFiles().size()); + assertEquals(0, dir.getCurrentDirectories().size()); + assertEquals(0, fs.getNumExistsCalls()); + } + + @Test + public void testRecursiveDirListingIsNotReused_whenSnapshotFalse() throws IOException { + Configuration conf = new Configuration(); + MockFileSystem fs = new MockFileSystem(conf, + new MockFile("mock:/tbl/part1/base_0/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/base_0/_orc_acid_version", 10, new byte[0])); + conf.set(ValidTxnList.VALID_TXNS_KEY, + new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); + AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf, + new ValidReaderWriteIdList(), null, false, null, false); + assertEquals("mock:/tbl/part1/base_0", dir.getBaseDirectory().toString()); + assertEquals(0, dir.getObsolete().size()); + assertEquals(0, dir.getOriginalFiles().size()); + assertEquals(0, dir.getCurrentDirectories().size()); + assertEquals(2, fs.getNumExistsCalls()); + } + @Test public void testObsoleteOriginals() throws Exception { Configuration conf = new Configuration(); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 51723f7812..9574f2fc23 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -1211,11 +1211,18 @@ public String toString() { private static String blockedUgi = null; private final static List globalFiles = new ArrayList(); protected Statistics statistics; + private int numExistsCalls; public MockFileSystem() { // empty } + @Override + public boolean exists(Path f) throws IOException { + numExistsCalls++; + return super.exists(f); + } + @Override public void initialize(URI uri, Configuration conf) { setConf(conf); @@ -1572,6 +1579,10 @@ public String toString() { return buffer.toString(); } + public int getNumExistsCalls() { + return numExistsCalls; + } + public static void addGlobalFile(MockFile mockFile) { globalFiles.add(mockFile); }