diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 635ed3149c..7ce9b7a55e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -1243,11 +1244,10 @@ public static boolean isAcid(FileSystem fileSystem, Path directory, /** State class for getChildState; cannot modify 2 things in a method. */ private static class TxnBase { - private FileStatus status; + private Path basePath; private long writeId = 0; private long oldestBaseWriteId = Long.MAX_VALUE; private Path oldestBase = null; - private HdfsDirSnapshot dirSnapShot; } /** @@ -1255,20 +1255,22 @@ public static boolean isAcid(FileSystem fileSystem, Path directory, * base and diff directories. Note that because major compactions don't * preserve the history, we can't use a base directory that includes a * write id that we must exclude. - * @param directory the partition directory to analyze + * @param fileSystem optional, it it is not provided, it will be derived from the candidateDirectory + * @param candidateDirectory the partition directory to analyze * @param conf the configuration * @param writeIdList the list of write ids that we are reading + * @param useFileIds It will be set to true, if the FileSystem supports listing with fileIds + * @param ignoreEmptyFiles Ignore files with 0 length * @return the state of the directory - * @throws IOException + * @throws IOException on filesystem errors */ @VisibleForTesting public static Directory getAcidState(FileSystem fileSystem, Path candidateDirectory, Configuration conf, - ValidWriteIdList writeIdList, Ref useFileIds, boolean ignoreEmptyFiles, - Map tblproperties, boolean generateDirSnapshots) throws IOException { + ValidWriteIdList writeIdList, Ref useFileIds, boolean ignoreEmptyFiles) throws IOException { ValidTxnList validTxnList = null; String s = conf.get(ValidTxnList.VALID_TXNS_KEY); if(!Strings.isNullOrEmpty(s)) { - /** + /* * getAcidState() is sometimes called on non-transactional tables, e.g. * OrcInputFileFormat.FileGenerator.callInternal(). e.g. orc_merge3.q In that case * writeIdList is bogus - doesn't even have a table name. @@ -1284,36 +1286,29 @@ public static Directory getAcidState(FileSystem fileSystem, Path candidateDirect FileSystem fs = fileSystem == null ? candidateDirectory.getFileSystem(conf) : fileSystem; // The following 'deltas' includes all kinds of delta files including insert & delete deltas. - final List deltas = new ArrayList(); - List working = new ArrayList(); + final List deltas = new ArrayList<>(); + List working = new ArrayList<>(); List originalDirectories = new ArrayList<>(); final List obsolete = new ArrayList<>(); final List abortedDirectories = new ArrayList<>(); - List childrenWithId = tryListLocatedHdfsStatus(useFileIds, fs, candidateDirectory); - TxnBase bestBase = new TxnBase(); final List original = new ArrayList<>(); Map dirSnapshots = null; + + List childrenWithId = tryListLocatedHdfsStatus(useFileIds, fs, candidateDirectory); + if (childrenWithId != null) { for (HdfsFileStatusWithId child : childrenWithId) { - getChildState(child.getFileStatus(), child, writeIdList, working, originalDirectories, original, obsolete, - bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList); + getChildState(child, writeIdList, working, originalDirectories, original, obsolete, + bestBase, ignoreEmptyFiles, abortedDirectories, fs, validTxnList); } } else { - if (generateDirSnapshots) { - dirSnapshots = getHdfsDirSnapshots(fs, candidateDirectory); - getChildState(candidateDirectory, dirSnapshots, writeIdList, working, originalDirectories, original, obsolete, - bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList); - } else { - List children = HdfsUtils.listLocatedStatus(fs, candidateDirectory, hiddenFileFilter); - for (FileStatus child : children) { - getChildState(child, null, writeIdList, working, originalDirectories, original, obsolete, bestBase, - ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList); - } - } + dirSnapshots = getHdfsDirSnapshots(fs, candidateDirectory); + getChildState(candidateDirectory, dirSnapshots, writeIdList, working, originalDirectories, original, obsolete, + bestBase, ignoreEmptyFiles, abortedDirectories, fs, validTxnList); } // If we have a base, the original files are obsolete. - if (bestBase.status != null) { + if (bestBase.basePath != null) { // Add original files to obsolete list if any for (HdfsFileStatusWithId fswid : original) { obsolete.add(fswid.getFileStatus().getPath()); @@ -1382,9 +1377,9 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId } } - if(bestBase.oldestBase != null && bestBase.status == null && + if(bestBase.oldestBase != null && bestBase.basePath == null && isCompactedBase(ParsedBase.parseBase(bestBase.oldestBase), fs, dirSnapshots)) { - /** + /* * If here, it means there was a base_x (> 1 perhaps) but none were suitable for given * {@link writeIdList}. Note that 'original' files are logically a base_Long.MIN_VALUE and thus * cannot have any data for an open txn. We could check {@link deltas} has files to cover @@ -1405,78 +1400,80 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId Path base = null; boolean isBaseInRawFormat = false; - if (bestBase.status != null) { - base = bestBase.status.getPath(); + if (bestBase.basePath != null) { + base = bestBase.basePath; isBaseInRawFormat = MetaDataFile.isRawFormat(base, fs, dirSnapshots != null ? dirSnapshots.get(base) : null); - if (isBaseInRawFormat && (bestBase.dirSnapShot != null)) { - for (FileStatus stat : bestBase.dirSnapShot.getFiles()) { - if ((!ignoreEmptyFiles) || (stat.getLen() != 0)) { - original.add(createOriginalObj(null, stat)); - } - } - } } LOG.debug("in directory " + candidateDirectory.toUri().toString() + " base = " + base + " deltas = " + deltas.size()); - /** + /* * If this sort order is changed and there are tables that have been converted to transactional * and have had any update/delete/merge operations performed but not yet MAJOR compacted, it * may result in data loss since it may change how * {@link org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair} assigns * {@link RecordIdentifier#rowId} for read (that have happened) and compaction (yet to happen). */ - Collections.sort(original, (HdfsFileStatusWithId o1, HdfsFileStatusWithId o2) -> { - //this does "Path.uri.compareTo(that.uri)" - return o1.getFileStatus().compareTo(o2.getFileStatus()); - }); - return new DirectoryImpl(abortedDirectories, isBaseInRawFormat, original, - obsolete, deltas, base); - } - - public static Map getHdfsDirSnapshots(final FileSystem fs, - final Path path) throws IOException { - try { - Map dirToSnapshots = new HashMap(); - RemoteIterator itr = fs.listFiles(path, true); - while (itr.hasNext()) { - FileStatus fStatus = itr.next(); - Path fPath = fStatus.getPath(); - if (acidHiddenFileFilter.accept(fPath)) { - if (fStatus.isDirectory() && acidTempDirFilter.accept(fPath)) { - HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(fPath); + // this does "Path.uri.compareTo(that.uri)" + original.sort(Comparator.comparing(HdfsFileStatusWithId::getFileStatus)); + return new DirectoryImpl(abortedDirectories, isBaseInRawFormat, original, obsolete, deltas, base); + } + + public static Map getHdfsDirSnapshots(final FileSystem fs, final Path path) + throws IOException { + Map dirToSnapshots = new HashMap<>(); + RemoteIterator itr = fs.listFiles(path, true); + while (itr.hasNext()) { + FileStatus fStatus = itr.next(); + Path fPath = fStatus.getPath(); + if (acidHiddenFileFilter.accept(fPath)) { + if (fStatus.isDirectory() && acidTempDirFilter.accept(fPath)) { + HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(fPath); + if (dirSnapshot == null) { + dirSnapshot = new HdfsDirSnapshotImpl(fPath); + dirToSnapshots.put(fPath, dirSnapshot); + } + } else { + Path parentDirPath = fPath.getParent(); + if (acidTempDirFilter.accept(parentDirPath)) { + while (isChildOfDelta(parentDirPath, path)) { + // Some cases there are other directory layers between the delta and the datafiles + // (export-import mm table, insert with union all to mm table, skewed tables). + // But it does not matter for the AcidState, we just need the deltas and the data files + // So build the snapshot with the files inside the delta directory + parentDirPath = parentDirPath.getParent(); + } + HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(parentDirPath); if (dirSnapshot == null) { - dirSnapshot = new HdfsDirSnapshotImpl(fPath, fStatus); - dirToSnapshots.put(fPath, dirSnapshot); + dirSnapshot = new HdfsDirSnapshotImpl(parentDirPath); + dirToSnapshots.put(parentDirPath, dirSnapshot); } - } else { - Path parentDirPath = fPath.getParent(); - if (acidTempDirFilter.accept(parentDirPath)) { - HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(parentDirPath); - if (dirSnapshot == null) { - FileStatus parentDirFStatus = null; - if (!parentDirPath.equals(path)) { - parentDirFStatus = fs.getFileStatus(parentDirPath); - } - dirSnapshot = new HdfsDirSnapshotImpl(parentDirPath, parentDirFStatus); - dirToSnapshots.put(parentDirPath, dirSnapshot); - } - // We're not filtering out the metadata file and acid format file, as they represent parts of a valid snapshot - // We're not using the cached values downstream, but we can potentially optimize more in a follow-up task - if (fStatus.getPath().toString().contains(MetaDataFile.METADATA_FILE)) { - dirSnapshot.addMetadataFile(fStatus); - } else if (fStatus.getPath().toString().contains(OrcAcidVersion.ACID_FORMAT)) { - dirSnapshot.addOrcAcidFormatFile(fStatus); - } else { - dirSnapshot.addFile(fStatus); - } + // We're not filtering out the metadata file and acid format file, + // as they represent parts of a valid snapshot + // We're not using the cached values downstream, but we can potentially optimize more in a follow-up task + if (fStatus.getPath().toString().contains(MetaDataFile.METADATA_FILE)) { + dirSnapshot.addMetadataFile(fStatus); + } else if (fStatus.getPath().toString().contains(OrcAcidVersion.ACID_FORMAT)) { + dirSnapshot.addOrcAcidFormatFile(fStatus); + } else { + dirSnapshot.addFile(fStatus); } } } } - return dirToSnapshots; - } catch (IOException e) { - throw new IOException(e); } + return dirToSnapshots; + } + + private static boolean isChildOfDelta(Path childDir, Path rootPath) { + if (childDir.toUri().toString().length() <= rootPath.toUri().toString().length()) { + return false; + } + // We do not want to look outside the original directory + String fullName = childDir.toUri().toString().substring(rootPath.toUri().toString().length() + 1); + String dirName = childDir.getName(); + return (fullName.startsWith(BASE_PREFIX) && !dirName.startsWith(BASE_PREFIX)) || + (fullName.startsWith(DELTA_PREFIX) && !dirName.startsWith(DELTA_PREFIX)) || + (fullName.startsWith(DELETE_DELTA_PREFIX) && !dirName.startsWith(DELETE_DELTA_PREFIX)); } /** @@ -1485,7 +1482,7 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId * with additional properties about the dir (like isBase etc) * */ - public static interface HdfsDirSnapshot { + public interface HdfsDirSnapshot { public Path getPath(); public void addOrcAcidFormatFile(FileStatus fStatus); @@ -1496,14 +1493,9 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId public FileStatus getMetadataFile(FileStatus fStatus); - // FileStatus of this HDFS directory - public FileStatus getFileStatus(); - // Get the list of files if any within this directory public List getFiles(); - public void setFileStatus(FileStatus fStatus); - public void addFile(FileStatus file); // File id or null @@ -1530,7 +1522,6 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId public static class HdfsDirSnapshotImpl implements HdfsDirSnapshot { private Path dirPath; - private FileStatus fStatus; private FileStatus metadataFStatus = null; private FileStatus orcAcidFormatFStatus = null; private List files = new ArrayList(); @@ -1540,31 +1531,19 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId private Boolean isValidBase = null; private Boolean isCompactedBase = null; - public HdfsDirSnapshotImpl(Path path, FileStatus fStatus, List files) { + public HdfsDirSnapshotImpl(Path path, List files) { this.dirPath = path; - this.fStatus = fStatus; this.files = files; } - public HdfsDirSnapshotImpl(Path path, FileStatus fStatus) { + public HdfsDirSnapshotImpl(Path path) { this.dirPath = path; - this.fStatus = fStatus; } @Override public Path getPath() { return dirPath; } - - @Override - public FileStatus getFileStatus() { - return fStatus; - } - - @Override - public void setFileStatus(FileStatus fStatus) { - this.fStatus = fStatus; - } @Override public List getFiles() { @@ -1676,35 +1655,28 @@ public String toString() { * causes anything written previously to be ignored (hence the overwrite). In this case, base_x * is visible if writeid:x is committed for current reader. */ - private static boolean isValidBase(ParsedBase parsedBase, ValidWriteIdList writeIdList, - FileSystem fs) throws IOException { - if(parsedBase.getWriteId() == Long.MIN_VALUE) { - //such base is created by 1st compaction in case of non-acid to acid table conversion - //By definition there are no open txns with id < 1. - return true; - } - if (writeIdList.getMinOpenWriteId() != null && parsedBase.getWriteId() <= writeIdList.getMinOpenWriteId()) { - return true; - } - if(isCompactedBase(parsedBase, fs, (HdfsDirSnapshot) null)) { - return writeIdList.isValidBase(parsedBase.getWriteId()); - } - //if here, it's a result of IOW - return writeIdList.isWriteIdValid(parsedBase.getWriteId()); - } - - private static boolean isValidBase(HdfsDirSnapshot dirSnapshot, ParsedBase parsedBase, ValidWriteIdList writeIdList, - FileSystem fs) throws IOException { + private static boolean isValidBase(ParsedBase parsedBase, ValidWriteIdList writeIdList, FileSystem fs, + HdfsDirSnapshot dirSnapshot) throws IOException { boolean isValidBase; - if (dirSnapshot.isValidBase() != null) { + if (dirSnapshot != null && dirSnapshot.isValidBase() != null) { isValidBase = dirSnapshot.isValidBase(); } else { - if (isCompactedBase(parsedBase, fs, dirSnapshot)) { + if (parsedBase.getWriteId() == Long.MIN_VALUE) { + //such base is created by 1st compaction in case of non-acid to acid table conversion + //By definition there are no open txns with id < 1. + isValidBase = true; + } else if (writeIdList.getMinOpenWriteId() != null && parsedBase.getWriteId() <= writeIdList + .getMinOpenWriteId()) { + isValidBase = true; + } else if (isCompactedBase(parsedBase, fs, dirSnapshot)) { isValidBase = writeIdList.isValidBase(parsedBase.getWriteId()); } else { + // if here, it's a result of IOW isValidBase = writeIdList.isWriteIdValid(parsedBase.getWriteId()); } - dirSnapshot.setIsValidBase(isValidBase); + if (dirSnapshot != null) { + dirSnapshot.setIsValidBase(isValidBase); + } } return isValidBase; } @@ -1724,78 +1696,39 @@ private static boolean isCompactedBase(ParsedBase parsedBase, FileSystem fs, HdfsDirSnapshot snapshot) throws IOException { return parsedBase.getVisibilityTxnId() > 0 || MetaDataFile.isCompacted(parsedBase.getBaseDirPath(), fs, snapshot); } - - private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId, - ValidWriteIdList writeIdList, List working, List originalDirectories, - List original, List obsolete, TxnBase bestBase, - boolean ignoreEmptyFiles, List aborted, Map tblproperties, - FileSystem fs, ValidTxnList validTxnList) throws IOException { - Path p = child.getPath(); - String fn = p.getName(); - if (!child.isDirectory()) { - if (!ignoreEmptyFiles || child.getLen() != 0) { - original.add(createOriginalObj(childWithId, child)); - } - return; - } - if (fn.startsWith(BASE_PREFIX)) { - ParsedBase parsedBase = ParsedBase.parseBase(p); - if(!isDirUsable(child.getPath(), parsedBase.getVisibilityTxnId(), aborted, validTxnList)) { - return; - } - final long writeId = parsedBase.getWriteId(); - if(bestBase.oldestBaseWriteId > writeId) { - //keep track for error reporting - bestBase.oldestBase = p; - bestBase.oldestBaseWriteId = writeId; - } - if (bestBase.status == null) { - if(isValidBase(parsedBase, writeIdList, fs)) { - bestBase.status = child; - bestBase.writeId = writeId; - } - } else if (bestBase.writeId < writeId) { - if(isValidBase(parsedBase, writeIdList, fs)) { - obsolete.add(bestBase.status.getPath()); - bestBase.status = child; - bestBase.writeId = writeId; - } - } else { - obsolete.add(child.getPath()); + + private static void getChildState(HdfsFileStatusWithId childWithId, ValidWriteIdList writeIdList, + List working, List originalDirectories, List original, + List obsolete, TxnBase bestBase, boolean ignoreEmptyFiles, List aborted, FileSystem fs, + ValidTxnList validTxnList) throws IOException { + Path childPath = childWithId.getFileStatus().getPath(); + String fn = childPath.getName(); + if (!childWithId.getFileStatus().isDirectory()) { + if (!ignoreEmptyFiles || childWithId.getFileStatus().getLen() != 0) { + original.add(childWithId); } + } else if (fn.startsWith(BASE_PREFIX)) { + processBaseDir(childPath, writeIdList, obsolete, bestBase, aborted, fs, validTxnList, null); } else if (fn.startsWith(DELTA_PREFIX) || fn.startsWith(DELETE_DELTA_PREFIX)) { - String deltaPrefix = fn.startsWith(DELTA_PREFIX) ? DELTA_PREFIX : DELETE_DELTA_PREFIX; - ParsedDelta delta = parseDelta(child.getPath(), deltaPrefix, fs, null); - if(!isDirUsable(child.getPath(), delta.getVisibilityTxnId(), aborted, validTxnList)) { - return; - } - if(ValidWriteIdList.RangeResponse.ALL == - writeIdList.isWriteIdRangeAborted(delta.minWriteId, delta.maxWriteId)) { - aborted.add(child.getPath()); - } - else if (writeIdList.isWriteIdRangeValid( - delta.minWriteId, delta.maxWriteId) != ValidWriteIdList.RangeResponse.NONE) { - working.add(delta); - } + processDeltaDir(childPath, writeIdList, working, aborted, fs, validTxnList, null); } else { // This is just the directory. We need to recurse and find the actual files. But don't // do this until we have determined there is no base. This saves time. Plus, // it is possible that the cleaner is running and removing these original files, // in which case recursing through them could cause us to get an error. - originalDirectories.add(child.getPath()); + originalDirectories.add(childPath); } } - + private static void getChildState(Path candidateDirectory, Map dirSnapshots, ValidWriteIdList writeIdList, List working, List originalDirectories, - List original, - List obsolete, TxnBase bestBase, boolean ignoreEmptyFiles, List aborted, - Map tblproperties, FileSystem fs, ValidTxnList validTxnList) throws IOException { + List original, List obsolete, TxnBase bestBase, boolean ignoreEmptyFiles, + List aborted, FileSystem fs, ValidTxnList validTxnList) throws IOException { for (HdfsDirSnapshot dirSnapshot : dirSnapshots.values()) { - FileStatus fStat = dirSnapshot.getFileStatus(); Path dirPath = dirSnapshot.getPath(); String dirName = dirPath.getName(); - if (dirPath.equals(candidateDirectory)) { + // dirPath may contains the filesystem prefix + if (dirPath.toString().endsWith(candidateDirectory.toString())) { // if the candidateDirectory is itself a delta directory, we need to add originals in that directory // and return. This is the case when compaction thread calls getChildState. for (FileStatus fileStatus : dirSnapshot.getFiles()) { @@ -1804,44 +1737,9 @@ private static void getChildState(Path candidateDirectory, Map writeId) { - //keep track for error reporting - bestBase.oldestBase = dirPath; - bestBase.oldestBaseWriteId = writeId; - } - if (bestBase.status == null) { - if (isValidBase(dirSnapshot, parsedBase, writeIdList, fs)) { - bestBase.status = fStat; - bestBase.writeId = writeId; - } - } else if (bestBase.writeId < writeId) { - if (isValidBase(dirSnapshot, parsedBase, writeIdList, fs)) { - obsolete.add(bestBase.status.getPath()); - bestBase.status = fStat; - bestBase.writeId = writeId; - } - } else { - obsolete.add(dirPath); - } + processBaseDir(dirPath, writeIdList, obsolete, bestBase, aborted, fs, validTxnList, dirSnapshot); } else if (dirName.startsWith(DELTA_PREFIX) || dirName.startsWith(DELETE_DELTA_PREFIX)) { - String deltaPrefix = dirName.startsWith(DELTA_PREFIX) ? DELTA_PREFIX : DELETE_DELTA_PREFIX; - ParsedDelta delta = parseDelta(dirPath, deltaPrefix, fs, dirSnapshot); - if (!isDirUsable(dirPath, delta.getVisibilityTxnId(), aborted, validTxnList)) { - continue; - } - if (ValidWriteIdList.RangeResponse.ALL == writeIdList - .isWriteIdRangeAborted(delta.minWriteId, delta.maxWriteId)) { - aborted.add(dirPath); - } else if (writeIdList.isWriteIdRangeValid(delta.minWriteId, delta.maxWriteId) - != ValidWriteIdList.RangeResponse.NONE) { - working.add(delta); - } + processDeltaDir(dirPath, writeIdList, working, aborted, fs, validTxnList, dirSnapshot); } else { originalDirectories.add(dirPath); for (FileStatus stat : dirSnapshot.getFiles()) { @@ -1852,27 +1750,72 @@ private static void getChildState(Path candidateDirectory, Map obsolete, TxnBase bestBase, + List aborted, FileSystem fs, ValidTxnList validTxnList, AcidUtils.HdfsDirSnapshot dirSnapshot) + throws IOException { + ParsedBase parsedBase = ParsedBase.parseBase(baseDir); + if (!isDirUsable(baseDir, parsedBase.getVisibilityTxnId(), aborted, validTxnList)) { + return; + } + final long writeId = parsedBase.getWriteId(); + if (bestBase.oldestBaseWriteId > writeId) { + // keep track for error reporting + bestBase.oldestBase = baseDir; + bestBase.oldestBaseWriteId = writeId; + } + if (bestBase.basePath == null) { + if (isValidBase(parsedBase, writeIdList, fs, dirSnapshot)) { + bestBase.basePath = baseDir; + bestBase.writeId = writeId; + } + } else if (bestBase.writeId < writeId) { + if (isValidBase(parsedBase, writeIdList, fs, dirSnapshot)) { + obsolete.add(bestBase.basePath); + bestBase.basePath = baseDir; + bestBase.writeId = writeId; + } + } else { + obsolete.add(baseDir); + } + } + + private static void processDeltaDir(Path deltadir, ValidWriteIdList writeIdList, List working, + List aborted, FileSystem fs, ValidTxnList validTxnList, AcidUtils.HdfsDirSnapshot dirSnapshot) + throws IOException { + String dirName = deltadir.getName(); + String deltaPrefix = dirName.startsWith(DELTA_PREFIX) ? DELTA_PREFIX : DELETE_DELTA_PREFIX; + ParsedDelta delta = parseDelta(deltadir, deltaPrefix, fs, dirSnapshot); + if (!isDirUsable(deltadir, delta.getVisibilityTxnId(), aborted, validTxnList)) { + return; + } + if (ValidWriteIdList.RangeResponse.ALL == writeIdList.isWriteIdRangeAborted(delta.minWriteId, delta.maxWriteId)) { + aborted.add(deltadir); + } else if (writeIdList.isWriteIdRangeValid(delta.minWriteId, delta.maxWriteId) + != ValidWriteIdList.RangeResponse.NONE) { + working.add(delta); + } + } + /** * checks {@code visibilityTxnId} to see if {@code child} is committed in current snapshot */ - private static boolean isDirUsable(Path child, long visibilityTxnId, - List aborted, ValidTxnList validTxnList) { - if(validTxnList == null) { + private static boolean isDirUsable(Path child, long visibilityTxnId, List aborted, ValidTxnList validTxnList) { + if (validTxnList == null) { throw new IllegalArgumentException("No ValidTxnList for " + child); } - if(!validTxnList.isTxnValid(visibilityTxnId)) { + if (!validTxnList.isTxnValid(visibilityTxnId)) { boolean isAborted = validTxnList.isTxnAborted(visibilityTxnId); - if(isAborted) { - aborted.add(child);//so we can clean it up + if (isAborted) { + aborted.add(child);// so we can clean it up } LOG.debug("getChildState() ignoring(" + aborted + ") " + child); return false; } return true; } - public static HdfsFileStatusWithId createOriginalObj( - HdfsFileStatusWithId childWithId, FileStatus child) { + + public static HdfsFileStatusWithId createOriginalObj(HdfsFileStatusWithId childWithId, FileStatus child) { return childWithId != null ? childWithId : new HdfsFileStatusWithoutId(child); } @@ -2157,7 +2100,7 @@ public static AcidOperationalProperties getAcidOperationalProperties( * Returns the logical end of file for an acid data file. * * This relies on the fact that if delta_x_y has no committed transactions it wil be filtered out - * by {@link #getAcidState(FileSystem, Path, Configuration, ValidWriteIdList, Ref, boolean, Map, boolean)} + * by {@link #getAcidState(FileSystem, Path, Configuration, ValidWriteIdList, Ref, boolean)} * and so won't be read at all. * @param file - data file to read/compute splits on */ @@ -2669,7 +2612,7 @@ public static Path getVersionFilePath(Path deltaOrBase) { + " from " + jc.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY)); return null; } - Directory acidInfo = AcidUtils.getAcidState(fs, dir, jc, idList, null, false, null, true); + Directory acidInfo = AcidUtils.getAcidState(fs, dir, jc, idList, null, false); // Assume that for an MM table, or if there's only the base directory, we are good. if (!acidInfo.getCurrentDirectories().isEmpty() && AcidUtils.isFullAcidTable(table)) { Utilities.FILE_OP_LOGGER.warn( @@ -2707,7 +2650,7 @@ public static Path getVersionFilePath(Path deltaOrBase) { // If ACID/MM tables, then need to find the valid state wrt to given ValidWriteIdList. ValidWriteIdList validWriteIdList = new ValidReaderWriteIdList(validWriteIdStr); Directory acidInfo = AcidUtils.getAcidState(dataPath.getFileSystem(conf), dataPath, conf, validWriteIdList, null, - false, null, false); + false); for (HdfsFileStatusWithId hfs : acidInfo.getOriginalFiles()) { pathList.add(hfs.getFileStatus().getPath()); @@ -3200,17 +3143,14 @@ private static void initDirCache(int durationInMts) { * @param candidateDirectory the partition directory to analyze * @param conf the configuration * @param writeIdList the list of write ids that we are reading - * @param useFileIds - * @param ignoreEmptyFiles - * @param tblproperties - * @param generateDirSnapshots + * @param useFileIds It will be set to true, if the FileSystem supports listing with fileIds + * @param ignoreEmptyFiles Ignore files with 0 length * @return directory state * @throws IOException on errors */ public static Directory getAcidStateFromCache(Supplier fileSystem, Path candidateDirectory, Configuration conf, - ValidWriteIdList writeIdList, Ref useFileIds, boolean ignoreEmptyFiles, - Map tblproperties, boolean generateDirSnapshots) throws IOException { + ValidWriteIdList writeIdList, Ref useFileIds, boolean ignoreEmptyFiles) throws IOException { int dirCacheDuration = HiveConf.getIntVar(conf, ConfVars.HIVE_TXN_ACID_DIR_CACHE_DURATION); @@ -3218,7 +3158,7 @@ public static Directory getAcidStateFromCache(Supplier fileSystem, if (dirCacheDuration <= 0) { LOG.debug("dirCache is not enabled"); return getAcidState(fileSystem.get(), candidateDirectory, conf, writeIdList, - useFileIds, ignoreEmptyFiles, tblproperties, generateDirSnapshots); + useFileIds, ignoreEmptyFiles); } else { initDirCache(dirCacheDuration); } @@ -3257,8 +3197,7 @@ public static Directory getAcidStateFromCache(Supplier fileSystem, // compute and add to cache if (recompute || (value == null)) { Directory dirInfo = getAcidState(fileSystem.get(), candidateDirectory, conf, - writeIdList, useFileIds, ignoreEmptyFiles, tblproperties, - generateDirSnapshots); + writeIdList, useFileIds, ignoreEmptyFiles); value = new DirInfoValue(writeIdList.writeToString(), dirInfo); if (value.dirInfo != null && value.dirInfo.getBaseDirectory() != null diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index ca234cfb37..073a3cbd80 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -696,7 +696,7 @@ private static void processForWriteIdsForMmRead(Path dir, Configuration conf, } if (hasAcidDirs) { AcidUtils.Directory dirInfo = AcidUtils.getAcidState( - fs, dir, conf, validWriteIdList, Ref.from(false), true, null, false); + fs, dir, conf, validWriteIdList, Ref.from(false), true); // Find the base, created for IOW. Path base = dirInfo.getBaseDirectory(); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 1059cb227f..bf871f0522 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -1215,8 +1215,9 @@ public String toString() { /** * For plain or acid tables this is the root of the partition (or table if not partitioned). * For MM table this is delta/ or base/ dir. In MM case applying of the ValidTxnList that - * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} normally does has already - * been done in {@link HiveInputFormat#processPathsForMmRead(List, JobConf, ValidWriteIdList)}. + * {@link AcidUtils#getAcidState(FileSystem, Path, Configuration, ValidWriteIdList, Ref, boolean)} + * normally does has already been done in + * {@link HiveInputFormat#processPathsForMmRead(List, Configuration, ValidWriteIdList, List, List)} */ private final Path dir; private final Ref useFileIds; @@ -1257,10 +1258,10 @@ public AcidDirInfo run() throws Exception { private Directory getAcidState() throws IOException { if (context.isAcid && context.splitStrategyKind == SplitStrategyKind.BI) { return AcidUtils.getAcidStateFromCache(fs, dir, context.conf, - context.writeIdList, useFileIds, true, null, true); + context.writeIdList, useFileIds, true); } else { return AcidUtils.getAcidState(fs.get(), dir, context.conf, context.writeIdList, - useFileIds, true, null, true); + useFileIds, true); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 16c915959c..6739a2a1ab 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -463,7 +463,7 @@ static int encodeBucketId(Configuration conf, int bucketId, int statementId) { */ //the split is from something other than the 1st file of the logical bucket - compute offset AcidUtils.Directory directoryState = AcidUtils.getAcidState(null, mergerOptions.getRootPath(), conf, - validWriteIdList, Ref.from(false), true, null, true); + validWriteIdList, Ref.from(false), true); for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { int bucketIdFromPath = AcidUtils.parseBucketId(f.getFileStatus().getPath()); if (bucketIdFromPath != bucketId) { @@ -577,7 +577,7 @@ public void next(OrcStruct next) throws IOException { assert options.getOffset() == 0; assert options.getMaxOffset() == Long.MAX_VALUE; AcidUtils.Directory directoryState = AcidUtils.getAcidState(null, mergerOptions.getRootPath(), conf, - validWriteIdList, Ref.from(false), true, null, true); + validWriteIdList, Ref.from(false), true); /** * Note that for reading base_x/ or delta_x_x/ with non-acid schema, * {@link Options#getRootPath()} is set to base_x/ or delta_x_x/ which causes all it's diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 598220b0c4..51c06c7c43 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -725,7 +725,7 @@ dropped by the Reader (I guess because of orc.impl.SchemaEvolution) //statementId is from directory name (or 0 if there is none) .statementId(syntheticTxnInfo.statementId).bucket(bucketId)); AcidUtils.Directory directoryState = AcidUtils.getAcidState(null, syntheticTxnInfo.folder, conf, - validWriteIdList, Ref.from(false), true, null, true); + validWriteIdList, Ref.from(false), true); for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { int bucketIdFromPath = AcidUtils.parseBucketId(f.getFileStatus().getPath()); if (bucketIdFromPath != bucketId) { diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 2a15913f9f..abde748787 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -221,7 +221,7 @@ private void removeFiles(String location, ValidWriteIdList writeIdList, Compacti throws IOException, NoSuchObjectException, MetaException { Path locPath = new Path(location); AcidUtils.Directory dir = AcidUtils.getAcidState(locPath.getFileSystem(conf), locPath, conf, writeIdList, Ref.from( - false), false, null, false); + false), false); List obsoleteDirs = dir.getObsolete(); /** * add anything in 'dir' that only has data from aborted transactions - no one should be diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 4e5d5b003b..4365817a27 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -264,7 +264,7 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor maxDeltasToHandle, -1, conf, msc, ci.id, jobName); } //now recompute state since we've done minor compactions and have different 'best' set of deltas - dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, writeIds, Ref.from(false), false, null, false); + dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, writeIds, Ref.from(false), false); } StringableList dirsToSearch = new StringableList(); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 7913295380..89e6ed775b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -314,13 +314,13 @@ private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdLi boolean noBase = false; Path location = new Path(sd.getLocation()); FileSystem fs = location.getFileSystem(conf); - AcidUtils.Directory dir = AcidUtils.getAcidState(fs, location, conf, writeIds, Ref.from(false), false, tblproperties, false); + AcidUtils.Directory dir = AcidUtils.getAcidState(fs, location, conf, writeIds, Ref.from(false), false); Path base = dir.getBaseDirectory(); long baseSize = 0; FileStatus stat = null; if (base != null) { stat = fs.getFileStatus(base); - if (!stat.isDir()) { + if (!stat.isDirectory()) { LOG.error("Was assuming base " + base.toString() + " is directory, but it's a file!"); return null; } @@ -336,7 +336,7 @@ private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdLi List deltas = dir.getCurrentDirectories(); for (AcidUtils.ParsedDelta delta : deltas) { stat = fs.getFileStatus(delta.getPath()); - if (!stat.isDir()) { + if (!stat.isDirectory()) { LOG.error("Was assuming delta " + delta.getPath().toString() + " is a directory, " + "but it's a file!"); return null; diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java index d83a50f555..934493309f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java @@ -49,8 +49,7 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD AcidUtils .setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(table.getParameters())); AcidUtils.Directory dir = AcidUtils - .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false, - table.getParameters(), false); + .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false); // Set up the session for driver. HiveConf conf = new HiveConf(hiveConf); conf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column"); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java index 5e11d8d2d8..bf7309a583 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java @@ -46,8 +46,7 @@ LOG.debug("Going to delete directories for aborted transactions for MM table " + table.getDbName() + "." + table .getTableName()); AcidUtils.Directory dir = AcidUtils - .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false, - table.getParameters(), false); + .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false); QueryCompactor.Util.removeFilesForMmTable(hiveConf, dir); // Set up the session for driver. diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java index 1bdec7df2d..9f1d379b51 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java @@ -48,9 +48,9 @@ "Going to delete directories for aborted transactions for MM table " + table.getDbName() + "." + table.getTableName()); - AcidUtils.Directory dir = AcidUtils.getAcidState(null, - new Path(storageDescriptor.getLocation()), hiveConf, writeIds, - Ref.from(false), false, table.getParameters(), false); + AcidUtils.Directory dir = AcidUtils + .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, + Ref.from(false), false); QueryCompactor.Util.removeFilesForMmTable(hiveConf, dir); HiveConf driverConf = setUpDriverSession(hiveConf); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 75941b3f33..784c95b102 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -497,7 +497,7 @@ protected Boolean findNextCompactionAndExecute(boolean computeStats) throws Inte // Don't start compaction or cleaning if not necessary AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, - tblValidWriteIds, Ref.from(false), true, null, false); + tblValidWriteIds, Ref.from(false), true); if (!isEnoughToCompact(ci.isMajorCompaction(), dir, sd)) { if (needsCleaning(dir, sd)) { msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 337f469d1a..61f2bd227e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -932,11 +932,13 @@ public void updateDeletePartitioned() throws Exception { int[][] tableData = {{1,2},{3,4},{5,6}}; runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=1) (a,b) " + makeValuesClause(tableData)); runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=2) (a,b) " + makeValuesClause(tableData)); - txnHandler.compact(new CompactionRequest("default", Table.ACIDTBLPART.name(), CompactionType.MAJOR)); + CompactionRequest request = new CompactionRequest("default", Table.ACIDTBLPART.name(), CompactionType.MAJOR); + request.setPartitionname("p=1"); + txnHandler.compact(request); runWorker(hiveConf); runCleaner(hiveConf); runStatementOnDriver("update " + Table.ACIDTBLPART + " set b = b + 1 where a = 3"); - txnHandler.compact(new CompactionRequest("default", Table.ACIDTBLPART.toString(), CompactionType.MAJOR)); + txnHandler.compact(request); runWorker(hiveConf); runCleaner(hiveConf); List rs = runStatementOnDriver("select p,a,b from " + Table.ACIDTBLPART + " order by p, a, b"); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index f351f04b08..cadaeeb222 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -165,7 +165,7 @@ public void testOriginal() throws Exception { new MockFile("mock:/tbl/part1/_done", 0, new byte[0]), new MockFile("mock:/tbl/part1/subdir/000000_0", 0, new byte[0])); AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "/tbl/part1"), conf, - new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false, null, false); + new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false); assertEquals(null, dir.getBaseDirectory()); assertEquals(0, dir.getCurrentDirectories().size()); assertEquals(0, dir.getObsolete().size()); @@ -201,7 +201,7 @@ public void testOriginalDeltas() throws Exception { conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf, - new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false, null, false); + new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false); assertEquals(null, dir.getBaseDirectory()); List obsolete = dir.getObsolete(); assertEquals(2, obsolete.size()); @@ -244,7 +244,7 @@ public void testBaseDeltas() throws Exception { conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf, - new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false, null, false); + new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false); assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString()); List obsoletes = dir.getObsolete(); assertEquals(5, obsoletes.size()); @@ -275,7 +275,7 @@ public void testRecursiveDirListingIsReusedWhenSnapshotTrue() throws IOException conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf, - new ValidReaderWriteIdList(), null, false, null, true); + new ValidReaderWriteIdList(), null, false); assertEquals("mock:/tbl/part1/base_0", dir.getBaseDirectory().toString()); assertEquals(0, dir.getObsolete().size()); assertEquals(0, dir.getOriginalFiles().size()); @@ -283,23 +283,6 @@ public void testRecursiveDirListingIsReusedWhenSnapshotTrue() throws IOException assertEquals(0, fs.getNumOpenFileCalls()); } - @Test - public void testRecursiveDirListingIsNotReusedWhenSnapshotFalse() throws IOException { - Configuration conf = new Configuration(); - MockFileSystem fs = new MockFileSystem(conf, - new MockFile("mock:/tbl/part1/base_0/bucket_0", 500, new byte[0]), - new MockFile("mock:/tbl/part1/base_0/_orc_acid_version", 10, new byte[0])); - conf.set(ValidTxnList.VALID_TXNS_KEY, - new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); - AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf, - new ValidReaderWriteIdList(), null, false, null, false); - assertEquals("mock:/tbl/part1/base_0", dir.getBaseDirectory().toString()); - assertEquals(0, dir.getObsolete().size()); - assertEquals(0, dir.getOriginalFiles().size()); - assertEquals(0, dir.getCurrentDirectories().size()); - assertEquals(2, fs.getNumOpenFileCalls()); - } - @Test public void testObsoleteOriginals() throws Exception { Configuration conf = new Configuration(); @@ -312,7 +295,7 @@ public void testObsoleteOriginals() throws Exception { conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:150:" - + Long.MAX_VALUE + ":"), null, false, null, false); + + Long.MAX_VALUE + ":"), null, false); // Obsolete list should include the two original bucket files, and the old base dir List obsoletes = dir.getObsolete(); assertEquals(3, obsoletes.size()); @@ -335,7 +318,7 @@ public void testOverlapingDelta() throws Exception { conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:" - + Long.MAX_VALUE + ":"), null, false, null, false); + + Long.MAX_VALUE + ":"), null, false); assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString()); List obsolete = dir.getObsolete(); assertEquals(2, obsolete.size()); @@ -372,7 +355,7 @@ public void testOverlapingDelta2() throws Exception { conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:" - + Long.MAX_VALUE + ":"), null, false, null, false); + + Long.MAX_VALUE + ":"), null, false); assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString()); List obsolete = dir.getObsolete(); assertEquals(5, obsolete.size()); @@ -401,7 +384,7 @@ public void deltasWithOpenTxnInRead() throws Exception { conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 55).writeToString()); AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:4:4"), null, - false, null, false); + false); List delts = dir.getCurrentDirectories(); assertEquals(2, delts.size()); assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString()); @@ -426,7 +409,7 @@ public void deltasWithOpenTxnInRead2() throws Exception { conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 55).writeToString()); AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:4:4"), null, - false, null, false); + false); List delts = dir.getCurrentDirectories(); assertEquals(2, delts.size()); assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString()); @@ -443,7 +426,7 @@ public void deltasWithOpenTxnsNotInCompact() throws Exception { conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidCompactorWriteIdList("tbl:4:" - + Long.MAX_VALUE), null, false, null, false); + + Long.MAX_VALUE), null, false); List delts = dir.getCurrentDirectories(); assertEquals(1, delts.size()); assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString()); @@ -462,7 +445,7 @@ public void deltasWithOpenTxnsNotInCompact2() throws Exception { conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidCompactorWriteIdList("tbl:3:" - + Long.MAX_VALUE), null, false, null, false); + + Long.MAX_VALUE), null, false); List delts = dir.getCurrentDirectories(); assertEquals(1, delts.size()); assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString()); @@ -488,7 +471,7 @@ public void testBaseWithDeleteDeltas() throws Exception { conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf, - new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false, null, false); + new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false); assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString()); List obsoletes = dir.getObsolete(); assertEquals(7, obsoletes.size()); @@ -531,7 +514,7 @@ public void testOverlapingDeltaAndDeleteDelta() throws Exception { conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:" - + Long.MAX_VALUE + ":"), null, false, null, false); + + Long.MAX_VALUE + ":"), null, false); assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString()); List obsolete = dir.getObsolete(); assertEquals(3, obsolete.size()); @@ -562,7 +545,7 @@ public void testMinorCompactedDeltaMakesInBetweenDelteDeltaObsolete() throws Exc conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:" - + Long.MAX_VALUE + ":"), null, false, null, false); + + Long.MAX_VALUE + ":"), null, false); List obsolete = dir.getObsolete(); assertEquals(1, obsolete.size()); assertEquals("mock:/tbl/part1/delete_delta_50_50", obsolete.get(0).toString()); @@ -591,7 +574,7 @@ public void deltasAndDeleteDeltasWithOpenTxnsNotInCompact() throws Exception { conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidCompactorWriteIdList("tbl:4:" - + Long.MAX_VALUE + ":"), null, false, null, false); + + Long.MAX_VALUE + ":"), null, false); List delts = dir.getCurrentDirectories(); assertEquals(2, delts.size()); assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString()); @@ -616,7 +599,7 @@ public void deleteDeltasWithOpenTxnInRead() throws Exception { conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 55).writeToString()); AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:4:4"), null, - false, null, false); + false); List delts = dir.getCurrentDirectories(); assertEquals(3, delts.size()); assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString()); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index e4440e9136..0d7b69923c 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -3692,7 +3692,7 @@ public void testACIDReaderNoFooterSerializeWithDeltas() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - assertEquals(6, readOpsDelta); + assertEquals(5, readOpsDelta); // revert back to local fs conf.set("fs.defaultFS", "file:///"); @@ -3766,7 +3766,7 @@ public void testACIDReaderFooterSerializeWithDeltas() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - assertEquals(6, readOpsDelta); + assertEquals(5, readOpsDelta); // revert back to local fs conf.set("fs.defaultFS", "file:///"); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index f63c40a7b5..e6fae44a10 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -597,7 +597,7 @@ public void testEmpty() throws Exception { conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); ValidWriteIdList writeIdList = new ValidReaderWriteIdList("testEmpty:200:" + Long.MAX_VALUE); - AcidUtils.Directory directory = AcidUtils.getAcidState(fs, root, conf, writeIdList, null, false, null, false); + AcidUtils.Directory directory = AcidUtils.getAcidState(fs, root, conf, writeIdList, null, false); Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(), BUCKET); @@ -668,8 +668,7 @@ private void testNewBaseAndDelta(boolean use130Format) throws Exception { conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); ValidWriteIdList writeIdList = new ValidReaderWriteIdList("testNewBaseAndDelta:200:" + Long.MAX_VALUE); - AcidUtils.Directory directory = AcidUtils.getAcidState(fs, root, conf, writeIdList, null, use130Format, null, - use130Format); + AcidUtils.Directory directory = AcidUtils.getAcidState(fs, root, conf, writeIdList, null, use130Format); assertEquals(new Path(root, "base_0000100"), directory.getBaseDirectory()); assertEquals(new Path(root, use130Format ? diff --git streaming/src/test/org/apache/hive/streaming/TestStreaming.java streaming/src/test/org/apache/hive/streaming/TestStreaming.java index 3a3b267927..5e0c38644a 100644 --- streaming/src/test/org/apache/hive/streaming/TestStreaming.java +++ streaming/src/test/org/apache/hive/streaming/TestStreaming.java @@ -943,7 +943,7 @@ public void testTableValidation() throws Exception { private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles, String... records) throws Exception { ValidWriteIdList writeIds = getTransactionContext(conf); - AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, writeIds, null, false, null, false); + AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, writeIds, null, false); Assert.assertEquals(0, dir.getObsolete().size()); Assert.assertEquals(0, dir.getOriginalFiles().size()); List current = dir.getCurrentDirectories(); @@ -997,7 +997,8 @@ private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int */ private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, int numExpectedFiles, String validationQuery, boolean vectorize, String... records) throws Exception { - AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, getTransactionContext(conf), null, false, null, false); + AcidUtils.Directory dir = + AcidUtils.getAcidState(null, partitionPath, conf, getTransactionContext(conf), null, false); Assert.assertEquals(0, dir.getObsolete().size()); Assert.assertEquals(0, dir.getOriginalFiles().size()); List current = dir.getCurrentDirectories(); @@ -1048,7 +1049,8 @@ private ValidWriteIdList getTransactionContext(Configuration conf) throws Except return TxnCommonUtils.createValidReaderWriteIdList(v.get(0)); } private void checkNothingWritten(Path partitionPath) throws Exception { - AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, getTransactionContext(conf), null, false, null, false); + AcidUtils.Directory dir = + AcidUtils.getAcidState(null, partitionPath, conf, getTransactionContext(conf), null, false); Assert.assertEquals(0, dir.getObsolete().size()); Assert.assertEquals(0, dir.getOriginalFiles().size()); List current = dir.getCurrentDirectories(); @@ -1995,7 +1997,7 @@ public void testInterleavedTransactionBatchCommits() throws Exception { /*now both batches have committed (but not closed) so we for each primary file we expect a side file to exist and indicate the true length of primary file*/ FileSystem fs = partLoc.getFileSystem(conf); - AcidUtils.Directory dir = AcidUtils.getAcidState(fs, partLoc, conf, getTransactionContext(conf), null, false, null, false); + AcidUtils.Directory dir = AcidUtils.getAcidState(fs, partLoc, conf, getTransactionContext(conf), null, false); for (AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) { for (FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) { Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath()); @@ -2020,7 +2022,7 @@ public void testInterleavedTransactionBatchCommits() throws Exception { //so each of 2 deltas has 1 bucket0 and 1 bucket0_flush_length. Furthermore, each bucket0 //has now received more data(logically - it's buffered) but it is not yet committed. //lets check that side files exist, etc - dir = AcidUtils.getAcidState(fs, partLoc, conf, getTransactionContext(conf), null, false, null, false); + dir = AcidUtils.getAcidState(fs, partLoc, conf, getTransactionContext(conf), null, false); for (AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) { for (FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) { Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());