diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 295fe7cbd0..53597a2d96 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.*; @@ -73,6 +74,7 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; +import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithIdAndPartialRecursiveListing; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.common.util.Ref; import org.apache.orc.FileFormatException; @@ -1194,12 +1196,13 @@ public static Directory getAcidState(FileSystem fileSystem, Path directory, Conf List originalDirectories = new ArrayList<>(); final List obsolete = new ArrayList<>(); final List abortedDirectories = new ArrayList<>(); - List childrenWithId = tryListLocatedHdfsStatus(useFileIds, fs, directory); + List childrenWithIdAndRecursiveListing = + tryListLocatedHdfsStatusRecursive(useFileIds, fs, directory); TxnBase bestBase = new TxnBase(); final List original = new ArrayList<>(); - if (childrenWithId != null) { - for (HdfsFileStatusWithId child : childrenWithId) { + if (childrenWithIdAndRecursiveListing != null) { + for (HdfsFileStatusWithIdAndPartialRecursiveListing child : childrenWithIdAndRecursiveListing) { getChildState(child.getFileStatus(), child, writeIdList, working, originalDirectories, original, obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList); } @@ -1353,16 +1356,17 @@ private static boolean isCompactedBase(ParsedBase parsedBase, FileSystem fs) thr MetaDataFile.isCompacted(parsedBase.getBaseDirPath(), fs); } - private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId, - ValidWriteIdList writeIdList, List working, List originalDirectories, - List original, List obsolete, TxnBase bestBase, - boolean ignoreEmptyFiles, List aborted, Map tblproperties, - FileSystem fs, ValidTxnList validTxnList) throws IOException { + + private static void getChildState(FileStatus child, + HdfsFileStatusWithIdAndPartialRecursiveListing childWithIdAndRecursiveListing, ValidWriteIdList writeIdList, + List working, List originalDirectories, List original, + List obsolete, TxnBase bestBase, boolean ignoreEmptyFiles, List aborted, + Map tblproperties, FileSystem fs, ValidTxnList validTxnList) throws IOException { Path p = child.getPath(); String fn = p.getName(); if (!child.isDirectory()) { if (!ignoreEmptyFiles || child.getLen() != 0) { - original.add(createOriginalObj(childWithId, child)); + original.add(createOriginalObj(childWithIdAndRecursiveListing, child)); } return; } @@ -1431,9 +1435,11 @@ private static boolean isDirUsable(Path child, long visibilityTxnId, } return true; } - public static HdfsFileStatusWithId createOriginalObj( - HdfsFileStatusWithId childWithId, FileStatus child) { - return childWithId != null ? childWithId : new HdfsFileStatusWithoutId(child); + + public static HdfsFileStatusWithIdAndPartialRecursiveListing createOriginalObj( + HdfsFileStatusWithIdAndPartialRecursiveListing childWithIdAndRecursiveListing, FileStatus child) { + return childWithIdAndRecursiveListing != null ? childWithIdAndRecursiveListing + : new HdfsFileStatusWithoutIdAndRecursiveListing(child); } private static class HdfsFileStatusWithoutId implements HdfsFileStatusWithId { @@ -1453,6 +1459,25 @@ public Long getFileId() { return null; } } + + private static class HdfsFileStatusWithoutIdAndRecursiveListing extends HdfsFileStatusWithoutId implements + HdfsFileStatusWithIdAndPartialRecursiveListing { + private List fileStatuses; + + public HdfsFileStatusWithoutIdAndRecursiveListing(LocatedFileStatus lfs, List fileStatuses) { + super(lfs); + this.fileStatuses = fileStatuses; + } + + public HdfsFileStatusWithoutIdAndRecursiveListing(FileStatus child) { + super(child); + } + + @Override + public List getFileStatuses() { + return fileStatuses; + } + } /** * Find the original files (non-ACID layout) recursively under the partition directory. @@ -1464,7 +1489,8 @@ public Long getFileId() { public static void findOriginals(FileSystem fs, Path dir, List original, Ref useFileIds, boolean ignoreEmptyFiles, boolean recursive) throws IOException { - List childrenWithId = tryListLocatedHdfsStatus(useFileIds, fs, dir); + List childrenWithId = tryListLocatedHdfsStatusRecursive(useFileIds, + fs, dir); if (childrenWithId != null) { for (HdfsFileStatusWithId child : childrenWithId) { if (child.getFileStatus().isDirectory()) { @@ -1494,13 +1520,13 @@ public static void findOriginals(FileSystem fs, Path dir, } } - private static List tryListLocatedHdfsStatus(Ref useFileIds, - FileSystem fs, Path directory) { + private static List tryListLocatedHdfsStatusRecursive( + Ref useFileIds, FileSystem fs, Path directory) { Boolean val = useFileIds.value; - List childrenWithId = null; + List childrenWithIdAndRecursiveListing = null; if (val == null || val) { try { - childrenWithId = SHIMS.listLocatedHdfsStatus(fs, directory, hiddenFileFilter); + childrenWithIdAndRecursiveListing = SHIMS.listPartialLocatedHdfsStatusRecursive(fs, directory, hiddenFileFilter); if (val == null) { useFileIds.value = true; } @@ -1511,7 +1537,7 @@ public static void findOriginals(FileSystem fs, Path dir, } } } - return childrenWithId; + return childrenWithIdAndRecursiveListing; } public static boolean isTablePropertyTransactional(Properties props) { diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 9a1e590f5d..986aac605b 100644 --- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -69,6 +69,7 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.ipc.CallerContext; @@ -741,7 +742,7 @@ public WebHCatJTShim getWebHCatShim(Configuration conf, UserGroupInformation ugi return new WebHCatJTShim23(conf, ugi);//this has state, so can't be cached } - private static final class HdfsFileStatusWithIdImpl implements HdfsFileStatusWithId { + private static class HdfsFileStatusWithIdImpl implements HdfsFileStatusWithId { private final LocatedFileStatus lfs; private final long fileId; @@ -763,7 +764,81 @@ public Long getFileId() { return fileId; } } + + private static class HdfsFileStatusWithIdAndPartialRecursiveListingImpl extends HdfsFileStatusWithIdImpl implements + HdfsFileStatusWithIdAndPartialRecursiveListing { + private final List fileStatuses; + + public HdfsFileStatusWithIdAndPartialRecursiveListingImpl(LocatedFileStatus lfs, long fileId, + List fileStatuses) { + super(lfs, fileId); + this.fileStatuses = fileStatuses; + } + @Override + public List getFileStatuses() { + return fileStatuses; + } + } + + @Override + public List listPartialLocatedHdfsStatusRecursive(FileSystem fs, + Path p, PathFilter filter) throws IOException { + DistributedFileSystem dfs = ensureDfs(fs); + DFSClient dfsc = dfs.getClient(); + final String src = p.toUri().getPath(); + DirectoryListing current = dfsc.listPaths(src, org.apache.hadoop.hdfs.protocol.HdfsFileStatus.EMPTY_NAME, true); + if (current == null) { // the directory does not exist + throw new FileNotFoundException("File " + p + " does not exist."); + } + final URI fsUri = fs.getUri(); + List result = + new ArrayList(current.getPartialListing().length); + while (current != null) { + org.apache.hadoop.hdfs.protocol.HdfsFileStatus[] hfss = current.getPartialListing(); + for (int i = 0; i < hfss.length; ++i) { + HdfsLocatedFileStatus next = (HdfsLocatedFileStatus) (hfss[i]); + if (filter != null) { + Path filterPath = next.getFullPath(p).makeQualified(fsUri, null); + if (!filter.accept(filterPath)) { + continue; + } else { + List recursiveListing = getPartialRecursiveListing(fs, filterPath, filter); + LocatedFileStatus lfs = next.makeQualifiedLocated(fsUri, p); + result.add(new HdfsFileStatusWithIdAndPartialRecursiveListingImpl(lfs, next.getFileId(), recursiveListing)); + for (FileStatus fStatus : recursiveListing) { + if (!fStatus.isDirectory()) { + return result; + } + } + } + } + } + current = current.hasMore() ? dfsc.listPaths(src, current.getLastName(), true) : null; + } + return result; + } + + private List getPartialRecursiveListing(FileSystem fs, Path directory, PathFilter filter) + throws FileNotFoundException, IOException { + final URI fsUri = fs.getUri(); + List fileStatuses = new ArrayList(); + RemoteIterator fileStatusListIterator = fs.listFiles(directory, true); + while (fileStatusListIterator.hasNext()) { + LocatedFileStatus fileStatus = fileStatusListIterator.next(); + Path filterPath = ((HdfsFileStatus) fileStatus).getFullPath(directory).makeQualified(fsUri, null); + if (!filter.accept(filterPath)) { + continue; + } + fileStatuses.add(fileStatus); + // If we find at least one file that passes the filter, return + if (!fileStatus.isDirectory()) { + return fileStatuses; + } + } + return fileStatuses; + } + @Override public List listLocatedHdfsStatus( FileSystem fs, Path p, PathFilter filter) throws IOException { diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index 49a2ab3616..1665400d1c 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -252,6 +252,9 @@ RecordReader getRecordReader(JobConf job, CombineFileSplit split, Reporter repor List listLocatedHdfsStatus( FileSystem fs, Path path, PathFilter filter) throws IOException; + + List listPartialLocatedHdfsStatusRecursive( + FileSystem fs, Path path, PathFilter filter) throws IOException; /** * For file status returned by listLocatedStatus, convert them into a list @@ -288,6 +291,14 @@ RecordReader getRecordReader(JobConf job, CombineFileSplit split, Reporter repor public FileStatus getFileStatus(); public Long getFileId(); } + + public interface HdfsFileStatusWithIdAndPartialRecursiveListing extends HdfsFileStatusWithId { + /** + * Partial list of recursive FileStatus within this directory: should contain atleast one file + * @return + */ + public List getFileStatuses(); + } public HCatHadoopShims getHCatShim(); public interface HCatHadoopShims {