diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java index d59cfe51e9..b3722018db 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java @@ -345,7 +345,7 @@ public void testCompactionWithSchemaEvolutionNoBucketsMultipleReducers() throws // Clean up executeStatementOnDriver("drop table " + tblName, driver); } - + private void runCompaction(String dbName, String tblName, CompactionType compactionType, String... partNames) throws Exception { HiveConf hiveConf = new HiveConf(conf); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 295fe7cbd0..e7b7a6d1b2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -27,10 +27,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.TreeMap; import java.util.regex.Pattern; import com.google.common.base.Strings; @@ -40,8 +42,10 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.common.*; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -493,6 +497,12 @@ public boolean isBaseInRawFormat() { public List getAbortedDirectories() { return abortedDirectories; } + + @Override + public String toString() { + return "Aborted Directories: " + abortedDirectories + "; isBaseInRawFormat: " + isBaseInRawFormat + "; original: " + + original + "; obsolete: " + obsolete + "; deltas: " + deltas + "; base: " + base; + } } //This is used for (full) Acid tables. InsertOnly use NOT_ACID @@ -1198,17 +1208,16 @@ public static Directory getAcidState(FileSystem fileSystem, Path directory, Conf TxnBase bestBase = new TxnBase(); final List original = new ArrayList<>(); + List dirSnapshots = null; if (childrenWithId != null) { for (HdfsFileStatusWithId child : childrenWithId) { getChildState(child.getFileStatus(), child, writeIdList, working, originalDirectories, original, obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList); } } else { - List children = HdfsUtils.listLocatedStatus(fs, directory, hiddenFileFilter); - for (FileStatus child : children) { - getChildState(child, null, writeIdList, working, originalDirectories, original, obsolete, - bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList); - } + dirSnapshots = getHdfsDirSnapshots(fs, directory, hiddenFileFilter); + getChildState(dirSnapshots, writeIdList, working, originalDirectories, original, obsolete, bestBase, + ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList); } // If we have a base, the original files are obsolete. @@ -1218,16 +1227,22 @@ public static Directory getAcidState(FileSystem fileSystem, Path directory, Conf obsolete.add(fswid.getFileStatus().getPath()); } // Add original directories to obsolete list if any - obsolete.addAll(originalDirectories); + if (dirSnapshots == null) { + obsolete.addAll(originalDirectories) + } // remove the entries so we don't get confused later and think we should // use them. original.clear(); originalDirectories.clear(); } else { - // Okay, we're going to need these originals. Recurse through them and figure out what we - // really need. - for (Path origDir : originalDirectories) { - findOriginals(fs, origDir, original, useFileIds, ignoreEmptyFiles, true); + // Okay, we're going to need these originals. + // Recurse through them and figure out what we really need. + // If we already have the original list, do nothing + // If dirSnapshots != null, we would have already populated "original" + if (dirSnapshots == null) { + for (Path origDir : originalDirectories) { + findOriginals(fs, origDir, original, useFileIds, ignoreEmptyFiles, true); + } } } @@ -1319,6 +1334,140 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId return new DirectoryImpl(abortedDirectories, isBaseInRawFormat, original, obsolete, deltas, base); } + + public static List getHdfsDirSnapshots(final FileSystem fs, final Path path, final PathFilter filter) + throws IOException { + try { + Map dirToSnapshots = new TreeMap(); + RemoteIterator itr = fs.listFiles(path, true); + while (itr.hasNext()) { + FileStatus fStatus = itr.next(); + if (filter == null || filter.accept(fStatus.getPath())) { + if (fStatus.isDirectory()) { + Path dirPath = fStatus.getPath(); + HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(dirPath); + if (dirSnapshot == null) { + dirSnapshot = new HdfsDirSnapshotImpl(dirPath, fStatus); + dirToSnapshots.put(dirPath, dirSnapshot); + } + } else { + Path parentDirPath = fStatus.getPath().getParent(); + FileStatus parentDirFStatus = fs.getFileStatus(parentDirPath); + HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(parentDirPath); + if (dirSnapshot == null) { + dirSnapshot = new HdfsDirSnapshotImpl(parentDirPath, parentDirFStatus); + dirToSnapshots.put(parentDirPath, dirSnapshot); + } + dirSnapshot.addFile(fStatus); + } + } + } + return new ArrayList(dirToSnapshots.values()); + } catch (IOException e) { + e.printStackTrace(); + throw new IOException(e); + } + } + + /** + * Recursive hdfs dir listing + * + */ + public static interface HdfsDirSnapshot { + public Path getPath(); + + // FileStatus of this HDFS directory + public FileStatus getFileStatus(); + + // Get the list of files if any within this directory + public List getFiles(); + + public void setFileStatus(FileStatus fStatus); + + public void addFile(FileStatus file); + + // File id or null + public Long getFileId(); + + public Boolean isRawFormat(); + + public void setIsRawFormat(boolean isRawFormat); + + } + + public static class HdfsDirSnapshotImpl implements HdfsDirSnapshot { + private Path path; + private FileStatus fStatus; + private List files = new ArrayList(); + private Long fileId = null; + private Boolean isRawFormat = null; + + public HdfsDirSnapshotImpl(Path path, FileStatus fStatus, List files) { + this.path = path; + this.fStatus = fStatus; + this.files = files; + } + + public HdfsDirSnapshotImpl(Path path, FileStatus fStatus) { + this.path = path; + this.fStatus = fStatus; + } + + @Override + public Path getPath() { + return path; + } + + @Override + public FileStatus getFileStatus() { + return fStatus; + } + + @Override + public void setFileStatus(FileStatus fStatus) { + this.fStatus = fStatus; + } + + @Override + public List getFiles() { + return files; + } + + @Override + public void addFile(FileStatus file) { + files.add(file); + } + + @Override + public Long getFileId() { + return null; + } + + @Override + public Boolean isRawFormat() { + return isRawFormat; + } + + @Override + public void setIsRawFormat(boolean isRawFormat) { + this.isRawFormat = isRawFormat; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Path: " + path); + sb.append("; "); + sb.append("Files: { "); + for (FileStatus fstatus : files) { + sb.append(fstatus); + sb.append(", "); + } + sb.append(" }"); + return sb.toString(); + } + } + /** * We can only use a 'base' if it doesn't have an open txn (from specific reader's point of view) * A 'base' with open txn in its range doesn't have 'enough history' info to produce a correct @@ -1353,6 +1502,7 @@ private static boolean isCompactedBase(ParsedBase parsedBase, FileSystem fs) thr MetaDataFile.isCompacted(parsedBase.getBaseDirPath(), fs); } + private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId, ValidWriteIdList writeIdList, List working, List originalDirectories, List original, List obsolete, TxnBase bestBase, @@ -1413,6 +1563,62 @@ else if (writeIdList.isWriteIdRangeValid( originalDirectories.add(child.getPath()); } } + + private static void getChildState(List dirSnapshots, ValidWriteIdList writeIdList, + List working, List originalDirectories, List original, + List obsolete, TxnBase bestBase, boolean ignoreEmptyFiles, List aborted, + Map tblproperties, FileSystem fs, ValidTxnList validTxnList) throws IOException { + for (HdfsDirSnapshot dirSnapshot : dirSnapshots) { + FileStatus fStat = dirSnapshot.getFileStatus(); + Path dirPath = dirSnapshot.getPath(); + String dirName = dirPath.getName(); + if (dirName.startsWith(BASE_PREFIX)) { + ParsedBase parsedBase = ParsedBase.parseBase(dirPath); + if (!isDirUsable(dirPath, parsedBase.getVisibilityTxnId(), aborted, validTxnList)) { + return; + } + final long writeId = parsedBase.getWriteId(); + if (bestBase.oldestBaseWriteId > writeId) { + //keep track for error reporting + bestBase.oldestBase = dirPath; + bestBase.oldestBaseWriteId = writeId; + } + if (bestBase.status == null) { + if (isValidBase(parsedBase, writeIdList, fs)) { + bestBase.status = fStat; + bestBase.writeId = writeId; + } + } else if (bestBase.writeId < writeId) { + if (isValidBase(parsedBase, writeIdList, fs)) { + obsolete.add(bestBase.status.getPath()); + bestBase.status = fStat; + bestBase.writeId = writeId; + } + } else { + obsolete.add(dirPath); + } + } else if (dirName.startsWith(DELTA_PREFIX) || dirName.startsWith(DELETE_DELTA_PREFIX)) { + String deltaPrefix = dirName.startsWith(DELTA_PREFIX) ? DELTA_PREFIX : DELETE_DELTA_PREFIX; + ParsedDelta delta = parseDelta(dirPath, deltaPrefix, fs); + if (!isDirUsable(dirPath, delta.getVisibilityTxnId(), aborted, validTxnList)) { + return; + } + if (ValidWriteIdList.RangeResponse.ALL == writeIdList.isWriteIdRangeAborted(delta.minWriteId, + delta.maxWriteId)) { + aborted.add(dirPath); + } else if (writeIdList.isWriteIdRangeValid(delta.minWriteId, + delta.maxWriteId) != ValidWriteIdList.RangeResponse.NONE) { + working.add(delta); + } + } else { + originalDirectories.add(dirPath); + for (FileStatus stat : dirSnapshot.getFiles()) { + original.add(createOriginalObj(null, stat)); + } + } + } + } + /** * checks {@code visibilityTxnId} to see if {@code child} is committed in current snapshot */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java index 9d5ba3d310..4de5c8cff4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java @@ -26,8 +26,9 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; - +import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index c5faec5e95..8506ecbbd4 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -18,14 +18,16 @@ package org.apache.hadoop.hive.ql.io; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.util.BitSet; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; import org.apache.hadoop.hive.common.ValidReadTxnList; @@ -246,13 +248,17 @@ public void testBaseDeltas() throws Exception { AcidUtils.getAcidState(new MockPath(fs, "mock:/tbl/part1"), conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":")); assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString()); - List obsolete = dir.getObsolete(); - assertEquals(5, obsolete.size()); - assertEquals("mock:/tbl/part1/base_10", obsolete.get(0).toString()); - assertEquals("mock:/tbl/part1/base_5", obsolete.get(1).toString()); - assertEquals("mock:/tbl/part1/delta_025_030", obsolete.get(2).toString()); - assertEquals("mock:/tbl/part1/delta_025_025", obsolete.get(3).toString()); - assertEquals("mock:/tbl/part1/delta_029_029", obsolete.get(4).toString()); + List obsoletes = dir.getObsolete(); + assertEquals(5, obsoletes.size()); + Set obsoletePathNames = new HashSet(); + for (Path obsolete : obsoletes) { + obsoletePathNames.add(obsolete.toString()); + } + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/base_5")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/base_10")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_025_030")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_025_025")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_029_029")); assertEquals(0, dir.getOriginalFiles().size()); List deltas = dir.getCurrentDirectories(); assertEquals(1, deltas.size()); @@ -276,9 +282,9 @@ public void testObsoleteOriginals() throws Exception { AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:150:" + Long.MAX_VALUE + ":")); // Obsolete list should include the two original bucket files, and the old base dir - List obsolete = dir.getObsolete(); - assertEquals(3, obsolete.size()); - assertEquals("mock:/tbl/part1/base_5", obsolete.get(0).toString()); + List obsoletes = dir.getObsolete(); + assertEquals(3, obsoletes.size()); + assertEquals("mock:/tbl/part1/base_5", obsoletes.get(0).toString()); assertEquals("mock:/tbl/part1/base_10", dir.getBaseDirectory().toString()); } @@ -451,15 +457,19 @@ public void testBaseWithDeleteDeltas() throws Exception { AcidUtils.getAcidState(new MockPath(fs, "mock:/tbl/part1"), conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":")); assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString()); - List obsolete = dir.getObsolete(); - assertEquals(7, obsolete.size()); - assertEquals("mock:/tbl/part1/base_10", obsolete.get(0).toString()); - assertEquals("mock:/tbl/part1/base_5", obsolete.get(1).toString()); - assertEquals("mock:/tbl/part1/delete_delta_025_030", obsolete.get(2).toString()); - assertEquals("mock:/tbl/part1/delta_025_030", obsolete.get(3).toString()); - assertEquals("mock:/tbl/part1/delta_025_025", obsolete.get(4).toString()); - assertEquals("mock:/tbl/part1/delete_delta_029_029", obsolete.get(5).toString()); - assertEquals("mock:/tbl/part1/delta_029_029", obsolete.get(6).toString()); + List obsoletes = dir.getObsolete(); + assertEquals(7, obsoletes.size()); + Set obsoletePathNames = new HashSet(); + for (Path obsolete : obsoletes) { + obsoletePathNames.add(obsolete.toString()); + } + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/base_5")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/base_10")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delete_delta_025_030")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_025_030")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_025_025")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delete_delta_029_029")); + assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_029_029")); assertEquals(0, dir.getOriginalFiles().size()); List deltas = dir.getCurrentDirectories(); assertEquals(2, deltas.size());