diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 295fe7cbd0..a34bb34022 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -72,6 +72,7 @@ import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.hive.shims.HadoopShims.HdfsDirSnapshot; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.common.util.Ref; @@ -447,13 +448,13 @@ else if (filename.startsWith(BUCKET_PREFIX)) { public static final class DirectoryImpl implements Directory { private final List abortedDirectories; private final boolean isBaseInRawFormat; - private final List original; + private final List original; private final List obsolete; private final List deltas; private final Path base; public DirectoryImpl(List abortedDirectories, - boolean isBaseInRawFormat, List original, + boolean isBaseInRawFormat, List original, List obsolete, List deltas, Path base) { this.abortedDirectories = abortedDirectories == null ? Collections.emptyList() : abortedDirectories; @@ -475,7 +476,7 @@ public boolean isBaseInRawFormat() { } @Override - public List getOriginalFiles() { + public List getOriginalFiles() { return original; } @@ -540,11 +541,11 @@ public static DataOperationType toDataOperationType(Operation op) { * treated as a base when split-update is enabled for acid). */ public static class AcidBaseFileInfo { - final private HdfsFileStatusWithId fileId; + final private HdfsDirSnapshot hdfsDirSnapshot; final private AcidBaseFileType acidBaseFileType; - public AcidBaseFileInfo(HdfsFileStatusWithId fileId, AcidBaseFileType acidBaseFileType) { - this.fileId = fileId; + public AcidBaseFileInfo(HdfsDirSnapshot hdfsDirSnapshot, AcidBaseFileType acidBaseFileType) { + this.hdfsDirSnapshot = hdfsDirSnapshot; this.acidBaseFileType = acidBaseFileType; } @@ -556,8 +557,8 @@ public boolean isAcidSchema() { return this.acidBaseFileType == AcidBaseFileType.ACID_SCHEMA; } - public HdfsFileStatusWithId getHdfsFileStatusWithId() { - return this.fileId; + public HdfsDirSnapshot getHdfsDirSnapshot() { + return this.hdfsDirSnapshot; } } @@ -748,7 +749,7 @@ public String toString() { * Get the list of original files. Not {@code null}. Must be sorted. * @return the list of original files (eg. 000000_0) */ - List getOriginalFiles(); + List getOriginalFiles(); /** * Get the list of base and delta directories that are valid and not @@ -1048,7 +1049,7 @@ public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix, FileSys boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX); if (filename.startsWith(deltaPrefix)) { //small optimization - delete delta can't be in raw format - boolean isRawFormat = !isDeleteDelta && MetaDataFile.isRawFormat(deltaDir, fs); + boolean isRawFormat = !isDeleteDelta && MetaDataFile.isRawFormat(deltaDir, fs, null); return parsedDelta(deltaDir, isRawFormat); } throw new IllegalArgumentException(deltaDir + " does not start with " + @@ -1130,6 +1131,7 @@ public static Directory getAcidState(Path directory, private long writeId = 0; private long oldestBaseWriteId = Long.MAX_VALUE; private Path oldestBase = null; + private List files = null; } /** @@ -1166,10 +1168,8 @@ public static Directory getAcidState(Path directory, Configuration conf, } public static Directory getAcidState(FileSystem fileSystem, Path directory, Configuration conf, - ValidWriteIdList writeIdList, - Ref useFileIds, - boolean ignoreEmptyFiles, - Map tblproperties) throws IOException { + ValidWriteIdList writeIdList, Ref useFileIds, boolean ignoreEmptyFiles, + Map tblproperties) throws IOException { ValidTxnList validTxnList = null; String s = conf.get(ValidTxnList.VALID_TXNS_KEY); if(!Strings.isNullOrEmpty(s)) { @@ -1194,14 +1194,15 @@ public static Directory getAcidState(FileSystem fileSystem, Path directory, Conf List originalDirectories = new ArrayList<>(); final List obsolete = new ArrayList<>(); final List abortedDirectories = new ArrayList<>(); - List childrenWithId = tryListLocatedHdfsStatus(useFileIds, fs, directory); + // List childrenWithId = tryListLocatedHdfsStatus(useFileIds, fs, directory); + List hdfsDirSnapshots = getHdfsDirSnapshots(useFileIds, fs, directory); TxnBase bestBase = new TxnBase(); - final List original = new ArrayList<>(); - if (childrenWithId != null) { - for (HdfsFileStatusWithId child : childrenWithId) { - getChildState(child.getFileStatus(), child, writeIdList, working, originalDirectories, original, - obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList); + final List original = new ArrayList<>(); + if (hdfsDirSnapshots != null) { + for (HdfsDirSnapshot hdfsDirSnapshot : hdfsDirSnapshots) { + getChildState(hdfsDirSnapshot.getFileStatus(), hdfsDirSnapshot, writeIdList, working, originalDirectories, + original, obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList); } } else { List children = HdfsUtils.listLocatedStatus(fs, directory, hiddenFileFilter); @@ -1214,8 +1215,8 @@ public static Directory getAcidState(FileSystem fileSystem, Path directory, Conf // If we have a base, the original files are obsolete. if (bestBase.status != null) { // Add original files to obsolete list if any - for (HdfsFileStatusWithId fswid : original) { - obsolete.add(fswid.getFileStatus().getPath()); + for (HdfsDirSnapshot hdfsDirSnapshot : original) { + obsolete.add(hdfsDirSnapshot.getFileStatus().getPath()); } // Add original directories to obsolete list if any obsolete.addAll(originalDirectories); @@ -1300,6 +1301,7 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId } final Path base = bestBase.status == null ? null : bestBase.status.getPath(); + List files = bestBase.files; LOG.debug("in directory " + directory.toUri().toString() + " base = " + base + " deltas = " + deltas.size()); /** @@ -1309,13 +1311,13 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId * {@link org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair} assigns * {@link RecordIdentifier#rowId} for read (that have happened) and compaction (yet to happen). */ - Collections.sort(original, (HdfsFileStatusWithId o1, HdfsFileStatusWithId o2) -> { + Collections.sort(original, (HdfsDirSnapshot o1, HdfsDirSnapshot o2) -> { //this does "Path.uri.compareTo(that.uri)" return o1.getFileStatus().compareTo(o2.getFileStatus()); }); // Note: isRawFormat is invalid for non-ORC tables. It will always return true, so we're good. - final boolean isBaseInRawFormat = base != null && MetaDataFile.isRawFormat(base, fs); + final boolean isBaseInRawFormat = base != null && MetaDataFile.isRawFormat(base, fs, files); return new DirectoryImpl(abortedDirectories, isBaseInRawFormat, original, obsolete, deltas, base); } @@ -1353,16 +1355,17 @@ private static boolean isCompactedBase(ParsedBase parsedBase, FileSystem fs) thr MetaDataFile.isCompacted(parsedBase.getBaseDirPath(), fs); } - private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId, + private static void getChildState(FileStatus child, HdfsDirSnapshot hdfsDirSnapshot, ValidWriteIdList writeIdList, List working, List originalDirectories, - List original, List obsolete, TxnBase bestBase, + List original, List obsolete, TxnBase bestBase, boolean ignoreEmptyFiles, List aborted, Map tblproperties, FileSystem fs, ValidTxnList validTxnList) throws IOException { + List files = hdfsDirSnapshot.getFiles(); Path p = child.getPath(); String fn = p.getName(); if (!child.isDirectory()) { if (!ignoreEmptyFiles || child.getLen() != 0) { - original.add(createOriginalObj(childWithId, child)); + original.add(createOriginalObj(hdfsDirSnapshot, child)); } return; } @@ -1376,17 +1379,26 @@ private static void getChildState(FileStatus child, HdfsFileStatusWithId childWi //keep track for error reporting bestBase.oldestBase = p; bestBase.oldestBaseWriteId = writeId; + if (files != null) { + bestBase.files = files; + } } if (bestBase.status == null) { if(isValidBase(parsedBase, writeIdList, fs)) { bestBase.status = child; bestBase.writeId = writeId; + if (files != null) { + bestBase.files = files; + } } } else if (bestBase.writeId < writeId) { if(isValidBase(parsedBase, writeIdList, fs)) { obsolete.add(bestBase.status.getPath()); bestBase.status = child; bestBase.writeId = writeId; + if (files != null) { + bestBase.files = files; + } } } else { obsolete.add(child.getPath()); @@ -1431,11 +1443,34 @@ private static boolean isDirUsable(Path child, long visibilityTxnId, } return true; } - public static HdfsFileStatusWithId createOriginalObj( - HdfsFileStatusWithId childWithId, FileStatus child) { - return childWithId != null ? childWithId : new HdfsFileStatusWithoutId(child); + + public static HdfsDirSnapshot createOriginalObj(HdfsDirSnapshot hdfsDirSnapshot, FileStatus child) { + return hdfsDirSnapshot != null ? hdfsDirSnapshot : new HdfsDirSnapshotWithoutId(child); } + private static class HdfsDirSnapshotWithoutId implements HdfsDirSnapshot { + private final FileStatus fs; + + public HdfsDirSnapshotWithoutId(FileStatus fs) { + this.fs = fs; + } + + @Override + public FileStatus getFileStatus() { + return fs; + } + + @Override + public Long getFileId() { + return null; + } + + @Override + public List getFiles() { + return null; + } + } + private static class HdfsFileStatusWithoutId implements HdfsFileStatusWithId { private final FileStatus fs; @@ -1461,20 +1496,26 @@ public Long getFileId() { * @param original the list of original files * @throws IOException */ - public static void findOriginals(FileSystem fs, Path dir, - List original, Ref useFileIds, + public static void findOriginals(FileSystem fs, Path dir, List original, Ref useFileIds, boolean ignoreEmptyFiles, boolean recursive) throws IOException { - List childrenWithId = tryListLocatedHdfsStatus(useFileIds, fs, dir); - if (childrenWithId != null) { - for (HdfsFileStatusWithId child : childrenWithId) { - if (child.getFileStatus().isDirectory()) { - if (recursive) { - findOriginals(fs, child.getFileStatus().getPath(), original, useFileIds, - ignoreEmptyFiles, true); + List hdfsDirSnapshots = getHdfsDirSnapshots(useFileIds, fs, dir); + if (hdfsDirSnapshots != null) { + for (HdfsDirSnapshot hdfsDirSnapshot : hdfsDirSnapshots) { + if (hdfsDirSnapshot.getFileStatus().isDirectory()) { + // If we already have file listing for this directory, use them + if (hdfsDirSnapshot.getFiles() != null) { + for (FileStatus file : hdfsDirSnapshot.getFiles()) { + original.add(createOriginalObj(null, file)); + } + } else { + if (recursive) { + findOriginals(fs, hdfsDirSnapshot.getFileStatus().getPath(), original, useFileIds, ignoreEmptyFiles, + true); + } } } else { - if(!ignoreEmptyFiles || child.getFileStatus().getLen() > 0) { - original.add(child); + if (!ignoreEmptyFiles || hdfsDirSnapshot.getFileStatus().getLen() > 0) { + original.add(hdfsDirSnapshot); } } } @@ -1486,7 +1527,7 @@ public static void findOriginals(FileSystem fs, Path dir, findOriginals(fs, child.getPath(), original, useFileIds, ignoreEmptyFiles, true); } } else { - if(!ignoreEmptyFiles || child.getLen() > 0) { + if (!ignoreEmptyFiles || child.getLen() > 0) { original.add(createOriginalObj(null, child)); } } @@ -1494,8 +1535,28 @@ public static void findOriginals(FileSystem fs, Path dir, } } - private static List tryListLocatedHdfsStatus(Ref useFileIds, - FileSystem fs, Path directory) { + private static List getHdfsDirSnapshots(Ref useFileIds, FileSystem fs, Path directory) { + Boolean val = useFileIds.value; + List hdfsDirSnapshots = null; + if (val == null || val) { + try { + hdfsDirSnapshots = SHIMS.getHdfsDirSnapshots(fs, directory, hiddenFileFilter); + if (val == null) { + useFileIds.value = true; + } + } catch (Throwable t) { + LOG.error("Failed to get files with ID; using regular API: " + t.getMessage()); + if (val == null && t instanceof UnsupportedOperationException) { + useFileIds.value = false; + } + } + } + return hdfsDirSnapshots; + } + + + private static List tryListLocatedHdfsStatus(Ref useFileIds, FileSystem fs, + Path directory) { Boolean val = useFileIds.value; List childrenWithId = null; if (val == null || val) { @@ -2061,12 +2122,20 @@ static boolean isCompacted(Path baseOrDeltaDir, FileSystem fs) throws IOExceptio * Chooses 1 representative file from {@code baseOrDeltaDir} * This assumes that all files in the dir are of the same type: either written by an acid * write or Load Data. This should always be the case for an Acid table. + * @param files: file listing in this base or delta directory (could be null) */ - private static Path chooseFile(Path baseOrDeltaDir, FileSystem fs) throws IOException { + private static Path chooseFile(Path baseOrDeltaDir, FileSystem fs, List files) throws IOException { if(!(baseOrDeltaDir.getName().startsWith(BASE_PREFIX) || baseOrDeltaDir.getName().startsWith(DELTA_PREFIX))) { throw new IllegalArgumentException(baseOrDeltaDir + " is not a base/delta"); } + if (files != null) { + for (FileStatus file : files) { + if (file.isFile()) { + return file.getPath(); + } + } + } FileStatus[] dataFiles = fs.listStatus(new Path[] {baseOrDeltaDir}, originalBucketFilter); return dataFiles != null && dataFiles.length > 0 ? dataFiles[0].getPath() : null; } @@ -2076,9 +2145,10 @@ private static Path chooseFile(Path baseOrDeltaDir, FileSystem fs) throws IOExce * and thus do not have ROW_IDs embedded in the data. * This is only meaningful for full CRUD tables - Insert-only tables have all their data * in raw format by definition. - * @param baseOrDeltaDir base or delta file. + * @param baseOrDeltaDir base or delta directory. + * @param files: file listing in this base or delta directory (could be null) */ - public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs) throws IOException { + public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs, List files) throws IOException { //todo: this could be optimized - for full CRUD table only base_x and delta_x_x could have // files in raw format delta_x_y (x != y) whether from streaming ingested or compaction // must be native Acid format by definition @@ -2099,13 +2169,14 @@ public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs) throws IOE } } //if here, have to check the files - Path dataFile = chooseFile(baseOrDeltaDir, fs); + Path dataFile = chooseFile(baseOrDeltaDir, fs, files); if (dataFile == null) { //directory is empty or doesn't have any that could have been produced by load data return false; } return isRawFormatFile(dataFile, fs); } + public static boolean isRawFormatFile(Path dataFile, FileSystem fs) throws IOException { try { Reader reader = OrcFile.createReader(dataFile, OrcFile.readerOptions(fs.getConf())); @@ -2236,8 +2307,8 @@ public static int getAcidVersionFromMetaFile(Path deltaOrBaseDir, FileSystem fs) if (fs == null) { fs = dir.getFileSystem(jc); } - for (HdfsFileStatusWithId hfs : acidInfo.getOriginalFiles()) { - fileList.add(hfs.getFileStatus()); + for (HdfsDirSnapshot hdfsDirSnapshot : acidInfo.getOriginalFiles()) { + fileList.add(hdfsDirSnapshot.getFileStatus()); } for (ParsedDelta delta : acidInfo.getCurrentDirectories()) { for (FileStatus f : HiveStatsUtils.getFileStatusRecurse(delta.getPath(), -1, fs)) { @@ -2266,8 +2337,8 @@ public static int getAcidVersionFromMetaFile(Path deltaOrBaseDir, FileSystem fs) ValidWriteIdList validWriteIdList = new ValidReaderWriteIdList(validWriteIdStr); Directory acidInfo = AcidUtils.getAcidState(dataPath, conf, validWriteIdList); - for (HdfsFileStatusWithId hfs : acidInfo.getOriginalFiles()) { - pathList.add(hfs.getFileStatus().getPath()); + for (HdfsDirSnapshot hdfsDirSnapshot : acidInfo.getOriginalFiles()) { + pathList.add(hdfsDirSnapshot.getFileStatus().getPath()); } for (ParsedDelta delta : acidInfo.getCurrentDirectories()) { pathList.add(delta.getPath()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ExternalCache.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ExternalCache.java index bb75ebf983..0b8fd356b8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ExternalCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ExternalCache.java @@ -42,7 +42,7 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; +import org.apache.hadoop.hive.shims.HadoopShims.HdfsDirSnapshot; import org.apache.orc.impl.OrcTail; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,7 +104,7 @@ public void configure(HiveConf queryConfig) { } @Override - public void getAndValidate(List files, boolean isOriginal, + public void getAndValidate(List files, boolean isOriginal, OrcTail[] result, ByteBuffer[] ppdResult) throws IOException, HiveException { assert result.length == files.size(); assert ppdResult == null || ppdResult.length == files.size(); @@ -152,7 +152,7 @@ public void getAndValidate(List files, boolean isOriginal, } private int getAndVerifyIndex(HashMap posMap, - List files, OrcTail[] result, Long fileId) { + List files, OrcTail[] result, Long fileId) { int ix = posMap.get(fileId); assert result[ix] == null; assert fileId != null && fileId.equals(files.get(ix).getFileId()); @@ -160,7 +160,7 @@ private int getAndVerifyIndex(HashMap posMap, } private boolean processBbResult( - ByteBuffer bb, int ix, HdfsFileStatusWithId file, OrcTail[] result) throws IOException { + ByteBuffer bb, int ix, HdfsDirSnapshot file, OrcTail[] result) throws IOException { if (bb == null) { return true; } @@ -173,7 +173,7 @@ private boolean processBbResult( return true; } - private void processPpdResult(MetadataPpdResult mpr, HdfsFileStatusWithId file, + private void processPpdResult(MetadataPpdResult mpr, HdfsDirSnapshot file, int ix, OrcTail[] result, ByteBuffer[] ppdResult) throws IOException { if (mpr == null) { @@ -190,12 +190,12 @@ private void processPpdResult(MetadataPpdResult mpr, HdfsFileStatusWithId file, } private List determineFileIdsToQuery( - List files, OrcTail[] result, HashMap posMap) { + List files, OrcTail[] result, HashMap posMap) { for (int i = 0; i < result.length; ++i) { if (result[i] != null) { continue; } - HdfsFileStatusWithId file = files.get(i); + HdfsDirSnapshot file = files.get(i); final FileStatus fs = file.getFileStatus(); Long fileId = file.getFileId(); if (fileId == null) { @@ -214,9 +214,9 @@ private void processPpdResult(MetadataPpdResult mpr, HdfsFileStatusWithId file, return Lists.newArrayList(posMap.keySet()); } - private Long generateTestFileId(final FileStatus fs, List files, int i) { + private Long generateTestFileId(final FileStatus fs, List files, int i) { final Long fileId = HdfsUtils.createTestFileId(fs.getPath().toUri().getPath(), fs, false, null); - files.set(i, new HdfsFileStatusWithId() { + files.set(i, new HdfsDirSnapshot() { @Override public FileStatus getFileStatus() { return fs; @@ -226,6 +226,11 @@ public FileStatus getFileStatus() { public Long getFileId() { return fileId; } + + @Override + public List getFiles() { + return null; + } }); return fileId; } @@ -302,7 +307,7 @@ public static void translateSargToTableColIndexes( } private static OrcTail createOrcTailFromMs( - HdfsFileStatusWithId file, ByteBuffer bb) throws IOException { + HdfsDirSnapshot file, ByteBuffer bb) throws IOException { if (bb == null) { return null; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LocalCache.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LocalCache.java index 4ca328d92f..690fccb51a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LocalCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LocalCache.java @@ -87,16 +87,16 @@ public void put(Path path, OrcTail tail) { } @Override - public void getAndValidate(final List files, + public void getAndValidate(final List files, final boolean isOriginal, final OrcTail[] result, final ByteBuffer[] ppdResult) throws IOException, HiveException { // TODO: should local cache also be by fileId? Preserve the original logic for now. assert result.length == files.size(); int i = -1; - for (HadoopShims.HdfsFileStatusWithId fileWithId : files) { + for (HadoopShims.HdfsDirSnapshot hdfsDirSnapshot : files) { ++i; - FileStatus file = fileWithId.getFileStatus(); + FileStatus file = hdfsDirSnapshot.getFileStatus(); Path path = file.getPath(); TailAndFileData tfd = cache.getIfPresent(path); if (LOG.isDebugEnabled()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 3878bba4d3..2a960f06de 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -50,10 +50,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hive.common.BlobStorageUtils; -import org.apache.hadoop.hive.common.NoDynamicValuesException; import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; @@ -108,7 +104,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; import org.apache.hadoop.hive.shims.HadoopShims; -import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; +import org.apache.hadoop.hive.shims.HadoopShims.HdfsDirSnapshot; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; @@ -826,7 +822,7 @@ boolean isEmpty() { static final class SplitInfo extends ACIDSplitStrategy { private final Context context; private final FileSystem fs; - private final HdfsFileStatusWithId fileWithId; + private final HdfsDirSnapshot hdfsDirSnapshot; private final OrcTail orcTail; private final List readerTypes; private final boolean isOriginal; @@ -834,13 +830,13 @@ boolean isEmpty() { private final boolean hasBase; private final ByteBuffer ppdResult; - SplitInfo(Context context, FileSystem fs, HdfsFileStatusWithId fileWithId, OrcTail orcTail, + SplitInfo(Context context, FileSystem fs, HdfsDirSnapshot hdfsDirSnapshot, OrcTail orcTail, List readerTypes, boolean isOriginal, List deltas, boolean hasBase, Path dir, boolean[] covered, ByteBuffer ppdResult) throws IOException { super(dir, context.numBuckets, deltas, covered, context.acidOperationalProperties); this.context = context; this.fs = fs; - this.fileWithId = fileWithId; + this.hdfsDirSnapshot = hdfsDirSnapshot; this.orcTail = orcTail; this.readerTypes = readerTypes; this.isOriginal = isOriginal; @@ -878,7 +874,7 @@ public ETLDir(Path dir, FileSystem fs, int fileCount) { Context context; final List dirs; - List files; + List files; private final List deltas; private final boolean[] covered; final boolean isOriginal; @@ -891,7 +887,7 @@ public ETLDir(Path dir, FileSystem fs, int fileCount) { private final boolean isDefaultFs; public ETLSplitStrategy(Context context, FileSystem fs, Path dir, - List children, List readerTypes, boolean isOriginal, + List children, List readerTypes, boolean isOriginal, List deltas, boolean[] covered, UserGroupInformation ugi, boolean allowSyntheticFileIds, boolean isDefaultFs) { assert !children.isEmpty(); @@ -933,7 +929,7 @@ public ETLSplitStrategy(Context context, FileSystem fs, Path dir, } OrcTail orcTail = orcTails[i]; ByteBuffer ppdResult = ppdResults == null ? null : ppdResults[i]; - HdfsFileStatusWithId file = files.get(i); + HdfsDirSnapshot file = files.get(i); if (orcTail != null) { // Cached copy is valid context.cacheHitCounter.incrementAndGet(); @@ -947,7 +943,7 @@ public ETLSplitStrategy(Context context, FileSystem fs, Path dir, } else { int dirIx = -1, fileInDirIx = -1, filesInDirCount = 0; ETLDir dir = null; - for (HdfsFileStatusWithId file : files) { + for (HdfsDirSnapshot file : files) { if ((++fileInDirIx) == filesInDirCount) { dir = dirs.get(++dirIx); filesInDirCount = dir.fileCount; @@ -987,7 +983,7 @@ public String toString() { } public CombineResult combineWith(FileSystem fs, Path dir, - List otherFiles, boolean isOriginal) { + List otherFiles, boolean isOriginal) { if ((files.size() + otherFiles.size()) > ETL_COMBINE_FILE_LIMIT || this.isOriginal != isOriginal) {//todo: what is this checking???? return (files.size() > otherFiles.size()) @@ -1071,7 +1067,7 @@ private void runGetSplitsSync(List>> splitFutures, * as opposed to query execution (split generation does not read or cache file footers). */ static final class BISplitStrategy extends ACIDSplitStrategy { - private final List fileStatuses; + private final List hdfsDirSnapshots; private final boolean isOriginal; private final List deltas; private final FileSystem fs; @@ -1086,10 +1082,10 @@ private void runGetSplitsSync(List>> splitFutures, * @param dir - root of partition dir */ public BISplitStrategy(Context context, FileSystem fs, Path dir, - List fileStatuses, boolean isOriginal, List deltas, + List hdfsDirSnapshots, boolean isOriginal, List deltas, boolean[] covered, boolean allowSyntheticFileIds, boolean isDefaultFs) { super(dir, context.numBuckets, deltas, covered, context.acidOperationalProperties); - this.fileStatuses = fileStatuses; + this.hdfsDirSnapshots = hdfsDirSnapshots; this.isOriginal = isOriginal; this.deltas = deltas; this.fs = fs; @@ -1105,16 +1101,16 @@ public BISplitStrategy(Context context, FileSystem fs, Path dir, public List getSplits() throws IOException { List splits = Lists.newArrayList(); OrcSplit.OffsetAndBucketProperty offsetAndBucket = null; - for (HdfsFileStatusWithId file : fileStatuses) { + for (HdfsDirSnapshot hdfsDirSnapshot : hdfsDirSnapshots) { if (isOriginal && isAcid && vectorMode) { - offsetAndBucket = VectorizedOrcAcidRowBatchReader.computeOffsetAndBucket(file.getFileStatus(), dir, + offsetAndBucket = VectorizedOrcAcidRowBatchReader.computeOffsetAndBucket(hdfsDirSnapshot.getFileStatus(), dir, isOriginal, !deltas.isEmpty(), conf); } - FileStatus fileStatus = file.getFileStatus(); + FileStatus fileStatus = hdfsDirSnapshot.getFileStatus(); long logicalLen = AcidUtils.getLogicalLength(fs, fileStatus); if (logicalLen != 0) { - Object fileKey = isDefaultFs ? file.getFileId() : null; + Object fileKey = isDefaultFs ? hdfsDirSnapshot.getFileId() : null; if (fileKey == null && allowSyntheticFileIds) { fileKey = new SyntheticFileId(fileStatus); } @@ -1262,11 +1258,11 @@ private AcidDirInfo callInternal() throws IOException { // the originals could still be handled by AcidUtils like a regular non-txn table. boolean isRecursive = context.conf.getBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, context.conf.getBoolean("mapred.input.dir.recursive", false)); - List originals = new ArrayList<>(); + List originals = new ArrayList<>(); List baseFiles = new ArrayList<>(); AcidUtils.findOriginals(fs, dir, originals, useFileIds, true, isRecursive); - for (HdfsFileStatusWithId fileId : originals) { - baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.ORIGINAL_BASE)); + for (HdfsDirSnapshot file : originals) { + baseFiles.add(new AcidBaseFileInfo(file, AcidUtils.AcidBaseFileType.ORIGINAL_BASE)); } return new AcidDirInfo(fs, dir, new AcidUtils.DirectoryImpl(Lists.newArrayList(), true, originals, Lists.newArrayList(), Lists.newArrayList(), null), baseFiles, new ArrayList<>()); @@ -1278,13 +1274,13 @@ private AcidDirInfo callInternal() throws IOException { List baseFiles = new ArrayList<>(); if (dirInfo.getBaseDirectory() == null) { // For non-acid tables (or paths), all data files are in getOriginalFiles() list - for (HdfsFileStatusWithId fileId : dirInfo.getOriginalFiles()) { - baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.ORIGINAL_BASE)); + for (HdfsDirSnapshot hdfsDirSnapshot : dirInfo.getOriginalFiles()) { + baseFiles.add(new AcidBaseFileInfo(hdfsDirSnapshot, AcidUtils.AcidBaseFileType.ORIGINAL_BASE)); } } else { - List compactedBaseFiles = findBaseFiles(dirInfo.getBaseDirectory(), useFileIds); - for (HdfsFileStatusWithId fileId : compactedBaseFiles) { - baseFiles.add(new AcidBaseFileInfo(fileId, dirInfo.isBaseInRawFormat() ? + List compactedBaseFiles = findBaseFiles(dirInfo.getBaseDirectory(), useFileIds); + for (HdfsDirSnapshot hdfsDirSnapshot : compactedBaseFiles) { + baseFiles.add(new AcidBaseFileInfo(hdfsDirSnapshot, dirInfo.isBaseInRawFormat() ? AcidUtils.AcidBaseFileType.ORIGINAL_BASE : AcidUtils.AcidBaseFileType.ACID_SCHEMA)); } } @@ -1322,10 +1318,10 @@ private AcidDirInfo callInternal() throws IOException { Boolean val = useFileIds.value; if (val == null || val) { try { - List insertDeltaFiles = - SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), bucketFilter); - for (HdfsFileStatusWithId fileId : insertDeltaFiles) { - baseFiles.add(new AcidBaseFileInfo(fileId, deltaType)); + List insertDeltaFiles = + SHIMS.getHdfsDirSnapshots(fs, parsedDelta.getPath(), bucketFilter); + for (HdfsDirSnapshot hdfsDirSnapshot : insertDeltaFiles) { + baseFiles.add(new AcidBaseFileInfo(hdfsDirSnapshot, deltaType)); } if (val == null) { useFileIds.value = true; // The call succeeded, so presumably the API is there. @@ -1341,8 +1337,8 @@ private AcidDirInfo callInternal() throws IOException { // Fall back to regular API and create statuses without ID. List children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), bucketFilter); for (FileStatus child : children) { - HdfsFileStatusWithId fileId = AcidUtils.createOriginalObj(null, child); - baseFiles.add(new AcidBaseFileInfo(fileId, deltaType)); + HdfsDirSnapshot hdfsDirSnapshot = AcidUtils.createOriginalObj(null, child); + baseFiles.add(new AcidBaseFileInfo(hdfsDirSnapshot, deltaType)); } } } @@ -1361,13 +1357,12 @@ private AcidDirInfo callInternal() throws IOException { return new AcidDirInfo(fs, dir, dirInfo, baseFiles, parsedDeltas); } - private List findBaseFiles( + private List findBaseFiles( Path base, Ref useFileIds) throws IOException { Boolean val = useFileIds.value; if (val == null || val) { try { - List result = SHIMS.listLocatedHdfsStatus( - fs, base, AcidUtils.hiddenFileFilter); + List result = SHIMS.getHdfsDirSnapshots(fs, base, AcidUtils.hiddenFileFilter); if (val == null) { useFileIds.value = true; // The call succeeded, so presumably the API is there. } @@ -1382,7 +1377,7 @@ private AcidDirInfo callInternal() throws IOException { // Fall back to regular API and create states without ID. List children = HdfsUtils.listLocatedStatus(fs, base, AcidUtils.hiddenFileFilter); - List result = new ArrayList<>(children.size()); + List result = new ArrayList<>(children.size()); for (FileStatus child : children) { result.add(AcidUtils.createOriginalObj(null, child)); } @@ -1427,8 +1422,8 @@ public SplitGenerator(SplitInfo splitInfo, UserGroupInformation ugi, this.ugi = ugi; this.context = splitInfo.context; this.fs = splitInfo.fs; - this.file = splitInfo.fileWithId.getFileStatus(); - this.fsFileId = isDefaultFs ? splitInfo.fileWithId.getFileId() : null; + this.file = splitInfo.hdfsDirSnapshot.getFileStatus(); + this.fsFileId = isDefaultFs ? splitInfo.hdfsDirSnapshot.getFileId() : null; this.blockSize = this.file.getBlockSize(); this.orcTail = splitInfo.orcTail; this.readerTypes = splitInfo.readerTypes; @@ -1963,7 +1958,7 @@ private static void scheduleSplits(ETLSplitStrategy splitStrategy, Context conte } private static SplitStrategy combineOrCreateETLStrategy(CombinedCtx combinedCtx, - Context context, FileSystem fs, Path dir, List files, + Context context, FileSystem fs, Path dir, List files, List deltas, boolean[] covered, List readerTypes, boolean isOriginal, UserGroupInformation ugi, boolean allowSyntheticFileIds, boolean isDefaultFs) { @@ -2356,15 +2351,15 @@ private static boolean isStripeSatisfyPredicate( return splitStrategies; } - List acidSchemaFiles = new ArrayList<>(); - List originalSchemaFiles = new ArrayList(); + List acidSchemaFiles = new ArrayList<>(); + List originalSchemaFiles = new ArrayList(); // Separate the base files into acid schema and non-acid(original) schema files. for (AcidBaseFileInfo acidBaseFileInfo : baseFiles) { if (acidBaseFileInfo.isOriginal()) { - originalSchemaFiles.add(acidBaseFileInfo.getHdfsFileStatusWithId()); + originalSchemaFiles.add(acidBaseFileInfo.getHdfsDirSnapshot()); } else { assert acidBaseFileInfo.isAcidSchema(); - acidSchemaFiles.add(acidBaseFileInfo.getHdfsFileStatusWithId()); + acidSchemaFiles.add(acidBaseFileInfo.getHdfsDirSnapshot()); } } @@ -2391,7 +2386,7 @@ private static boolean isStripeSatisfyPredicate( private static SplitStrategy determineSplitStrategy(CombinedCtx combinedCtx, Context context, FileSystem fs, Path dir, - List baseFiles, + List baseFiles, boolean isOriginal, List parsedDeltas, List readerTypes, @@ -2402,7 +2397,7 @@ private static boolean isStripeSatisfyPredicate( // if we have a base to work from if (!baseFiles.isEmpty()) { long totalFileSize = 0; - for (HdfsFileStatusWithId child : baseFiles) { + for (HdfsDirSnapshot child : baseFiles) { totalFileSize += child.getFileStatus().getLen(); int b = AcidUtils.parseBucketId(child.getFileStatus().getPath()); // If the bucket is in the valid range, mark it as covered. @@ -2460,7 +2455,7 @@ private static boolean isStripeSatisfyPredicate( //it may also be null if there is no base - only deltas mergerOptions.baseDir(baseDirectory); if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) { - isOriginal = AcidUtils.MetaDataFile.isRawFormat(baseDirectory, baseDirectory.getFileSystem(conf)); + isOriginal = AcidUtils.MetaDataFile.isRawFormat(baseDirectory, baseDirectory.getFileSystem(conf), null); mergerOptions.rootPath(baseDirectory.getParent()); } else { isOriginal = true; @@ -2482,7 +2477,7 @@ private static boolean isStripeSatisfyPredicate( public interface FooterCache { ByteBuffer NO_SPLIT_AFTER_PPD = ByteBuffer.wrap(new byte[0]); - void getAndValidate(List files, boolean isOriginal, + void getAndValidate(List files, boolean isOriginal, OrcTail[] result, ByteBuffer[] ppdResult) throws IOException, HiveException; boolean hasPpd(); boolean isBlocking(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index b1ede0556f..6c395dcb6e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -465,7 +465,7 @@ static int encodeBucketId(Configuration conf, int bucketId, int statementId) { AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(), conf, validWriteIdList, false, true); - for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { + for (HadoopShims.HdfsDirSnapshot f : directoryState.getOriginalFiles()) { int bucketIdFromPath = AcidUtils.parseBucketId(f.getFileStatus().getPath()); if (bucketIdFromPath != bucketId) { continue;//todo: HIVE-16952 @@ -554,7 +554,7 @@ public void next(OrcStruct next) throws IOException { * See {@link AcidUtils.Directory#getOriginalFiles()}. This list has a fixed sort order. * It includes all original files (for all buckets). */ - private final List originalFiles; + private final List originalFiles; /** * index into {@link #originalFiles} */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 15f1f945ce..4e6bb36745 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -724,7 +724,7 @@ dropped by the Reader (I guess because of orc.impl.SchemaEvolution) .statementId(syntheticTxnInfo.statementId).bucket(bucketId)); AcidUtils.Directory directoryState = AcidUtils.getAcidState(syntheticTxnInfo.folder, conf, validWriteIdList, false, true); - for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { + for (HadoopShims.HdfsDirSnapshot f : directoryState.getOriginalFiles()) { int bucketIdFromPath = AcidUtils.parseBucketId(f.getFileStatus().getPath()); if (bucketIdFromPath != bucketId) { continue;//HIVE-16952 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 67a5e6de46..9a91883f86 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -76,7 +76,7 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.util.DirectionUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; +import org.apache.hadoop.hive.shims.HadoopShims.HdfsDirSnapshot; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; @@ -308,11 +308,11 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor // partition is just now being converted to ACID. baseDir = dir.getBaseDirectory(); if (baseDir == null) { - List originalFiles = dir.getOriginalFiles(); + List originalFiles = dir.getOriginalFiles(); if (!(originalFiles == null) && !(originalFiles.size() == 0)) { // There are original format files - for (HdfsFileStatusWithId stat : originalFiles) { - Path path = stat.getFileStatus().getPath(); + for (HdfsDirSnapshot hdfsDirSnapshot : originalFiles) { + Path path = hdfsDirSnapshot.getFileStatus().getPath(); //note that originalFiles are all original files recursively not dirs dirsToSearch.add(path); LOG.debug("Adding original file " + path + " to dirs to search"); @@ -1036,7 +1036,7 @@ public String toString() { dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) { boolean sawBase = dir.getName().startsWith(AcidUtils.BASE_PREFIX); boolean isRawFormat = !dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX) - && AcidUtils.MetaDataFile.isRawFormat(dir, fs);//deltes can't be raw format + && AcidUtils.MetaDataFile.isRawFormat(dir, fs, null);//deltes can't be raw format FileStatus[] files = fs.listStatus(dir, isRawFormat ? AcidUtils.originalBucketFilter : AcidUtils.bucketFileFilter); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 6168fc0f79..64c22b3975 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -41,7 +41,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; +import org.apache.hadoop.hive.shims.HadoopShims.HdfsDirSnapshot; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; @@ -304,8 +304,8 @@ private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdLi baseSize = sumDirSize(fs, base); } - List originals = dir.getOriginalFiles(); - for (HdfsFileStatusWithId origStat : originals) { + List originals = dir.getOriginalFiles(); + for (HdfsDirSnapshot origStat : originals) { baseSize += origStat.getFileStatus().getLen(); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index c5faec5e95..d816fb873d 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -25,7 +25,6 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; import org.apache.hadoop.hive.common.ValidReadTxnList; @@ -39,7 +38,7 @@ import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFileSystem; import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockPath; import org.apache.hadoop.hive.ql.io.orc.TestOrcRawRecordMerger; -import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; +import org.apache.hadoop.hive.shims.HadoopShims.HdfsDirSnapshot; import org.junit.Test; public class TestAcidUtils { @@ -167,7 +166,7 @@ public void testOriginal() throws Exception { assertEquals(null, dir.getBaseDirectory()); assertEquals(0, dir.getCurrentDirectories().size()); assertEquals(0, dir.getObsolete().size()); - List result = dir.getOriginalFiles(); + List result = dir.getOriginalFiles(); assertEquals(7, result.size()); assertEquals("mock:/tbl/part1/000000_0", result.get(0).getFileStatus().getPath().toString()); assertEquals("mock:/tbl/part1/000000_0" + Utilities.COPY_KEYWORD + "1", @@ -208,7 +207,7 @@ public void testOriginalDeltas() throws Exception { obsolete.get(0).toString()); assertEquals("mock:/tbl/part1/delta_029_029", obsolete.get(1).toString()); - List result = dir.getOriginalFiles(); + List result = dir.getOriginalFiles(); assertEquals(5, result.size()); assertEquals("mock:/tbl/part1/000000_0", result.get(0).getFileStatus().getPath().toString()); assertEquals("mock:/tbl/part1/000001_1", result.get(1).getFileStatus().getPath().toString()); diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 9a1e590f5d..833cafa59f 100644 --- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -69,6 +69,7 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.ipc.CallerContext; @@ -763,7 +764,92 @@ public Long getFileId() { return fileId; } } + + + public class HdfsDirSnapshotImpl implements HdfsDirSnapshot { + private LocatedFileStatus lfs; + private Long fileId = null; + private List files = null; + + public HdfsDirSnapshotImpl(LocatedFileStatus lfs, long fileId, List files) { + this.lfs = lfs; + this.fileId = fileId; + this.files = files; + } + + public HdfsDirSnapshotImpl(LocatedFileStatus lfs) { + this.lfs = lfs; + } + + @Override + public FileStatus getFileStatus() { + return lfs; + } + + @Override + public List getFiles() { + return files; + } + + @Override + public Long getFileId() { + return fileId; + } + } + + @Override + public List getHdfsDirSnapshots(FileSystem fs, Path path, PathFilter filter) throws IOException { + DistributedFileSystem dfs = ensureDfs(fs); + DFSClient dfsc = dfs.getClient(); + final String src = path.toUri().getPath(); + DirectoryListing current = dfsc.listPaths(src, org.apache.hadoop.hdfs.protocol.HdfsFileStatus.EMPTY_NAME, true); + if (current == null) { + throw new FileNotFoundException("File " + path + " does not exist."); + } + final URI fsUri = fs.getUri(); + List result = new ArrayList(); + while (current != null) { + org.apache.hadoop.hdfs.protocol.HdfsFileStatus[] hfss = current.getPartialListing(); + for (int i = 0; i < hfss.length; ++i) { + HdfsLocatedFileStatus next = (HdfsLocatedFileStatus) (hfss[i]); + if (filter != null) { + Path filterPath = next.getFullPath(path).makeQualified(fsUri, null); + if (!filter.accept(filterPath)) { + continue; + } else { + List fileStatuses = getRecursiveListing(fs, filterPath, filter); + LocatedFileStatus lfs = next.makeQualifiedLocated(fsUri, path); + result.add(new HdfsDirSnapshotImpl(lfs, next.getFileId(), fileStatuses)); + for (FileStatus fStatus : fileStatuses) { + if (!fStatus.isDirectory()) { + return result; + } + } + } + } + } + current = current.hasMore() ? dfsc.listPaths(src, current.getLastName(), true) : null; + } + return result; + } + private List getRecursiveListing(FileSystem fs, Path directory, PathFilter filter) + throws FileNotFoundException, IOException { + final URI fsUri = fs.getUri(); + List fileStatuses = new ArrayList(); + RemoteIterator fileStatusListIterator = fs.listFiles(directory, true); + while (fileStatusListIterator.hasNext()) { + LocatedFileStatus fileStatus = fileStatusListIterator.next(); + Path filterPath = ((HdfsFileStatus) fileStatus).getFullPath(directory).makeQualified(fsUri, null); + if (!filter.accept(filterPath)) { + continue; + } + fileStatuses.add(fileStatus); + } + return fileStatuses; + } + + @Override public List listLocatedHdfsStatus( FileSystem fs, Path p, PathFilter filter) throws IOException { diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index 49a2ab3616..7d9975330b 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -252,6 +252,8 @@ RecordReader getRecordReader(JobConf job, CombineFileSplit split, Reporter repor List listLocatedHdfsStatus( FileSystem fs, Path path, PathFilter filter) throws IOException; + + List getHdfsDirSnapshots(FileSystem fs, Path path, PathFilter filter) throws IOException; /** * For file status returned by listLocatedStatus, convert them into a list @@ -288,6 +290,22 @@ RecordReader getRecordReader(JobConf job, CombineFileSplit split, Reporter repor public FileStatus getFileStatus(); public Long getFileId(); } + + /** + * Recursive hdfs dir listing + * + */ + public interface HdfsDirSnapshot { + + // FileStatus of this HDFS directory + public FileStatus getFileStatus(); + + // Get the list of files if any within this directory + public List getFiles(); + + // File id or null + public Long getFileId(); + } public HCatHadoopShims getHCatShim(); public interface HCatHadoopShims {